Skip to content

Commit

Permalink
DRILL-7095: Changes after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
arina-ielchiieva committed Mar 18, 2019
1 parent 0a48f6d commit 8836c62
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ private ExecConstants() {
public static final BooleanValidator JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR = new BooleanValidator(JSON_READER_ESCAPE_ANY_CHAR,
new OptionDescription("Enables the JSON record reader in Drill to escape any character. Default is false. (Drill 1.16+)"));

public static final String STORE_TABLE_USE_SCHEMA_FILE = "store.table.use_schema_file";
public static final BooleanValidator STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR = new BooleanValidator(STORE_TABLE_USE_SCHEMA_FILE,
new OptionDescription("Controls if schema file stored in table root directory will be used during query execution. (Drill 1.16+)"));

/**
* The column label (for directory levels) in results when querying files in a directory
* E.g. labels: dir0 dir1<pre>
Expand Down Expand Up @@ -972,8 +976,4 @@ public static String bootDefaultFor(String name) {
public static final String NDV_BLOOM_FILTER_FPOS_PROB = "exec.statistics.ndv_extrapolation_bf_fpprobability";
public static final LongValidator NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR = new PositiveLongValidator(NDV_BLOOM_FILTER_FPOS_PROB,
100, new OptionDescription("Controls trade-off between NDV statistic computation memory cost and sampling extrapolation accuracy"));

public static final String STORE_TABLE_USE_SCHEMA_FILE = "store.table.use_schema_file";
public static final BooleanValidator STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR = new BooleanValidator(STORE_TABLE_USE_SCHEMA_FILE,
new OptionDescription("Controls if schema file stored in table root directory will be used during query execution. (Drill 1.16+)"));
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR),
new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_UNION_TYPE),
new OptionDefinition(ExecConstants.TEXT_ESTIMATED_ROW_SIZE),
new OptionDefinition(ExecConstants.JSON_EXTENDED_TYPES),
Expand Down Expand Up @@ -281,8 +282,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR),
new OptionDefinition(ExecConstants.RM_QUERY_TAGS_VALIDATOR,
new OptionMetaData(OptionValue.AccessibleScopes.SESSION_AND_QUERY, false, false)),
new OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR),
new OptionDefinition(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE_VALIDATOR)
new OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR)
};

CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options, TupleMetadata schema) throws IOException {
FormatSelection formatSelection = selection.getWith(lpPersistance, FormatSelection.class);
FormatPlugin plugin = getFormatPlugin(formatSelection.getFormat());
plugin.setSchema(schema);
return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options);
return plugin.getGroupScan(userName, formatSelection.getSelection(), columns, options, schema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ default AbstractGroupScan getGroupScan(String userName, FileSelection selection,
return getGroupScan(userName, selection, columns);
}

default AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, TupleMetadata schema) throws IOException {
return getGroupScan(userName, selection, columns);
}

default AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options, TupleMetadata schema) throws IOException {
return getGroupScan(userName, selection, columns, options);
}

boolean supportsStatistics();

TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException;
Expand All @@ -74,18 +82,4 @@ default AbstractGroupScan getGroupScan(String userName, FileSelection selection,
Configuration getFsConf();
DrillbitContext getContext();
String getName();

/**
* Sets table schema that will be used during data read.
*
* @param schema table schema
*/
default void setSchema(TupleMetadata schema) { }

/**
* Returns table schema to be used during data read.
*
* @return table schema
*/
default TupleMetadata getSchema() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public CloseableRecordBatch buildScan(

if (! columnExplorer.isStarQuery()) {
scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth());
columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth(), scan.getSchema());
scan.setOperatorId(scan.getOperatorId());
}

Expand Down Expand Up @@ -296,8 +296,7 @@ public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> p
}

@Override
protected FileScanFramework buildFramework(
EasySubScan scan) throws ExecutionSetupException {
protected FileScanFramework buildFramework(EasySubScan scan) {

final FileScanFramework framework = new FileScanFramework(
scan.getColumns(),
Expand Down Expand Up @@ -471,14 +470,14 @@ protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroup
}

@Override
public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException {
public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
return new EasyWriter(child, location, partitionColumns, this);
}

@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
throws IOException {
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
private List<CompleteFileWork> chunks;
private List<EndpointAffinity> endpointAffinities;
private Path selectionRoot;
private final TupleMetadata schema;

@JsonCreator
public EasyGroupScan(
Expand All @@ -91,16 +92,6 @@ public EasyGroupScan(
schema);
}

public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, Path selectionRoot)
throws IOException {
this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot, null);
}

public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin,
List<SchemaPath> columns, Path selectionRoot) throws IOException {
this(userName, selection, formatPlugin, columns, selectionRoot, null);
}

