-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-3488 Scheduling can cause RAS_Node resources to become negative #3114
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix. I have left a few comments.
I would also suggest to make the first topology less complicated and easier to understand its purpose if it's not hard to do so.
@@ -739,6 +740,20 @@ private double calculateSharedOffHeapNodeMemory( | |||
return memorySharedWithinNode; | |||
} | |||
|
|||
// special case for calculateSharedOffHeapMemory, where assignment is null | |||
private double calculateSharedOffHeapNodeMemoryForUnassignedSlot(TopologyDetails td, ExecutorDetails exec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be another calculateSharedOffHeapNodeMemory
function which invokes `calculateSharedOffHeapNodeMemory(nodeId, null, null).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed as overload. unified parameter list would need 4 params; seems cleaner to add overload with 2 params.
@@ -592,6 +592,7 @@ public boolean wouldFit( | |||
} else { | |||
WorkerResources wrAfter = calculateWorkerResources(td, wouldBeAssigned); | |||
afterCpuTotal = wrAfter.get_cpu(); | |||
afterTotal += calculateSharedOffHeapNodeMemoryForUnassignedSlot(td, exec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do we need to care about
afterOnHeap
? - Shouldn't
afterTotal
be wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap() + sharedOffHeapNode?
builder.setBolt("bolt-1", new TestBolt(), | ||
boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker, "bolt-1 shared off heap worker")).shuffleGrouping("spout"); | ||
builder.setBolt("bolt-2", new TestBolt(), | ||
boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "bolt-2 shared node")).shuffleGrouping("bolt-1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to bolt-2 shared off heap within node
?
builder.setBolt("bolt-2", new TestBolt(), | ||
boltParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "bolt-2 shared node")).shuffleGrouping("bolt-1"); | ||
builder.setBolt("bolt-3", new TestBolt(), | ||
boltParallelism).addSharedMemory(new SharedOnHeap(sharedOnHeap, "bolt-3 shared worker")).shuffleGrouping("bolt-2"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to bolt-3 shared on heap within worker
?
// 2nd topology | ||
builder = new TopologyBuilder(); | ||
builder.setSpout("spout", new TestSpout(), | ||
spoutParallelism).addSharedMemory(new SharedOffHeapWithinNode(sharedOffHeapNode, "spout shared node")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to spout shared off heap within node
?
builder.setSpout("spout", new TestSpout(), | ||
spoutParallelism); | ||
builder.setBolt("bolt-1", new TestBolt(), | ||
boltParallelism).addSharedMemory(new SharedOffHeapWithinWorker(sharedOffHeapWorker, "bolt-1 shared off heap worker")).shuffleGrouping("spout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to bolt-1 shared off heap within worker
?
double sharedOffHeapWorker = 500; | ||
|
||
Config conf = createClusterConfig(cpuPercent, memoryOnHeap, memoryOffHeap, null); | ||
TopologyDetails topo[] = new TopologyDetails[2]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is suggested to be TopologyDetails[] topo
|
||
// schedule both topologies. this was triggering the negative resource event | ||
// first topology gets evicted for higher priority (lower value) second topology to successfully schedule | ||
topologies = new Topologies(topo[0], topo[1]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood it now after talking to @dandsager1 offline. The problem here is that without this bug fix, this second topology will be scheduled without evicting the first topology (even the resource is not really enough) because of miscalculations. But it then ends up with negative resource balances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. Just a few comment
@@ -739,6 +743,20 @@ private double calculateSharedOffHeapNodeMemory( | |||
return memorySharedWithinNode; | |||
} | |||
|
|||
// used when topology has no assignments - would be first executor | |||
private double calculateSharedOffHeapNodeMemory(ExecutorDetails exec, TopologyDetails td) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically identical to the other method. So what we can do is to have
private double calculateSharedOffHeapNodeMemory(ExecutorDetails exec, TopologyDetails td) {\
return calculateSharedOffHeapNodeMemory(null, null, exec);
}
and update the other function.
|
Should not be related. I think this is resolved with STORM-3495 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Looks good to me
afterTotal = wrAfter.get_mem_off_heap() + wrAfter.get_mem_on_heap(); | ||
afterOnHeap = wrAfter.get_mem_on_heap(); | ||
|
||
afterTotal += calculateSharedOffHeapNodeMemory(ws.getNodeId(), assignment, td, exec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have another function
private double calculateSharedOffHeapNodeMemory(TopologyDetails td, ExecutorDetails extra) {
return calculateSharedOffHeapNodeMemory(null, null, td, extra);
}
But I am fine with it as it is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Thanks for cleaning up the code
There are checkstyle violations. |
Since this is a bug fix, it should get into 2.1.x too. Could you please file a pull request against 2.1.x-branch after fixing this? Thanks |
No description provided.