diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java index f69aa407efc..761f68bb47a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileScanFramework.java @@ -62,7 +62,8 @@ public class FileScanFramework extends ManagedScanFramework { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileScanFramework.class); + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(FileScanFramework.class); /** * The file schema negotiator adds no behavior at present, but is @@ -80,7 +81,6 @@ public interface FileSchemaNegotiator extends SchemaNegotiator { /** * Gives the Drill file system for this operator. */ - DrillFileSystem fileSystem(); /** @@ -186,6 +186,10 @@ public ManagedReader next() { return newReader(); } + public CustomErrorContext errorContext() { + return fileFramework == null ? null : fileFramework.errorContext(); + } + public abstract ManagedReader newReader(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java index 52203da0b73..a7a46fe4e4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java @@ -135,10 +135,17 @@ public interface ReaderFactory { public static class ScanFrameworkBuilder extends ScanOrchestratorBuilder { protected ReaderFactory readerFactory; + protected String userName; public void setReaderFactory(ReaderFactory readerFactory) { this.readerFactory = readerFactory; } + + public ReaderFactory readerFactory() { return readerFactory; } + + public void setUserName(String userName) { + this.userName = userName; + } } // Inputs diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java index bc303ecfa44..74ea512e85c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java @@ -64,8 +64,15 @@ public interface SchemaNegotiator { * Specify an advanced error context which allows the reader to * fill in custom context values. */ + void setErrorContext(CustomErrorContext context); + /* + * The name of the user running the query. + */ + + String userName(); + /** * Specify the table schema if this is an early-schema reader. Need * not be called for a late-schema readers. The schema provided here, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java index ab70734342e..15008c12b8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java @@ -100,6 +100,11 @@ public void setBatchSize(int maxRecordsPerBatch) { batchSize = maxRecordsPerBatch; } + @Override + public String userName() { + return framework.builder.userName; + } + /** * Callback from the schema negotiator to build the schema from information from * both the table and scan operator. Returns the result set loader to be used diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 74439a45e12..a1701222aac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -23,9 +23,7 @@ import java.util.Map; import java.util.Set; -import org.apache.drill.exec.physical.base.MetadataProviderManager; -import org.apache.drill.shaded.guava.com.google.common.base.Functions; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.drill.common.exceptions.CustomErrorContext; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; @@ -35,17 +33,21 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.MetadataProviderManager; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.impl.ScanBatch; -import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch; +import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch; import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec; import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework; import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory; import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; @@ -62,11 +64,13 @@ import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.schedule.CompleteFileWork; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - +import org.apache.drill.shaded.guava.com.google.common.base.Functions; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * Base class for various file readers. @@ -80,6 +84,8 @@ public abstract class EasyFormatPlugin implements FormatPlugin { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class); + /** * Defines the static, programmer-defined options for this plugin. These * options are attributes of how the plugin works. The plugin config, @@ -104,214 +110,43 @@ public static class EasyFormatConfig { public boolean supportsProjectPushdown; public boolean supportsAutoPartitioning; + public boolean supportsStatistics; public int readerOperatorType = -1; public int writerOperatorType = -1; - } - /** - * Creates the scan batch to use with the plugin. Drill supports the "classic" - * style of scan batch and readers, along with the newer size-aware, - * component-based version. The implementation of this class assembles the - * readers and scan batch operator as needed for each version. - */ + // Choose whether to use the "traditional" or "enhanced" reader + // structure. Can also be selected at runtime by overriding + // useEnhancedScan(). - public interface ScanBatchCreator { - CloseableRecordBatch buildScan( - final FragmentContext context, EasySubScan scan) - throws ExecutionSetupException; + public boolean useEnhancedScan; } /** - * Use the original scanner based on the {@link RecordReader} interface. - * Requires that the storage plugin roll its own solutions for null columns. - * Is not able to limit vector or batch sizes. Retained or backward - * compatibility with "classic" format plugins which have not yet been - * upgraded to use the new framework. + * Builds the readers for the V3 text scan operator. */ - - public static class ClassicScanBatchCreator implements ScanBatchCreator { + private static class EasyReaderFactory extends FileReaderFactory { private final EasyFormatPlugin plugin; + private final EasySubScan scan; + private final FragmentContext context; - public ClassicScanBatchCreator(EasyFormatPlugin plugin) { + public EasyReaderFactory(EasyFormatPlugin plugin, + final EasySubScan scan, FragmentContext context) { this.plugin = plugin; + this.scan = scan; + this.context = context; } @Override - public CloseableRecordBatch buildScan( - final FragmentContext context, EasySubScan scan) throws ExecutionSetupException { - final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns()); - - if (! columnExplorer.isStarQuery()) { - scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), - columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth(), scan.getSchema()); - scan.setOperatorId(scan.getOperatorId()); - } - - final OperatorContext oContext = context.newOperatorContext(scan); - final DrillFileSystem dfs; + public ManagedReader newReader() { try { - dfs = oContext.newFileSystem(plugin.easyConfig().fsConf); - } catch (final IOException e) { - throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); - } - - final List readers = new LinkedList<>(); - final List> implicitColumns = Lists.newArrayList(); - Map mapWithMaxColumns = Maps.newLinkedHashMap(); - final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; - for (final FileWork work : scan.getWorkUnits()) { - final RecordReader recordReader = getRecordReader( - plugin, context, dfs, work, scan.getColumns(), scan.getUserName()); - readers.add(recordReader); - final List partitionValues = ColumnExplorer.listPartitionValues( - work.getPath(), scan.getSelectionRoot(), false); - final Map implicitValues = columnExplorer.populateImplicitColumns( - work.getPath(), partitionValues, supportsFileImplicitColumns); - implicitColumns.add(implicitValues); - if (implicitValues.size() > mapWithMaxColumns.size()) { - mapWithMaxColumns = implicitValues; - } - } - - // all readers should have the same number of implicit columns, add missing ones with value null - final Map diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); - for (final Map map : implicitColumns) { - map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); + return plugin.newBatchReader(scan, context.getOptions()); + } catch (ExecutionSetupException e) { + throw UserException.validationError(e) + .addContext("Reason", "Failed to create a batch reader") + .addContext(errorContext()) + .build(logger); } - - return new ScanBatch(context, oContext, readers, implicitColumns); - } - - /** - * Create a record reader given a file system, a file description and other - * information. For backward compatibility, calls the plugin method by - * default. - * - * @param plugin - * the plugin creating the scan - * @param context - * fragment context for the fragment running the scan - * @param dfs - * Drill's distributed file system facade - * @param fileWork - * description of the file to scan - * @param columns - * list of columns to project - * @param userName - * the name of the user performing the scan - * @return a scan operator - * @throws ExecutionSetupException - * if anything goes wrong - */ - - public RecordReader getRecordReader(EasyFormatPlugin plugin, - FragmentContext context, DrillFileSystem dfs, FileWork fileWork, - List columns, String userName) throws ExecutionSetupException { - return plugin.getRecordReader(context, dfs, fileWork, columns, userName); - } - } - - /** - * Revised scanner based on the revised {@link org.apache.drill.exec.physical.rowSet.ResultSetLoader} - * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes. - * Handles most projection tasks automatically. Able to limit - * vector and batch sizes. Use this for new format plugins. - */ - - public abstract static class ScanFrameworkCreator - implements ScanBatchCreator { - - protected EasyFormatPlugin plugin; - - public ScanFrameworkCreator(EasyFormatPlugin plugin) { - this.plugin = plugin; - } - - /** - * Builds the revised {@link FileBatchReader}-based scan batch. - * - * @param context - * @param scan - * @return - * @throws ExecutionSetupException - */ - - @Override - public CloseableRecordBatch buildScan( - final FragmentContext context, - final EasySubScan scan) throws ExecutionSetupException { - - // Assemble the scan operator and its wrapper. - - try { - final FileScanBuilder builder = frameworkBuilder(scan); - builder.setProjection(scan.getColumns()); - builder.setFiles(scan.getWorkUnits()); - builder.setConfig(plugin.easyConfig().fsConf); - - // The text readers use required Varchar columns to represent null columns. - - builder.allowRequiredNullColumns(true); - final Path selectionRoot = scan.getSelectionRoot(); - if (selectionRoot != null) { - builder.metadataOptions().setSelectionRoot(selectionRoot); - builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth()); - } - FileScanFramework framework = builder.buildFileFramework(); - return new OperatorRecordBatch( - context, scan, - new ScanOperatorExec( - framework)); - } catch (final UserException e) { - // Rethrow user exceptions directly - throw e; - } catch (final Throwable e) { - // Wrap all others - throw new ExecutionSetupException(e); - } - } - - /** - * Create the plugin-specific framework that manages the scan. The framework - * creates batch readers one by one for each file or block. It defines semantic - * rules for projection. It handles "early" or "late" schema readers. A typical - * framework builds on standardized frameworks for files in general or text - * files in particular. - * - * @param scan the physical operation definition for the scan operation. Contains - * one or more files to read. (The Easy format plugin works only for files.) - * @return the scan framework which orchestrates the scan operation across - * potentially many files - * @throws ExecutionSetupException for all setup failures - */ - protected abstract FileScanBuilder frameworkBuilder( - EasySubScan scan) throws ExecutionSetupException; - } - - /** - * Generic framework creator for files that just use the basic file - * support: metadata, etc. Specialized use cases (special "columns" - * column, say) will require a specialized implementation. - */ - - public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator { - - private final FileReaderFactory readerCreator; - - public FileScanFrameworkCreator(EasyFormatPlugin plugin, - FileReaderFactory readerCreator) { - super(plugin); - this.readerCreator = readerCreator; - } - - @Override - protected FileScanBuilder frameworkBuilder( - EasySubScan scan) throws ExecutionSetupException { - - FileScanBuilder builder = new FileScanBuilder(); - builder.setReaderFactory(readerCreator); - return builder; } } @@ -428,26 +263,173 @@ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs protected CloseableRecordBatch getReaderBatch(final FragmentContext context, final EasySubScan scan) throws ExecutionSetupException { - return scanBatchCreator(context.getOptions()).buildScan(context, scan); + if (useEnhancedScan(context.getOptions())) { + return buildScan(context, scan); + } else { + return buildScanBatch(context, scan); + } + } + + /** + * Choose whether to use the enhanced scan based on the row set and scan + * framework, or the "traditional" ad-hoc structure based on ScanBatch. + * Normally set as a config option. Override this method if you want to + * make the choice based on a system/session option. + * + * @return true to use the enhanced scan framework, false for the + * traditional scan-batch framework + */ + + protected boolean useEnhancedScan(OptionManager options) { + return easyConfig.useEnhancedScan; + } + + /** + * Use the original scanner based on the {@link RecordReader} interface. + * Requires that the storage plugin roll its own solutions for null columns. + * Is not able to limit vector or batch sizes. Retained or backward + * compatibility with "classic" format plugins which have not yet been + * upgraded to use the new framework. + */ + + private CloseableRecordBatch buildScanBatch(FragmentContext context, + EasySubScan scan) throws ExecutionSetupException { + final ColumnExplorer columnExplorer = + new ColumnExplorer(context.getOptions(), scan.getColumns()); + + if (! columnExplorer.isStarQuery()) { + scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), + columnExplorer.getTableColumns(), scan.getSelectionRoot(), + scan.getPartitionDepth(), scan.getSchema()); + scan.setOperatorId(scan.getOperatorId()); + } + + final OperatorContext oContext = context.newOperatorContext(scan); + final DrillFileSystem dfs; + try { + dfs = oContext.newFileSystem(easyConfig().fsConf); + } catch (final IOException e) { + throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); + } + + final List readers = new LinkedList<>(); + final List> implicitColumns = Lists.newArrayList(); + Map mapWithMaxColumns = Maps.newLinkedHashMap(); + final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null; + for (final FileWork work : scan.getWorkUnits()) { + final RecordReader recordReader = getRecordReader( + context, dfs, work, scan.getColumns(), scan.getUserName()); + readers.add(recordReader); + final List partitionValues = ColumnExplorer.listPartitionValues( + work.getPath(), scan.getSelectionRoot(), false); + final Map implicitValues = columnExplorer.populateImplicitColumns( + work.getPath(), partitionValues, supportsFileImplicitColumns); + implicitColumns.add(implicitValues); + if (implicitValues.size() > mapWithMaxColumns.size()) { + mapWithMaxColumns = implicitValues; + } + } + + // all readers should have the same number of implicit columns, add missing ones with value null + final Map diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); + for (final Map map : implicitColumns) { + map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); + } + + return new ScanBatch(context, oContext, readers, implicitColumns); } /** - * Create the scan batch creator. Needed only when using the revised scan batch. In that - * case, override the readerIterator() method on the custom scan batch - * creator implementation. + * Revised scanner based on the revised {@link org.apache.drill.exec.physical.rowSet.ResultSetLoader} + * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} classes. + * Handles most projection tasks automatically. Able to limit + * vector and batch sizes. Use this for new format plugins. + */ + + private CloseableRecordBatch buildScan(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { + + // Assemble the scan operator and its wrapper. + + try { + final FileScanBuilder builder = frameworkBuilder(context.getOptions(), scan); + builder.setProjection(scan.getColumns()); + builder.setFiles(scan.getWorkUnits()); + builder.setConfig(easyConfig().fsConf); + builder.setUserName(scan.getUserName()); + + // Pass along the output schema, if any + + builder.setOutputSchema(scan.getSchema()); + final Path selectionRoot = scan.getSelectionRoot(); + if (selectionRoot != null) { + builder.metadataOptions().setSelectionRoot(selectionRoot); + builder.metadataOptions().setPartitionDepth(scan.getPartitionDepth()); + } + + // Add batch reader, if none specified + + if (builder.readerFactory() == null) { + builder.setReaderFactory(new EasyReaderFactory(this, scan, context)); + } + + // Add error context, if none is specified + + if (builder.errorContext() == null) { + builder.setContext( + new CustomErrorContext() { + @Override + public void addContext(UserException.Builder builder) { + builder.addContext("Format plugin:", + EasyFormatPlugin.this.getClass().getSimpleName()); + builder.addContext("Plugin config name:", getName()); + } + }); + } + + FileScanFramework framework = builder.buildFileFramework(); + return new OperatorRecordBatch(context, scan, + new ScanOperatorExec(framework)); + } catch (final UserException e) { + // Rethrow user exceptions directly + throw e; + } catch (final Throwable e) { + // Wrap all others + throw new ExecutionSetupException(e); + } + } + + public ManagedReader newBatchReader( + EasySubScan scan, OptionManager options) throws ExecutionSetupException { + throw new ExecutionSetupException("Must implement newBatchReader() if using the enhanced framework."); + } + + /** + * Create the plugin-specific framework that manages the scan. The framework + * creates batch readers one by one for each file or block. It defines semantic + * rules for projection. It handles "early" or "late" schema readers. A typical + * framework builds on standardized frameworks for files in general or text + * files in particular. * - * @return the strategy for creating the scan batch for this plugin + * @param scan the physical operation definition for the scan operation. Contains + * one or more files to read. (The Easy format plugin works only for files.) + * @return the scan framework which orchestrates the scan operation across + * potentially many files + * @throws ExecutionSetupException for all setup failures */ - protected ScanBatchCreator scanBatchCreator(OptionManager options) { - return new ClassicScanBatchCreator(this); + protected FileScanBuilder frameworkBuilder( + OptionManager options, EasySubScan scan) throws ExecutionSetupException { + throw new ExecutionSetupException("Must implement frameworkBuilder() if using the enhanced framework."); } public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) { return false; } - public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException; + public RecordWriter getRecordWriter(FragmentContext context, + EasyWriter writer) throws IOException { + throw new UnsupportedOperationException("unimplemented"); + } public StatisticsRecordWriter getStatisticsRecordWriter(FragmentContext context, EasyWriter writer) throws IOException { @@ -519,4 +501,18 @@ public Set getOptimizerRules() { public int getReaderOperatorType() { return easyConfig.readerOperatorType; } public int getWriterOperatorType() { return easyConfig.writerOperatorType; } + + @Override + public boolean supportsStatistics() { return easyConfig.supportsStatistics; } + + @Override + public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override + public void writeStatistics(TableStatistics statistics, FileSystem fs, + Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index f764c38c566..3d122edf9ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -28,9 +28,8 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -42,7 +41,6 @@ import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder; import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; -import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; @@ -67,7 +65,6 @@ import org.apache.drill.exec.vector.accessor.convert.AbstractConvertFromString; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; @@ -199,70 +196,6 @@ public ManagedReader newReader() { } } - /** - * Builds the V3 text scan operator. - */ - private static class TextScanBatchCreator extends ScanFrameworkCreator { - - private final TextFormatPlugin textPlugin; - - public TextScanBatchCreator(TextFormatPlugin plugin) { - super(plugin); - textPlugin = plugin; - } - - @Override - protected FileScanBuilder frameworkBuilder( - EasySubScan scan) throws ExecutionSetupException { - ColumnsScanBuilder builder = new ColumnsScanBuilder(); - builder.setReaderFactory(new ColumnsReaderFactory(textPlugin)); - - // Provide custom error context - builder.setContext( - new CustomErrorContext() { - @Override - public void addContext(UserException.Builder builder) { - builder.addContext("Format plugin:", PLUGIN_NAME); - builder.addContext("Plugin config name:", textPlugin.getName()); - builder.addContext("Extract headers:", - Boolean.toString(textPlugin.getConfig().isHeaderExtractionEnabled())); - builder.addContext("Skip headers:", - Boolean.toString(textPlugin.getConfig().isSkipFirstLine())); - } - }); - - // If this format has no headers, or wants to skip them, - // then we must use the columns column to hold the data. - - builder.requireColumnsArray( - ! textPlugin.getConfig().isHeaderExtractionEnabled()); - - // Text files handle nulls in an unusual way. Missing columns - // are set to required Varchar and filled with blanks. Yes, this - // means that the SQL statement or code cannot differentiate missing - // columns from empty columns, but that is how CSV and other text - // files have been defined within Drill. - - builder.setNullType( - MajorType.newBuilder() - .setMinorType(MinorType.VARCHAR) - .setMode(DataMode.REQUIRED) - .build()); - - // Pass along the output schema, if any - - builder.setOutputSchema(scan.getSchema()); - - // CSV maps blank columns to nulls (for nullable non-string columns), - // or to the default value (for non-nullable non-string columns.) - - builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP, - AbstractConvertFromString.BLANK_AS_NULL); - - return builder; - } - } - public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { this(name, context, fsConf, storageConfig, new TextFormatConfig()); } @@ -284,6 +217,9 @@ private static EasyFormatConfig easyConfig(Configuration fsConf, TextFormatConfi config.defaultName = PLUGIN_NAME; config.readerOperatorType = CoreOperatorType.TEXT_SUB_SCAN_VALUE; config.writerOperatorType = CoreOperatorType.TEXT_WRITER_VALUE; + + // Uncomment this, and remove useEnhancedScan(), when v2 is retired + //config.useEnhancedScan = true; return config; } @@ -304,16 +240,12 @@ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, } @Override - protected ScanBatchCreator scanBatchCreator(OptionManager options) { + protected boolean useEnhancedScan(OptionManager options) { // Create the "legacy", "V2" reader or the new "V3" version based on // the result set loader. This code should be temporary: the two // readers provide identical functionality for the user; only the // internals differ. - if (options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY)) { - return new TextScanBatchCreator(this); - } else { - return new ClassicScanBatchCreator(this); - } + return options.getBoolean(ExecConstants.ENABLE_V3_TEXT_READER_KEY); } // TODO: Remove this once the V2 reader is removed. @@ -336,6 +268,53 @@ public RecordReader getRecordReader(FragmentContext context, } } + @Override + protected FileScanBuilder frameworkBuilder( + OptionManager options, EasySubScan scan) throws ExecutionSetupException { + ColumnsScanBuilder builder = new ColumnsScanBuilder(); + builder.setReaderFactory(new ColumnsReaderFactory(this)); + + // If this format has no headers, or wants to skip them, + // then we must use the columns column to hold the data. + + builder.requireColumnsArray( + ! getConfig().isHeaderExtractionEnabled()); + + // Text files handle nulls in an unusual way. Missing columns + // are set to required Varchar and filled with blanks. Yes, this + // means that the SQL statement or code cannot differentiate missing + // columns from empty columns, but that is how CSV and other text + // files have been defined within Drill. + + builder.setNullType(Types.required(MinorType.VARCHAR)); + + // CSV maps blank columns to nulls (for nullable non-string columns), + // or to the default value (for non-nullable non-string columns.) + + builder.setConversionProperty(AbstractConvertFromString.BLANK_ACTION_PROP, + AbstractConvertFromString.BLANK_AS_NULL); + + // The text readers use required Varchar columns to represent null columns. + + builder.allowRequiredNullColumns(true); + + // Provide custom error context + builder.setContext( + new CustomErrorContext() { + @Override + public void addContext(UserException.Builder builder) { + builder.addContext("Format plugin:", PLUGIN_NAME); + builder.addContext("Plugin config name:", getName()); + builder.addContext("Extract headers:", + Boolean.toString(getConfig().isHeaderExtractionEnabled())); + builder.addContext("Skip first line:", + Boolean.toString(getConfig().isSkipFirstLine())); + } + }); + + return builder; + } + @Override public RecordWriter getRecordWriter(final FragmentContext context, final EasyWriter writer) throws IOException { final Map options = new HashMap<>(); @@ -354,21 +333,6 @@ public RecordWriter getRecordWriter(final FragmentContext context, final EasyWri return recordWriter; } - @Override - public boolean supportsStatistics() { - return false; - } - - @Override - public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { - throw new UnsupportedOperationException("unimplemented"); - } - - @Override - public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) { - throw new UnsupportedOperationException("unimplemented"); - } - @Override protected ScanStats getScanStats(final PlannerSettings settings, final EasyGroupScan scan) { long data = 0; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java index 645af3018af..7abbf3d0fe4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java @@ -926,7 +926,7 @@ public void testColumnsIndexV3() throws IOException { assertTrue(e.getMessage().contains("Format plugin: text")); assertTrue(e.getMessage().contains("Plugin config name: csv")); assertTrue(e.getMessage().contains("Extract headers: true")); - assertTrue(e.getMessage().contains("Skip headers: false")); + assertTrue(e.getMessage().contains("Skip first line: false")); } catch (Exception e) { fail(); } finally { @@ -974,7 +974,7 @@ public void testColumnsIndexMissingV3() throws IOException { assertTrue(e.getMessage().contains("Format plugin: text")); assertTrue(e.getMessage().contains("Plugin config name: csv")); assertTrue(e.getMessage().contains("Extract headers: true")); - assertTrue(e.getMessage().contains("Skip headers: false")); + assertTrue(e.getMessage().contains("Skip first line: false")); } catch (Exception e) { fail(); } finally {