Skip to content

Commit

Permalink
DRILL-6952: Host compliant text reader on the row set framework
Browse files Browse the repository at this point in the history
The result set loader allows controlling batch sizes. The new scan framework
built on top of that framework handles projection, implicit columns, null
columns and more. This commit converts the "new" ("compliant") text reader
to use the new framework. Options select the use of the V2 ("new") or V3
(row-set based) versions. Unit tests demonstrate V3 functionality.

closes #1683
  • Loading branch information
Paul Rogers authored and arina-ielchiieva committed Mar 11, 2019
1 parent 4a79e2a commit d585452
Show file tree
Hide file tree
Showing 59 changed files with 5,141 additions and 696 deletions.
Expand Up @@ -713,6 +713,14 @@ private ExecConstants() {
public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY, public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY,
new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files.")); new OptionDescription("Enables the text reader that complies with the RFC 4180 standard for text/csv files."));


public static final String ENABLE_V3_TEXT_READER_KEY = "exec.storage.enable_v3_text_reader";
public static final OptionValidator ENABLE_V3_TEXT_READER = new BooleanValidator(ENABLE_V3_TEXT_READER_KEY,
new OptionDescription("Enables the row set based version of the text/csv reader."));

public static final String MIN_READER_WIDTH_KEY = "exec.storage.min_width";
public static final OptionValidator MIN_READER_WIDTH = new LongValidator(MIN_READER_WIDTH_KEY,
new OptionDescription("Min width for text readers, mostly for testing."));

public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json"; public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";


public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill"; public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.shaded.guava.com.google.common.collect.Maps;


public abstract class AbstractExchange extends AbstractSingle implements Exchange { public abstract class AbstractExchange extends AbstractSingle implements Exchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);


// Ephemeral info for generating execution fragments. // Ephemeral info for generating execution fragments.
protected int senderMajorFragmentId; protected int senderMajorFragmentId;
Expand Down Expand Up @@ -77,7 +76,7 @@ public ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint>
} }


