Skip to content

Commit

Permalink
PHOENIX-6507 DistinctAggregatingResultIterator should keep original t…
Browse files Browse the repository at this point in the history
…uple order of the AggregatingResultIterator
  • Loading branch information
comnetwork committed Jul 15, 2021
1 parent 9c9dd0b commit a073d5b
Show file tree
Hide file tree
Showing 4 changed files with 406 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,5 +331,76 @@ private void doTestOrderByOptimizeForClientAggregatePlanBug4820(boolean desc ,bo
}
}

@Test
public void testDistinctAggregatingResultIteratorBug6507() throws Exception {
doTestDistinctAggregatingResultIteratorBug6507(false, false);
doTestDistinctAggregatingResultIteratorBug6507(false, true);
doTestDistinctAggregatingResultIteratorBug6507(true, false);
doTestDistinctAggregatingResultIteratorBug6507(true, true);
}

private void doTestDistinctAggregatingResultIteratorBug6507(boolean desc ,boolean salted) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = null;
try {
conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
String sql = "create table " + tableName + "( "+
" pk1 varchar not null , " +
" pk2 varchar not null, " +
" pk3 varchar not null," +
" v1 varchar, " +
" v2 varchar, " +
" CONSTRAINT TEST_PK PRIMARY KEY ( "+
"pk1 "+(desc ? "desc" : "")+", "+
"pk2 "+(desc ? "desc" : "")+", "+
"pk3 "+(desc ? "desc" : "")+
" )) "+(salted ? "SALT_BUCKETS =4" : "split on('b')");
conn.createStatement().execute(sql);

conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a11','a12','a13','a14','a15')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a21','a22','a23','a24','a25')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','a33','a38','a35')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b11','b12','b13','b14','b15')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b21','b22','b23','b24','b25')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b31','b32','b33','b34','b35')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','c12','c13','a34','a35')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','c13','a34','a35')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','d13','a35','a35')");
conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('d31','a32','c13','a35','a35')");
conn.commit();

sql = "select distinct pk1,max(v1) from "+tableName+" group by pk1,pk2,pk3 order by pk1,pk2,pk3";

ResultSet rs = conn.prepareStatement(sql).executeQuery();
assertResultSet(rs, new Object[][]{
{"a11","a14"},
{"a21","a24"},
{"a31","a38"},
{"a31","a34"},
{"a31","a35"},
{"b11","b14"},
{"b21","b24"},
{"b31","b34"},
{"d31","a35"}});

sql = "select distinct pk2,max(v1) from "+tableName+" group by pk2,pk3 order by pk2,pk3";

