-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cassandra 15752 trunk #606
Conversation
8f0f541
to
f272a35
Compare
@@ -2100,22 +2112,33 @@ public RowIterator computeNext() | |||
} | |||
} | |||
|
|||
private void updateConcurrencyFactor() | |||
@VisibleForTesting | |||
public void handleBatchCompleted() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to place update of liveReturned
here. AFIK this method doesn't do anything else than updating the concurrency factor, and we don't have any alternative implementations, so I think I'd prefer the old name for this method, updateConcurrencyFactor
. As for the visibility change, I don't see where is it used in testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed back to updateConcurrencyFactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mulled around the idea of having a class whose only responsibility was tracking the command's completion state. In that case, you could imagine a method updating it in a general sense, taking the number of live rows returned and ranges queried in a round and automatically updating the concurrency factor (which you would just access to make the next batch). It would be pretty easy to test and make RangeCommandIterator
a little more focused, but the static computeConcurrencyFactor()
basically gets us the same thing.
tl;dr I have no problems with the current structure.
|
||
// no live row returned, fetch all remaining ranges but hit the max instead | ||
int cf = StorageProxy.RangeCommandIterator.computeConcurrencyFactor(100, 30, maxConccurrentRangeRequest, 500, 0); | ||
assertEquals(maxConccurrentRangeRequest, cf); // because 100 - 30 = 70 > maxConccurrentRangeRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the comments here 👍
|
||
public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact) | ||
public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, int rangeCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We could break this line
public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, int rangeCount) | |
public ForRangeRead(Keyspace keyspace, | |
ConsistencyLevel consistencyLevel, | |
AbstractBounds<PartitionPosition> range, | |
EndpointsForRange candidates, | |
EndpointsForRange contact, | |
int rangeCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
} | ||
|
||
public AbstractBounds<PartitionPosition> range() { return range; } | ||
|
||
/** | ||
* @return number of vnode ranges |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could extend this to number of vnode ranges intersected by the range
, or rename the method to vnodesCount
/subrangeCount
, or something like that. I think that having a singular range
and a range count at the same time might be a bit confusing to unaware readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. updated the javadoc and method name to vnodeCount
private int concurrencyFactor; | ||
// The two following "metric" are maintained to improve the concurrencyFactor | ||
// when it was not good enough initially. | ||
private int liveReturned; | ||
private int rangesQueried; | ||
|
||
public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime) | ||
public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, int maxConcurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we could break this line
public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, int maxConcurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime) | |
public RangeCommandIterator(RangeIterator ranges, | |
PartitionRangeReadCommand command, | |
int concurrencyFactor, | |
int maxConcurrencyFactor, | |
Keyspace keyspace, | |
ConsistencyLevel consistency, | |
long queryStartNanoTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -294,7 +294,7 @@ public void setUp() | |||
static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets) | |||
{ | |||
return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, | |||
ReplicaUtils.FULL_BOUNDS, replicas, targets); | |||
ReplicaUtils.FULL_BOUNDS, replicas, targets, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we don't need to break this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
f272a35
to
a8d104c
Compare
@Test | ||
public void testComputeConcurrencyFactor() | ||
{ | ||
int maxConccurrentRangeRequest = 32; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int maxConccurrentRangeRequest = 32; | |
int maxConcurrentRangeRequest = 32; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
} | ||
|
||
@Test | ||
public void testRangeCountWithRangeMerge() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This could probably be the first resident of a new RangeMergerTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method needs setTokens()
.. we can probably move it when refactoring RangeReadExecutor
out of StorageProxy
assertEquals(tokens.size() + 1, data.rangesQueried()); | ||
} | ||
|
||
private List<Token> updateTokens(List<Integer> values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe setTokens()
, given it clears the existing stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
int num = Util.size(data); | ||
assertEquals(rows, num); | ||
assertEquals(tokens.size() + 1, data.rangesQueried()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be helpful to remind the future reader why there's a +1 here ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
concurrentQueries.add(response); | ||
readRepairs.add(response.readRepair); | ||
++rangesQueried; | ||
rangesQueried += range.vnodeCount(); | ||
i += range.vnodeCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasonstack @adelapena Say we have a concurrency factor of 2, and the next range actually is a merged range representing 3 vnodes. We'll actually exceed the concurrency factor by 1, but is the idea that we would otherwise not be able to make progress? It doesn't feel like it matters much either way, given the point of this whole mechanism is to limit queries to replicas, which it still does, but I wanted to make sure we're on the same page about the intent...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Say we have a concurrency factor of 2, and the next range actually is a merged range representing 3 vnodes. We'll actually exceed the concurrency factor by 1
this is inevitable, unless coordinator defers the range merging until it knows how many ranges it needs for next batch. I think fetching more ranges in one replica read command isn't going to be very costly, but the cost of under-fetching is significantly higher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem that over-fetching is going to be a problem, but we might add a comment about it.
@@ -2041,16 +2051,24 @@ public void close() | |||
private DataLimits.Counter counter; | |||
private PartitionIterator sentQueryIterator; | |||
|
|||
private int maxConcurrencyFactor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be final
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasonstack Left a few minor comments, but without making more invasive changes to StorageProxy
, this looks pretty good. A bit more testing around RangeCommentIterator
might be in order though.
PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build(); | ||
// avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly. | ||
StorageProxy.RangeIterator rangeIterator = new StorageProxy.RangeIterator(command, keyspace, ConsistencyLevel.ONE); | ||
StorageProxy.RangeCommandIterator data = new StorageProxy.RangeCommandIterator(rangeIterator, command, 1, 1000, keyspace, ConsistencyLevel.ONE, System.nanoTime()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasonstack I traced through this test and it actually looks like we are merging all 5 ranges into one ForRangeRead
, and the reason we still get 5 from data.rangesQueried()
is just that we allow overflow from the last ForRangeRead
in sendNextRequests()
. It feels like perhaps a couple more tests around sendNextRequests()
would be helpful? If we parameterize RangeCommandIterator
to optionally merge ranges, this would be easier. (It could even make the signature of the RangeCommandIterator
constructor less busy, given keyspace
and consistency
are actually only used by the RangeMerger
created inside it.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason we still get 5 from data.rangesQueried() is just that we allow overflow from the last ForRangeRead in sendNextRequests()
correct..coordinator needs to track num of vnodes queried. The test will merge ranges.
I will add some more tests and fix the test comments..
cfs.forceBlockingFlush(); | ||
|
||
PartitionRangeReadCommand command = (PartitionRangeReadCommand) Util.cmd(cfs).build(); | ||
// avoid merging ranges, so that it queries in multiple batches and check if liveReturned is updated correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
liveReturned
appears to be zero if you assert on it here, which makes sense, given we don't get past the first batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right.. it won't update CF if iteration ends.
a8d104c
to
9f62012
Compare
rebased with latest trunk |
@@ -1321,9 +1322,10 @@ private void assertRepairMetadata(Mutation mutation) | |||
assertEquals(update.metadata().name, cfm.name); | |||
} | |||
|
|||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed..
…nd cap max concurrency factor by core * 10 patch by Zhao Yang; reviewed by Andres de la Peña, Caleb Rackliffe for CASSANDRA-15752
596d5d2
to
a5d7eea
Compare
… property name (apache#606) * added 'Void' type in Timer.java * - Either "static_scaling_parameters" xor "static_scaling_factors" is accepted - Changed default readMultiplier to 1.0 instead of 0.5 for adaptive compaction cost calculations * added coverage for 'static_scaling_factors' option * renamed 'static_scaling_parameters' to 'scaling_parameters' * fixed failing tests
… property name (apache#606) * added 'Void' type in Timer.java * - Either "static_scaling_parameters" xor "static_scaling_factors" is accepted - Changed default readMultiplier to 1.0 instead of 0.5 for adaptive compaction cost calculations * added coverage for 'static_scaling_factors' option * renamed 'static_scaling_parameters' to 'scaling_parameters' * fixed failing tests (cherry picked from commit 89e9cb7) (cherry picked from commit 3ba08bd)
… property name (apache#606) * added 'Void' type in Timer.java * - Either "static_scaling_parameters" xor "static_scaling_factors" is accepted - Changed default readMultiplier to 1.0 instead of 0.5 for adaptive compaction cost calculations * added coverage for 'static_scaling_factors' option * renamed 'static_scaling_parameters' to 'scaling_parameters' * fixed failing tests (cherry picked from commit 89e9cb7) (cherry picked from commit 3ba08bd) (cherry picked from commit 745f0d8)
CircleCI: https://circleci.com/workflow-run/440126c4-058d-4511-8301-16df2bddf5db