/** /**
* Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrances * Get a default endpoint affinity map where affinity of a Drillbit is proportional to the number of its occurrences
* in given endpoint list. * in given endpoint list.
* *
* @param fragmentEndpoints Drillbit endpoint assignments of fragments. * @param fragmentEndpoints Drillbit endpoint assignments of fragments.
Expand Down Expand Up @@ -111,7 +110,6 @@ public final void setupSenders(int majorFragmentId, List<DrillbitEndpoint> sende
setupSenders(senderLocations); setupSenders(senderLocations);
} }



@Override @Override
public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
this.receiverMajorFragmentId = majorFragmentId; this.receiverMajorFragmentId = majorFragmentId;
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSelection;


public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan { public abstract class AbstractFileGroupScan extends AbstractGroupScan implements FileGroupScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFileGroupScan.class);


public AbstractFileGroupScan(String userName) { public AbstractFileGroupScan(String userName) {
super(userName); super(userName);
Expand All @@ -46,5 +45,4 @@ public FileGroupScan clone(FileSelection selection) throws IOException {
public boolean supportsPartitionFilterPushdown() { public boolean supportsPartitionFilterPushdown() {
return true; return true;
} }

} }
Expand Up @@ -25,8 +25,7 @@
import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;


public abstract class AbstractSubScan extends AbstractBase implements SubScan{ public abstract class AbstractSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);


public AbstractSubScan(String userName) { public AbstractSubScan(String userName) {
super(userName); super(userName);
Expand Down
Expand Up @@ -499,7 +499,6 @@ public void clear() {
schemaChanged = false; schemaChanged = false;
} }


@SuppressWarnings("resource")
private <T extends ValueVector> T addField(MaterializedField field, private <T extends ValueVector> T addField(MaterializedField field,
Class<T> clazz, boolean isImplicitField) throws SchemaChangeException { Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
Map<String, ValueVector> fieldVectorMap; Map<String, ValueVector> fieldVectorMap;
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
Expand Down Expand Up @@ -77,21 +76,21 @@
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;


public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);

private static final String EMPTY_STRING = "";

private Projector projector; private Projector projector;
private List<ValueVector> allocationVectors; private List<ValueVector> allocationVectors;
private List<ComplexWriter> complexWriters; private List<ComplexWriter> complexWriters;
private List<FieldReference> complexFieldReferencesList; private List<FieldReference> complexFieldReferencesList;
private boolean hasRemainder = false; private boolean hasRemainder = false;
private int remainderIndex = 0; private int remainderIndex;
private int recordCount; private int recordCount;

private ProjectMemoryManager memoryManager; private ProjectMemoryManager memoryManager;


private static final String EMPTY_STRING = "";
private boolean first = true; private boolean first = true;
private boolean wasNone = false; // whether a NONE iter outcome was already seen private boolean wasNone = false; // whether a NONE iter outcome was already seen
private ColumnExplorer columnExplorer;


private class ClassifierResult { private class ClassifierResult {
public boolean isStar = false; public boolean isStar = false;
Expand All @@ -114,21 +113,20 @@ private void clear() {


public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException { public ProjectRecordBatch(final Project pop, final RecordBatch incoming, final FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming); super(pop, context, incoming);
columnExplorer = new ColumnExplorer(context.getOptions());
} }


@Override @Override
public int getRecordCount() { public int getRecordCount() {
return recordCount; return recordCount;
} }



@Override @Override
protected void killIncoming(final boolean sendUpstream) { protected void killIncoming(final boolean sendUpstream) {
super.killIncoming(sendUpstream); super.killIncoming(sendUpstream);
hasRemainder = false; hasRemainder = false;
} }



@Override @Override
public IterOutcome innerNext() { public IterOutcome innerNext() {
if (wasNone) { if (wasNone) {
Expand All @@ -145,7 +143,7 @@ public IterOutcome innerNext() {


@Override @Override
public VectorContainer getOutgoingContainer() { public VectorContainer getOutgoingContainer() {
return this.container; return container;
} }


@Override @Override
Expand Down Expand Up @@ -204,22 +202,21 @@ protected IterOutcome doWork() {
} }
incomingRecordCount = incoming.getRecordCount(); incomingRecordCount = incoming.getRecordCount();
memoryManager.update(); memoryManager.update();
logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}", logger.trace("doWork():[1] memMgr RC {}, incoming rc {}, incoming {}, Project {}",
memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this);
} }
} }
} }


if (complexWriters != null && getLastKnownOutcome() == EMIT) { if (complexWriters != null && getLastKnownOutcome() == EMIT) {
throw new UnsupportedOperationException("Currently functions producing complex types as output is not " + throw new UnsupportedOperationException("Currently functions producing complex types as output are not " +
"supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " + "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
"function in the projection list of outermost query."); "function in the projection list of outermost query.");
} }


first = false; first = false;
container.zeroVectors(); container.zeroVectors();



int maxOuputRecordCount = memoryManager.getOutputRowCount(); int maxOuputRecordCount = memoryManager.getOutputRowCount();
logger.trace("doWork():[2] memMgr RC {}, incoming rc {}, incoming {}, project {}", logger.trace("doWork():[2] memMgr RC {}, incoming rc {}, incoming {}, project {}",
memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this);
Expand All @@ -233,7 +230,6 @@ protected IterOutcome doWork() {
long projectEndTime = System.currentTimeMillis(); long projectEndTime = System.currentTimeMillis();
logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime)); logger.trace("doWork(): projection: records {}, time {} ms", outputRecords, (projectEndTime - projectStartTime));



if (outputRecords < incomingRecordCount) { if (outputRecords < incomingRecordCount) {
setValueCount(outputRecords); setValueCount(outputRecords);
hasRemainder = true; hasRemainder = true;
Expand Down Expand Up @@ -277,7 +273,7 @@ private void handleRemainder() {
final int projRecords = projector.projectRecords(this.incoming, remainderIndex, recordsToProcess, 0); final int projRecords = projector.projectRecords(this.incoming, remainderIndex, recordsToProcess, 0);
long projectEndTime = System.currentTimeMillis(); long projectEndTime = System.currentTimeMillis();


logger.trace("handleRemainder: projection: " + "records {}, time {} ms", projRecords,(projectEndTime - projectStartTime)); logger.trace("handleRemainder: projection: records {}, time {} ms", projRecords,(projectEndTime - projectStartTime));


if (projRecords < remainingRecordCount) { if (projRecords < remainingRecordCount) {
setValueCount(projRecords); setValueCount(projRecords);
Expand Down Expand Up @@ -463,7 +459,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
memoryManager.addNewField(vv, write); memoryManager.addNewField(vv, write);
final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
} }
} }
continue; continue;
Expand Down Expand Up @@ -546,7 +542,7 @@ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaCha
final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName())); final TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector); final boolean useSetSafe = !(ouputVector instanceof FixedWidthVector);
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
final HoldingContainer hc = cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND); cg.addExpr(write, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
memoryManager.addNewField(ouputVector, write); memoryManager.addNewField(ouputVector, write);


// We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
Expand Down Expand Up @@ -590,7 +586,7 @@ protected boolean setupNewSchema() throws SchemaChangeException {
} }


private boolean isImplicitFileColumn(ValueVector vvIn) { private boolean isImplicitFileColumn(ValueVector vvIn) {
return ColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null; return columnExplorer.isImplicitFileColumn(vvIn.getField().getName());
} }


private List<NamedExpression> getExpressionList() { private List<NamedExpression> getExpressionList() {
Expand Down
Expand Up @@ -24,7 +24,7 @@
import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl; import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
import org.apache.drill.exec.store.easy.text.compliant.RepeatedVarCharOutput; import org.apache.drill.exec.store.easy.text.compliant.v3.TextReader;


/** /**
* Parses the `columns` array. Doing so is surprisingly complex. * Parses the `columns` array. Doing so is surprisingly complex.
Expand Down Expand Up @@ -113,14 +113,14 @@ public boolean parse(RequestedColumn inCol) {


if (inCol.isArray()) { if (inCol.isArray()) {
int maxIndex = inCol.maxIndex(); int maxIndex = inCol.maxIndex();
if (maxIndex > RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) { if (maxIndex > TextReader.MAXIMUM_NUMBER_COLUMNS) {
throw UserException throw UserException
.validationError() .validationError()
.message(String.format( .message(String.format(
"`columns`[%d] index out of bounds, max supported size is %d", "`columns`[%d] index out of bounds, max supported size is %d",
maxIndex, RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)) maxIndex, TextReader.MAXIMUM_NUMBER_COLUMNS))
.addContext("Column", inCol.name()) .addContext("Column", inCol.name())
.addContext("Maximum index", RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS) .addContext("Maximum index", TextReader.MAXIMUM_NUMBER_COLUMNS)
.build(logger); .build(logger);
} }
} }
Expand Down
Expand Up @@ -69,6 +69,7 @@ public interface FileSchemaNegotiator extends SchemaNegotiator {
private List<FileSplit> spilts = new ArrayList<>(); private List<FileSplit> spilts = new ArrayList<>();
private Iterator<FileSplit> splitIter; private Iterator<FileSplit> splitIter;
private Path scanRootDir; private Path scanRootDir;
private int partitionDepth;
protected DrillFileSystem dfs; protected DrillFileSystem dfs;
private FileMetadataManager metadataManager; private FileMetadataManager metadataManager;


Expand All @@ -82,12 +83,18 @@ public BaseFileScanFramework(List<SchemaPath> projection,


/** /**
* Specify the selection root for a directory scan, if any. * Specify the selection root for a directory scan, if any.
* Used to populate partition columns. * Used to populate partition columns. Also, specify the maximum
* partition depth.
*
* @param rootPath Hadoop file path for the directory * @param rootPath Hadoop file path for the directory
* @param partitionDepth maximum partition depth across all files
* within this logical scan operator (files in this scan may be
* shallower)
*/ */