rs = conn.prepareStatement(sql).executeQuery();
assertResultSet(rs, new Object[][]{
{"a12","a14"},
{"a22","a24"},
{"a32","a38"},
{"a32","a35"},
{"b12","b14"},
{"b22","b24"},
{"b32","b34"},
{"c12","a34"}});
} finally {
if(conn != null) {
conn.close();
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ public class KeyValueColumnExpression extends ColumnExpression {

public KeyValueColumnExpression() {
}


public KeyValueColumnExpression(final byte[] cf, final byte[] cq) {
this.cf = cf;
this.cq = cq;
}

public KeyValueColumnExpression(PColumn column) {
super(column);
this.cf = column.getFamilyName().getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
package org.apache.phoenix.iterate;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Arrays;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.tuple.Tuple;

import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
Expand All @@ -43,27 +42,44 @@
* @since 1.2
*/
public class DistinctAggregatingResultIterator implements AggregatingResultIterator {
private final AggregatingResultIterator delegate;
/**
* Original AggregatingResultIterator
*/
private final AggregatingResultIterator targetAggregatingResultIterator;
private final RowProjector rowProjector;
private Iterator<ResultEntry> resultIterator;
private final ImmutableBytesWritable ptr1 = new ImmutableBytesWritable();
private final ImmutableBytesWritable ptr2 = new ImmutableBytesWritable();
/**
* Cached tuples already seen.
*/
private final Set<ResultEntry> resultEntries =
Sets.<ResultEntry>newHashSet();

private class ResultEntry {
/**
* cached hashCode.
*/
private final int hashCode;
private final Tuple result;
/**
* cached column values.
*/
private final ImmutableBytesPtr[] columnValues;

ResultEntry(Tuple result) {
final int prime = 31;
this.result = result;
int hashCode = 0;
for (ColumnProjector column : rowProjector.getColumnProjectors()) {
Expression e = column.getExpression();
if (e.evaluate(this.result, ptr1)) {
hashCode = prime * hashCode + ptr1.hashCode();
this.columnValues =
new ImmutableBytesPtr[rowProjector.getColumnCount()];
int columnIndex = 0;
for (ColumnProjector columnProjector : rowProjector.getColumnProjectors()) {
Expression expression = columnProjector.getExpression();
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
if (!expression.evaluate(this.result, ptr)) {
columnValues[columnIndex] = null;
} else {
columnValues[columnIndex] = ptr;
}
columnIndex++;
}
this.hashCode = hashCode;
this.hashCode = Arrays.hashCode(columnValues);
}

@Override
Expand All @@ -78,105 +94,66 @@ public boolean equals(Object o) {
return false;
}
ResultEntry that = (ResultEntry) o;
for (ColumnProjector column : rowProjector.getColumnProjectors()) {
Expression e = column.getExpression();
boolean isNull1 = !e.evaluate(this.result, ptr1);
boolean isNull2 = !e.evaluate(that.result, ptr2);
if (isNull1 && isNull2) {
return true;
}
if (isNull1 || isNull2) {
return false;
}
if (ptr1.compareTo(ptr2) != 0) {
return false;
}
}
return true;
return Arrays.equals(this.columnValues, that.columnValues);
}

@Override
public int hashCode() {
return hashCode;
}

Tuple getResult() {
return result;
}
}

protected ResultIterator getDelegate() {
return delegate;
}


public DistinctAggregatingResultIterator(AggregatingResultIterator delegate,
RowProjector rowProjector) {
this.delegate = delegate;
this.targetAggregatingResultIterator = delegate;
this.rowProjector = rowProjector;
}

@Override
public Tuple next() throws SQLException {
Iterator<ResultEntry> iterator = getResultIterator();
if (iterator.hasNext()) {
ResultEntry entry = iterator.next();
Tuple tuple = entry.getResult();
aggregate(tuple);
return tuple;
}
resultIterator = Collections.emptyIterator();
return null;
}

private Iterator<ResultEntry> getResultIterator() throws SQLException {
if (resultIterator != null) {
return resultIterator;
}

Set<ResultEntry> entries = Sets.<ResultEntry>newHashSet(); // TODO: size?
try {
for (Tuple result = delegate.next(); result != null; result = delegate.next()) {
ResultEntry entry = new ResultEntry(result);
entries.add(entry);
while (true) {
Tuple nextTuple = this.targetAggregatingResultIterator.next();
if (nextTuple == null) {
return null;
}
ResultEntry resultEntry = new ResultEntry(nextTuple);
if (!this.resultEntries.contains(resultEntry)) {
this.resultEntries.add(resultEntry);
return nextTuple;
}
} finally {
delegate.close();
}

resultIterator = entries.iterator();
return resultIterator;
}

@Override
public void close() {
resultIterator = Collections.emptyIterator();
}

public void close() throws SQLException {
this.targetAggregatingResultIterator.close();
}

@Override
public void explain(List<String> planSteps) {
delegate.explain(planSteps);
targetAggregatingResultIterator.explain(planSteps);
planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
}

@Override
public void explain(List<String> planSteps,
ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
delegate.explain(planSteps, explainPlanAttributesBuilder);
targetAggregatingResultIterator.explain(
planSteps,
explainPlanAttributesBuilder);
explainPlanAttributesBuilder.setClientDistinctFilter(
rowProjector.toString());
planSteps.add("CLIENT DISTINCT ON " + rowProjector.toString());
}

@Override
public Aggregator[] aggregate(Tuple result) {
return delegate.aggregate(result);
return targetAggregatingResultIterator.aggregate(result);
}

@Override
public String toString() {
return "DistinctAggregatingResultIterator [delegate=" + delegate
+ ", rowProjector=" + rowProjector + ", resultIterator="
+ resultIterator + ", ptr1=" + ptr1 + ", ptr2=" + ptr2 + "]";
}
@Override
public String toString() {
return "DistinctAggregatingResultIterator [targetAggregatingResultIterator=" + targetAggregatingResultIterator
+ ", rowProjector=" + rowProjector;
}
}

0 comments on commit a073d5b

Please sign in to comment.