public EasyGroupScan(
String userName,
FileSelection selection,
Expand All @@ -112,11 +103,9 @@ public EasyGroupScan(
super(userName);
this.selection = Preconditions.checkNotNull(selection);
this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
if (schema != null) {
this.formatPlugin.setSchema(schema);
}
this.columns = columns == null ? ALL_COLUMNS : columns;
this.selectionRoot = selectionRoot;
this.schema = schema;
initFromSelection(selection, formatPlugin);
}

Expand Down Expand Up @@ -152,6 +141,7 @@ private EasyGroupScan(final EasyGroupScan that) {
minWidth = that.minWidth;
mappings = that.mappings;
partitionDepth = that.partitionDepth;
schema = that.schema;
}

@JsonIgnore
Expand Down Expand Up @@ -224,7 +214,7 @@ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (endpointAffinities == null) {
logger.debug("chunks: {}", chunks.size());
logger.debug("Chunks size: {}", chunks.size());
endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
return endpointAffinities;
Expand Down Expand Up @@ -258,7 +248,7 @@ public EasySubScan getSpecificScan(int minorFragmentId) {
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));

EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
columns, selectionRoot, partitionDepth);
columns, selectionRoot, partitionDepth, schema);
subScan.setOperatorId(this.getOperatorId());
return subScan;
}
Expand All @@ -283,8 +273,8 @@ public FormatPluginConfig getFormatConfig() {

@Override
public String toString() {
final String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s]";
return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles());
String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s, columns=%s, files=%s, schema=%s]";
return String.format(pattern, selectionRoot, getFiles().size(), columns, getFiles(), schema);
}

@Override
Expand All @@ -295,7 +285,7 @@ public String getDigest() {
@Override
public GroupScan clone(List<SchemaPath> columns) {
if (!formatPlugin.supportsPushDown()) {
throw new IllegalStateException(String.format("%s doesn't support pushdown.", this.getClass().getSimpleName()));
throw new IllegalStateException(String.format("%s doesn't support push down.", this.getClass().getSimpleName()));
}
EasyGroupScan newScan = new EasyGroupScan(this);
newScan.columns = columns;
Expand All @@ -319,6 +309,6 @@ public boolean canPushdownProjects(List<SchemaPath> columns) {

@JsonProperty
public TupleMetadata getSchema() {
return formatPlugin.getSchema();
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class EasySubScan extends AbstractSubScan {
private final List<SchemaPath> columns;
private final Path selectionRoot;
private final int partitionDepth;
private final TupleMetadata schema;

@JsonCreator
public EasySubScan(
Expand All @@ -61,21 +62,22 @@ public EasySubScan(
super(userName);
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(this.formatPlugin);
this.formatPlugin.setSchema(schema);
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
this.partitionDepth = partitionDepth;
this.schema = schema;
}

public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
List<SchemaPath> columns, Path selectionRoot, int partitionDepth) {
List<SchemaPath> columns, Path selectionRoot, int partitionDepth, TupleMetadata schema) {
super(userName);
this.formatPlugin = plugin;
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
this.partitionDepth = partitionDepth;
this.schema = schema;
}

@JsonProperty
Expand All @@ -100,7 +102,7 @@ public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?
public List<SchemaPath> getColumns() { return columns; }

@JsonProperty("schema")
public TupleMetadata getSchema() { return formatPlugin.getSchema(); }
public TupleMetadata getSchema() { return schema; }

@Override
public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public boolean supportsPushDown() {
@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
throws IOException {
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot);
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
Expand Down Expand Up @@ -190,8 +189,7 @@ public TextScanBatchCreator(TextFormatPlugin plugin,
}

@Override
protected ColumnsScanFramework buildFramework(
EasySubScan scan) throws ExecutionSetupException {
protected ColumnsScanFramework buildFramework(EasySubScan scan) {
ColumnsScanFramework framework = new ColumnsScanFramework(
scan.getColumns(),
scan.getWorkUnits(),
Expand Down Expand Up @@ -220,8 +218,6 @@ protected ColumnsScanFramework buildFramework(
}
}

private TupleMetadata schema;

public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
this(name, context, fsConf, storageConfig, new TextFormatConfig());
}
Expand All @@ -247,14 +243,14 @@ private static EasyFormatConfig easyConfig(Configuration fsConf, TextFormatConfi
}

@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns)
public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, TupleMetadata schema)
throws IOException {
return new EasyGroupScan(userName, selection, this, columns, selection.selectionRoot, schema);
}

@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
List<SchemaPath> columns, OptionManager options) throws IOException {
List<SchemaPath> columns, OptionManager options, TupleMetadata schema) throws IOException {
return new EasyGroupScan(userName, selection, this, columns,
selection.selectionRoot,
// Some paths provide a null option manager. In that case, default to a
Expand Down Expand Up @@ -344,14 +340,4 @@ protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroup
final double estRowCount = data / estimatedRowSize;
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
}

@Override
public void setSchema(TupleMetadata schema) {
this.schema = schema;
}

@Override
public TupleMetadata getSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public WriterRecordBatch getWriterBatch(FragmentContext context, RecordBatch inc

@Override
public AbstractFileGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
return getGroupScan(userName, selection, columns, null);
return getGroupScan(userName, selection, columns, (OptionManager) null);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions exec/java-exec/src/main/resources/drill-module.conf
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ drill.exec.options: {
# Using common operators batch configuration unless the Parquet specific
# configuration is used
store.parquet.flat.batch.memory_size: 0,
store.table.use_schema_file: false,
store.partition.hash_distribute: false,
store.text.estimated_row_size_bytes: 100.0,
store.kafka.all_text_mode: false,
Expand Down Expand Up @@ -683,6 +684,5 @@ drill.exec.options: {
exec.query.return_result_set_for_ddl: true,
# ========= rm related options ===========
exec.rm.queryTags: "",
exec.rm.queues.wait_for_preferred_nodes: true,
store.table.use_schema_file: false
exec.rm.queues.wait_for_preferred_nodes: true
}

0 comments on commit 8836c62

Please sign in to comment.