Skip to content

Commit

Permalink
Fix short read protection for GROUP BY queries
Browse files Browse the repository at this point in the history
patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-15459
  • Loading branch information
adelapena committed Aug 6, 2020
1 parent 7976777 commit 86a9261
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.11.8
* Fix short read protection for GROUP BY queries (CASSANDRA-15459)
* Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
Merged from 3.0:
* Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
Expand Down
68 changes: 34 additions & 34 deletions src/java/org/apache/cassandra/db/filter/DataLimits.java
Expand Up @@ -257,7 +257,7 @@ public static abstract class Counter extends StoppingTransformation<BaseRowItera
private final boolean enforceStrictLiveness;

// false means we do not propagate our stop signals onto the iterator, we only count
private boolean enforceLimits = true;
protected boolean enforceLimits = true;

protected Counter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
{
Expand Down Expand Up @@ -308,14 +308,14 @@ public RowIterator applyTo(RowIterator partition)
*
* @return the number of rows counted.
*/
public abstract int rowCounted();
public abstract int rowsCounted();

/**
* The number of rows counted in the current partition.
*
* @return the number of rows counted in the current partition.
*/
public abstract int rowCountedInCurrentPartition();
public abstract int rowsCountedInCurrentPartition();

public abstract boolean isDone();
public abstract boolean isDoneForPartition();
Expand Down Expand Up @@ -483,8 +483,8 @@ public float estimateTotalResults(ColumnFamilyStore cfs)

protected class CQLCounter extends Counter
{
protected int rowCounted;
protected int rowInCurrentPartition;
protected int rowsCounted;
protected int rowsInCurrentPartition;
protected final boolean countPartitionsWithOnlyStaticData;

protected boolean hasLiveStaticRow;
Expand All @@ -501,7 +501,7 @@ public CQLCounter(int nowInSec,
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowInCurrentPartition = 0;
rowsInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
}

Expand All @@ -519,47 +519,47 @@ public void onPartitionClose()
// Normally, we don't count static rows as from a CQL point of view, it will be merge with other
// rows in the partition. However, if we only have the static row, it will be returned as one row
// so count it.
if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowInCurrentPartition == 0)
if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowsInCurrentPartition == 0)
incrementRowCount();
super.onPartitionClose();
}

protected void incrementRowCount()
{
if (++rowCounted >= rowLimit)
if (++rowsCounted >= rowLimit)
stop();
if (++rowInCurrentPartition >= perPartitionLimit)
if (++rowsInCurrentPartition >= perPartitionLimit)
stopInPartition();
}

public int counted()
{
return rowCounted;
return rowsCounted;
}

public int countedInCurrentPartition()
{
return rowInCurrentPartition;
return rowsInCurrentPartition;
}

public int rowCounted()
public int rowsCounted()
{
return rowCounted;
return rowsCounted;
}

public int rowCountedInCurrentPartition()
public int rowsCountedInCurrentPartition()
{
return rowInCurrentPartition;
return rowsInCurrentPartition;
}

public boolean isDone()
{
return rowCounted >= rowLimit;
return rowsCounted >= rowLimit;
}

public boolean isDoneForPartition()
{
return isDone() || rowInCurrentPartition >= perPartitionLimit;
return isDone() || rowsInCurrentPartition >= perPartitionLimit;
}
}

