From e2a234776817933467400116ef2987db965ce479 Mon Sep 17 00:00:00 2001 From: Paul Rogers Date: Thu, 15 Jun 2017 22:46:56 -0700 Subject: [PATCH] DRILL-5590: Bugs in CSV field matching, null columns Please see the problem and solution descriptions in DRILL-5590. Also cleaned up some dead code left over from DRILL-5498. --- .../compliant/CompliantTextRecordReader.java | 73 +------------------ .../text/compliant/FieldVarCharOutput.java | 73 +++++++++++-------- .../text/compliant/RepeatedVarCharOutput.java | 5 +- .../store/easy/text/compliant/TestCsv.java | 22 ++++++ 4 files changed, 68 insertions(+), 105 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java index 4a35c3bebb8..700958486e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java @@ -17,15 +17,9 @@ */ package org.apache.drill.exec.store.easy.text.compliant; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Maps; -import com.univocity.parsers.common.TextParsingException; -import io.netty.buffer.DrillBuf; - import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.Map; import javax.annotation.Nullable; @@ -36,16 +30,16 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.util.CallBack; -import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.mapred.FileSplit; import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import org.apache.drill.exec.expr.TypeHelper; +import com.univocity.parsers.common.TextParsingException; + +import io.netty.buffer.DrillBuf; // New text reader, complies with the RFC 4180 standard for text/csv files public class CompliantTextRecordReader extends AbstractRecordReader { @@ -255,63 +249,4 @@ public void close() { logger.warn("Exception while closing stream.", e); } } - - /** - * TextRecordReader during its first phase read to extract header should pass its own - * OutputMutator to avoid reshaping query output. - * This class provides OutputMutator for header extraction. - */ - private class HeaderOutputMutator implements OutputMutator { - private final Map fieldVectorMap = Maps.newHashMap(); - - @SuppressWarnings("resource") - @Override - public T addField(MaterializedField field, Class clazz) throws SchemaChangeException { - ValueVector v = fieldVectorMap.get(field); - if (v == null || v.getClass() != clazz) { - // Field does not exist add it to the map - v = TypeHelper.getNewVector(field, oContext.getAllocator()); - if (!clazz.isAssignableFrom(v.getClass())) { - throw new SchemaChangeException(String.format( - "Class %s was provided, expected %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); - } - fieldVectorMap.put(field.getPath(), v); - } - return clazz.cast(v); - } - - @Override - public void allocate(int recordCount) { - //do nothing for now - } - - @Override - public boolean isNewSchema() { - return false; - } - - @Override - public DrillBuf getManagedBuffer() { - return null; - } - - @Override - public CallBack getCallBack() { - return null; - } - - /** - * Since this OutputMutator is passed by TextRecordReader to get the header out - * the mutator might not get cleaned up elsewhere. TextRecordReader will call - * this method to clear any allocations - */ - public void close() { - for (final ValueVector v : fieldVectorMap.values()) { - v.clear(); - } - fieldVectorMap.clear(); - } - - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java index 494c593caa9..b8343d1ebc3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/FieldVarCharOutput.java @@ -17,8 +17,16 @@ */ package org.apache.drill.exec.store.easy.text.compliant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; @@ -26,12 +34,6 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.VarCharVector; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Arrays; -import java.util.List; - /** * Class is responsible for generating record batches for text file inputs. We generate * a record batch with a set of varchar vectors. A varchar vector contains all the field @@ -61,8 +63,9 @@ class FieldVarCharOutput extends TextOutput { private boolean rowHasData= false; private static final int MAX_FIELD_LENGTH = 1024 * 64; private int recordCount = 0; - private int batchIndex = 0; private int maxField = 0; + private int[] nullCols; + private byte nullValue[] = new byte[0]; /** * We initialize and add the varchar vector for each incoming field in this @@ -77,6 +80,7 @@ public FieldVarCharOutput(OutputMutator outputMutator, String [] fieldNames, Col int totalFields = fieldNames.length; List outputColumns = new ArrayList<>(Arrays.asList(fieldNames)); + List nullColumns = new ArrayList<>(); if (isStarQuery) { maxField = totalFields - 1; @@ -84,11 +88,14 @@ public FieldVarCharOutput(OutputMutator outputMutator, String [] fieldNames, Col Arrays.fill(selectedFields, true); } else { List columnIds = new ArrayList(); - String pathStr; - int index; + Map headers = CaseInsensitiveMap.newHashMap(); + for (int i = 0; i < fieldNames.length; i++) { + headers.put(fieldNames[i], i); + } for (SchemaPath path : columns) { - pathStr = path.getRootSegment().getPath(); + int index; + String pathStr = path.getRootSegment().getPath(); if (pathStr.equals(COL_NAME) && path.getRootSegment().getChild() != null) { //TODO: support both field names and columns index along with predicate pushdown throw UserException @@ -98,12 +105,15 @@ public FieldVarCharOutput(OutputMutator outputMutator, String [] fieldNames, Col .addContext("column index", path.getRootSegment().getChild()) .build(logger); } else { - index = outputColumns.indexOf(pathStr); - if (index < 0) { + Integer value = headers.get(pathStr); + if (value == null) { // found col that is not a part of fieldNames, add it // this col might be part of some another scanner index = totalFields++; outputColumns.add(pathStr); + nullColumns.add(index); + } else { + index = value; } } columnIds.add(index); @@ -128,6 +138,12 @@ public FieldVarCharOutput(OutputMutator outputMutator, String [] fieldNames, Col this.fieldBytes = new byte[MAX_FIELD_LENGTH]; + // Keep track of the null columns to be filled in. + + nullCols = new int[nullColumns.size()]; + for (int i = 0; i < nullCols.length; i++) { + nullCols[i] = nullColumns.get(i); + } } /** @@ -135,11 +151,10 @@ public FieldVarCharOutput(OutputMutator outputMutator, String [] fieldNames, Col */ @Override public void startBatch() { - this.recordCount = 0; - this.batchIndex = 0; - this.currentFieldIndex= -1; - this.collect = true; - this.fieldOpen = false; + recordCount = 0; + currentFieldIndex= -1; + collect = true; + fieldOpen = false; } @Override @@ -173,7 +188,7 @@ public void append(byte data) { public boolean endField() { fieldOpen = false; - if(collect) { + if (collect) { assert currentVector != null; currentVector.getMutator().setSafe(recordCount, fieldBytes, 0, currentDataPointer); } @@ -192,25 +207,20 @@ public boolean endEmptyField() { @Override public void finishRecord() { - if(fieldOpen){ + if (fieldOpen){ endField(); } + // Fill in null (really empty) values. + + for (int i = 0; i < nullCols.length; i++) { + vectors[nullCols[i]].getMutator().setSafe(recordCount, nullValue, 0, 0); + } recordCount++; } - // Sets the record count in this batch within the value vector @Override - public void finishBatch() { - batchIndex++; - - for (int i = 0; i <= maxField; i++) { - if (this.vectors[i] != null) { - this.vectors[i].getMutator().setValueCount(batchIndex); - } - } - - } + public void finishBatch() { } @Override public long getRecordCount() { @@ -221,5 +231,4 @@ public long getRecordCount() { public boolean rowHasData() { return this.rowHasData; } - - } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java index eda2feb611f..156d6c2c39c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java @@ -345,9 +345,6 @@ public void finishRecord() { return out; } - // Sets the record count in this batch within the value vector @Override - public void finishBatch() { - mutator.setValueCount(batchIndex); - } + public void finishBatch() { } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java index 7d38cf96506..c18adc914a6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsv.java @@ -135,6 +135,28 @@ public void testInvalidCsvHeaders() throws IOException { .verifyAndClear(actual); } + // Test fix for DRILL-5590 + @Test + public void testCsvHeadersCaseInsensitive() throws IOException { + String fileName = "case2.csv"; + buildFile(fileName, validHeaders); + String sql = "SELECT A, b, C FROM `dfs.data`.`" + fileName + "`"; + RowSet actual = client.queryBuilder().sql(sql).rowSet(); + + BatchSchema expectedSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("b", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .build(); + assertEquals(expectedSchema, actual.batchSchema()); + + RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) + .add("10", "foo", "bar") + .build(); + new RowSetComparison(expected) + .verifyAndClear(actual); + } + private String makeStatement(String fileName) { return "SELECT * FROM `dfs.data`.`" + fileName + "`"; }