Skip to content

Commit

Permalink
Addressed code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KazydubB committed Nov 23, 2018
1 parent e3f964f commit fac1d40
Show file tree
Hide file tree
Showing 30 changed files with 307 additions and 484 deletions.
Expand Up @@ -877,11 +877,8 @@ public static String bootDefaultFor(String name) {
public static final BooleanValidator LIST_FILES_RECURSIVELY_VALIDATOR = new BooleanValidator(LIST_FILES_RECURSIVELY,
new OptionDescription("Enables recursive files listing when querying the `INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. Default is false. (Drill 1.15+"));

public static final String FETCH_RESULT_SET_FOR_DDL = "drill.exec.fetch_resultset_for_ddl";
public static final BooleanValidator FETCH_RESULT_SET_FOR_DDL_VALIDATOR = new BooleanValidator(FETCH_RESULT_SET_FOR_DDL,
new OptionDescription("Controls whether to fetch result set for CREATE TABLE/VIEW, DROP TABLE/VIEW, SET, USE etc. queries"));

public static final String SQL_NODE_KIND = "drill.exec.query_sqlnode_kind";
public static final StringValidator SQL_NODE_KIND_VALIDATOR = new StringValidator(SQL_NODE_KIND,
new OptionDescription("Query-level type of sql node."));
public static final String RETURN_RESULT_SET_FOR_DDL = "exec.return_result_set_for_ddl";
public static final BooleanValidator RETURN_RESULT_SET_FOR_DDL_VALIDATOR = new BooleanValidator(RETURN_RESULT_SET_FOR_DDL,
new OptionDescription("Controls whether to return result set for CREATE TABLE/VIEW, DROP TABLE/VIEW, SET, USE etc. queries. " +
"If set to `false` affected rows count will be returned instead and result set will be `null`"));
}
Expand Up @@ -55,7 +55,6 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
private WritableBatch batch;
private final BufferAllocator allocator;
private int recordCount = -1;
private int updateCount = -1;
private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
private SelectionVector2 sv2;
private long timeNs;
Expand Down Expand Up @@ -101,7 +100,6 @@ public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, B
public void readFromStream(InputStream input) throws IOException {
final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
updateCount = batchDef.getUpdateCount();
if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
readSv2(input);
}
Expand Down
Expand Up @@ -186,8 +186,7 @@ public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment f
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
}
OptionManager parentOptionManager = queryContext != null ? queryContext.getOptions() : context.getOptionManager();
fragmentOptions = new FragmentOptionManager(parentOptionManager, queryContext == null ? list : new OptionList());
fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);

executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());

Expand Down
Expand Up @@ -154,7 +154,7 @@ private void addOutputContainerData() {
summaryVector.getMutator().setSafe(0, counter);
summaryVector.getMutator().setValueCount(1);

if (!context.getOptions().getOption(ExecConstants.FETCH_RESULT_SET_FOR_DDL_VALIDATOR)) {
if (!context.getOptions().getOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL_VALIDATOR)) {
final BigIntVector rowsAffected = (BigIntVector) container.getValueAccessorById(BigIntVector.class,
container.getValueVectorId(SchemaPath.getSimplePath(WritableBatch.ROWS_AFFECTED_HIDDEN_COLUMN_NAME))
.getFieldIds()).getValueVector();
Expand Down Expand Up @@ -183,7 +183,7 @@ protected void setupNewSchema() throws IOException {
container.addOrGet(fragmentIdField);
container.addOrGet(summaryField);

if (!context.getOptions().getOption(ExecConstants.FETCH_RESULT_SET_FOR_DDL_VALIDATOR)) {
if (!context.getOptions().getOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL_VALIDATOR)) {
// 3. Temporary vector to contain count for updated rows in case when result set is not to be returned.
final MaterializedField rowsAffected =
MaterializedField.create(WritableBatch.ROWS_AFFECTED_HIDDEN_COLUMN_NAME, Types.required(MinorType.BIGINT));
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.materialize;

import org.apache.calcite.sql.SqlKind;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
Expand Down Expand Up @@ -52,19 +51,10 @@ public QueryWritableBatch convertNext() {
.setQueryId(queryId)
.setRowCount(batch.getRecordCount())
.setDef(w.getDef());
if (hasUpdateCount()) {
int count = w.getDef().getUpdateCount();
builder.setUpdateCount(count == -1 ? 0 : count);
if (!options.getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL)) {
int count = w.getDef().getAffectedRowsCount();
builder.setAffectedRowsCount(count == -1 ? 0 : count);
}
return new QueryWritableBatch(builder.build(), w.getBuffers());
}

private boolean hasUpdateCount() {
if (options != null && !options.getOption(ExecConstants.FETCH_RESULT_SET_FOR_DDL_VALIDATOR)) {
String sqlQueryKind = options.getOption(ExecConstants.SQL_NODE_KIND_VALIDATOR);
SqlKind kind = SqlKind.valueOf(sqlQueryKind);
return SqlKind.DDL.contains(kind);
}
return false;
}
}
Expand Up @@ -37,7 +37,6 @@
import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
Expand Down Expand Up @@ -112,10 +111,6 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
final AbstractSqlHandler handler;
final SqlHandlerConfig config = new SqlHandlerConfig(context, parser);

// Actual type of the node. Needed because many SqlNode Drill implementations
// such as CREATE TABLE, DROP TABLE etc. have SqlKind.OTHER
SqlKind kind = sqlNode.getKind();

switch(sqlNode.getKind()) {
case EXPLAIN:
handler = new ExplainHandler(config, textPlan);
Expand All @@ -133,24 +128,28 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
handler = new DescribeSchemaHandler(config);
break;
}
case CREATE_TABLE:
handler = ((DrillSqlCall) sqlNode).getSqlHandler(config, textPlan);
break;
case DROP_TABLE:
case CREATE_VIEW:
case DROP_VIEW:
case OTHER_DDL:
case OTHER:
if(sqlNode instanceof SqlCreateTable) {
handler = ((DrillSqlCall)sqlNode).getSqlHandler(config, textPlan);
kind = SqlKind.CREATE_TABLE;
break;
}

if (sqlNode instanceof DrillSqlCall) {
handler = ((DrillSqlCall)sqlNode).getSqlHandler(config);
kind = SqlConverter.getSqlKind(sqlNode);
handler = ((DrillSqlCall) sqlNode).getSqlHandler(config);
break;
}
// fallthrough
default:
handler = new DefaultSqlHandler(config, textPlan);
}

context.getOptions().setLocalOption(ExecConstants.SQL_NODE_KIND, kind.name());
boolean returnResultSet = context.getOptions().getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL);
// Determine whether result set should be returned for the query based on `exec.return_result_set_for_ddl`
// and sql node kind. Overrides the option on a query level.
context.getOptions().setLocalOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL,
returnResultSet || !SqlKind.DDL.contains(sqlNode.getKind()));

