Skip to content

Commit

Permalink
PHOENIX-1672 RegionScanner.nextRaw contract not implemented correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
apurtell committed Feb 22, 2015
1 parent c633151 commit 3d50147
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 180 deletions.
Expand Up @@ -375,7 +375,7 @@ GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable t
* @param limit TODO * @param limit TODO
*/ */
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner s, final List<Expression> expressions, final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException { final ServerAggregators aggregators, long limit) throws IOException {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
Expand Down Expand Up @@ -410,28 +410,30 @@ private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment
HRegion region = c.getEnvironment().getRegion(); HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation(); region.startRegionOperation();
try { try {
do { synchronized (scanner) {
List<Cell> results = new ArrayList<Cell>(); do {
// Results are potentially returned even when the return List<Cell> results = new ArrayList<Cell>();
// value of s.next is false // Results are potentially returned even when the return
// since this is an indication of whether or not there are // value of s.next is false
// more values after the // since this is an indication of whether or not there are
// ones returned // more values after the
hasMore = s.nextRaw(results); // ones returned
if (!results.isEmpty()) { hasMore = scanner.nextRaw(results);
result.setKeyValues(results); if (!results.isEmpty()) {
ImmutableBytesWritable key = result.setKeyValues(results);
ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions); TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key); Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here // Aggregate values here
aggregators.aggregate(rowAggregators, result); aggregators.aggregate(rowAggregators, result);
} }
} while (hasMore && groupByCache.size() < limit); } while (hasMore && groupByCache.size() < limit);
}
} finally { } finally {
region.closeRegionOperation(); region.closeRegionOperation();
} }


RegionScanner regionScanner = groupByCache.getScanner(s); RegionScanner regionScanner = groupByCache.getScanner(scanner);


// Do not sort here, but sort back on the client instead // Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region // The reason is that if the scan ever extends beyond a region
Expand All @@ -453,7 +455,7 @@ private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment
* @throws IOException * @throws IOException
*/ */
private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s, final List<Expression> expressions, final Scan scan, final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, final long limit) throws IOException { final ServerAggregators aggregators, final long limit) throws IOException {


if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
Expand All @@ -466,12 +468,12 @@ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnviron


@Override @Override
public HRegionInfo getRegionInfo() { public HRegionInfo getRegionInfo() {
return s.getRegionInfo(); return scanner.getRegionInfo();
} }


@Override @Override
public void close() throws IOException { public void close() throws IOException {
s.close(); scanner.close();
} }


@Override @Override
Expand All @@ -488,32 +490,36 @@ public boolean next(List<Cell> results) throws IOException {
HRegion region = c.getEnvironment().getRegion(); HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation(); region.startRegionOperation();
try { try {
do { synchronized (scanner) {
List<Cell> kvs = new ArrayList<Cell>(); do {
// Results are potentially returned even when the return List<Cell> kvs = new ArrayList<Cell>();
// value of s.next is false // Results are potentially returned even when the return
// since this is an indication of whether or not there // value of s.next is false
// are more values after the // since this is an indication of whether or not there
// ones returned // are more values after the
hasMore = s.nextRaw(kvs); // ones returned
if (!kvs.isEmpty()) { hasMore = scanner.nextRaw(kvs);
result.setKeyValues(kvs); if (!kvs.isEmpty()) {
key = TupleUtil.getConcatenatedValue(result, expressions); result.setKeyValues(kvs);
aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; key = TupleUtil.getConcatenatedValue(result, expressions);
if (!aggBoundary) { aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;
aggregators.aggregate(rowAggregators, result); if (!aggBoundary) {
if (logger.isDebugEnabled()) { aggregators.aggregate(rowAggregators, result);
logger.debug(LogUtil.addCustomAnnotations("Row passed filters: " + kvs if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations(
"Row passed filters: " + kvs
+ ", aggregated values: " + ", aggregated values: "
+ Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan))); + Arrays.asList(rowAggregators),
ScanUtil.getCustomAnnotations(scan)));
}
currentKey = key;
} }
currentKey = key;
} }
} atLimit = rowCount + countOffset >= limit;
atLimit = rowCount + countOffset >= limit; // Do rowCount + 1 b/c we don't have to wait for a complete
// Do rowCount + 1 b/c we don't have to wait for a complete // row in the case of a DISTINCT with a LIMIT
// row in the case of a DISTINCT with a LIMIT } while (hasMore && !aggBoundary && !atLimit);
} while (hasMore && !aggBoundary && !atLimit); }
} finally { } finally {
region.closeRegionOperation(); region.closeRegionOperation();
} }
Expand Down Expand Up @@ -555,7 +561,7 @@ public boolean next(List<Cell> results) throws IOException {


@Override @Override
public long getMaxResultSize() { public long getMaxResultSize() {
return s.getMaxResultSize(); return scanner.getMaxResultSize();
} }
}; };
} }
Expand Down

0 comments on commit 3d50147

Please sign in to comment.