Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 #2485

Merged
merged 4 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,36 +41,29 @@
import java.io.InputStream;
import java.io.InputStreamReader;

public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator> {
public class HttpdLogBatchReader implements ManagedReader {

private static final Logger logger = LoggerFactory.getLogger(HttpdLogBatchReader.class);
public static final String RAW_LINE_COL_NAME = "_raw";
public static final String MATCHED_COL_NAME = "_matched";
private final HttpdLogFormatConfig formatConfig;
private final int maxRecords;
private final EasySubScan scan;
private HttpdParser parser;
private FileSplit split;
private final HttpdParser parser;
private final FileDescrip file;
private InputStream fsStream;
private RowSetLoader rowWriter;
private final RowSetLoader rowWriter;
private BufferedReader reader;
private int lineNumber;
private CustomErrorContext errorContext;
private ScalarWriter rawLineWriter;
private ScalarWriter matchedWriter;
private final CustomErrorContext errorContext;
private final ScalarWriter rawLineWriter;
private final ScalarWriter matchedWriter;
private int errorCount;


public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, int maxRecords, EasySubScan scan) {
public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
this.formatConfig = formatConfig;
this.maxRecords = maxRecords;
this.scan = scan;
}

@Override
public boolean open(FileSchemaNegotiator negotiator) {
// Open the input stream to the log file
openFile(negotiator);
file = negotiator.file();
openFile();
errorContext = negotiator.parentErrorContext();
try {
parser = new HttpdParser(
Expand All @@ -92,7 +86,6 @@ public boolean open(FileSchemaNegotiator negotiator) {
parser.addFieldsToParser(rowWriter);
rawLineWriter = addImplicitColumn(RAW_LINE_COL_NAME, MinorType.VARCHAR);
matchedWriter = addImplicitColumn(MATCHED_COL_NAME, MinorType.BIT);
return true;
}

@Override
Expand All @@ -108,11 +101,6 @@ public boolean next() {
private boolean nextLine(RowSetLoader rowWriter) {
String line;

// Check if the limit has been reached
if (rowWriter.limitReached(maxRecords)) {
return false;
}

try {
line = reader.readLine();
if (line == null) {
Expand Down Expand Up @@ -164,19 +152,19 @@ public void close() {
try {
fsStream.close();
} catch (IOException e) {
logger.warn("Error when closing HTTPD file: {} {}", split.getPath().toString(), e.getMessage());
logger.warn("Error when closing HTTPD file: {} {}", file.split().getPath().toString(), e.getMessage());
}
fsStream = null;
}

private void openFile(FileSchemaNegotiator negotiator) {
split = negotiator.split();
private void openFile() {
Path path = file.split().getPath();
try {
fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
fsStream = file.fileSystem().openPossiblyCompressedStream(path);
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open open input file: %s", split.getPath().toString())
.message("Failed to open open input file: %s", path.toString())
.addContext(e.getMessage())
.build(logger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
Expand All @@ -41,18 +41,16 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
private static class HttpLogReaderFactory extends FileReaderFactory {

private final HttpdLogFormatConfig config;
private final int maxRecords;
private final EasySubScan scan;

private HttpLogReaderFactory(HttpdLogFormatConfig config, int maxRecords, EasySubScan scan) {
private HttpLogReaderFactory(HttpdLogFormatConfig config, EasySubScan scan) {
this.config = config;
this.maxRecords = maxRecords;
this.scan = scan;
}

@Override
public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
return new HttpdLogBatchReader(config, maxRecords, scan);
public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
return new HttpdLogBatchReader(config, scan, negotiator);
}
}

Expand All @@ -76,24 +74,15 @@ private static EasyFormatConfig easyConfig(Configuration fsConf, HttpdLogFormatC
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
.readerOperatorType(OPERATOR_TYPE)
.scanVersion(ScanFrameworkVersion.EVF_V1)
.scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}

@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
EasySubScan scan, OptionManager options) {
return new HttpdLogBatchReader(formatConfig, scan.getMaxRecords(), scan);
}

@Override
protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
builder.setReaderFactory(new HttpLogReaderFactory(formatConfig, scan.getMaxRecords(), scan));

initScanBuilder(builder, scan);
protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
builder.readerFactory(new HttpLogReaderFactory(formatConfig, scan));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ public TupleMetadata setupParser()

dummy.addParseTarget(String.class.getMethod("indexOf", String.class), allParserPaths);

/*
If the column is not requested explicitly, remove it from the requested path list.
*/
// If the column is not requested explicitly, remove it from the requested path list.
if (!isStarQuery() &&
!isMetadataQuery() &&
!isOnlyImplicitColumns()) {
Expand Down