diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 9458db26e8c..ba105920b80 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -31,9 +31,9 @@ import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.AbstractRecordReader; @@ -75,7 +75,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas private boolean rowKeyOnly; public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, - List projectedColumns, FragmentContext context) throws OutOfMemoryException { + List projectedColumns, FragmentContext context) { hbaseConf = conf; hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName(); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); @@ -169,15 +169,16 @@ public int next() { done: for (; rowCount < TARGET_RECORD_COUNT; rowCount++) { Result result = null; + final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats(); try { - if (operatorContext != null) { - operatorContext.getStats().startWait(); + if (operatorStats != null) { + operatorStats.startWait(); } try { result = resultScanner.next(); } finally { - if (operatorContext != null) { - operatorContext.getStats().stopWait(); + if (operatorStats != null) { + operatorStats.stopWait(); } } } catch (IOException e) { @@ -193,20 +194,20 @@ public int next() { rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(), cells[0].getRowLength()); } if (!rowKeyOnly) { - for (Cell cell : cells) { - int familyOffset = cell.getFamilyOffset(); - int familyLength = cell.getFamilyLength(); - byte[] familyArray = cell.getFamilyArray(); - MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true); + for (final Cell cell : cells) { + final int familyOffset = cell.getFamilyOffset(); + final int familyLength = cell.getFamilyLength(); + final byte[] familyArray = cell.getFamilyArray(); + final MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true); - int qualifierOffset = cell.getQualifierOffset(); - int qualifierLength = cell.getQualifierLength(); - byte[] qualifierArray = cell.getQualifierArray(); - NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength)); + final int qualifierOffset = cell.getQualifierOffset(); + final int qualifierLength = cell.getQualifierLength(); + final byte[] qualifierArray = cell.getQualifierArray(); + final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength)); - int valueOffset = cell.getValueOffset(); - int valueLength = cell.getValueLength(); - byte[] valueArray = cell.getValueArray(); + final int valueOffset = cell.getValueOffset(); + final int valueLength = cell.getValueLength(); + final byte[] valueArray = cell.getValueArray(); v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength); } } @@ -246,7 +247,7 @@ private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qua } @Override - public void cleanup() { + public void close() { try { if (resultScanner != null) { resultScanner.close(); @@ -267,5 +268,4 @@ private void setOutputRowCount(int count) { rowKeyVector.getMutator().setValueCount(count); } } - } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 9e87ec6e854..dc1bae322ad 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -133,7 +133,7 @@ private void init() throws ExecutionSetupException { String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat(); try { format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance(); - Class c = Class.forName(sLib); + Class c = Class.forName(sLib); serde = (SerDe) c.getConstructor().newInstance(); serde.initialize(job, properties); } catch (ReflectiveOperationException | SerDeException e) { @@ -286,7 +286,6 @@ public int next() { } private boolean readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) { - boolean success; for (int i = 0; i < selectedColumnNames.size(); i++) { String columnName = selectedColumnNames.get(i); Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)); @@ -311,7 +310,7 @@ private void setValueCountAndPopulatePartitionVectors(int recordCount) { } @Override - public void cleanup() { + public void close() { try { if (reader != null) { reader.close(); diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java index 69c45c2f474..463ae672cad 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -229,7 +229,7 @@ public int next() { } @Override - public void cleanup() { + public void close() { AutoCloseables.close(resultSet, logger); AutoCloseables.close(statement, logger); AutoCloseables.close(connection, logger); diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index c8b06990728..dd5fcdf3fb6 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -52,7 +52,7 @@ import com.mongodb.client.MongoDatabase; public class MongoRecordReader extends AbstractRecordReader { - static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class); + private static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class); private MongoCollection collection; private MongoCursor cursor; @@ -187,7 +187,7 @@ public int next() { } @Override - public void cleanup() { + public void close() { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 873ae76b536..1ac4f7be696 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -92,47 +92,49 @@ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Operat if (!readers.hasNext()) { throw new ExecutionSetupException("A scan batch must contain at least one reader."); } - this.currentReader = readers.next(); + currentReader = readers.next(); this.oContext = oContext; boolean setup = false; try { oContext.getStats().startProcessing(); - this.currentReader.setup(oContext, mutator); + currentReader.setup(oContext, mutator); setup = true; } finally { // if we had an exception during setup, make sure to release existing data. if (!setup) { - currentReader.cleanup(); + try { + currentReader.close(); + } catch(final Exception e) { + throw new ExecutionSetupException(e); + } } oContext.getStats().stopProcessing(); } this.partitionColumns = partitionColumns.iterator(); - this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null; + partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null; this.selectedPartitionColumns = selectedPartitionColumns; // TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize // options; so labelValue = null. final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); - this.partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val; + partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val; addPartitionVectors(); } - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator readers) throws ExecutionSetupException { + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator readers) + throws ExecutionSetupException { this(subScanConfig, context, context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */), readers, Collections. emptyList(), Collections. emptyList()); } + @Override public FragmentContext getContext() { return context; } - public OperatorContext getOperatorContext() { - return oContext; - } - @Override public BatchSchema getSchema() { return schema; @@ -156,6 +158,12 @@ private void releaseAssets() { container.zeroVectors(); } + private void clearFieldVectorMap() { + for (final ValueVector v : fieldVectorMap.values()) { + v.clear(); + } + } + @Override public IterOutcome next() { if (done) { @@ -169,15 +177,13 @@ public IterOutcome next() { currentReader.allocate(fieldVectorMap); } catch (OutOfMemoryException | OutOfMemoryRuntimeException e) { logger.debug("Caught Out of Memory Exception", e); - for (ValueVector v : fieldVectorMap.values()) { - v.clear(); - } + clearFieldVectorMap(); return IterOutcome.OUT_OF_MEMORY; } while ((recordCount = currentReader.next()) == 0) { try { if (!readers.hasNext()) { - currentReader.cleanup(); + currentReader.close(); releaseAssets(); done = true; if (mutator.isNewSchema()) { @@ -196,7 +202,7 @@ public IterOutcome next() { fieldVectorMap.clear(); } - currentReader.cleanup(); + currentReader.close(); currentReader = readers.next(); partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; currentReader.setup(oContext, mutator); @@ -204,9 +210,7 @@ public IterOutcome next() { currentReader.allocate(fieldVectorMap); } catch (OutOfMemoryException e) { logger.debug("Caught OutOfMemoryException"); - for (ValueVector v : fieldVectorMap.values()) { - v.clear(); - } + clearFieldVectorMap(); return IterOutcome.OUT_OF_MEMORY; } addPartitionVectors(); @@ -249,7 +253,7 @@ public IterOutcome next() { } } - private void addPartitionVectors() throws ExecutionSetupException{ + private void addPartitionVectors() throws ExecutionSetupException { try { if (partitionVectors != null) { for (ValueVector v : partitionVectors) { @@ -258,8 +262,10 @@ private void addPartitionVectors() throws ExecutionSetupException{ } partitionVectors = Lists.newArrayList(); for (int i : selectedPartitionColumns) { - MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR)); - ValueVector v = mutator.addField(field, NullableVarCharVector.class); + final MaterializedField field = + MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), + Types.optional(MinorType.VARCHAR)); + final ValueVector v = mutator.addField(field, NullableVarCharVector.class); partitionVectors.add(v); } } catch(SchemaChangeException e) { @@ -269,12 +275,12 @@ private void addPartitionVectors() throws ExecutionSetupException{ private void populatePartitionVectors() { for (int index = 0; index < selectedPartitionColumns.size(); index++) { - int i = selectedPartitionColumns.get(index); - NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index); + final int i = selectedPartitionColumns.get(index); + final NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index); if (partitionValues.length > i) { - String val = partitionValues[i]; + final String val = partitionValues[i]; AllocationHelper.allocate(v, recordCount, val.length()); - byte[] bytes = val.getBytes(); + final byte[] bytes = val.getBytes(); for (int j = 0; j < recordCount; j++) { v.getMutator().setSafe(j, bytes, 0, bytes.length); } @@ -306,27 +312,24 @@ public VectorWrapper getValueAccessorById(Class clazz, int... ids) { return container.getValueAccessorById(clazz, ids); } - - private class Mutator implements OutputMutator { + private boolean schemaChange = true; - boolean schemaChange = true; - - @SuppressWarnings("unchecked") @Override public T addField(MaterializedField field, Class clazz) throws SchemaChangeException { // Check if the field exists ValueVector v = fieldVectorMap.get(field.key()); - if (v == null || v.getClass() != clazz) { // Field does not exist add it to the map and the output container v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack); if (!clazz.isAssignableFrom(v.getClass())) { - throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); + throw new SchemaChangeException(String.format( + "The class that was provided %s does not correspond to the expected vector type of %s.", + clazz.getSimpleName(), v.getClass().getSimpleName())); } - ValueVector old = fieldVectorMap.put(field.key(), v); - if(old != null){ + final ValueVector old = fieldVectorMap.put(field.key(), v); + if (old != null) { old.clear(); container.remove(old); } @@ -336,12 +339,12 @@ public T addField(MaterializedField field, Class claz schemaChange = true; } - return (T) v; + return clazz.cast(v); } @Override public void allocate(int recordCount) { - for (ValueVector v : fieldVectorMap.values()) { + for (final ValueVector v : fieldVectorMap.values()) { AllocationHelper.allocate(v, recordCount, 50, 10); } } @@ -378,18 +381,17 @@ public WritableBatch getWritableBatch() { } @Override - public void close() { + public void close() throws Exception { container.clear(); - for (ValueVector v : partitionVectors) { + for (final ValueVector v : partitionVectors) { v.clear(); } fieldVectorMap.clear(); - currentReader.cleanup(); + currentReader.close(); } @Override public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index 61ccac5f344..c2ab0d04e6b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -26,30 +26,27 @@ import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.vector.ValueVector; -public interface RecordReader { - +public interface RecordReader extends AutoCloseable { public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; /** * Configure the RecordReader with the provided schema and the record batch that should be written to. * + * @param context operator context for the reader * @param output * The place where output for a particular scan should be written. The record reader is responsible for * mutating the set of schema values for that particular record. * @throws ExecutionSetupException */ - public abstract void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException; + void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException; - public abstract void allocate(Map vectorMap) throws OutOfMemoryException; + void allocate(Map vectorMap) throws OutOfMemoryException; /** * Increment record reader forward, writing into the provided output batch. * * @return The number of additional records added to the output. */ - public abstract int next(); - - public abstract void cleanup(); - -} \ No newline at end of file + int next(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index a09cd536055..210ed9d37ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -351,7 +351,7 @@ private void ensure(final int length) { } @Override - public void cleanup() { + public void close() { if (reader != null) { try { reader.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 8e78cf124fa..4d51199eb9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -57,7 +57,6 @@ public class JSONRecordReader extends AbstractRecordReader { private int recordCount; private long runningRecordCount = 0; private final FragmentContext fragmentContext; - private OperatorContext operatorContext; private final boolean enableAllTextMode; private final boolean readNumbersAsDouble; @@ -82,13 +81,14 @@ public JSONRecordReader(final FragmentContext fragmentContext, final String inpu * @param columns * @throws OutOfMemoryException */ - public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, final DrillFileSystem fileSystem, - final List columns) throws OutOfMemoryException { + public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, + final DrillFileSystem fileSystem, final List columns) throws OutOfMemoryException { this(fragmentContext, null, embeddedContent, fileSystem, columns); } - private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final JsonNode embeddedContent, final DrillFileSystem fileSystem, - final List columns) throws OutOfMemoryException { + private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, + final JsonNode embeddedContent, final DrillFileSystem fileSystem, + final List columns) { Preconditions.checkArgument( (inputPath == null && embeddedContent != null) || @@ -96,9 +96,9 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp "One of inputPath or embeddedContent must be set but not both." ); - if(inputPath != null){ + if(inputPath != null) { this.hadoopPath = new Path(inputPath); - }else{ + } else { this.embeddedContent = embeddedContent; } @@ -113,7 +113,6 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp @Override public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { - this.operatorContext = context; try{ if (hadoopPath != null) { this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath); @@ -131,7 +130,7 @@ public void setup(final OperatorContext context, final OutputMutator output) thr } } - private void setupParser() throws IOException{ + private void setupParser() throws IOException { if(hadoopPath != null){ jsonReader.setSource(stream); }else{ @@ -177,11 +176,11 @@ public int next() { ReadState write = null; // Stopwatch p = new Stopwatch().start(); try{ - outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION){ + outside: while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) { writer.setPosition(recordCount); write = jsonReader.write(writer); - if(write == ReadState.WRITE_SUCCEED){ + if(write == ReadState.WRITE_SUCCEED) { // logger.debug("Wrote record."); recordCount++; }else{ @@ -198,7 +197,6 @@ public int next() { // System.out.println(String.format("Wrote %d records in %dms.", recordCount, p.elapsed(TimeUnit.MILLISECONDS))); updateRunningCount(); - return recordCount; } catch (final Exception e) { @@ -213,14 +211,9 @@ private void updateRunningCount() { } @Override - public void cleanup() { - try { - if(stream != null){ - stream.close(); - } - } catch (final IOException e) { - logger.warn("Failure while closing stream.", e); + public void close() throws Exception { + if(stream != null) { + stream.close(); } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java index ae11ba7abf3..ad65a9403ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java @@ -144,7 +144,7 @@ public int next() { * This would internally close the input stream we are reading from. */ @Override - public void cleanup() { + public void close() { try { if (reader != null) { reader.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index fd97c484c2d..c742690147d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -24,7 +24,6 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; @@ -38,23 +37,19 @@ import org.apache.drill.exec.vector.ValueVector; public class MockRecordReader extends AbstractRecordReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class); - private OutputMutator output; - private MockScanEntry config; - private FragmentContext context; - private BufferAllocator alcator; + private final MockScanEntry config; + private final FragmentContext context; private ValueVector[] valueVectors; private int recordsRead; private int batchRecordCount; - private FragmentContext fragmentContext; private OperatorContext operatorContext; - public MockRecordReader(FragmentContext context, MockScanEntry config) throws OutOfMemoryException { + public MockRecordReader(FragmentContext context, MockScanEntry config) { this.context = context; this.config = config; - this.fragmentContext=context; } private int getEstimatedRecordSize(MockColumn[] types) { @@ -67,38 +62,26 @@ private int getEstimatedRecordSize(MockColumn[] types) { private MaterializedField getVector(String name, MajorType type, int length) { assert context != null : "Context shouldn't be null."; - MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type); - + final MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type); return f; - - } - - public OperatorContext getOperatorContext() { - return operatorContext; - } - - public void setOperatorContext(OperatorContext operatorContext) { - this.operatorContext = operatorContext; } @Override public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { try { - this.output = output; - int estimateRowSize = getEstimatedRecordSize(config.getTypes()); + final int estimateRowSize = getEstimatedRecordSize(config.getTypes()); valueVectors = new ValueVector[config.getTypes().length]; batchRecordCount = 250000 / estimateRowSize; for (int i = 0; i < config.getTypes().length; i++) { - MajorType type = config.getTypes()[i].getMajorType(); - MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount); - Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode()); + final MajorType type = config.getTypes()[i].getMajorType(); + final MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount); + final Class vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode()); valueVectors[i] = output.addField(field, vvClass); } } catch (SchemaChangeException e) { throw new ExecutionSetupException("Failure while setting up fields", e); } - } @Override @@ -107,23 +90,20 @@ public int next() { return 0; } - int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead); - + final int recordSetSize = Math.min(batchRecordCount, this.config.getRecords() - recordsRead); recordsRead += recordSetSize; - for (ValueVector v : valueVectors) { - -// logger.debug(String.format("MockRecordReader: Generating %d records of random data for VV of type %s.", recordSetSize, v.getClass().getName())); - ValueVector.Mutator m = v.getMutator(); + for (final ValueVector v : valueVectors) { + final ValueVector.Mutator m = v.getMutator(); m.generateTestData(recordSetSize); - } + return recordSetSize; } @Override public void allocate(Map vectorMap) throws OutOfMemoryException { try { - for (ValueVector v : vectorMap.values()) { + for (final ValueVector v : vectorMap.values()) { AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); } } catch (NullPointerException e) { @@ -132,7 +112,6 @@ public void allocate(Map vectorMap) throws OutOfMemoryExceptio } @Override - public void cleanup() { + public void close() { } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index da6fbfbf57b..a4f5cac017b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -67,9 +67,7 @@ public class ParquetRecordReader extends AbstractRecordReader { private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024; // TODO - should probably find a smarter way to set this, currently 1 megabyte - private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1; public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1; - private static final String SEPERATOR = System.getProperty("file.separator"); // used for clearing the last n bits of a byte public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128}; @@ -79,16 +77,16 @@ public class ParquetRecordReader extends AbstractRecordReader { private int bitWidthAllFixedFields; private boolean allFieldsFixedLength; private int recordsPerBatch; + private OperatorContext operatorContext; // private long totalRecords; // private long rowGroupOffset; - private List columnStatuses; + private List> columnStatuses; private FileSystem fileSystem; private long batchSize; Path hadoopPath; private VarLenBinaryReader varLengthReader; private ParquetMetadata footer; - private OperatorContext operatorContext; // This is a parallel list to the columns list above, it is used to determine the subset of the project // pushdown columns that do not appear in this file private boolean[] columnsFound; @@ -160,14 +158,6 @@ public long getBatchSize() { return batchSize; } - public OperatorContext getOperatorContext() { - return operatorContext; - } - - public void setOperatorContext(OperatorContext operatorContext) { - this.operatorContext = operatorContext; - } - /** * @param type a fixed length type from the parquet library enum * @return the length in pageDataByteArray of the type @@ -205,9 +195,13 @@ private boolean fieldSelected(MaterializedField field) { return false; } + public OperatorContext getOperatorContext() { + return operatorContext; + } + @Override - public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { - this.operatorContext = context; + public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException { + this.operatorContext = operatorContext; if (!isStarQuery()) { columnsFound = new boolean[getColumns().size()]; nullFilledVectors = new ArrayList<>(); @@ -276,7 +270,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio try { ValueVector vector; SchemaElement schemaElement; - ArrayList varLengthColumns = new ArrayList<>(); + final ArrayList varLengthColumns = new ArrayList<>(); // initialize all of the column read status objects boolean fieldFixedLength; for (int i = 0; i < columns.size(); ++i) { @@ -342,7 +336,7 @@ protected void handleAndRaise(String s, Exception e) { @Override public void allocate(Map vectorMap) throws OutOfMemoryException { try { - for (ValueVector v : vectorMap.values()) { + for (final ValueVector v : vectorMap.values()) { AllocationHelper.allocate(v, recordsPerBatch, 50, 10); } } catch (NullPointerException e) { @@ -366,17 +360,17 @@ private TypeProtos.DataMode getDataMode(ColumnDescriptor column) { } private void resetBatch() { - for (ColumnReader column : columnStatuses) { + for (final ColumnReader column : columnStatuses) { column.valuesReadInCurrentPass = 0; } - for (VarLengthColumn r : varLengthReader.columns) { + for (final VarLengthColumn r : varLengthReader.columns) { r.valuesReadInCurrentPass = 0; } } public void readAllFixedFields(long recordsToRead) throws IOException { - for (ColumnReader crs : columnStatuses) { + for (ColumnReader crs : columnStatuses) { crs.processPages(recordsToRead); } } @@ -386,7 +380,7 @@ public int next() { resetBatch(); long recordsToRead = 0; try { - ColumnReader firstColumnStatus; + ColumnReader firstColumnStatus; if (columnStatuses.size() > 0) { firstColumnStatus = columnStatuses.iterator().next(); } @@ -404,7 +398,7 @@ public int next() { return 0; } recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead); - for (ValueVector vv : nullFilledVectors ) { + for (final ValueVector vv : nullFilledVectors ) { vv.getMutator().setValueCount( (int) recordsToRead); } mockRecordsRead += recordsToRead; @@ -429,7 +423,7 @@ public int next() { // if we have requested columns that were not found in the file fill their vectors with null // (by simply setting the value counts inside of them, as they start null filled) if (nullFilledVectors != null) { - for (ValueVector vv : nullFilledVectors ) { + for (final ValueVector vv : nullFilledVectors ) { vv.getMutator().setValueCount(firstColumnStatus.getRecordsReadInCurrentPass()); } } @@ -451,13 +445,13 @@ public int next() { } @Override - public void cleanup() { + public void close() { logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); // enable this for debugging when it is know that a whole file will be read // limit kills upstream operators once it has enough records, so this assert will fail // assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount(); if (columnStatuses != null) { - for (ColumnReader column : columnStatuses) { + for (final ColumnReader column : columnStatuses) { column.clear(); } columnStatuses.clear(); @@ -467,7 +461,7 @@ public void cleanup() { codecFactory.close(); if (varLengthReader != null) { - for (VarLengthColumn r : varLengthReader.columns) { + for (final VarLengthColumn r : varLengthReader.columns) { r.clear(); } varLengthReader.columns.clear(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 4c49def4e37..01a98537f53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -68,7 +68,6 @@ import com.google.common.collect.Sets; public class DrillParquetReader extends AbstractRecordReader { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class); // same as the DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH in ParquetRecordReader @@ -155,7 +154,7 @@ public static MessageType getProjection(MessageType schema, } // loop through projection columns and add any columns that are missing from parquet schema to columnsNotFound list - outer: for (SchemaPath columnPath : modifiedColumns) { + for (SchemaPath columnPath : modifiedColumns) { boolean notFound = true; for (SchemaPath schemaPath : schemaPaths) { if (schemaPath.contains(columnPath)) { @@ -191,7 +190,7 @@ public static MessageType getProjection(MessageType schema, @Override public void allocate(Map vectorMap) throws OutOfMemoryException { try { - for (ValueVector v : vectorMap.values()) { + for (final ValueVector v : vectorMap.values()) { AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); } } catch (NullPointerException e) { @@ -216,7 +215,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio projection = schema; } if(columnsNotFound!=null && columnsNotFound.size()>0) { - nullFilledVectors = new ArrayList(); + nullFilledVectors = new ArrayList<>(); for(SchemaPath col: columnsNotFound){ nullFilledVectors.add( (NullableIntVector)output.addField(MaterializedField.create(col, @@ -234,7 +233,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio ColumnIOFactory factory = new ColumnIOFactory(false); MessageColumnIO columnIO = factory.getColumnIO(projection, schema); - Map paths = new HashMap(); + Map paths = new HashMap<>(); for (ColumnChunkMetaData md : footer.getBlocks().get(entry.getRowGroupIndex()).getColumns()) { paths.put(md.getPath(), md); @@ -273,7 +272,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio } protected void handleAndRaise(String s, Exception e) { - cleanup(); + close(); String message = "Error in drill parquet reader (complex).\nMessage: " + s + "\nParquet Metadata: " + footer; throw new DrillRuntimeException(message, e); @@ -319,7 +318,7 @@ public int next() { // if we have requested columns that were not found in the file fill their vectors with null // (by simply setting the value counts inside of them, as they start null filled) if (nullFilledVectors != null) { - for (ValueVector vv : nullFilledVectors ) { + for (final ValueVector vv : nullFilledVectors) { vv.getMutator().setValueCount(count); } } @@ -328,7 +327,7 @@ public int next() { private int getPercentFilled() { int filled = 0; - for (ValueVector v : primitiveVectors) { + for (final ValueVector v : primitiveVectors) { filled = Math.max(filled, v.getAccessor().getValueCount() * 100 / v.getValueCapacity()); if (v instanceof VariableWidthVector) { filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity()); @@ -343,7 +342,7 @@ private int getPercentFilled() { } @Override - public void cleanup() { + public void close() { try { if (pageReadStore != null) { pageReadStore.close(); @@ -354,20 +353,13 @@ public void cleanup() { } } - public void setOperatorContext(OperatorContext operatorContext) { - this.operatorContext = operatorContext; - } + static public class ProjectedColumnType { + public final String projectedColumnName; + public final MessageType type; - static public class ProjectedColumnType{ - ProjectedColumnType(String projectedColumnName, MessageType type){ - this.projectedColumnName=projectedColumnName; - this.type=type; + ProjectedColumnType(String projectedColumnName, MessageType type) { + this.projectedColumnName = projectedColumnName; + this.type = type; } - - public String projectedColumnName; - public MessageType type; - - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java index 543121fbe32..32fa31d2426 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java @@ -123,7 +123,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio @Override public void allocate(Map vectorMap) throws OutOfMemoryException { - for (ValueVector v : vectorMap.values()) { + for (final ValueVector v : vectorMap.values()) { AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); } } @@ -146,7 +146,6 @@ public int next() { injector.injectPause(operatorContext.getExecutionControls(), "read-next", logger); try { int i =0; - outside: while (doCurrent || iterator.hasNext()) { if (doCurrent) { doCurrent = false; @@ -175,7 +174,6 @@ public int next() { } @Override - public void cleanup() { + public void close() { } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index c59ade9baf3..bc675af1fbc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -52,17 +52,14 @@ import com.google.common.collect.Lists; public class DrillTextRecordReader extends AbstractRecordReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class); - static final String COL_NAME = "columns"; + private static final String COL_NAME = "columns"; private org.apache.hadoop.mapred.RecordReader reader; - private List vectors = Lists.newArrayList(); + private final List vectors = Lists.newArrayList(); private byte delimiter; - private int targetRecordCount; private FieldReference ref = new FieldReference(COL_NAME); - private FragmentContext fragmentContext; - private OperatorContext operatorContext; private RepeatedVarCharVector vector; private List columnIds = Lists.newArrayList(); private LongWritable key; @@ -71,9 +68,8 @@ public class DrillTextRecordReader extends AbstractRecordReader { private FileSplit split; private long totalRecordsRead; - public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, char delimiter, - List columns) { - this.fragmentContext = context; + public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentContext context, + char delimiter, List columns) { this.delimiter = (byte) delimiter; this.split = split; setColumns(columns); @@ -95,7 +91,6 @@ public DrillTextRecordReader(FileSplit split, Configuration fsConf, FragmentCont Collections.sort(columnIds); numCols = columnIds.size(); } - targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE); TextInputFormat inputFormat = new TextInputFormat(); JobConf job = new JobConf(fsConf); @@ -122,14 +117,6 @@ public boolean apply(@Nullable SchemaPath path) { }).isPresent(); } - public OperatorContext getOperatorContext() { - return operatorContext; - } - - public void setOperatorContext(OperatorContext operatorContext) { - this.operatorContext = operatorContext; - } - @Override public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR)); @@ -155,6 +142,7 @@ public int next() { int batchSize = 0; try { int recordCount = 0; + final RepeatedVarCharVector.Mutator mutator = vector.getMutator(); while (recordCount < Character.MAX_VALUE && batchSize < 200*1000 && reader.next(key, value)) { int start; int end = -1; @@ -162,7 +150,7 @@ public int next() { // index of the scanned field int p = 0; int i = 0; - vector.getMutator().startNewValue(recordCount); + mutator.startNewValue(recordCount); // Process each field in this line while (end < value.getLength() - 1) { if(numCols > 0 && p >= numCols) { @@ -178,24 +166,24 @@ public int next() { } } if (numCols > 0 && i++ < columnIds.get(p)) { - vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, 0); + mutator.addSafe(recordCount, value.getBytes(), start + 1, 0); continue; } p++; - vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1); + mutator.addSafe(recordCount, value.getBytes(), start + 1, end - start - 1); batchSize += end - start; } recordCount++; totalRecordsRead++; } - for (ValueVector v : vectors) { + for (final ValueVector v : vectors) { v.getMutator().setValueCount(recordCount); } - vector.getMutator().setValueCount(recordCount); -// logger.debug("text scan batch size {}", batchSize); + mutator.setValueCount(recordCount); + // logger.debug("text scan batch size {}", batchSize); return recordCount; } catch(Exception e) { - cleanup(); + close(); handleAndRaise("Failure while parsing text. Parser was at record: " + (totalRecordsRead + 1), e); } @@ -229,7 +217,7 @@ public int find(Text text, byte delimiter, int start) { } @Override - public void cleanup() { + public void close() { try { if (reader != null) { reader.close(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java index 8639e1cc9f8..efe877d1738 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java @@ -27,11 +27,11 @@ import static org.junit.Assert.assertTrue; -public class TestJsonRecordReader extends BaseTestQuery{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class); +public class TestJsonRecordReader extends BaseTestQuery { + //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class); @Test - public void testComplexJsonInput() throws Exception{ + public void testComplexJsonInput() throws Exception { // test("select z[0]['orange'] from cp.`jsoninput/input2.json` limit 10"); test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], z[1]['pink'] from cp.`jsoninput/input2.json` limit 10 "); // test("select x from cp.`jsoninput/input2.json`"); @@ -45,14 +45,14 @@ public void testContainingArray() throws Exception { } @Test - public void testComplexMultipleTimes() throws Exception{ - for(int i =0 ; i < 5; i++){ + public void testComplexMultipleTimes() throws Exception { + for(int i =0 ; i < 5; i++) { test("select * from cp.`join/merge_join.json`"); } } @Test - public void trySimpleQueryWithLimit() throws Exception{ + public void trySimpleQueryWithLimit() throws Exception { test("select * from cp.`limit/test1.json` limit 10"); } @@ -90,9 +90,7 @@ public void testExceptionHandling() throws Exception { @Test //DRILL-1832 public void testJsonWithNulls1() throws Exception { - final String query="select * from cp.`jsoninput/twitter_43.json`"; - testBuilder() .sqlQuery(query) .unOrdered() @@ -102,9 +100,7 @@ public void testJsonWithNulls1() throws Exception { @Test //DRILL-1832 public void testJsonWithNulls2() throws Exception { - final String query="select SUM(1) as `sum_Number_of_Records_ok` from cp.`/jsoninput/twitter_43.json` having (COUNT(1) > 0)"; - testBuilder() .sqlQuery(query) .unOrdered() diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 8ded70317e1..14cfd8eef45 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -386,7 +386,6 @@ private void validateFooters(final List