Skip to content

Commit

Permalink
PHOENIX-374: Enable access to dynamic columns in * or cf.* selection
Browse files Browse the repository at this point in the history
  • Loading branch information
ChinmaySKulkarni committed Jan 22, 2019
1 parent 3bd426f commit a9951fa
Show file tree
Hide file tree
Showing 33 changed files with 2,133 additions and 106 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.phoenix.compile;

import static org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -110,9 +113,13 @@ private static void projectColumnFamily(PTable table, Scan scan, byte[] family)
}

public static RowProjector compile(StatementContext context, SelectStatement statement, GroupBy groupBy) throws SQLException {
return compile(context, statement, groupBy, Collections.<PColumn>emptyList(),
NULL_EXPRESSION// Pass null expression because we don't want empty key value to be projected
);
boolean wildcardIncludesDynamicCols = context.getConnection().getQueryServices()
.getConfiguration().getBoolean(WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB,
DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB);
return compile(context, statement, groupBy, Collections.<PColumn>emptyList(),
// Pass null expression because we don't want empty key value to be projected
NULL_EXPRESSION,
wildcardIncludesDynamicCols);
}

private static int getMinPKOffset(PTable table, PName tenantId) {
Expand Down Expand Up @@ -337,23 +344,29 @@ private static Expression coerceIfNecessary(int index, List<? extends PDatum> ta
/**
* Builds the projection for the scan
* @param context query context kept between compilation of different query clauses
* @param statement TODO
* @param statement the statement being compiled
* @param groupBy compiled GROUP BY clause
* @param targetColumns list of columns, parallel to aliasedNodes, that are being set for an
* UPSERT SELECT statement. Used to coerce expression types to the expected target type.
* @param where the where clause expression
* @param wildcardIncludesDynamicCols true if wildcard queries should include dynamic columns
* @return projector used to access row values during scan
* @throws SQLException
*/
public static RowProjector compile(StatementContext context, SelectStatement statement, GroupBy groupBy, List<? extends PDatum> targetColumns, Expression where) throws SQLException {
List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<KeyValueColumnExpression>();
List<ProjectedColumnExpression> arrayProjectedColumnRefs = new ArrayList<ProjectedColumnExpression>();
List<Expression> arrayKVFuncs = new ArrayList<Expression>();
List<Expression> arrayOldFuncs = new ArrayList<Expression>();
public static RowProjector compile(StatementContext context, SelectStatement statement,
GroupBy groupBy, List<? extends PDatum> targetColumns, Expression where,
boolean wildcardIncludesDynamicCols) throws SQLException {
List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<>();
List<ProjectedColumnExpression> arrayProjectedColumnRefs = new ArrayList<>();
List<Expression> arrayKVFuncs = new ArrayList<>();
List<Expression> arrayOldFuncs = new ArrayList<>();
Map<Expression, Integer> arrayExpressionCounts = new HashMap<>();
List<AliasedNode> aliasedNodes = statement.getSelect();
// Setup projected columns in Scan
SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy, arrayKVRefs, arrayKVFuncs, arrayExpressionCounts, arrayProjectedColumnRefs, arrayOldFuncs, statement);
List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>();
SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy, arrayKVRefs,
arrayKVFuncs, arrayExpressionCounts, arrayProjectedColumnRefs, arrayOldFuncs,
statement);
List<ExpressionProjector> projectedColumns = new ArrayList<>();
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
PTable table = tableRef.getTable();
Expand Down Expand Up @@ -468,7 +481,9 @@ public static RowProjector compile(StatementContext context, SelectStatement sta
}

boolean isProjectEmptyKeyValue = false;
if (isWildcard) {
// Don't project known/declared column families into the scan if we want to support
// surfacing dynamic columns in wildcard queries
if (isWildcard && !wildcardIncludesDynamicCols) {
projectAllColumnFamilies(table, scan);
} else {
isProjectEmptyKeyValue = where == null || LiteralExpression.isTrue(where) || where.requiresFinalEvaluation();
Expand Down Expand Up @@ -501,7 +516,9 @@ public static RowProjector compile(StatementContext context, SelectStatement sta
// Ignore as this can happen for local indexes when the data table has a column family, but there are no covered columns in the family
}
}
return new RowProjector(projectedColumns, Math.max(estimatedKeySize, estimatedByteSize), isProjectEmptyKeyValue, resolver.hasUDFs(), isWildcard);
return new RowProjector(projectedColumns, Math.max(estimatedKeySize, estimatedByteSize),
isProjectEmptyKeyValue, resolver.hasUDFs(), isWildcard,
isWildcard || !projectedFamilies.isEmpty());
}

private static void projectAllColumnFamilies(PTable table, Scan scan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.phoenix.compile;

import static org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;

import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
Expand Down Expand Up @@ -50,7 +53,6 @@
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.optimize.Cost;
Expand Down Expand Up @@ -213,7 +215,11 @@ protected QueryPlan compileJoinQuery(StatementContext context, List<Object> bind
context.setCurrentTable(table.getTableRef());
PTable projectedTable = table.createProjectedTable(!projectPKColumns, context);
TupleProjector projector = new TupleProjector(projectedTable);
TupleProjector.serializeProjectorIntoScan(context.getScan(), projector);
boolean wildcardIncludesDynamicCols = context.getConnection().getQueryServices()
.getConfiguration().getBoolean(WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB,
DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB);
TupleProjector.serializeProjectorIntoScan(context.getScan(), projector,
wildcardIncludesDynamicCols);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
table.projectColumns(context.getScan());
return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true);
Expand Down Expand Up @@ -252,6 +258,9 @@ protected QueryPlan compileJoinQuery(StatementContext context, List<Object> bind
protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
byte[] emptyByteArray = new byte[0];
List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
boolean wildcardIncludesDynamicCols = context.getConnection().getQueryServices()
.getConfiguration().getBoolean(WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB,
DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB);
switch (strategy) {
case HASH_BUILD_RIGHT: {
boolean[] starJoinVector = joinTable.getStarJoinVector();
Expand Down Expand Up @@ -318,7 +327,8 @@ protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementCo
}
hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), usePersistentCache, keyRangeLhsExpression, keyRangeRhsExpression);
}
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector,
wildcardIncludesDynamicCols);
QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
Integer limit = null;
Expand Down Expand Up @@ -370,7 +380,8 @@ protected QueryPlan compileJoinQuery(JoinCompiler.Strategy strategy, StatementCo
PTable lhsTable = needsMerge ? lhsCtx.getResolver().getTables().get(0).getTable() : null;
int fieldPosition = needsMerge ? rhsProjTable.getColumns().size() - rhsProjTable.getPKColumns().size() : 0;
PTable projectedTable = needsMerge ? JoinCompiler.joinProjectedTables(rhsProjTable, lhsTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector,
wildcardIncludesDynamicCols);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
Expand Down Expand Up @@ -561,7 +572,12 @@ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectState
// definitively whether or not we'll traverse in row key order.
groupBy = groupBy.compile(context, innerPlanTupleProjector);
context.setResolver(resolver); // recover resolver
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
boolean wildcardIncludesDynamicCols = context.getConnection().getQueryServices()
.getConfiguration().getBoolean(WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB,
DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB);
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy,
asSubquery ? Collections.emptyList() : targetColumns, where,
wildcardIncludesDynamicCols);
OrderBy orderBy = OrderByCompiler.compile(
context,
select,
Expand All @@ -586,7 +602,9 @@ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectState
}

if (projectedTable != null) {
TupleProjector.serializeProjectorIntoScan(context.getScan(), new TupleProjector(projectedTable));
TupleProjector.serializeProjectorIntoScan(context.getScan(),
new TupleProjector(projectedTable), wildcardIncludesDynamicCols &&
projector.projectDynColsInWildcardQueries());
}

QueryPlan plan = innerPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,38 @@ public class RowProjector {
private final boolean isProjectEmptyKeyValue;
private final boolean cloneRequired;
private final boolean hasUDFs;

private final boolean isProjectDynColsInWildcardQueries;

public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) {
this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue, projector.hasUDFs, projector.isProjectAll);
this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(),
isProjectEmptyKeyValue, projector.hasUDFs, projector.isProjectAll,
projector.isProjectDynColsInWildcardQueries);
}
/**
* Construct RowProjector based on a list of ColumnProjectors.
* @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause
* aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may
* be null.
* @param estimatedRowSize
* @param isProjectEmptyKeyValue
*/
public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue) {
this(columnProjectors, estimatedRowSize, isProjectEmptyKeyValue, false, false);
this(columnProjectors, estimatedRowSize, isProjectEmptyKeyValue, false, false, false);
}
/**
* Construct RowProjector based on a list of ColumnProjectors.
* @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause
* aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may
* be null.
* @param estimatedRowSize
* @param estimatedRowSize
* @param isProjectEmptyKeyValue
* @param hasUDFs
* @param isProjectAll
* @param isProjectDynColsInWildcardQueries
*/
public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue, boolean hasUDFs, boolean isProjectAll) {
public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize,
boolean isProjectEmptyKeyValue, boolean hasUDFs, boolean isProjectAll,
boolean isProjectDynColsInWildcardQueries) {
this.columnProjectors = Collections.unmodifiableList(columnProjectors);
int position = columnProjectors.size();
reverseIndex = ArrayListMultimap.<String, Integer>create();
Expand Down Expand Up @@ -107,6 +115,7 @@ public RowProjector(List<? extends ColumnProjector> columnProjectors, int estima
}
}
this.cloneRequired = cloneRequired || hasUDFs;
this.isProjectDynColsInWildcardQueries = isProjectDynColsInWildcardQueries;
}

