Skip to content

Commit

Permalink
DRILL-6834: Introduce option to disable result set on CTAS, create vi…
Browse files Browse the repository at this point in the history
…ew and drop table/view etc. for JDBC connection

- Added session-scoped option `drill.exec.fetch_resultset_for_ddl` to control whether update count or result set should be returned for JDBC connection session. By default the option is set to `true` which ensures that result set is returned;
- Updated Drill JDBC: `DrillCursor` and `DrillStatement` to achieve desired behaviour;
- Added query-scoped option `drill.exec.query_sqlnode_kind` to determine if query is eligible to not return result set in case if `drill.exec.fetch_resultset_for_ddl` is disabled. Eligible queries are CTAS, CREATE VIEW, CREATE FUNCTION, DROP TABLE, DROP VIEW, DROP FUNCTION, USE schema, SET option, REFRESH METADATA TABLE etc.;
- Made `FragmentOptionManager` to have `QueryOptionManger` as fallback `OptionManager` and Updated `InMemoryOptionManager`.
  • Loading branch information
KazydubB committed Nov 21, 2018
1 parent 6ecaed7 commit e3f964f
Show file tree
Hide file tree
Showing 23 changed files with 799 additions and 182 deletions.
Expand Up @@ -876,4 +876,12 @@ public static String bootDefaultFor(String name) {
public static final String LIST_FILES_RECURSIVELY = "storage.list_files_recursively";
public static final BooleanValidator LIST_FILES_RECURSIVELY_VALIDATOR = new BooleanValidator(LIST_FILES_RECURSIVELY,
new OptionDescription("Enables recursive files listing when querying the `INFORMATION_SCHEMA.FILES` table or executing the SHOW FILES command. Default is false. (Drill 1.15+"));

public static final String FETCH_RESULT_SET_FOR_DDL = "drill.exec.fetch_resultset_for_ddl";
public static final BooleanValidator FETCH_RESULT_SET_FOR_DDL_VALIDATOR = new BooleanValidator(FETCH_RESULT_SET_FOR_DDL,
new OptionDescription("Controls whether to fetch result set for CREATE TABLE/VIEW, DROP TABLE/VIEW, SET, USE etc. queries"));

public static final String SQL_NODE_KIND = "drill.exec.query_sqlnode_kind";
public static final StringValidator SQL_NODE_KIND_VALIDATOR = new StringValidator(SQL_NODE_KIND,
new OptionDescription("Query-level type of sql node."));
}
Expand Up @@ -55,6 +55,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
private WritableBatch batch;
private final BufferAllocator allocator;
private int recordCount = -1;
private int updateCount = -1;
private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
private SelectionVector2 sv2;
private long timeNs;
Expand Down Expand Up @@ -100,6 +101,7 @@ public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, B
public void readFromStream(InputStream input) throws IOException {
final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
updateCount = batchDef.getUpdateCount();
if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
readSv2(input);
}
Expand Down
Expand Up @@ -186,7 +186,8 @@ public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment f
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
}
fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
OptionManager parentOptionManager = queryContext != null ? queryContext.getOptions() : context.getOptionManager();
fragmentOptions = new FragmentOptionManager(parentOptionManager, queryContext == null ? list : new OptionList());

executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint());

Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Writer;
Expand All @@ -32,6 +33,7 @@
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.store.EventBasedRecordWriter;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.vector.AllocationHelper;
Expand Down Expand Up @@ -152,6 +154,15 @@ private void addOutputContainerData() {
summaryVector.getMutator().setSafe(0, counter);
summaryVector.getMutator().setValueCount(1);

if (!context.getOptions().getOption(ExecConstants.FETCH_RESULT_SET_FOR_DDL_VALIDATOR)) {
final BigIntVector rowsAffected = (BigIntVector) container.getValueAccessorById(BigIntVector.class,
container.getValueVectorId(SchemaPath.getSimplePath(WritableBatch.ROWS_AFFECTED_HIDDEN_COLUMN_NAME))
.getFieldIds()).getValueVector();
AllocationHelper.allocate(rowsAffected, 1, 8);
rowsAffected.getMutator().setSafe(0, counter);
rowsAffected.getMutator().setValueCount(1);
}

container.setRecordCount(1);
}

Expand All @@ -171,6 +182,14 @@ protected void setupNewSchema() throws IOException {

container.addOrGet(fragmentIdField);
container.addOrGet(summaryField);

if (!context.getOptions().getOption(ExecConstants.FETCH_RESULT_SET_FOR_DDL_VALIDATOR)) {
// 3. Temporary vector to contain count for updated rows in case when result set is not to be returned.
final MaterializedField rowsAffected =
MaterializedField.create(WritableBatch.ROWS_AFFECTED_HIDDEN_COLUMN_NAME, Types.required(MinorType.BIGINT));
container.addOrGet(rowsAffected);
}

container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
} finally {
stats.stopSetup();
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.materialize;

import org.apache.calcite.sql.SqlKind;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
Expand All @@ -25,35 +27,44 @@
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.server.options.OptionManager;

public class VectorRecordMaterializer implements RecordMaterializer{
public class VectorRecordMaterializer implements RecordMaterializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class);

private QueryId queryId;
private RecordBatch batch;
private BufferAllocator allocator;
private OptionManager options;

public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) {
this.queryId = context.getHandle().getQueryId();
this.batch = batch;
this.allocator = oContext.getAllocator();
BatchSchema schema = batch.getSchema();
assert schema != null : "Schema must be defined.";

// for (MaterializedField f : batch.getSchema()) {
// logger.debug("New Field: {}", f);
// }
options = context.getOptions();
}