Expand Down Expand Up @@ -639,7 +639,7 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
if (partitionKey.getKey().equals(lastReturnedKey))
{
rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining;
rowsInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining;
// lastReturnedKey is the last key for which we're returned rows in the first page.
// So, since we know we have returned rows, we know we have accounted for the static row
// if any already, so force hasLiveStaticRow to false so we make sure to not count it
Expand Down Expand Up @@ -825,7 +825,7 @@ public String toString()
@Override
public boolean isExhausted(Counter counter)
{
return ((GroupByAwareCounter) counter).rowCounted < rowLimit
return ((GroupByAwareCounter) counter).rowsCounted < rowLimit
&& counter.counted() < groupLimit;
}

Expand All @@ -843,12 +843,12 @@ protected class GroupByAwareCounter extends Counter
/**
* The number of rows counted so far.
*/
protected int rowCounted;
protected int rowsCounted;

/**
* The number of rows counted so far in the current partition.
*/
protected int rowCountedInCurrentPartition;
protected int rowsCountedInCurrentPartition;

/**
* The number of groups counted so far. A group is counted only once it is complete
Expand Down Expand Up @@ -919,12 +919,12 @@ public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
}
currentPartitionKey = partitionKey;
// If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition
// If we are done we need to preserve the groupInCurrentPartition and rowsCountedInCurrentPartition
// because the pager need to retrieve the count associated to the last value it has returned.
if (!isDone())
{
groupInCurrentPartition = 0;
rowCountedInCurrentPartition = 0;
rowsCountedInCurrentPartition = 0;
}
}

Expand All @@ -934,7 +934,7 @@ protected Row applyToStatic(Row row)
// It's possible that we're "done" if the partition we just started bumped the number of groups (in
// applyToPartition() above), in which case Transformation will still call this method. In that case, we
// want to ignore the static row, it should (and will) be returned with the next page/group if needs be.
if (isDone())
if (enforceLimits && isDone())
{
hasLiveStaticRow = false; // The row has not been returned
return Rows.EMPTY_STATIC_ROW;
Expand All @@ -960,7 +960,7 @@ public Row applyToRow(Row row)

// That row may have made us increment the group count, which may mean we're done for this partition, in
// which case we shouldn't count this row (it won't be returned).
if (isDoneForPartition())
if (enforceLimits && isDoneForPartition())
{
hasGroupStarted = false;
return null;
Expand Down Expand Up @@ -989,21 +989,21 @@ public int countedInCurrentPartition()
}

@Override
public int rowCounted()
public int rowsCounted()
{
return rowCounted;
return rowsCounted;
}

@Override
public int rowCountedInCurrentPartition()
public int rowsCountedInCurrentPartition()
{
return rowCountedInCurrentPartition;
return rowsCountedInCurrentPartition;
}

protected void incrementRowCount()
{
rowCountedInCurrentPartition++;
if (++rowCounted >= rowLimit)
rowsCountedInCurrentPartition++;
if (++rowsCounted >= rowLimit)
stop();
}

Expand Down Expand Up @@ -1058,7 +1058,7 @@ public void onClose()
// 2) the end of the data is reached
// We know that the end of the data is reached if the group limit has not been reached
// and the number of rows counted is smaller than the internal page size.
if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit)
if (hasGroupStarted && groupCounted < groupLimit && rowsCounted < rowLimit)
{
incrementGroupCount();
incrementGroupInCurrentPartitionCount();
Expand Down Expand Up @@ -1311,12 +1311,12 @@ public int countedInCurrentPartition()
return cellsInCurrentPartition;
}

public int rowCounted()
public int rowsCounted()
{
throw new UnsupportedOperationException();
}

public int rowCountedInCurrentPartition()
public int rowsCountedInCurrentPartition()
{
throw new UnsupportedOperationException();
}
Expand Down
11 changes: 7 additions & 4 deletions src/java/org/apache/cassandra/service/DataResolver.java
Expand Up @@ -770,11 +770,12 @@ public UnfilteredPartitionIterator moreContents()
return executeReadCommand(cmd);
}

// Counts the number of rows for regular queries and the number of groups for GROUP BY queries
/** Returns the number of results counted by the counter */
private int counted(Counter counter)
{
// We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of groups.
return command.limits().isGroupByLimit()
? counter.rowCounted()
? counter.rowsCounted()
: counter.counted();
}

Expand Down Expand Up @@ -915,11 +916,13 @@ public UnfilteredRowIterator moreContents()
return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);
}

// Counts the number of rows for regular queries and the number of groups for GROUP BY queries
/** Returns the number of results counted in the partition by the counter */
private int countedInCurrentPartition(Counter counter)
{
// We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
// the number of groups in the current partition.
return command.limits().isGroupByLimit()
? counter.rowCountedInCurrentPartition()
? counter.rowsCountedInCurrentPartition()
: counter.countedInCurrentPartition();
}

Expand Down

0 comments on commit 86a9261

Please sign in to comment.