try {
return handler.getPlan(sqlNode);
Expand Down
Expand Up @@ -25,18 +25,6 @@
import java.util.Properties;
import java.util.Set;

import org.apache.calcite.sql.SqlKind;
import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction;
import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
import org.apache.drill.exec.planner.sql.parser.SqlCreateView;
import org.apache.drill.exec.planner.sql.parser.SqlDropFunction;
import org.apache.drill.exec.planner.sql.parser.SqlDropTable;
import org.apache.drill.exec.planner.sql.parser.SqlDropView;
import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
import org.apache.drill.exec.planner.sql.parser.SqlShowTables;
import org.apache.drill.exec.planner.sql.parser.SqlUseSchema;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
Expand Down Expand Up @@ -748,34 +736,4 @@ private static CalciteConnectionConfigImpl getConnectionConfig(boolean caseSensi
String.valueOf(caseSensitive));
return new CalciteConnectionConfigImpl(properties);
}

public static SqlKind getSqlKind(SqlNode node) {
SqlKind kind = node.getKind();

if (node instanceof SqlCreateFunction) {
kind = SqlKind.OTHER_DDL;
} else if (node instanceof SqlCreateTable) {
kind = SqlKind.CREATE_TABLE;
} else if (node instanceof SqlCreateView) {
kind = SqlKind.CREATE_VIEW;
} else if (node instanceof SqlDropFunction) {
kind = SqlKind.OTHER_DDL;
} else if (node instanceof SqlDropTable) {
kind = SqlKind.DROP_TABLE;
} else if (node instanceof SqlDropView) {
kind = SqlKind.DROP_VIEW;
} else if (node instanceof SqlRefreshMetadata) {
kind = SqlKind.OTHER_DDL;
} else if (node instanceof SqlShowFiles) {
kind = SqlKind.OTHER;
} else if (node instanceof SqlShowSchemas) {
kind = SqlKind.OTHER;
} else if (node instanceof SqlShowTables) {
kind = SqlKind.OTHER;
} else if (node instanceof SqlUseSchema) {
kind = SqlKind.OTHER_DDL;
}

return kind;
}
}
Expand Up @@ -36,7 +36,7 @@ public class SqlCreateFunction extends DrillSqlCall {

private final SqlNode jar;

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_FUNCTION", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_FUNCTION", SqlKind.OTHER_DDL) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlCreateFunction(pos, operands[0]);
Expand Down
Expand Up @@ -41,7 +41,7 @@
import org.apache.drill.exec.util.Pointer;

public class SqlCreateTable extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.CREATE_TABLE) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
Preconditions.checkArgument(operands.length == 6, "SqlCreateTable.createCall() has to get 6 operands!");
Expand Down
Expand Up @@ -37,7 +37,7 @@
import java.util.List;

public class SqlCreateView extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_VIEW", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_VIEW", SqlKind.CREATE_VIEW) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlCreateView(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], operands[2], (SqlLiteral) operands[3]);
Expand Down
Expand Up @@ -36,7 +36,7 @@ public class SqlDropFunction extends DrillSqlCall {

private final SqlNode jar;

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_FUNCTION", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_FUNCTION", SqlKind.OTHER_DDL) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlDropFunction(pos, operands[0]);
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;