public QueryWritableBatch convertNext() {
//batch.getWritableBatch().getDef().getRecordCount()
WritableBatch w = batch.getWritableBatch().transfer(allocator);
QueryData.Builder builder = QueryData.newBuilder()
.setQueryId(queryId)
.setRowCount(batch.getRecordCount())
.setDef(w.getDef());
if (hasUpdateCount()) {
int count = w.getDef().getUpdateCount();
builder.setUpdateCount(count == -1 ? 0 : count);
}
return new QueryWritableBatch(builder.build(), w.getBuffers());
}

QueryData header = QueryData.newBuilder() //
.setQueryId(queryId) //
.setRowCount(batch.getRecordCount()) //
.setDef(w.getDef()).build();
QueryWritableBatch batch = new QueryWritableBatch(header, w.getBuffers());
return batch;
private boolean hasUpdateCount() {
if (options != null && !options.getOption(ExecConstants.FETCH_RESULT_SET_FOR_DDL_VALIDATOR)) {
String sqlQueryKind = options.getOption(ExecConstants.SQL_NODE_KIND_VALIDATOR);
SqlKind kind = SqlKind.valueOf(sqlQueryKind);
return SqlKind.DDL.contains(kind);
}
return false;
}
}
Expand Up @@ -20,10 +20,12 @@
import java.io.IOException;

import org.apache.calcite.sql.SqlDescribeSchema;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
Expand Down Expand Up @@ -110,7 +112,11 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
final AbstractSqlHandler handler;
final SqlHandlerConfig config = new SqlHandlerConfig(context, parser);

switch(sqlNode.getKind()){
// Actual type of the node. Needed because many SqlNode Drill implementations
// such as CREATE TABLE, DROP TABLE etc. have SqlKind.OTHER
SqlKind kind = sqlNode.getKind();

switch(sqlNode.getKind()) {
case EXPLAIN:
handler = new ExplainHandler(config, textPlan);
break;
Expand All @@ -130,18 +136,22 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
case OTHER:
if(sqlNode instanceof SqlCreateTable) {
handler = ((DrillSqlCall)sqlNode).getSqlHandler(config, textPlan);
kind = SqlKind.CREATE_TABLE;
break;
}

if (sqlNode instanceof DrillSqlCall) {
handler = ((DrillSqlCall)sqlNode).getSqlHandler(config);
kind = SqlConverter.getSqlKind(sqlNode);
break;
}
// fallthrough
default:
handler = new DefaultSqlHandler(config, textPlan);
}

context.getOptions().setLocalOption(ExecConstants.SQL_NODE_KIND, kind.name());

try {
return handler.getPlan(sqlNode);
} catch(ValidationException e) {
Expand Down
Expand Up @@ -25,6 +25,18 @@
import java.util.Properties;
import java.util.Set;

import org.apache.calcite.sql.SqlKind;
import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction;
import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
import org.apache.drill.exec.planner.sql.parser.SqlCreateView;
import org.apache.drill.exec.planner.sql.parser.SqlDropFunction;
import org.apache.drill.exec.planner.sql.parser.SqlDropTable;
import org.apache.drill.exec.planner.sql.parser.SqlDropView;
import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
import org.apache.drill.exec.planner.sql.parser.SqlShowTables;
import org.apache.drill.exec.planner.sql.parser.SqlUseSchema;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
Expand Down Expand Up @@ -736,4 +748,34 @@ private static CalciteConnectionConfigImpl getConnectionConfig(boolean caseSensi
String.valueOf(caseSensitive));
return new CalciteConnectionConfigImpl(properties);
}

public static SqlKind getSqlKind(SqlNode node) {
SqlKind kind = node.getKind();

if (node instanceof SqlCreateFunction) {
kind = SqlKind.OTHER_DDL;
} else if (node instanceof SqlCreateTable) {
kind = SqlKind.CREATE_TABLE;
} else if (node instanceof SqlCreateView) {
kind = SqlKind.CREATE_VIEW;
} else if (node instanceof SqlDropFunction) {
kind = SqlKind.OTHER_DDL;
} else if (node instanceof SqlDropTable) {
kind = SqlKind.DROP_TABLE;
} else if (node instanceof SqlDropView) {
kind = SqlKind.DROP_VIEW;
} else if (node instanceof SqlRefreshMetadata) {
kind = SqlKind.OTHER_DDL;
} else if (node instanceof SqlShowFiles) {
kind = SqlKind.OTHER;
} else if (node instanceof SqlShowSchemas) {
kind = SqlKind.OTHER;
} else if (node instanceof SqlShowTables) {
kind = SqlKind.OTHER;
} else if (node instanceof SqlUseSchema) {
kind = SqlKind.OTHER_DDL;
}

return kind;
}
}
Expand Up @@ -53,6 +53,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
private VectorContainer container = new VectorContainer();
private int valueCount;
private BatchSchema schema;
private int updateCount;