public RowProjector cloneIfNecessary() {
Expand All @@ -129,7 +138,8 @@ public RowProjector cloneIfNecessary() {
}
}
return new RowProjector(clonedColProjectors,
this.estimatedSize, this.isProjectEmptyKeyValue, this.hasUDFs, this.isProjectAll);
this.estimatedSize, this.isProjectEmptyKeyValue, this.hasUDFs, this.isProjectAll,
this.isProjectDynColsInWildcardQueries);
}

public boolean projectEveryRow() {
Expand All @@ -139,6 +149,14 @@ public boolean projectEveryRow() {
public boolean projectEverything() {
return isProjectAll;
}

public boolean hasUDFs() {
return hasUDFs;
}

public boolean projectDynColsInWildcardQueries() {
return isProjectDynColsInWildcardQueries;
}

public List<? extends ColumnProjector> getColumnProjectors() {
return columnProjectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ public static PTable createProjectedTable(SelectStatement select, StatementConte
.setBaseColumnCount(BASE_TABLE_BASE_COLUMN_COUNT)
.setExcludedColumns(ImmutableList.of())
.setPhysicalNames(ImmutableList.of())
.setColumns(projectedColumns)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException {
PTable projectedTable = PTableImpl.builderWithColumns(table, projectedColumns)
.setExcludedColumns(ImmutableList.of())
.setDefaultFamilyName(PNameFactory.newName(SchemaUtil.getEmptyColumnFamily(table)))
.setColumns(projectedColumns)
.build();

SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
Expand Down
Loading

0 comments on commit a9951fa

Please sign in to comment.