public class SqlDropTable extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_TABLE", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_TABLE", SqlKind.DROP_TABLE) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlDropTable(pos, (SqlIdentifier) operands[0], (SqlLiteral) operands[1]);
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;

public class SqlDropView extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_VIEW", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_VIEW", SqlKind.DROP_VIEW) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlDropView(pos, (SqlIdentifier) operands[0], (SqlLiteral) operands[1]);
Expand Down
Expand Up @@ -40,7 +40,7 @@
* REFRESH TABLE METADATA tblname
*/
public class SqlRefreshMetadata extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("REFRESH_TABLE_METADATA", SqlKind.OTHER) {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("REFRESH_TABLE_METADATA", SqlKind.OTHER_DDL) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlRefreshMetadata(pos, (SqlIdentifier) operands[0]);
Expand Down
Expand Up @@ -38,8 +38,7 @@
*/
public class SqlUseSchema extends DrillSqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("USE_SCHEMA", SqlKind.OTHER){
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE_SCHEMA", SqlKind.OTHER_DDL) {
@Override
public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
return new SqlUseSchema(pos, (SqlIdentifier) operands[0]);
Expand Down
Expand Up @@ -53,7 +53,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
private VectorContainer container = new VectorContainer();
private int valueCount;
private BatchSchema schema;
private int updateCount;

/**
* Constructs a loader using the given allocator for vector buffer allocation.
Expand Down Expand Up @@ -83,7 +82,6 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti
}
container.zeroVectors();
valueCount = def.getRecordCount();
updateCount = def.getUpdateCount();
boolean schemaChanged = schema == null;

// Load vectors from the batch buffer, while tracking added and/or removed
Expand All @@ -109,16 +107,17 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti
// Field did not exist previously--is schema change.
schemaChanged = true;
vector = TypeHelper.getNewVector(fieldDef, allocator);
} else if (!vector.getField().getType().equals(fieldDef.getType())) {
} else if (! vector.getField().getType().equals(fieldDef.getType())) {
// Field had different type before--is schema change.
// clear previous vector
vector.clear();
schemaChanged = true;
vector = TypeHelper.getNewVector(fieldDef, allocator);

// If the field is a map, check if the map schema changed.
// If the field is a map, check if the map schema changed.

} else if (vector.getField().getType().getMinorType() == MinorType.MAP && !isSameSchema(vector.getField().getChildren(), field.getChildList())) {
} else if (vector.getField().getType().getMinorType() == MinorType.MAP &&
! isSameSchema(vector.getField().getChildren(), field.getChildList())) {

// The map schema changed. Discard the old map and create a new one.

Expand Down
Expand Up @@ -157,7 +157,7 @@ public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWra
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<DrillBuf> buffers = Lists.newArrayList();
List<SerializedField> metadata = Lists.newArrayList();
long updateCount = -1;
long affectedRowsCount = -1;
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());

Expand All @@ -168,7 +168,7 @@ public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector>
}

if (ROWS_AFFECTED_HIDDEN_COLUMN_NAME.equals(vv.getField().getName())) {
updateCount = ((Long) vv.getAccessor().getObject(0));
affectedRowsCount = ((Long) vv.getAccessor().getObject(0));
vv.clear();
continue;
}
Expand All @@ -182,8 +182,8 @@ public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector>
.addAllField(metadata)
.setRecordCount(recordCount)
.setCarriesTwoByteSelectionVector(isSV2);
if (updateCount != -1) {
builder.setUpdateCount((int) updateCount);
if (affectedRowsCount != -1) {
builder.setAffectedRowsCount((int) affectedRowsCount);
}
return new WritableBatch(builder.build(), buffers);
}
Expand Down
Expand Up @@ -23,12 +23,13 @@
import java.util.Map;

/**
* {@link OptionManager} that holds options within {@link org.apache.drill.exec.ops.FragmentContext}.
* {@link OptionManager} that holds options within {@link FragmentContextImpl}.
*/
public class FragmentOptionManager extends InMemoryOptionManager {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class);

public FragmentOptionManager(OptionManager fallbackOptions, OptionList options) {
super(fallbackOptions, CaseInsensitiveMap.newHashMap(), getMapFromOptionList(options));
public FragmentOptionManager(OptionManager systemOptions, OptionList options) {
super(systemOptions, getMapFromOptionList(options));
}

private static Map<String, OptionValue> getMapFromOptionList(final OptionList options) {
Expand All @@ -49,6 +50,11 @@ public void deleteLocalOption(String name) {
throw new UnsupportedOperationException();
}

@Override
public OptionValue getDefault(String optionName) {
return fallback.getDefault(optionName);
}

@Override
protected OptionValue.OptionScope getScope() {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit fac1d40

Please sign in to comment.