Skip to content

Commit

Permalink
DRILL-8171: Convert SequenceFile to EVF2
Browse files Browse the repository at this point in the history
  • Loading branch information
luocooong committed Mar 19, 2022
1 parent 7ef203b commit cf6ccd9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
Expand All @@ -36,7 +38,6 @@
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
Expand All @@ -46,31 +47,47 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
public class SequenceFileBatchReader implements ManagedReader {

private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);

private final SequenceFileFormatConfig config;
private final EasySubScan scan;
private FileSplit split;
private String queryUserName;
private String opUserName;
private final FileDescrip file;
private final String opUserName;
private final String queryUserName;
public static final String KEY_SCHEMA = "binary_key";
public static final String VALUE_SCHEMA = "binary_value";
private final BytesWritable key = new BytesWritable();
private final BytesWritable value = new BytesWritable();
private final int maxRecords;
private RowSetLoader loader;
private ScalarWriter keyWriter;
private ScalarWriter valueWriter;
private final RowSetLoader loader;
private final ScalarWriter keyWriter;
private final ScalarWriter valueWriter;
private RecordReader<BytesWritable, BytesWritable> reader;
private CustomErrorContext errorContext;
private Stopwatch watch;
private final CustomErrorContext errorContext;
private final Stopwatch watch;

public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
this.config = config;
this.scan = scan;
this.maxRecords = scan.getMaxRecords();
public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
errorContext = negotiator.parentErrorContext();
file = negotiator.file();
opUserName = scan.getUserName();
queryUserName = negotiator.context().getFragmentContext().getQueryUserName();

negotiator.tableSchema(defineMetadata(), true);
logger.trace("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
try {
processReader(negotiator);
} catch (ExecutionSetupException e) {
throw UserException
.dataReadError(e)
.message("Failure in initial sequencefile reader")
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
ResultSetLoader setLoader = negotiator.build();
loader = setLoader.writer();
keyWriter = loader.scalar(KEY_SCHEMA);
valueWriter = loader.scalar(VALUE_SCHEMA);
watch = Stopwatch.createStarted();
}

private TupleMetadata defineMetadata() {
Expand All @@ -82,12 +99,7 @@ private TupleMetadata defineMetadata() {

private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
split = negotiator.split();
// After defined the split, We should also define the errorContext.
errorContext = negotiator.parentErrorContext();
opUserName = scan.getUserName();
queryUserName = negotiator.context().getFragmentContext().getQueryUserName();
final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
final JobConf jobConf = new JobConf(file.fileSystem().getConf());
jobConf.setInputFormat(inputFormat.getClass());
reader = getRecordReader(inputFormat, jobConf);
}
Expand All @@ -100,87 +112,48 @@ private RecordReader<BytesWritable, BytesWritable> getRecordReader(
return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
@Override
public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
return inputFormat.getRecordReader(file.split(), jobConf, Reporter.NULL);
}
});
} catch (IOException | InterruptedException e) {
throw UserException
.dataReadError(e)
.message("Error in creating sequencefile reader for file: %s, start: %d, length: %d. "
+ e.getMessage(), split.getPath(), split.getStart(), split.getLength())
.addContext(errorContext)
.build(logger);
}
}

@Override
public boolean open(FileSchemaNegotiator negotiator) {
negotiator.tableSchema(defineMetadata(), true);
logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
// open Sequencefile
try {
processReader(negotiator);
} catch (ExecutionSetupException e) {
throw UserException
.dataReadError(e)
.message("Failure in initial sequencefile reader. " + e.getMessage())
.message("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
file.split().getPath(), file.split().getStart(), file.split().getLength())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
ResultSetLoader setLoader = negotiator.build();
loader = setLoader.writer();
keyWriter = loader.scalar(KEY_SCHEMA);
valueWriter = loader.scalar(VALUE_SCHEMA);
return true;
}

@Override
public boolean next() {
int recordCount = 0;
if (watch == null) {
watch = Stopwatch.createStarted();
}
try {
while (!loader.isFull()) {
if (reader.next(key, value)) {
loader.start();
keyWriter.setBytes(key.getBytes(), key.getLength());
valueWriter.setBytes(value.getBytes(), value.getLength());
loader.save();
++ recordCount;
} else {
logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
return false;
}
if (loader.limitReached(maxRecords)) {
logger.debug("Reader fetch {} records in {} ms", loader.rowCount(), watch.elapsed(TimeUnit.MILLISECONDS));
watch.stop();
return false;
}
}
return true;
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("An error occurred while reading the next key/value pair from the sequencefile reader. "
+ e.getMessage())
.addContext(errorContext)
.build(logger);
.dataReadError(e)
.message("An error occurred while reading the next key/value pair")
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
return true;
}

@Override
public void close() {
try {
// The reader not support AutoCloseable, must be closed by invoke close().
if (reader != null) {
reader.close();
reader = null;
}
} catch (IOException e) {
throw UserException
.dataReadError(e)
.message("Failed closing sequencefile reader. " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
AutoCloseables.closeSilently(reader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
*/
package org.apache.drill.exec.store.easy.sequencefile;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -52,7 +51,7 @@ private static EasyFormatConfig easyConfig(Configuration fsConf, SequenceFileFor
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.readerOperatorType(OPERATOR_TYPE)
.scanVersion(ScanFrameworkVersion.EVF_V1)
.scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(SequenceFileFormatConfig.NAME)
Expand All @@ -70,24 +69,14 @@ public SequenceFileReaderFactory(SequenceFileFormatConfig config, EasySubScan sc
}

@Override
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
return new SequenceFileBatchReader(config, scan);
public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
return new SequenceFileBatchReader(config, scan, negotiator);
}
}

@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options)
throws ExecutionSetupException {
return new SequenceFileBatchReader(formatConfig, scan);
}

@Override
protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
FileScanBuilder builder = new FileScanBuilder();
builder.setReaderFactory(new SequenceFileReaderFactory(formatConfig, scan));

initScanBuilder(builder, scan);
protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(MinorType.VARCHAR));
return builder;
builder.readerFactory(new SequenceFileReaderFactory(formatConfig, scan));
}
}

0 comments on commit cf6ccd9

Please sign in to comment.