/**
* Constructs a loader using the given allocator for vector buffer allocation.
Expand Down Expand Up @@ -82,6 +83,7 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti
}
container.zeroVectors();
valueCount = def.getRecordCount();
updateCount = def.getUpdateCount();
boolean schemaChanged = schema == null;

// Load vectors from the batch buffer, while tracking added and/or removed
Expand All @@ -107,17 +109,16 @@ public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeExcepti
// Field did not exist previously--is schema change.
schemaChanged = true;
vector = TypeHelper.getNewVector(fieldDef, allocator);
} else if (! vector.getField().getType().equals(fieldDef.getType())) {
} else if (!vector.getField().getType().equals(fieldDef.getType())) {
// Field had different type before--is schema change.
// clear previous vector
vector.clear();
schemaChanged = true;
vector = TypeHelper.getNewVector(fieldDef, allocator);

// If the field is a map, check if the map schema changed.
// If the field is a map, check if the map schema changed.

} else if (vector.getField().getType().getMinorType() == MinorType.MAP &&
! isSameSchema(vector.getField().getChildren(), field.getChildList())) {
} else if (vector.getField().getType().getMinorType() == MinorType.MAP && !isSameSchema(vector.getField().getChildren(), field.getChildList())) {

// The map schema changed. Discard the old map and create a new one.

Expand Down
Expand Up @@ -19,6 +19,7 @@

import io.netty.buffer.DrillBuf;

import java.util.Arrays;
import java.util.List;

import org.apache.drill.exec.memory.BufferAllocator;
Expand All @@ -36,6 +37,8 @@
public class WritableBatch implements AutoCloseable {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WritableBatch.class);

public static final String ROWS_AFFECTED_HIDDEN_COLUMN_NAME = "$ROWS_AFFECTED_HIDDEN_COLUMN_NAME$";

private final RecordBatchDef def;
private final DrillBuf[] buffers;
private boolean cleared = false;
Expand Down Expand Up @@ -154,7 +157,7 @@ public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWra
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<DrillBuf> buffers = Lists.newArrayList();
List<SerializedField> metadata = Lists.newArrayList();

long updateCount = -1;
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());

Expand All @@ -164,17 +167,25 @@ public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector>
continue;
}

for (DrillBuf b : vv.getBuffers(true)) {
buffers.add(b);
if (ROWS_AFFECTED_HIDDEN_COLUMN_NAME.equals(vv.getField().getName())) {
updateCount = ((Long) vv.getAccessor().getObject(0));
vv.clear();
continue;
}

buffers.addAll(Arrays.asList(vv.getBuffers(true)));
// remove vv access to buffers.
vv.clear();
}

RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
.setCarriesTwoByteSelectionVector(isSV2).build();
WritableBatch b = new WritableBatch(batchDef, buffers);
return b;
RecordBatchDef.Builder builder = RecordBatchDef.newBuilder()
.addAllField(metadata)
.setRecordCount(recordCount)
.setCarriesTwoByteSelectionVector(isSV2);
if (updateCount != -1) {
builder.setUpdateCount((int) updateCount);
}
return new WritableBatch(builder.build(), buffers);
}

public static WritableBatch get(VectorAccessible batch) {
Expand Down
Expand Up @@ -23,13 +23,12 @@
import java.util.Map;

/**
* {@link OptionManager} that holds options within {@link FragmentContextImpl}.
* {@link OptionManager} that holds options within {@link org.apache.drill.exec.ops.FragmentContext}.
*/
public class FragmentOptionManager extends InMemoryOptionManager {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class);

public FragmentOptionManager(OptionManager systemOptions, OptionList options) {
super(systemOptions, getMapFromOptionList(options));
public FragmentOptionManager(OptionManager fallbackOptions, OptionList options) {
super(fallbackOptions, CaseInsensitiveMap.newHashMap(), getMapFromOptionList(options));
}

private static Map<String, OptionValue> getMapFromOptionList(final OptionList options) {
Expand All @@ -50,11 +49,6 @@ public void deleteLocalOption(String name) {
throw new UnsupportedOperationException();
}

@Override
public OptionValue getDefault(String optionName) {
return fallback.getDefault(optionName);
}

@Override
protected OptionValue.OptionScope getScope() {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit e3f964f

Please sign in to comment.