public void setSelectionRoot(Path rootPath) { public void setSelectionRoot(Path rootPath, int partitionDepth) {
this.scanRootDir = rootPath; this.scanRootDir = rootPath;
this.partitionDepth = partitionDepth;
} }


@Override @Override
Expand Down Expand Up @@ -122,7 +129,10 @@ protected void configure() {


metadataManager = new FileMetadataManager( metadataManager = new FileMetadataManager(
context.getFragmentContext().getOptions(), context.getFragmentContext().getOptions(),
true, // Expand partition columns with wildcard
false, // Put partition columns after table columns
scanRootDir, scanRootDir,
partitionDepth,
paths); paths);
scanOrchestrator.withMetadata(metadataManager); scanOrchestrator.withMetadata(metadataManager);
} }
Expand Down
Expand Up @@ -46,6 +46,8 @@ public class FileMetadataColumnsParser implements ScanProjectionParser {


private boolean hasImplicitCols; private boolean hasImplicitCols;


private boolean expandPartitionsAtEnd;

public FileMetadataColumnsParser(FileMetadataManager metadataManager) { public FileMetadataColumnsParser(FileMetadataManager metadataManager) {
this.metadataManager = metadataManager; this.metadataManager = metadataManager;
partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE); partitionPattern = Pattern.compile(metadataManager.partitionDesignator + "(\\d+)", Pattern.CASE_INSENSITIVE);
Expand Down Expand Up @@ -123,8 +125,10 @@ private boolean buildMetadataColumn(FileMetadataColumnDefn defn,
} }


private void buildWildcard() { private void buildWildcard() {
if (metadataManager.useLegacyWildcardExpansion && if (!metadataManager.useLegacyWildcardExpansion) {
metadataManager.useLegacyExpansionLocation) { return;
}
if (metadataManager.useLegacyExpansionLocation) {


// Star column: this is a SELECT * query. // Star column: this is a SELECT * query.


Expand All @@ -134,6 +138,8 @@ private void buildWildcard() {
// set is constant across all files. // set is constant across all files.


expandPartitions(); expandPartitions();
} else {
expandPartitionsAtEnd = true;
} }
} }


Expand All @@ -144,8 +150,7 @@ public void validate() {
// feature to expand partitions for wildcards, and we want the // feature to expand partitions for wildcards, and we want the
// partitions after data columns. // partitions after data columns.


if (builder.hasWildcard() && metadataManager.useLegacyWildcardExpansion && if (expandPartitionsAtEnd) {
! metadataManager.useLegacyExpansionLocation) {
expandPartitions(); expandPartitions();
} }
} }
Expand Down

0 comments on commit d585452

Please sign in to comment.