New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2306 : Messaging subsystem redesign. #2241
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice Work Roshan!
Had an initial look at the code and left few comments(mostly minor).
conf/defaults.yaml
Outdated
@@ -146,7 +149,7 @@ supervisor.run.worker.as.user: false | |||
#how long supervisor will wait to ensure that a worker process is started | |||
supervisor.worker.start.timeout.secs: 120 | |||
#how long between heartbeats until supervisor considers that worker dead and tries to restart it | |||
supervisor.worker.timeout.secs: 30 | |||
supervisor.worker.timeout.secs: 30000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really a deliberate change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no... i need to revert that.
conf/defaults.yaml
Outdated
topology.transfer.batch.size: 10 | ||
topology.executor.receive.buffer.size: 50000 | ||
topology.producer.batch.size: 1000 # TODO: Roshan: rename | ||
topology.flush.tuple.freq.millis: 5000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Better to add a comment describing about this property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was planning to document it somewhere. Do we have a place to document Storm settings ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK if they're added to Config / DaemonConfig and had javadoc on each fields. Other configurations are also documented to the class.
conf/defaults.yaml
Outdated
@@ -304,6 +303,7 @@ storm.cgroup.resources: | |||
storm.cgroup.hierarchy.name: "storm" | |||
storm.supervisor.cgroup.rootdir: "storm" | |||
storm.cgroup.cgexec.cmd: "/bin/cgexec" | |||
storm.cgroup.cgexec.cmd: "/bin/cgexec" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be an accidental copy, needs to be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
@@ -30,4 +31,5 @@ | |||
void ack(Tuple input); | |||
void fail(Tuple input); | |||
void resetTimeout(Tuple input); | |||
void flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to add some javadoc about the same. It seems we are ready to break the APIs with new set of changes in this redesign.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an external facing api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is kind of external facing API as it is given to bolt implementors in prepare API. IOutputCollector
can be wrapped by users to do any custom processing before they are emitted to target tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Random passer-by user: E.g. we do this for tests a lot, yes. On the other hand, this is for Storm 2.0, so API breakages are kinda expected. Then again, at least it should be thinked through / discussed.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think they implement that interface... so may not be an issue. we may need folks to just recompile their topos with Storm 2.0 at most.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is like public API, but given that we ship the change to major version only, I guess that might be fine. If we have alternatives it would be better though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we support end users writing custom output collectors ? This is public for internal use only I think.
Right, there was no easy alternative.
LOG.warn("Received invalid messages for unknown tasks. Dropping... "); | ||
private void transferLocalBatch(List<AddressedTuple> tupleBatch) { | ||
try { | ||
for (int i = 0; i < tupleBatch.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does foreach
have significant perf issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 3.8 in the design doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non random access lists the perf will get worse. So change the signature transferLocalBatch
to accept ArrayList<AddressedTuple>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes indeed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice observation @roshannaik and @arunmahadevan!
} | ||
|
||
public boolean transferLocal(AddressedTuple tuple) throws InterruptedException { | ||
workerData.checkSerialize(serializer, tuple); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this should be done only when it is determined to be sending to local task. So, this should be pushed to just before queue.publish(tuple)
below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Thanks!
private ArrayList<List<Integer>> choices; | ||
private AtomicInteger current; | ||
private int current = 0 ; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You changed this to non threadsafe. Is this instance not shared by multiple executors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chooseTasks
is implemented based on the assumption of thread safety, so we should check this again. If it's thread-safe, it is better to leave comment on this.
} | ||
} | ||
|
||
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) { | ||
public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to have this to be passed in constructor as that can be derived like below from the existing arguments? This constructor change is spread across all usages.
srcComponent = context.getComponentId(taskId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because its a hashmap lookup. this change was based on profiling.
// Cache the msgs grouped by destination node | ||
public void add(TaskMessage taskMsg) { | ||
int destId = taskMsg.task(); | ||
ArrayList<TaskMessage> msgs = bundles.get(destId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can have bundles.computeIfAbsent(destId, integer -> new ArrayList<>());
removing null check, creating list and put that into bundles map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in critical path.. that lambda argument will result in allocation a new (lambda) object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-capturing lambdas only have a single instance in a generated private static field, so the allocation cost is only paid once.
Of course, there is some extra cost in the indirection ... then again, computeIfAbsent()
is optimized in OpenJDK to only do a single hash + table lookup, but if (get() == null) put()
does two of each if the get()
fails. Overall the difference will be nonexistant / heavily dependant on the prevailing branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 cents: either is fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for that sweet tip! just to be sure, i would feel comfortable verifying with profiler first ... given this is in critical path.
addListRefToMap(this.bundles, entry.getKey(), entry.getValue()); | ||
|
||
// Cache the msgs grouped by destination node | ||
public void add(TaskMessage taskMsg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems add
and send
are always invoked in the same thread from (JCQueue.Consumer
's accept
and flush
) and there is no contention. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right.
Hello Roshan,
Thank you very much for your huge work for improving Storm performances!
Regarding "competitive perf evaluation", would you say that now the reasons
why SuperCheif team moved away from Storm to their homebrewed streaming
processing system in 2015 (see http://blog.librato.com/posts/superchief) no
longer hold?
Best regards,
Alexandre Vermeerbergen
2017-07-25 8:58 GMT+02:00 Satish Duggana <notifications@github.com>:
… ***@***.**** commented on this pull request.
Nice Work Roshan!
Had an initial look at the code and left few comments(mostly minor),
Overall LGTM.
------------------------------
In conf/defaults.yaml
<#2241 (comment)>:
> @@ -146,7 +149,7 @@ supervisor.run.worker.as.user: false
#how long supervisor will wait to ensure that a worker process is started
supervisor.worker.start.timeout.secs: 120
#how long between heartbeats until supervisor considers that worker dead and tries to restart it
-supervisor.worker.timeout.secs: 30
+supervisor.worker.timeout.secs: 30000
Is this really a deliberate change?
------------------------------
In conf/defaults.yaml
<#2241 (comment)>:
> @@ -253,11 +247,16 @@ topology.trident.batch.emit.interval.millis: 500
topology.testing.always.try.serialize: false
topology.classpath: null
topology.environment: null
-topology.bolts.outgoing.overflow.buffer.enable: false
-topology.disruptor.wait.timeout.millis: 1000
-topology.disruptor.batch.size: 100
-topology.disruptor.batch.timeout.millis: 1
-topology.disable.loadaware.messaging: false
+topology.bolts.outgoing.overflow.buffer.enable: false # TODO: Roshan : Whats this ?
+topology.disruptor.wait.timeout.millis: 1000 # TODO: Roshan: not used, but we may/not want this behavior
+topology.transfer.buffer.size: 50000
+topology.transfer.batch.size: 10
+topology.executor.receive.buffer.size: 50000
+topology.producer.batch.size: 1000 # TODO: Roshan: rename
+topology.flush.tuple.freq.millis: 5000
nit: Better to add a comment describing about this property.
------------------------------
In conf/defaults.yaml
<#2241 (comment)>:
> @@ -304,6 +303,7 @@ storm.cgroup.resources:
storm.cgroup.hierarchy.name: "storm"
storm.supervisor.cgroup.rootdir: "storm"
storm.cgroup.cgexec.cmd: "/bin/cgexec"
+storm.cgroup.cgexec.cmd: "/bin/cgexec"
may be an accidental copy, needs to be removed.
------------------------------
In storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java
<#2241 (comment)>:
> @@ -30,4 +31,5 @@
void ack(Tuple input);
void fail(Tuple input);
void resetTimeout(Tuple input);
+ void flush();
May want to add some javadoc about the same. It seems we are ready to
break the APIs with new set of changes in this redesign.
------------------------------
In storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
<#2241 (comment)>:
>
- for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
- DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
- if (null != queue) {
- queue.publish(entry.getValue());
- } else {
- LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
+ private void transferLocalBatch(List<AddressedTuple> tupleBatch) {
+ try {
+ for (int i = 0; i < tupleBatch.size(); i++) {
Does foreach have significant perf issue?
------------------------------
In storm-client/src/jvm/org/apache/storm/daemon/Acker.java
<#2241 (comment)>:
> @@ -66,6 +67,7 @@ public void prepare(Map<String, Object> topoConf, TopologyContext context, Outpu
@OverRide
public void execute(Tuple input) {
+ long start = System.currentTimeMillis();
nit: start is never used.
------------------------------
In storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
<#2241 (comment)>:
> @@ -137,7 +137,7 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<I
public static class FieldsGrouper implements CustomStreamGrouping {
private Fields outFields;
- private List<Integer> targetTasks;
+ private ArrayList<List<Integer> > targetTasks;
nit: No need to change from List to ArrayList.
------------------------------
In storm-client/src/jvm/org/apache/storm/daemon/Task.java
<#2241 (comment)>:
> @@ -246,4 +291,26 @@ private void addTaskHooks() {
}
}
+ private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) {
nit: This can return Map<String, List<LoadAwareCustomStreamGrouping>
instead of implementations.
------------------------------
In storm-client/src/jvm/org/apache/storm/daemon/Task.java
<#2241 (comment)>:
> public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values);
}
- List<Integer> outTasks = new ArrayList<>();
- if (!streamComponentToGrouper.containsKey(stream)) {
- throw new IllegalArgumentException("Unknown stream ID: " + stream);
- }
- if (null != streamComponentToGrouper.get(stream)) {
- // null value for __system
- for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+ ArrayList<Integer> outTasks = new ArrayList<>();
+
+ // TODO: PERF: expensive hashtable lookup in critical path
Is this an expensive hit? This map may not contain many keys(no of streams
defined for this task).
------------------------------
In storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
<#2241 (comment)>:
> this.outputCollectors = new ArrayList<>();
- for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
- Task taskData = entry.getValue();
+ for (int i=0; i<idToTask.size(); ++i) {
Why did we replace with this instead of using foreach with
idToTask.entrySet() as that does not need any map lookups?
------------------------------
In storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
<#2241 (comment)>:
> this.outputCollectors = new ArrayList<>();
- for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
- Task taskData = entry.getValue();
+ for (int i=0; i<idToTask.size(); ++i) {
+ Task taskData = idToTask.get(i);
+ if (taskData==null)
Is this a valid condition? I guess taskData can never be null.
------------------------------
In storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java
<#2241 (comment)>:
>
- public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
+ public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, ArrayList<Task> taskDatas) {
nit: taskDatas as List
------------------------------
In storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
<#2241 (comment)>:
> }
}
+
+ private void flushRemotes() throws InterruptedException {
+ workerData.flushRemotes();
+ }
+
+ public boolean transferLocal(AddressedTuple tuple) throws InterruptedException {
+ workerData.checkSerialize(serializer, tuple);
I guess this should be done only when it is determined to be sending to
local task. So, this should be pushed to just before queue.publish(tuple)
below.
------------------------------
In storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
<#2241 (comment)>:
> private ArrayList<List<Integer>> choices;
- private AtomicInteger current;
+ private int current = 0 ;
You changed this to non threadsafe. Is this instance not shared by
multiple components?
------------------------------
In storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
<#2241 (comment)>:
> }
}
- public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
+ public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
Why do we need to have this to be passed in constructor as that can be
derived like below from the existing arguments? This constructor change is
spread across all usages.
srcComponent = context.getComponentId(taskId)
------------------------------
In storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
<#2241 (comment)>:
> private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
-
- public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
- for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : taskTupleSetMap.entrySet()) {
- addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
+
+ // Cache the msgs grouped by destination node
+ public void add(TaskMessage taskMsg) {
+ int destId = taskMsg.task();
+ ArrayList<TaskMessage> msgs = bundles.get(destId);
we can have bundles.computeIfAbsent(destId, integer -> new ArrayList<>());
removing null check, creating list and put that into bundles map.
------------------------------
In storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
<#2241 (comment)>:
> private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
-
- public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
- for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : taskTupleSetMap.entrySet()) {
- addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
+
+ // Cache the msgs grouped by destination node
+ public void add(TaskMessage taskMsg) {
It seems add and send are always invoked in the same thread from
(JCQueue.Consumer's accept and flush) and there is no contention. Is that
right?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#2241 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ARCSMfGDBX-tlWGOS3zMhYmPFFFA8yMYks5sRZIvgaJpZM4OiB-Q>
.
|
@roshannaik Did you run any topologies with trident topologies in distributed mode with these changes? |
@avermeer Looks like SuperChief blog is relaying the same basic claims that Heron has marketed. Since you ask, i will share my opinions wrt Heron's claims.
|
@roshannaik Before going into this deeply, I would like to see the comparison between current master branch vs this patch (say, before and after) so that we can see the impact of the patch clearly. IMO this kind of comparison is required basically on every performance patch. @revans2 brought awesome analysis when introducing disruptor batching. It should be great if we can see similar analysis for this patch too, only if you don't really mind. You can put your analysis with details on it. |
Let's not be too hard on the Heron community. Yes, in the past they've not been exactly friendly in terms of marketing and technical claims, but now that Heron is incubating as an Apache project (full disclosure: I'm a mentor), there's a chance that that might change. A collaborative relationship with the Heron community has potential benefit, an adversarial one not so much. |
@avermeer I think we learnt a lot and adopt some parts of them like back pressure from Heron's paper (though we implemented in different way), but personally also wonder of their points of performance gaining, especially 1 computational thread on 1 worker. Message should be serialized even across the workers in a node, and the cost of serialization/deserialization is not that cheap. Unlike TPC on SQL world, I think there's no general clear way of benchmark in Streaming, so claiming that A is N times faster than B could hide too much detail. Most of frameworks has their own characteristics hence excels on specific use case or performance test. Moreover, developer from A doesn't know about B deeply so most of time fails to set variables optimized specifically for the test. That's why I prefer comparing with our own history much instead of comparing it with others. |
@HeartSaVioR the current master clocks little over 3 mill/sec for the ConstSpoutNullBolt Topo without ACKing. On the latency end, with ACK enabled i saw numbers in multiple seconds with batchSz=1 for master and touching as low ~30 microseconds with the new patch. More analysis? I thought I might have spent too much time providing measurements of various kinds, both in the design docs and here. :-) Actually I have only presented a few pruned down numbers that I felt were interesting, in which case I was a bit meticulous to record the numbers. Given the scope of this patch, meticulously recording and cataloging all the perf tests that I have done (both high level profiler data & low level hardware perf counters) and performing the many more tests that I could have done, will take forever. I felt, ROI was -ve to rigorously pursue some of those measurements any further. If you feel some other critical measurements are necessary, bring it up. Batching is a small aspect of this patch. I had done some quick tests to determine a default that appears to be a modest sweetspot for high throughput low latency topos, but not enough to provide a rigorous analysis. As batch size is very dependent on topology components, deployment size and ack-mode, IMO the search for a more generally optimal batch size is not important right now. I think we will have more publish worthy analysis once these changes stabilize and get more testing. On a related note, about latency and ACK-mode, during my recent measurements, I noticed an issue in ACK mode, both in current master and optimized version, perf degrades with time (for certain batch sizes)... Something I had not noticed on the old version of master on which I had based my changes for a long time. I will share these observations soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a quick pass and left a few comments. Will go through the design doc and do an in-depth review later.
conf/defaults.yaml
Outdated
topology.disruptor.wait.timeout.millis: 1000 # TODO: Roshan: not used, but we may/not want this behavior | ||
topology.transfer.buffer.size: 50000 | ||
topology.transfer.batch.size: 10 | ||
topology.executor.receive.buffer.size: 50000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you are reusing this config as the JCQueue size? Its good to introduce a different config, since users might have overridden this value and it may not be an apt value for the JCQueue size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name is still good and means the same thing. some other defaults are also likely change in storm 2.0.. i didnt see good reason to rename it. i have renamed any setting with the name 'disruptor' in it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the name looks good. If the value users might override matters, let's guide a new start point of optimization from release announce doc or another doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI. I do have the notes in the java doc comments as of now. Agree that we need to make a note of the such settings for which older/existing values may need review along with the release announce doc.
storm-client/pom.xml
Outdated
@@ -257,7 +258,7 @@ | |||
<!--Note - the version would be inherited--> | |||
<configuration> | |||
<excludes>**/generated/**</excludes> | |||
<maxAllowedViolations>10785</maxAllowedViolations> | |||
<maxAllowedViolations>50785</maxAllowedViolations> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theres no point in having a standard if we keep violating it. Please fix the checkstyle issues with the new code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was intended to get a PR out quickly for others to being reviewing. these fixes can wait for the PR updates. will get fixed surely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kindly reminder, given that we are getting close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. fixed internally. will be part of next update.
@@ -137,7 +137,7 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<I | |||
public static class FieldsGrouper implements CustomStreamGrouping { | |||
|
|||
private Fields outFields; | |||
private List<Integer> targetTasks; | |||
private ArrayList<List<Integer> > targetTasks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless you are using some specific method of the implementation its always better to write code against the interfaces.
try { | ||
for (int i = 0; i < tupleBatch.size(); i++) { | ||
AddressedTuple tuple = tupleBatch.get(i); | ||
JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are optimizing the critical path, you might want to use an ArrayList or array for indexed based lookups than a Map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it did show up as a bottleneck during profiling. I recall trying to replace it with an ArrayList... and then backing out as it started getting a bit complicated... i think it had something to do with this map getting periodically refreshed. Let me take a look at it again.
LOG.warn("Received invalid messages for unknown tasks. Dropping... "); | ||
private void transferLocalBatch(List<AddressedTuple> tupleBatch) { | ||
try { | ||
for (int i = 0; i < tupleBatch.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non random access lists the perf will get worse. So change the signature transferLocalBatch
to accept ArrayList<AddressedTuple>
.
sendSize, waitTimeOutMs, batchSize, batchTimeOutMs); | ||
// Called by flush-tuple-timer thread | ||
public boolean publishFlushTuple() { | ||
TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(), Constants.SYSTEM_COMPONENT_ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be constructed newly for each call? If this is a fixed value you can make it a static final value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure... although this allocation doesn't matter much for perf.. as its not in critical path. Its invoked once in a while by the timer.
public boolean publishFlushTuple() { | ||
TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(), Constants.SYSTEM_COMPONENT_ID, | ||
(int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_FLUSH_STREAM_ID); | ||
AddressedTuple flushTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be constructed newly for each call? If this is a fixed value you can make it a static final value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right.. i dont think it is necessary.
(int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_FLUSH_STREAM_ID); | ||
AddressedTuple flushTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); | ||
if( receiveQueue.tryPublish(flushTuple) ) { | ||
LOG.debug("Published Flush tuple to: {} ", getComponentId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getComponentId will get evaluated irrespective of the log level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats fine... its not doing any lookups. nor is it in critical path.
return true; | ||
} | ||
else { | ||
LOG.debug("RecvQ is currently full, will retry later. Unable to publish Flush tuple to : ", getComponentId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getComponentId will get evaluated irrespective of the log level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{}
is missing in log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@roshannaik I'll go through the code first. |
public void add(Object obj) throws InterruptedException { | ||
boolean inserted = q.publishInternal(obj); | ||
while(!inserted) { | ||
Thread.yield(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better to busy spin for few times and then yield.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some other changes coming to that code soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have done for the first pass.
Lots of style violation observed but I guess we can address them after having some +1s on merging.
I didn't look at classes regarding JCQueue implementation since I don't have any knowledge on JCQueue. I'll try to see it later, but feel OK to skip given that there're at least two other reviewers reviewing this thoughtfully.
conf/defaults.yaml
Outdated
@@ -49,6 +49,9 @@ storm.nimbus.retry.times: 5 | |||
storm.nimbus.retry.interval.millis: 2000 | |||
storm.nimbus.retry.intervalceiling.millis: 60000 | |||
storm.auth.simple-white-list.users: [] | |||
storm.auth.simple-acl.users: [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following three lines seem not relevant of this patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks.. i think it that got introduced somehow during manual reconciling/rebasing to latest master.
conf/defaults.yaml
Outdated
@@ -231,16 +228,13 @@ topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer" | |||
topology.shellbolt.max.pending: 100 | |||
topology.skip.missing.kryo.registrations: false | |||
topology.max.task.parallelism: null | |||
topology.max.spout.pending: null | |||
topology.max.spout.pending: null # TODO: We dont need this any more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you're removing max spout pending and also configuration for backpressure. Does this patch also touch the mechanism of backpressure?
And let's address this in patch or file an issue and remove TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes back-pressure is addressed. Described in the design doc. A couple little things left related to BP .. 1) Ensure metrics get updated when retrying to publish to downstream Q that is full. 2) Remove max.spout.pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it utilizes blocking queue which looks natural for the definition of back pressure. If then we need to be sure there's no livelock and deadlock. AFAIK, Storm used blocking queue before, and ended up adding overflow buffer because of deadlock (dining philosopher).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the critical code path is now lock free so should be no room for live/dead-locks. To be very precise, its not exactly a blocking queue.. as there is a retry loop that we can break out of anytime. There are some changes coming soon in this PR.. wrt to what happens when Q is full. will also add that info in the design doc.
conf/defaults.yaml
Outdated
topology.state.synchronization.timeout.secs: 60 | ||
topology.stats.sample.rate: 0.05 | ||
topology.stats.sample.rate: 0.001 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the change intended?
This is OK to change it while doing performance test (especially micro perf), but 0.1% might be too small for production. Even users complained that only 5% of tuples are measured.
Other frameworks using dropwizard metrics even don't sample for meters, and I guess we will follow the rule on new metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was definitely impacting perf. There may be some tweaking needed in the sampler code (not sure). Also Sampling 5 out of every 100 seemed bit excessive even for topos doing several thousand per second. Does 1 out of every 100 seem more reasonable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise I said some frameworks even don't sample and measure all for meters. JStorm is one of example, and they (Alibaba) said "every day the JStorm Cluster handles 1.5 PB/2 Trillion messages" in 2015, which is 23,000,000 messages per seconds. Yes we don't know how many servers they use, but they also claim their performance in website http://jstorm.io:8080/Performance/ and metrics seems not make considerable bottleneck.
One thing to consider is that current metrics implementation may be really slower than dropwizard thing, so we may want to adjust rate if we believe it is really a bottleneck.
Btw, most of the cases users would want to have operational convenience over performance. We should consider first what users are facing day by day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I agree ... thats why I ask. Does 1 out of every 100 seem reasonable ? Or do you feel we should continue to sample 5 out of 100 ? This change is based on impact observed during perf runs.... but if it -vely impacts operational convenience then worth reverting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I don't feel we need to change it if it's only for performance. We already know the sampling rate affects performance, and tolerate it.
Btw, likewise I commented, I think we have touched so much variables irrespective of the message subsystem and JCQueue. How you can prove introducing JCQueue brings better throughput and/or lower latency in this patch? How about message subsystem? Let's not touch variables and address them in separate issue or patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I personally feel that sampling can be reduced a bit, but if you have a preference towards retaining it .... i am ok with that. JCQueue is at the core messaging system .. not separate. the numbers demonstrate the improvement.
conf/defaults.yaml
Outdated
topology.disruptor.batch.size: 100 | ||
topology.disruptor.batch.timeout.millis: 1 | ||
topology.disable.loadaware.messaging: false | ||
topology.bolts.outgoing.overflow.buffer.enable: false # TODO: Roshan : Whats this ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 0.9.x we don't have overflow buffer for bolts and it could make deadlock (philosopher dining problem), so introduced overflow buffer, but it then opens the chance of OOM, so set it to false by default.
After introducing disruptor batching and back pressure, we already made the queue unbounded so don't need the configuration any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok sounds like we can remove this in the PR.
conf/defaults.yaml
Outdated
topology.disruptor.wait.timeout.millis: 1000 # TODO: Roshan: not used, but we may/not want this behavior | ||
topology.transfer.buffer.size: 50000 | ||
topology.transfer.batch.size: 10 | ||
topology.executor.receive.buffer.size: 50000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed that the name looks good. If the value users might override matters, let's guide a new start point of optimization from release announce doc or another doc.
if (s==null) // then stop running it | ||
break; | ||
if (s>0) | ||
Thread.sleep(s); // TODO: Roshan : need to do something about sleep strategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Time.sleepSecs() to be compatible with time simulation. And please check its unit was second, but unit is effectively changed to milliseconds in this change.
@@ -291,7 +291,7 @@ private static TupleWindow makeTupleWindow(ArrayList<Tuple>... streams) { | |||
MockContext mockContext = new MockContext(fieldNames); | |||
|
|||
for (Object[] record : data) { | |||
TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), 0, streamName); | |||
TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), "testSrc", 0, streamName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put component id to the parameter of makeStream if possible.
@@ -313,7 +313,7 @@ private static TupleWindow makeTupleWindow(ArrayList<Tuple>... streams) { | |||
|
|||
ArrayList<Object> tupleValues = new ArrayList<>(1); | |||
tupleValues.add(recordMap); | |||
TupleImpl tuple = new TupleImpl(mockContext, tupleValues, 0, streamName); | |||
TupleImpl tuple = new TupleImpl(mockContext, tupleValues, "testSrc", 0, streamName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put component id to the parameter of makeNestedEventsStream if possible.
@@ -78,7 +78,7 @@ public Fields getComponentOutputFields(String componentId, String streamId) { | |||
} | |||
|
|||
private Tuple getTuple(String streamId, final Fields fields, Values values) { | |||
return new TupleImpl(getContext(fields), values, 1, streamId) { | |||
return new TupleImpl(getContext(fields), values, "testSrc", 1, streamId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put component id to the parameter of getTuple if possible.
@@ -712,6 +712,6 @@ public static Tuple testTuple(List<Object> values, MkTupleParam param) { | |||
new HashMap<>(), | |||
new HashMap<>(), | |||
new AtomicBoolean(false)); | |||
return new TupleImpl(context, values, 1, stream); | |||
return new TupleImpl(context, values, "testSrc", 1, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use component instead of "testSrc".
Btw, we should be careful if we drop something we provided. This patch seems focus to performance optimization (especially micro), and for some spots it might not be always better. One example is disabling load aware by default. |
-1 Perhaps I am running into some odd issues here so if I can be corrected I would be happy to change my vote, but nothing I have run with this patch is better in any way. Are all of the results from micro benchmarks? Did anyone run a real topology with this patch before posting all of these wonderful results to twitter? I am not one to swear but WTF? I built a stock 2.0.0-SNAPSHOT build (450ed63) and compared it to the exact same release with this patch merged on top of it (which was a clean merge). I am running
I ran the ThroughputVsLatency topology with several different options and no changes at all to the default storm.yaml. With this patch I found that.
I am happy to make all of my numbers public. I also plan to run them on a Linux box to see if it is any different. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not done with the code review, but until my other concerns are addressed I don't see much reason to do more.
conf/defaults.yaml
Outdated
topology.flush.tuple.freq.millis: 5000 | ||
topology.spout.recvq.skips: 3 # Check recvQ once every N invocations of Spout's nextTuple() [when ACKs disabled] | ||
|
||
topology.disable.loadaware.messaging: true # load aware messaging reduces throughput by ~20%. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that turning it off by default may not be what we want. It really helps on heterogeneous clusters. And quite honestly a lot of the benchmarks that we have been running are no where near what I have seen happen in production. It gives you a nice feeling to say we can do X million tuples per second, but these are for bolts that do close to nothing. Most real bolts I have seen actually take some time to process.
@@ -76,6 +76,8 @@ public static StormTopology getTopology(Map<String, Object> conf) { | |||
public static void main(String[] args) throws Exception { | |||
int runTime = -1; | |||
Config topoConf = new Config(); | |||
topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are setting this for all of the perf tests, then why is it not the default? Having a benchmark that does not show what people will get out of the box feels a bit fishy to me, even though everyone does it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Higher numbers for that setting are only useful for very low latency spouts. The default setting there assumes that you are likely to be using Kafka/Hdfs kind of spouts which have higher latencies (amount of time spent inside nextTuple() ).
*/ | ||
@isPowerOf2 | ||
public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this does not need to be a power of 2 that is fine but we still want an annotation to say what type it is. I think @isPositiveNumber
is probably good enough. You might want to make it an @isInteger
too if you want to limit it to 32 bits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed! my bad.
@@ -97,6 +97,8 @@ public void run() { | |||
// events. | |||
Time.sleep(1000); | |||
} | |||
if(Thread.interrupted()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR InterruptedException
in most places in storm is only used to shut the process down cleanly. This looks fine to me.
for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { | ||
ArrayList<Integer> outTasks = new ArrayList<>(); | ||
|
||
// TODO: PERF: expensive hashtable lookup in critical path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So like with all TODOs in the code we either need to fix the TODO, remove the TODO because we are not going to do it, or remove the TODO and file a follow on JIRA to do it later.
new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, | ||
return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials); | ||
} | ||
}); // Subject.doAs(...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If we are changing this can we just make it a lambda instead?
Subject.doAs(subject, () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on nit, cause it is simpler and not placed to critical path.
|
||
private void setupFlushTupleTimer(final List<IRunningExecutor> executors) { | ||
// StormTimer timerTask = workerState.getUserTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
public void run() { | ||
for (int i = 0; i < executors.size(); i++) { | ||
IRunningExecutor exec = executors.get(i); | ||
if(exec.getExecutorId().get(0) != -1) // dont send to system bolt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
return transferQueue; | ||
} | ||
|
||
public StormTimer getUserTimer() { | ||
return userTimer; | ||
} | ||
|
||
final DisruptorQueue transferQueue; | ||
final JCQueue transferQueue; //TODO: Roshan: transferQueue also needs to avoid double batching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this done? Looks kind of important.
I ran again with this exact version (5c0db92) and got the same results. |
I tried on Linux too and got very similar results. The CPU and memory usage of the topology was lower but the actual throughput and latency of the topology was very similar. |
@revans2 Do you mind posting your storm.yaml or are you running with defaults. We will try to see if we can reproduce this same behavior on our side. If there are any bugs we will work to fix it and but its shows great potential on the perf improvements. |
@harshach I am running with defaults in all cases I build
Wait for everything to come up, and I can see the ui. Then I run some tests (I pull in the storm starter from the build because the packaged one does not build on its own).
|
First of all, I think I should be more careful about like / retweet. Sorry about that and just reverted all. Regarding tweet, I think it heavily depends on which accounts (who) tweet it. If users or contributors are tweeting about this PR, we can take it like a gossip and no problem on it. If committers or PMCs are doing it, that could be shown as kind of publicizing, especially this PR compares with other frameworks and claims Storm will be no 1. performer. If PMC members tweet about unsafe or not yet stable source, and if it turned out to another result, someone could feel we (Storm community) hype, regardless of intention. If we are doing it with official account (@apachestorm), that just matters, because the account could be shown as on behalf of Storm community or at least PMC members. I was the one wondering why official account retweeted about non-Storm related tweet like https://twitter.com/BoredElonMusk/status/889935279213223936 . |
From now what I can suggest is... breaking down STORM-2306 to "redesign messaging subsystem" and "switch to JCTools Queues", and try out latter part first. |
@HeartSaVioR lets keep this discussion to reviews. This is not forum to discuss what one should tweet or not that's up to individuals. Nobody is trying to promote something that's not feasible lets not try to be a moral authority here to suggest what one can do or not. |
@harshach If we were having PR only replacing JCTools, it is easy to review and identify the benefit, or pros/cons of replacing Disruptor with JCTools. The patch will be fairly simple to review and verify, and most of the issues raised here shouldn't be raised there (because I guess most of them are not from queue replacement). |
@HeartSaVioR I don't mind breaking this into multiple PRs if it helps reviewing and merging in. Its up to @roshannaik . |
@revans2 @HeartSaVioR
I did the following changes:
If you see lines from 51 - 75 Apache Master clearly couldn't handle the faster spout and starts timing out. Perf degrades considerably and very quickly. Where as STORM-2306 not only was able to handle the faster spout and delivered stable and processing at more start out being 10x faster then improves to 35x faster compared to master.
From line 3-45 you can see with this patch we are getting under 10ms (depends on the topology) compare to an avg of 250ms+. (with batchSize=1000)
|
@harshach I have also done with some performance tests, without modifying TVL topology. The reason is that we should also care about non-performance-maximized topology. For benchmarking performance maximized topology we also have ConstSpoutIdBoltNullBoltTopo, so let's not modify TVL and verify this patch works with all the cases. Since this patch doesn't seem to handle inter-worker communication properly, the test set what we can do for now is very limited. Here's my machine spec used for performance test:
and here's my number (just pasted as raw number): My observation is that this patch looks impressive with performance maximized topology, but this also looks really bad (not acceptable) with relatively idle topology. While we often publicize micro-benchmark result, in practice users would run much idle topologies. |
@HeartSaVioR Its not 12 executors per worker. If you don't pass a command-line argument, it sets parallelism variable here to 4 https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java#L277 I still need to look more in detail of TVL topology, before doing so want to write a real-world topology that reads from Kafka and writes another Kafka topic and test that out. The issue here is not with idle topology but a topology that is throttling the input rate and its not idle. We've two metrics consumers with parallelism set to 1. How confident are we that the metrics consumers are not causing the issue. I'll look into these in more detail and update my findings here. |
@harshach The second argument is effectively representing worker count: you can see that topology set worker count as parallelism. I agree that the name is really misleading even I ran tests with topology.workers instead of passing second argument. (need to run test again...) |
Let me share a quick test result with passing
CPU usage was around 150 ~ 250%, mostly around 160% which seemed to be a bit more stable, but still fluctuating with small rate 10000.
CPU usage was around 120%, no outstanding fluctuation (+- 10%). |
…- interworker=3mill/sec. withAck=700k/s,1.3ms
@roshannaik any update on this? Is there anything I can do to help get this done for 2.x? |
@revans2 thanks very much for the offer to help, i think it might be useful to get past this issue that is blocking this. Just updated the PR with these two key changes.
Curent Status:
May be useful to sync up with you offline soon and see if looking into the issue together helps. |
…ication - Fixed some issues found during multi-worker testing - Fixes to perf topos to pick up cmd line args correctly - Removing support for single producer mode in JCQueue - Almost all msg transfers are non-blocking now (other than metrics and credentials-change notifications). Allows spouts and bolts to processes metrics ticks even if output path is choked due to backpressure - Some temporary debug logs that need to be removed after testing - Needs more multiworker mode testing
1c89c0f
to
c275bfc
Compare
- Adding workerID to BpStatus for better debuggability - Logging the length of an idle stretch for BP & max.spout.pending wait situations - Changes to defaults: topology.executor.receive.buffer.size=32k (rounding up to power of 2), topology.flush.tuple.freq.millis=1 (same as master) - minor fixes and improvements
0816894
to
3849cbb
Compare
…it Strategy settings + topology.flush.tuple.freq.millis -> topology.batch.flush.interval.millis )
- Added BackPressure unit tests
3849cbb
to
1f35bc9
Compare
78d5e0a
to
84cb990
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another partial review done. I finished reviewing to Task.java
, and will continue to review sooner when I can find another time.
topology.batch.flush.interval.millis: 1 # Flush tuples are disabled if this is set to 0 or if (topology.producer.batch.size=1 and topology.transfer.batch.size=1). | ||
topology.spout.recvq.skips: 3 # Check recvQ once every N invocations of Spout's nextTuple() [when ACKs disabled] | ||
|
||
topology.disable.loadaware.messaging: false # load aware messaging can degrade throughput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be better to describe the cases when we recommend using load aware messaging, or disable load aware messaging. The comment may mislead users to consider this option as always better to disable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that would be good but I dont have a good handle on loadAware's perf characteristics, other than noticing throughput degradation with the ConstSpout* Topos to make good recommendations. I had initially disabled it, but then since there was (well founded) opposition from Bobby wrt disabling it, so I reverted it and left the short comment.
Bobby mentioned some interesting improvements coming to LoadAware (like scheduling communicating tasks within same worker instead of even distribution across workers)... so hopefully this situation may change.
Since this is file is not conducive to making verbose recommendations, perhaps we can put better guidance in the Performance.md... if anyone has some concrete inputs about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bobby mentioned some interesting improvements coming to LoadAware (like scheduling communicating tasks within same worker instead of even distribution across workers)... so hopefully this situation may change.
Yes basic locality aware is already applied to load aware shuffle grouping, and I also expect improvement on determining locality will be available in future.
Since this is file is not conducive to making verbose recommendations, perhaps we can put better guidance in the Performance.md... if anyone has some concrete inputs about it.
Agreed. Details could be put to Performance.md.
docs/Performance.md
Outdated
|
||
|
||
## Max.spout.pending | ||
The back pressure mechanism no longer requires `topology.max.spout.pending`. It is recommend to set this to null (default). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the report posted in issue, setting this option still shows better result and hence the tests in the report utilizes the option. Do we want to guide it as well, or still be better to recommend users to set this to null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching that. This comment is no longer true. Will fix.
@Override | ||
public void run() { | ||
long start = System.currentTimeMillis(); | ||
// while (!Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it expected to toggle uncomment (and comment while (!halt) {
)? If not, let's just remove this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@Override | ||
public void run() { | ||
long start = System.currentTimeMillis(); | ||
// while (!Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
public void run() { | ||
Handler handler = new Handler(); | ||
long start = System.currentTimeMillis(); | ||
// while(!Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here and comments in below lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
if (grouper == GrouperFactory.DIRECT) { | ||
throw new IllegalArgumentException("Cannot do regular emit to direct stream"); | ||
} | ||
List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping); | ||
outTasks.addAll(compTasks); | ||
outTasks.addAll(compTasks); // TODO: PERF: this is a perf hit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: may be better to decide.
IMHO I'm still not convinced that it can introduce performance hit. Something I can imagine are allocating backed array (only once in method call) and expanding array, but unless we use fan-out in huge size topology, outTasks is expected to be small.
@@ -177,6 +196,35 @@ public BuiltinMetrics getBuiltInMetrics() { | |||
return builtInMetrics; | |||
} | |||
|
|||
|
|||
// Non Blocking call. If cannot emmit to destination immediately, such tuples will be added to `pendingEmits` argument |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: emmit
-> emit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
/** | ||
* Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api). | ||
*/ | ||
public void sendToEventLogger(Executor executor, List values, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the method be called every emit when number of executors of event logger are more than 0? If then we may want to apply more optimizations here (not in this issue but another following up issue - others can take it up).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Since topology.eventlogger.executors=0 (disabled) by default, i didn't spend time on this.
@@ -192,7 +240,7 @@ private TopologyContext mkTopologyContext(StormTopology topology) throws IOExcep | |||
ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())), | |||
ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()), | |||
taskId, | |||
workerData.getPort(), workerData.getTaskIds(), | |||
workerData.getPort(), workerData.getLocalTaskIds(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed method looks better to clarify the meaning. Nice improvement.
@@ -186,9 +201,11 @@ public Runnable getSuicideCallback() { | |||
final AtomicBoolean isTopologyActive; | |||
final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug; | |||
|
|||
// executors and taskIds running in this worker | |||
// executors and localTaskIds running in this worker | |||
final Set<List<Long>> executors; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the comment, this can be also localExecutors
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still in progress of reviewing... Will continue from SpoutOutputCollectorImpl.java
itr.remove(); | ||
changed = true; | ||
} else { | ||
// LOG.info("Task = {}, OverflowCount = {}, Q = {}", entry.getKey(), entry.getValue().getOverflowCount(), entry.getValue().getQueuedCount() ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we want to leave it with DEBUG, or remove the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, | ||
return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials); | ||
} | ||
}); // Subject.doAs(...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on nit, cause it is simpler and not placed to critical path.
@@ -193,6 +210,24 @@ public void run() { | |||
}); | |||
} | |||
|
|||
/** | |||
* Schedule a function to run recurrently | |||
* @param delayMs the number of seconds to delay before running the function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the unit is not seconds but milliseconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed.
private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) { | ||
final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); | ||
final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE)); | ||
final Long flushIntervalMicros = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config name is milliseconds, whereas variable name is microseconds. if you want to convert it later, it would be better to do it here, or just rename to milliseconds.
Looks like it is used as milliseconds below, so I guess the variable name and below log messages in this method are wrong.
|
||
private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) { | ||
final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS)); | ||
if (workerCount == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to change it to if (workCount <= 1)
to cover workerCount == 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that was the intent.
if (consumeIdleCounter==0) { | ||
LOG.debug("Invoking consume wait strategy"); | ||
} | ||
consumeIdleCounter = consumeWaitStrategy.idle(consumeIdleCounter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we let wait strategy instances handle relevant counters (consumeIdleCounter, bpIdleCount)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it is possible, but the counters have to be reset and examined if ==0 .... from outside, it didnt seem much benefit to abstract it inside the strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let's revisit if we will get confused again.
boltFailInfo.applyOn(taskData.getUserContext()); | ||
if (delta >= 0) { | ||
boltFailInfo.applyOn(task.getUserContext()); | ||
if (delta != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you addressed above only. It should stay the same.
storm/storm-core/src/clj/org/apache/storm/daemon/executor.clj
Lines 831 to 835 in b8f76af
(when (<= 0 delta) | |
(stats/bolt-failed-tuple! executor-stats | |
(.getSourceComponent tuple) | |
(.getSourceStreamId tuple) | |
delta)))) |
final boolean isActive = stormActive.get(); | ||
public Long call() throws Exception { | ||
int receiveCount = 0; | ||
if (i++ == recvqCheckSkipCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: we may be able to have longer but better variable name other than i
, since it's not a temporary short scope variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes makes sense.
rmspCount = 0; | ||
} | ||
|
||
if ( receiveCount>1 ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional to compare this with 1? I guess it looks natural to compare with 0, but you may have some reasons to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good observation. Actually it used to be 0. Then I noticed it was somewhat common case for there to be just one item in the recvQ (metrics tick, flush tuple etc) during idle stretches. Once we consume that, we idle immediately, rather than check again. a minor optimization.
if (!transferQueue.isFull() && !throttleOn && !reachedMaxSpoutPending) { | ||
for (ISpout spout : spouts) { | ||
spout.nextTuple(); | ||
boolean pendingEmitsIsEmpty = tryFlushPendingEmits(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: The logic for when Spout is active is now somewhat long and it contains lots of nested if and return. May be better to refactor a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i tried once and rolled it back.. due to the two levels of callables. will try again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot one to comment:
I have seen such kind of lineidToTask.get(taskId - idToTaskBase)
so many times. Given that it is not easy to understand unless we indicate how idToTask is build, it would be really better if we could find a way to abstract it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just finished reviewing, except JCQueue and related tests. I'll see I can review them afterwards.
writeMessage(channel, batch); | ||
} | ||
} catch (IOException e) { | ||
dropMessages(msgs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is !channel.isConnected()
the only case for IOException? If then it doesn't swallow the exception quietly, but it will swallow the exception quietly if there're other cases for IOException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writeMessage() is the only one that throws that IOException ( after checking if !channel.isConnected()). I think I should add a log msg also.
@@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Map<String, Object> conf, final GeneralTopolo | |||
_kryoInput = new Input(1); | |||
} | |||
|
|||
public Tuple deserialize(byte[] ser) { | |||
public TupleImpl deserialize(byte[] ser) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a reminder: Let's add @Override
, and just revert back if we don't leverage the explicit implementation type.
@@ -30,4 +31,5 @@ | |||
void ack(Tuple input); | |||
void fail(Tuple input); | |||
void resetTimeout(Tuple input); | |||
void flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is like public API, but given that we ship the change to major version only, I guess that might be fine. If we have alternatives it would be better though.
|
||
@Override | ||
public void flush() { | ||
//NOOP //TODO: Roshan: validate if this is OK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove TODO if it is addressed, or address this before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure yet. Wanted others opinion. Would you know if its ok for that to be a NO-OP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are adding this in this patch, so I'm also not sure about that.
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) { | ||
this(context, values, taskId, streamId, MessageId.makeUnanchored()); | ||
|
||
//// Perf Note: Disabling these checks as they are a throughput bottleneck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly reminder: if we are OK to drop schema validation, let's just remove the code. If we still want to validate the schema, let's leave it as it is.
(Maybe we could pass schema
as well for same reason as srcComponent
.)
if (s==null) // then stop running it | ||
break; | ||
if (s>0) | ||
Thread.sleep(s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Time.sleepSecs()
for simulated time.
@@ -1505,4 +1507,20 @@ public static StormTopology addVersions(StormTopology topology) { | |||
} | |||
return defaultsConf; | |||
} | |||
|
|||
public static <V> ArrayList<V> convertToArray(Map<Integer, V> srcMap, int start) { | |||
Set<Integer> executorIds = srcMap.keySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use more generic variable name: ids would be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed.
@@ -79,6 +79,7 @@ | |||
protected final long lowMemoryThresholdMB; | |||
protected final long mediumMemoryThresholdMb; | |||
protected final long mediumMemoryGracePeriodMs; | |||
private static int port = 5006; // TODO: Roshan: remove this after stabilization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly reminder: this should be addressed before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. will do this last. need for debugging.
@@ -747,6 +748,16 @@ protected String javaCmd(String cmd) { | |||
commandList.add("-Dstorm.conf.file=" + topoConfFile); | |||
commandList.add("-Dstorm.options=" + stormOptions); | |||
commandList.add("-Djava.io.tmpdir=" + workerTmpDir); | |||
if (_topoConf.get("attach.debugger") != null) { // TODO: Roshan: remove this after stabilization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Friendly reminder: this should be addressed before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also reviewed JCQueue related files: I'm finished another pass of reviewing.
} | ||
} // class DirectInserter | ||
|
||
private static class BatchInserter implements Inserter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: may be better to leave a comment that the class is non thread-safe. Even better to also add comment to ThreadLocal variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point.
if (producerBatchSz > 1) { | ||
inserter = thdLocalBatcher.get(); | ||
if (inserter == null) { | ||
BatchInserter b = new BatchInserter(this, producerBatchSz); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: Is there specific reason to have temporary local variable b
instead of directly using local inserter
variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid casting.
thdLocalBatcher.set(b)
expects a BatchInserter. inserter
is of base type.
} | ||
|
||
|
||
public interface ExitCondition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Predicate
with proper variable names might be able to get rid of this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. i had it that way originally, then felt that the call to keepRunning() made things more readable (made it clear that if it returns true we will keep running as opposed to quit). I can go back to predicate interface if you feel this is not desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK makes sense. Let's keep it as it is.
for (int i = 0; i < producerNum; i++) { | ||
producerThreads[i].join(TIMEOUT); | ||
assertFalse("producer "+i+" is still alive", producerThreads[i].isAlive()); | ||
} | ||
queue.haltWithInterrupt(); | ||
consumerThread.join(TIMEOUT); | ||
assertFalse("consumer is still alive", consumerThread.isAlive()); | ||
//TODO need to fix this... assertFalse("consumer is still alive", consumerThread.isAlive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fixed, or still need to fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes its its fixed. will remove the comment.
} | ||
|
||
|
||
// check that tryPublish() & tryOverflowPublish() work as expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed to add @Test
, or intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. will fix.
…hread (similar to master). Gives better interworker perf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Thanks @HeartSaVioR. Addressed most of your comments . Will get back on them.
for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { | ||
ArrayList<Integer> outTasks = new ArrayList<>(); | ||
|
||
// TODO: PERF: expensive hashtable lookup in critical path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be possible to fix, but needs some work... one way is to see if streamIds can be internally represented as ints.
Filed issue
if (grouper == GrouperFactory.DIRECT) { | ||
throw new IllegalArgumentException("Cannot do regular emit to direct stream"); | ||
} | ||
List<Integer> compTasks = grouper.chooseTasks(taskId, values, loadMapping); | ||
outTasks.addAll(compTasks); | ||
outTasks.addAll(compTasks); // TODO: PERF: this is a perf hit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your position, as I was surprised too. I have filed an issue and explained in it how I originally discovered it. I didnt pursue it too much, so I dont know why it is an issue (or if it still is), but felt it was good idea to record that observation for later.
@@ -177,6 +196,35 @@ public BuiltinMetrics getBuiltInMetrics() { | |||
return builtInMetrics; | |||
} | |||
|
|||
|
|||
// Non Blocking call. If cannot emmit to destination immediately, such tuples will be added to `pendingEmits` argument |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
/** | ||
* Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api). | ||
*/ | ||
public void sendToEventLogger(Executor executor, List values, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Since topology.eventlogger.executors=0 (disabled) by default, i didn't spend time on this.
itr.remove(); | ||
changed = true; | ||
} else { | ||
// LOG.info("Task = {}, OverflowCount = {}, Q = {}", entry.getKey(), entry.getValue().getOverflowCount(), entry.getValue().getQueuedCount() ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
|
||
@Override | ||
public void flush() { | ||
//NOOP //TODO: Roshan: validate if this is OK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure yet. Wanted others opinion. Would you know if its ok for that to be a NO-OP?
@Override | ||
public void run() { | ||
long start = System.currentTimeMillis(); | ||
// while (!Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
public void run() { | ||
Handler handler = new Handler(); | ||
long start = System.currentTimeMillis(); | ||
// while(!Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
if (workerData.tryTransferRemote(addressedTuple, pendingEmits)) { | ||
++remotesBatchSz; | ||
if (remotesBatchSz >= producerBatchSz) { | ||
workerData.tryFlushRemotes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to take a look more carefully. will get back on that.
try { | ||
Task task = new Task(executor, taskId); | ||
executor.sendUnanchored( | ||
task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer()); | ||
task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR, @revans2 , others : would you know why we have this call to sendUnanchored ? I recall verifying on master (and 2306) that it doesn't get delivered to anywhere. Could remove it if not needed.
fdfddb4
to
0edc250
Compare
Sorry it has taken me so long to get back to this. I did a quick pass through the code, but I need to spend some time to go more in depth. I expect to be doing this this afternoon and tomorrow. My biggest concern right now is around up-merging and getting the unit tests to pass. I don't see a reason to have JCQueueTest fail 100% of the time. I am also concerned that a lot of other unit tests fail 100% of the time with this patch too. |
This PR page has become unusable. Most of the time it doesn't load. If it does load, it is too slow to use...even for just typing a comment. Let stop using this and switch to the new PR (thanks for the suggestion @HeartSaVioR ) There are a few recent unaddressed comments on this PR that I will track and address in the new PR. |
Having spent a lot of time on this, I am happy to share some good news and some even better news with you.
Before venturing further, I must add, to limit the scope of this PR, no attempt was made to improve ACK-ing mode perf. Although there are some big latency gains seen in ACK mode, these are a side effect of the new messaging design and work remains to be done to improve ACK mode.
Please see the design docs posted on the STORM-2306 jira for details on what is being done
So, first the good news .. a quick competitive evaluation:
1) Competitive Perf evaluation :
Here are some quick comparison of Storm numbers taken on my laptop against numbers for similar/identical topologies published by Heron, Flink and Apex. Shall provide just rolled up summary here and leave the detailed analysis for later.
Storm numbers here were run on my MacBook Pro (2015) with 16GB ram and a single 4 core Intel i7 chip.
A) Compared To Heron and Flink:
Heron recently published this blog about the big perf improvements (~4-6x) they achieved.
https://blog.twitter.com/engineering/en_us/topics/open-source/2017/optimizing-twitter-heron.html
They ran it on dual 12-core Intel Xeon chips (didn't say how many machines).
They use a simplified word count topology that I have emulated for comparison purposes and included it as part of this PR (SimplifiedWordCountTopo).
Flink also publishes numbers for a similar setup here
https://flink.apache.org/features.html#streaming
Below are per core throughput numbers.
[:HERON:]
Acking Disabled: per core = ~475 k/sec.
Acking Enabled: per core = ~150 k/sec. Latency = 30ms
[:FLINK:]
Per core: ~1 mill/sec
[:STORM:]
Acking Disabled: per core = 2 mill/sec. (1 spout & 1 counter bolt)
Acking Enabled: per core = 0.6 mill/sec, Latency = 0.73 ms (+1 acker)
Takeaways:
B) Compared to Apex:
Apex appears to be the best performer among the opensource lot.. by a reasonably good margin. Some numbers they published in their early days (when it was called DataTorrent) were misleading/dubious IMO, but the newer numbers appear credible.
Here we look at how fast inter spout/bolt communication can be achieved using an ultra minimalist topology.
A ConstSpout emits a short string to a DevNull bolt that discards the tuples it receives. This topo has been in storm-perf for sometime now.
Apex provides numbers for a identical setup ... what they call "container local" performance here:
https://www.datatorrent.com/blog/blog-apex-performance-benchmark/
Other than the fact that Storm numbers were on my laptop, these numbers are a good apples to apples comparison.
[:APEX:]
Container local Throughput : ~4.2 mill/sec
[:STORM:]
Worker local throughput : 8.1 mill/sec
2) Core messaging Performance
Now for the better news. The redesigned messaging system is actually much faster and able to move messages between threads at an astounding rate .... :
I have included JCQueuePerfTest.java in this PR to help get quick measurements from within the IDE.
That naturally begs the question .. why is Storm pushing only 8.1 mill/sec between a ConstSpout and DevNullBolt ? The short answer is ... there are big bottlenecks in other parts of the code. In this PR I have tackled some such bottlenecks but many still remain. We are faster than the competition, but still have room to be much much faster. If anyone is interested in pursuing these to push Storm's perf to the next level, I am happy to point them in the right direction.
Again, please refer to the design docs in the JIRA for details on the new design and the rationale behind them.