From 64e1ab20ab9b3725d57ddf9386d4c721bb4f6b37 Mon Sep 17 00:00:00 2001 From: dbarclay Date: Sun, 27 Sep 2015 13:55:21 -0700 Subject: [PATCH 1/2] DRILL-3641: Doc. RecordBatch.IterOutcome (enumerators and possible sequences). [RecordBatch, AbstractRecordBatch] Documented RecordBatch.IterOutcome (RecordBatch.next() protocol) much more. Also moved AbstractRecordBatch.BatchState's documentation text from non-documentation comments to documentation comments. --- .../exec/record/AbstractRecordBatch.java | 18 +- .../apache/drill/exec/record/RecordBatch.java | 232 +++++++++++++++--- 2 files changed, 208 insertions(+), 42 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index d8f703ed485..8eeb194e39c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -65,12 +65,18 @@ protected AbstractRecordBatch(final T popConfig, final FragmentContext context, } protected static enum BatchState { - BUILD_SCHEMA, // Need to build schema and return - FIRST, // This is still the first data batch - NOT_FIRST, // The first data batch has already been returned - STOP, // The query most likely failed, we need to propagate STOP to the root - OUT_OF_MEMORY, // Out of Memory while building the Schema...Ouch! - DONE // All work is done, no more data to be sent + /** Need to build schema and return. */ + BUILD_SCHEMA, + /** This is still the first data batch. */ + FIRST, + /** The first data batch has already been returned. */ + NOT_FIRST, + /** The query most likely failed, we need to propagate STOP to the root. */ + STOP, + /** Out of Memory while building the Schema...Ouch! */ + OUT_OF_MEMORY, + /** All work is done, no more data to be sent. */ + DONE } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 6f10a1cbbf6..995393b46f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -23,29 +23,183 @@ import org.apache.drill.exec.record.selection.SelectionVector4; /** - * A record batch contains a set of field values for a particular range of records. In the case of a record batch - * composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not - * change unless the next() IterOutcome is a *_NEW_SCHEMA type. - * - * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids - * provided utilizing getValueVectorId(); + * A record batch contains a set of field values for a particular range of + * records. + *

+ * In the case of a record batch composed of ValueVectors, ideally a batch + * fits within L2 cache (~256kB per core). The set of value vectors does + * not change except during a call to {@link #next()} that returns + * {@link IterOutcome#OK_NEW_SCHEMA} value. + *

+ *

+ * A key thing to know is that the Iterator provided by a record batch must + * align with the rank positions of the field IDs provided using + * {@link getValueVectorId}. + *

*/ public interface RecordBatch extends VectorAccessible { - /* max batch size, limited by 2-byte-length in SV2 : 65536 = 2^16 */ + /* max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */ public static final int MAX_BATCH_SIZE = 65536; /** - * Describes the outcome of a RecordBatch being incremented forward. + * Describes the outcome of incrementing RecordBatch forward by a call to + * {@link #next()}. + *

+ * Key characteristics of the return value sequence: + *

+ * + *

+ * Details: + *

+ *

+ * For normal completion, the basic sequence of return values from calls to + * {@code next()} on a {@code RecordBatch} is: + *

+ *
    + *
  1. + * an {@link #OK_NEW_SCHEMA} value followed by zero or more {@link #OK} + * values, + *
  2. + *
  3. + * zero or more subsequences each having an {@code OK_NEW_SCHEMA} value + * followed by zero or more {@code OK} values, and then + *
  4. + *
  5. + * a {@link #NONE} value. + *
  6. + *
+ *

+ * In addition to that basic sequence, {@link #NOT_YET} and + * {@link #OUT_OF_MEMORY} values can appear anywhere in the subsequence + * before the terminal value ({@code NONE} or {@code STOP}). + *

+ *

+ * For abnormal termination, the sequence is truncated (before the + * {@code NONE}) and ends with {@link #STOP}. That is, the sequence begins + * with a subsequence that is some prefix of a normal-completion sequence + * and that does not contain {@code NONE}, and ends with {@code STOP}. + *

+ *

+ * (The normal-completion return sequence is matched by the following + * regular-expression-style grammar: + *

+   *     ( ( NOT_YET | OUT_OF_MEMORY )*  OK_NEW_SCHEMA
+   *       ( NOT_YET | OUT_OF_MEMORY )*  OK )*
+   *     )+
+   *     ( NOT_YET | OUT_OF_+MEMORY )*  NONE
+   *   
+ * ) + *

*/ public static enum IterOutcome { - NONE, // No more records were found. - OK, // A new range of records have been provided. - OK_NEW_SCHEMA, // A full collection of records - STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext - // to understand the current state of things. - NOT_YET, // used by batches that haven't received incoming data yet. - OUT_OF_MEMORY // an upstream operator was unable to allocate memory. A batch receiving this should release memory if it can + /** + * Normal completion of batch. + *

+ * The call to {@link #next()} + * read no records, + * the batch has and will have no more results to return, + * and {@code next()} must not be called again. + *

+ *

+ * This value will be returned only after {@link #OK_NEW_SCHEMA} has been + * returned at least once (not necessarily immediately after). + *

+ */ + NONE, + + /** + * Zero or more records with same schema. + *

+ * The call to {@link #next()} + * read zero or more records, + * the schema has not changed since the last time {@code OK_NEW_SCHEMA} + * was returned, + * and the batch will have more results to return (at least completion or + * abnormal termination ({@code NONE} or {@code STOP})). + * ({@code next()} should be called again.) + *

+ *

+ * This will be returned only after {@link #OK_NEW_SCHEMA} has been + * returned at least once (not necessarily immediately after). + *

+ */ + OK, + + /** + * New schema, maybe with records. + *

+ * The call to {@link #next()} + * changed the schema and vector structures + * and read zero or more records, + * and the batch will have more results to return (at least completion or + * abnormal termination ({@code NONE} or {@code STOP})). + * ({@code next()} should be called again.) + *

+ */ + OK_NEW_SCHEMA, + + /** + * Non-completion (abnormal) termination. + *

+ * The call to {@link #next()} + * reports that the query has terminated other than by normal completion, + * and that the caller must not call any of the schema-access or + * data-access methods nor call {@code next()} again. + *

+ *

+ * The caller can consume its QueryContext to understand the current state + * of things. + *

+ */ + STOP, + + /** + * No data yet. + *

+ * The call to {@link #next()} + * read no data, + * and the batch will have more results to return in the future (at least + * completion or abnormal termination ({@code NONE} or {@code STOP})). + * The caller should call {@code next()} again, but should do so later + * (including by returning {@code NOT_YET} to its caller). + *

+ *

+ * Normally, the caller should perform any locally available work while + * waiting for incoming data from the callee, for example, doing partial + * sorts on already received data while waiting for additional data to + * sort. + *

+ *

+ * Used by batches that haven't received incoming data yet. + *

+ */ + NOT_YET, + + /** + * Out of memory (not fatal). + *

+ * The call to {@link #next()}, + * including upstream operators, was unable to allocate memory + * and did not read any records, + * and the batch will have more results to return (at least completion or + * abnormal termination ({@code NONE} or {@code STOP})). + * The caller should release memory if it can (including by returning + * {@code OUT_OF_MEMORY} to its caller) and call {@code next()} again. + *

+ */ + OUT_OF_MEMORY } public static enum SetupOutcome { @@ -53,30 +207,25 @@ public static enum SetupOutcome { } /** - * Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query - * level information. - * - * @return + * Gets the FragmentContext of the current query fragment. Useful for + * reporting failure information or other query-level information. */ public FragmentContext getContext(); /** - * Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided. - * - * @return + * Gets the schema of the current RecordBatch. The schema changes if and only + * if {@link #next} returns {@link #OK_NEW_SCHEMA}. */ public BatchSchema getSchema(); /** - * Provide the number of records that are within this record count - * - * @return + * Gets the number of records that are within this record. */ public int getRecordCount(); /** - * Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine - * what has happened. + * Informs child nodes that this query should be terminated. Child nodes + * should use the QueryContext to determine what has happened. */ public void kill(boolean sendUpstream); @@ -88,9 +237,10 @@ public static enum SetupOutcome { public VectorContainer getOutgoingContainer(); /** - * Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the - * same as the ordinal position of the field within the Iterator provided this classes implementation of - * Iterable. + * Gets the value vector type and ID for the given schema path. + * The TypedFieldId should store a fieldId which is the same as the ordinal + * position of the field within the Iterator provided this class' + * implementation of Iterable. * * @param path * The path where the vector should be located. @@ -98,22 +248,32 @@ public static enum SetupOutcome { * TypedFieldId */ public abstract TypedFieldId getValueVectorId(SchemaPath path); + @Override public abstract VectorWrapper getValueAccessorById(Class clazz, int... ids); /** - * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an - * IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw - * an exception. + * Updates the data in each Field reading interface for the next range of + * records. + *

+ * Once a RecordBatch's {@code next()} has returned {@link IterOutcome#NONE} + * or {@link IterOutcome#STOP}, the consumer should no longer call + * {@code next()}. Behavior at this point is undefined and likely to + * throw an exception. + *

+ *

+ * See {@link IterOutcome} for the protocol (possible sequences of return + * values). + *

+ * * * @return An IterOutcome describing the result of the iteration. */ public IterOutcome next(); /** - * Get a writable version of this batch. Takes over owernship of existing buffers. - * - * @return + * Gets a writable version of this batch. Takes over ownership of existing + * buffers. */ public WritableBatch getWritableBatch(); From ca57fc05c9a5f1a1b111798a0e86c222f1da2f8c Mon Sep 17 00:00:00 2001 From: dbarclay Date: Sun, 27 Sep 2015 13:49:45 -0700 Subject: [PATCH 2/2] DRILL-2288: Fix: No OK_NEW_SCHEMA for 0-row source; missing downstream schema. Core fix: - Fixed ScanBatch.Mutator's isNewSchema() to stop reporting "new schema" spuriously at end of JSON file with complex types. - Fixed ScanBatch.next() to return OK_NEW_SCHEMA instead of NONE for certain case of zero-row source/reader with new schema. - Fixed UnionAllRecordBatch to handle correct IterOutcome sequence from now-corrected ScanBatch.next(). (Was DRILL-3659.) Also: - Added unit test (checking involved metadata at JDBC level). [Drill2288GetColumnsMetadataWhenNoRowsTest, empty.json] - Enhanced IteratorValidatorBatchIterator, mainly to validate IterOutcome value sequence from next(). - Documented SchemaChangeCallBack and renamed method for clarity. [SchemaChangeCallBack, ScanBatch, AbstractSingleRecordBatch] - Other [ScanBatch]: - Added/edited various code and doc. comments. - Renamed boolean schemaChange -> schemaChanged. - Various code cleanup (e.g., "final", SuppressWarnings, whitespace). --- .../drill/exec/physical/impl/ScanBatch.java | 73 ++++-- .../impl/union/UnionAllRecordBatch.java | 43 ++-- .../IteratorValidatorBatchIterator.java | 213 +++++++++++++++--- .../record/AbstractSingleRecordBatch.java | 2 +- .../exec/vector/SchemaChangeCallBack.java | 26 ++- ...l2288GetColumnsMetadataWhenNoRowsTest.java | 190 ++++++++++++++++ exec/jdbc/src/test/resources/empty.json | 0 7 files changed, 469 insertions(+), 78 deletions(-) create mode 100644 exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2288GetColumnsMetadataWhenNoRowsTest.java create mode 100644 exec/jdbc/src/test/resources/empty.json diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 1ac4f7be696..e32ea32b23e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -85,8 +85,12 @@ public class ScanBatch implements CloseableRecordBatch { private boolean done = false; private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private boolean hasReadNonEmptyFile = false; - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext, - Iterator readers, List partitionColumns, List selectedPartitionColumns) throws ExecutionSetupException { + + + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + OperatorContext oContext, Iterator readers, + List partitionColumns, + List selectedPartitionColumns) throws ExecutionSetupException { this.context = context; this.readers = readers; if (!readers.hasNext()) { @@ -123,7 +127,8 @@ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Operat addPartitionVectors(); } - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator readers) + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + Iterator readers) throws ExecutionSetupException { this(subScanConfig, context, context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */), @@ -183,18 +188,28 @@ public IterOutcome next() { while ((recordCount = currentReader.next()) == 0) { try { if (!readers.hasNext()) { + // We're on the last reader, and it has no (more) rows. + currentReader.close(); releaseAssets(); - done = true; + done = true; // have any future call to next() return NONE + if (mutator.isNewSchema()) { + // This last reader has a new schema (e.g., we have a zero-row + // file or other source). (Note that some sources have a non- + // null/non-trivial schema even when there are no no rows.) + container.buildSchema(SelectionVectorMode.NONE); schema = container.getSchema(); + + return IterOutcome.OK_NEW_SCHEMA; } return IterOutcome.NONE; } + // At this point, the reader that hit its end it not the last reader. // If all the files we have read so far are just empty, the schema is not useful - if(!hasReadNonEmptyFile) { + if (! hasReadNonEmptyFile) { container.clear(); for (ValueVector v : fieldVectorMap.values()) { v.clear(); @@ -221,6 +236,7 @@ public IterOutcome next() { return IterOutcome.STOP; } } + // At this point, the current reader has read 1 or more rows. hasReadNonEmptyFile = true; populatePartitionVectors(); @@ -264,7 +280,7 @@ private void addPartitionVectors() throws ExecutionSetupException { for (int i : selectedPartitionColumns) { final MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), - Types.optional(MinorType.VARCHAR)); + Types.optional(MinorType.VARCHAR)); final ValueVector v = mutator.addField(field, NullableVarCharVector.class); partitionVectors.add(v); } @@ -313,19 +329,26 @@ public VectorWrapper getValueAccessorById(Class clazz, int... ids) { } private class Mutator implements OutputMutator { - private boolean schemaChange = true; + /** Whether schema has changed since last inquiry (via #isNewSchema}). Is + * true before first inquiry. */ + private boolean schemaChanged = true; + + @SuppressWarnings("unchecked") @Override - public T addField(MaterializedField field, Class clazz) throws SchemaChangeException { - // Check if the field exists + public T addField(MaterializedField field, + Class clazz) throws SchemaChangeException { + // Check if the field exists. ValueVector v = fieldVectorMap.get(field.key()); if (v == null || v.getClass() != clazz) { - // Field does not exist add it to the map and the output container + // Field does not exist--add it to the map and the output container. v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack); if (!clazz.isAssignableFrom(v.getClass())) { - throw new SchemaChangeException(String.format( - "The class that was provided %s does not correspond to the expected vector type of %s.", - clazz.getSimpleName(), v.getClass().getSimpleName())); + throw new SchemaChangeException( + String.format( + "The class that was provided, %s, does not correspond to the " + + "expected vector type of %s.", + clazz.getSimpleName(), v.getClass().getSimpleName())); } final ValueVector old = fieldVectorMap.put(field.key(), v); @@ -335,8 +358,8 @@ public T addField(MaterializedField field, Class claz } container.add(v); - // Adding new vectors to the container mark that the schema has changed - schemaChange = true; + // Added new vectors to the container--mark that the schema has changed. + schemaChanged = true; } return clazz.cast(v); @@ -349,11 +372,21 @@ public void allocate(int recordCount) { } } + /** + * Reports whether schema has changed (field was added or re-added) since + * last call to {@link #isNewSchema}. returns true at first call. + */ @Override public boolean isNewSchema() { - // Check if top level schema has changed, second condition checks if one of the deeper map schema has changed - if (schemaChange || callBack.getSchemaChange()) { - schemaChange = false; + // Check if top-level schema or any of the deeper map schemas has changed. + + // Note: Callback's getSchemaChangeAndReset() must get called in order to + // reset it and avoid false reports of schema changes in future. (Be + // careful with short-circuit OR (||) operator.) + + boolean deeperSchemaChanged = callBack.getSchemaChangedAndReset(); + if (schemaChanged || deeperSchemaChanged) { + schemaChanged = false; return true; } return false; @@ -392,6 +425,8 @@ public void close() throws Exception { @Override public VectorContainer getOutgoingContainer() { - throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + throw new UnsupportedOperationException( + String.format("You should not call getOutgoingContainer() for class %s", + this.getClass().getCanonicalName())); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 445568b8620..357269de183 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -103,7 +103,7 @@ public SelectionVector4 getSelectionVector4() { public IterOutcome innerNext() { try { IterOutcome upstream = unionAllInput.nextBatch(); - logger.debug("Upstream of Union-All: ", upstream.toString()); + logger.debug("Upstream of Union-All: {}", upstream); switch(upstream) { case NONE: case OUT_OF_MEMORY: @@ -306,28 +306,36 @@ public IterOutcome nextBatch() throws SchemaChangeException { case OUT_OF_MEMORY: return iterLeft; - case NONE: - throw new SchemaChangeException("The left input of Union-All should not come from an empty data source"); - default: - throw new IllegalStateException(String.format("Unknown state %s.", iterLeft)); + throw new IllegalStateException( + String.format("Unexpected state %s.", iterLeft)); } IterOutcome iterRight = rightSide.nextBatch(); switch(iterRight) { case OK_NEW_SCHEMA: // Unless there is no record batch on the left side of the inputs, - // always start processing from the left side + // always start processing from the left side. unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); - inferOutputFields(); - break; - case NONE: - // If the right input side comes from an empty data source, - // use the left input side's schema directly - unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); - inferOutputFieldsFromLeftSide(); - rightIsFinish = true; + // If the record count of the first batch from right input is zero, + // there are two possibilities: + // 1. The right side is an empty input (e.g., file). + // 2. There will be more records carried by later batches. + if (rightSide.getRecordBatch().getRecordCount() == 0) { + iterRight = rightSide.nextBatch(); + + if (iterRight == IterOutcome.NONE) { + // Case 1: The right side was an empty input. + inferOutputFieldsFromLeftSide(); + rightIsFinish = true; + } else { + // Case 2: There are more records carried by the latter batches. + inferOutputFields(); + } + } else { + inferOutputFields(); + } break; case STOP: @@ -335,7 +343,8 @@ public IterOutcome nextBatch() throws SchemaChangeException { return iterRight; default: - throw new IllegalStateException(String.format("Unknown state %s.", iterRight)); + throw new IllegalStateException( + String.format("Unexpected state %s.", iterRight)); } upstream = IterOutcome.OK_NEW_SCHEMA; @@ -387,7 +396,7 @@ public IterOutcome nextBatch() throws SchemaChangeException { return upstream; default: - throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported"); + throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome)); } } else { IterOutcome iterOutcome = leftSide.nextBatch(); @@ -535,4 +544,4 @@ public IterOutcome nextBatch() { } } } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index efd155e1f64..9b32577df46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -32,36 +32,85 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.VectorValidator; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.*; + + public class IteratorValidatorBatchIterator implements CloseableRecordBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class); + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class); static final boolean VALIDATE_VECTORS = false; - private IterOutcome state = IterOutcome.NOT_YET; + /** For logging/debuggability only. */ + private static volatile int instanceCount; + + /** For logging/debuggability only. */ + private final int instNum; + { + instNum = ++instanceCount; + } + + /** + * The upstream batch, calls to which and return values from which are + * checked by this validator. + */ private final RecordBatch incoming; - private boolean first = true; + + /** Incoming batch's type (simple class name); for logging/debuggability + * only. */ + private final String batchTypeName; + + /** Exception state of incoming batch; last value thrown by its next() + * method. */ + private Throwable exceptionState = null; + + /** Main state of incoming batch; last value returned by its next() method. */ + private IterOutcome batchState = null; + + /** + * {@link IterOutcome} return value sequence validation state. + * (Only needs enough to validate returns of OK.) + */ + private enum ValidationState { + /** Initial state: Have not gotten any OK_NEW_SCHEMA yet and not + * terminated. OK is not allowed yet. */ + INITIAL_NO_SCHEMA, + /** Have gotten OK_NEW_SCHEMA already and not terminated. OK is allowed + * now. */ + HAVE_SCHEMA, + /** Terminal state: Have seen NONE or STOP. Nothing more is allowed. */ + TERMINAL + } + + /** High-level IterOutcome sequence state. */ + private ValidationState validationState = ValidationState.INITIAL_NO_SCHEMA; + public IteratorValidatorBatchIterator(RecordBatch incoming) { this.incoming = incoming; + batchTypeName = incoming.getClass().getSimpleName(); + + // (Log construction and close() at same level to bracket instance's activity.) + logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName); } - private void validateReadState() { - switch (state) { + private void validateReadState(String operation) { + switch (batchState) { case OK: case OK_NEW_SCHEMA: return; default: throw new IllegalStateException( - String - .format( - "You tried to do a batch data read operation when you were in a state of %s. You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.", - state.name())); + String.format( + "Batch data read operation (%s) attempted when last next() call" + + " on batch [#%d, %s] returned %s (not %s or %s).", + operation, instNum, batchTypeName, batchState, OK, OK_NEW_SCHEMA)); } } @Override public Iterator> iterator() { - validateReadState(); + validateReadState("iterator()"); return incoming.iterator(); } @@ -77,7 +126,7 @@ public BatchSchema getSchema() { @Override public int getRecordCount() { - validateReadState(); + validateReadState("getRecordCount()"); return incoming.getRecordCount(); } @@ -88,19 +137,19 @@ public void kill(boolean sendUpstream) { @Override public SelectionVector2 getSelectionVector2() { - validateReadState(); + validateReadState("getSelectionVector2()"); return incoming.getSelectionVector2(); } @Override public SelectionVector4 getSelectionVector4() { - validateReadState(); + validateReadState("getSelectionVector4()"); return incoming.getSelectionVector4(); } @Override public TypedFieldId getValueVectorId(SchemaPath path) { - validateReadState(); + validateReadState("getValueVectorId(SchemaPath)"); return incoming.getValueVectorId(path); } @@ -112,48 +161,140 @@ public VectorWrapper getValueAccessorById(Class clazz, int... ids) { @Override public IterOutcome next() { - if (state == IterOutcome.NONE ) { - throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); - } - state = incoming.next(); - if (first) { - first = !first; - } + logger.trace( "[#{}; on {}]: next() called.", instNum, batchTypeName); + final IterOutcome prevBatchState = batchState; + try { - if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) { - BatchSchema schema = incoming.getSchema(); - if (schema == null) { - return state; + // Check whether next() should even have been called in current state. + if ( null != exceptionState ) { + throw new IllegalStateException( + String.format( + "next() [on #%d; %s] called again after it threw %s (after" + + " returning %s). Caller should not have called next() again.", + instNum, batchTypeName, exceptionState, batchState)); } - - if (schema.getFieldCount() == 0) { - throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed."); + // (Note: This could use validationState.) + if (batchState == NONE || batchState == STOP) { + throw new IllegalStateException( + String.format( + "next() [on #%d, %s] called again after it returned %s." + + " Caller should not have called next() again.", + instNum, batchTypeName, batchState)); } - if (incoming.getRecordCount() > MAX_BATCH_SIZE) { - throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE)); + + // Now get result from upstream next(). + batchState = incoming.next(); + + logger.trace("[#{}; on {}]: incoming next() return: ({} ->) {}", + instNum, batchTypeName, prevBatchState, batchState ); + + // Check state transition and update high-level state. + switch (batchState) { + case OK_NEW_SCHEMA: + // OK_NEW_SCHEMA is allowed at any time, except if terminated (checked + // above). + // OK_NEW_SCHEMA moves to have-seen-schema state. + validationState = ValidationState.HAVE_SCHEMA; + break; + case OK : + // OK is allowed as long as OK_NEW_SCHEMA was seen, except if terminated + // (checked above). + if ( validationState != ValidationState.HAVE_SCHEMA ) { + throw new IllegalStateException( + String.format( + "next() returned %s without first returning %s [#%d, %s]", + batchState, OK_NEW_SCHEMA, instNum, batchTypeName)); + } + // OK doesn't change high-level state. + break; + case NONE: + // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if + // already terminated (checked above). + if ( validationState != ValidationState.HAVE_SCHEMA ) { + throw new IllegalStateException( + String.format( + "next() returned %s without first returning %s [#%d, %s]", + batchState, OK_NEW_SCHEMA, instNum, batchTypeName)); + } + // NONE moves to terminal high-level state. + validationState = ValidationState.TERMINAL; + break; + case STOP: + // STOP is allowed at any time, except if already terminated (checked + // above). + // STOP moves to terminal high-level state. + validationState = ValidationState.TERMINAL; + break; + case NOT_YET: + case OUT_OF_MEMORY: + // NOT_YET and OUT_OF_MEMORY are allowed at any time, except if + // terminated (checked above). + // NOT_YET and OUT_OF_MEMORY OK don't change high-level state. + break; + default: + throw new AssertionError( + "Unhandled new " + IterOutcome.class.getSimpleName() + " value " + + batchState); + //break; } - if (VALIDATE_VECTORS) { - VectorValidator.validate(incoming); + // Validate schema when available. + if (batchState == OK || batchState == OK_NEW_SCHEMA) { + BatchSchema schema = incoming.getSchema(); + logger.trace("[#{}; on {}]: incoming next() return: #records = {}, schema = {}", + instNum, batchTypeName, incoming.getRecordCount(), schema ); + if (schema == null) { + return batchState; + } + if (schema.getFieldCount() == 0) { + throw new IllegalStateException( + String.format( + "Incoming batch [#%d, %s] has an empty schema. This is not allowed.", + instNum, batchTypeName)); + } + if (incoming.getRecordCount() > MAX_BATCH_SIZE) { + throw new IllegalStateException( + String.format( + "Incoming batch [#%d, %s] has size %d, which is beyond the" + + " limit of %d", + instNum, batchTypeName, incoming.getRecordCount(), MAX_BATCH_SIZE + )); + } + + if (VALIDATE_VECTORS) { + VectorValidator.validate(incoming); + } } - } - return state; + return batchState; + } + catch ( RuntimeException | Error e ) { + exceptionState = e; + logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}", + instNum, batchTypeName, prevBatchState, exceptionState ); + throw e; + } } @Override public WritableBatch getWritableBatch() { - validateReadState(); + validateReadState("getWritableBatch()"); return incoming.getWritableBatch(); } @Override public void close() { + // (Log construction and close() calls at same logging level to bracket + // instance's activity.) + logger.trace( "[#{}; on {}]: close() called, state = {} / {}.", + instNum, batchTypeName, batchState, exceptionState); } @Override public VectorContainer getOutgoingContainer() { - throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + throw new UnsupportedOperationException( + String.format("You should not call getOutgoingContainer() for class %s", + this.getClass().getCanonicalName())); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index e84057bc939..4f91317de30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -104,7 +104,7 @@ public IterOutcome innerNext() { } // Check if schema has changed - if (callBack.getSchemaChange()) { + if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java index de05131fdae..4c2491ceaa9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java @@ -20,16 +20,32 @@ import org.apache.drill.exec.util.CallBack; + public class SchemaChangeCallBack implements CallBack { - private boolean schemaChange = false; + private boolean schemaChanged = false; + + /** + * Constructs a schema-change callback with the schema-changed state set to + * {@code false}. + */ + public SchemaChangeCallBack() { + } + /** + * Sets the schema-changed state to {@code true}. + */ + @Override public void doWork() { - schemaChange = true; + schemaChanged = true; } - public boolean getSchemaChange() { - final boolean current = schemaChange; - schemaChange = false; + /** + * Returns the value of schema-changed state, resetting the + * schema-changed state to {@code false}. + */ + public boolean getSchemaChangedAndReset() { + final boolean current = schemaChanged; + schemaChanged = false; return current; } } diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2288GetColumnsMetadataWhenNoRowsTest.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2288GetColumnsMetadataWhenNoRowsTest.java new file mode 100644 index 00000000000..7c489dbc754 --- /dev/null +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/Drill2288GetColumnsMetadataWhenNoRowsTest.java @@ -0,0 +1,190 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.jdbc.test; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.drill.jdbc.Driver; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + + +/** + * Tests from DRILL-2288, in which schema information wasn't propagated when a + * scan yielded an empty (zero-row) result set. + */ +public class Drill2288GetColumnsMetadataWhenNoRowsTest { + + private static Connection connection; + + + @BeforeClass + public static void setUpConnection() throws SQLException { + // (Note: Can't use JdbcTest's connect(...) because JdbcTest closes + // Connection--and other JDBC objects--on test method failure, but this test + // class uses some objects across methods.) + connection = new Driver().connect( "jdbc:drill:zk=local", null ); + } + + @AfterClass + public static void tearDownConnection() throws SQLException { + connection.close(); + } + + + /** + * Tests that an empty JSON file (having zero records) no longer triggers + * breakage in schema propagation. (Case failed before; columns a, b and c + * didn't show up.) + */ + @Test + public void testEmptyJsonFileDoesntSuppressNetSchema1() throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = stmt.executeQuery( "SELECT a, b, c, * FROM cp.`empty.json`" ); + + ResultSetMetaData metadata = results.getMetaData(); + assertThat( metadata.getColumnCount(), not( equalTo( 0 ) ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + @Test + public void testEmptyJsonFileDoesntSuppressNetSchema2() throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = stmt.executeQuery( "SELECT a FROM cp.`empty.json`" ); + + ResultSetMetaData metadata = results.getMetaData(); + assertThat( metadata.getColumnCount(), equalTo( 1 ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + /** + * Tests that an INFORMATION_SCHEMA.TABLES query that has zero rows because of + * a (simple-enough) filter expression using column TABLE_SCHEMA (which + * supports pushdown) still has all columns. (Case failed before; had zero + * columns.) + */ + @Test + public void testInfoSchemaTablesZeroRowsBy_TABLE_SCHEMA_works() throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = + stmt.executeQuery( "SELECT * FROM INFORMATION_SCHEMA.`TABLES`" + + " WHERE TABLE_SCHEMA = ''" ); + + // Result set should still have columns even though there are no rows: + ResultSetMetaData metadata = results.getMetaData(); + assertThat( "ResultSetMetaData.getColumnCount()", + metadata.getColumnCount(), not( equalTo( 0 ) ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + /** (Worked before (because TABLE_CATALOG test not pushed down).) */ + @Test + public void testInfoSchemaTablesZeroRowsBy_TABLE_CATALOG_works() throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = + stmt.executeQuery( "SELECT * FROM INFORMATION_SCHEMA.`TABLES`" + + " WHERE TABLE_CATALOG = ''" ); + + ResultSetMetaData metadata = results.getMetaData(); + assertThat( metadata.getColumnCount(), not( equalTo( 0 ) ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + /** (Failed before (because TABLE_NAME test is pushed down).) */ + @Test + public void testInfoSchemaTablesZeroRowsBy_TABLE_NAME_works() + throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = + stmt.executeQuery( + "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME = ''" ); + + ResultSetMetaData metadata = results.getMetaData(); + + assertThat( metadata.getColumnCount(), not( equalTo( 0 ) ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + /** (Worked before.) */ + @Test + public void testInfoSchemaTablesZeroRowsByLimitWorks() throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = + stmt.executeQuery( + "SELECT * FROM INFORMATION_SCHEMA.`TABLES` LIMIT 0" ); + + ResultSetMetaData metadata = results.getMetaData(); + + assertThat( metadata.getColumnCount(), not( equalTo( 0 ) ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + /** (Worked before.) */ + @Test + public void testInfoSchemaTablesZeroRowsByWhereFalseWorks() throws Exception { + Statement stmt = connection.createStatement(); + ResultSet results = + stmt.executeQuery( + "SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE FALSE" ); + + ResultSetMetaData metadata = results.getMetaData(); + + assertThat( metadata.getColumnCount(), not( equalTo( 0 ) ) ); + + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + /** (Failed before (because table schema and name tests are pushed down).) */ + @Test + public void testGetTablesZeroRowsByTableSchemaOrNameWorks() throws Exception { + DatabaseMetaData dbMetadata = connection.getMetaData(); + + ResultSet results = dbMetadata.getTables( "NoSuchCatalog", "NoSuchSchema", + "NoSuchTable", new String[0] ); + + ResultSetMetaData metadata = results.getMetaData(); + assertThat( metadata.getColumnCount(), not( equalTo( 0 ) ) ); + assertThat( "Unexpected non-empty results. Test rot?", + false, equalTo( results.next() ) ); + } + + +} diff --git a/exec/jdbc/src/test/resources/empty.json b/exec/jdbc/src/test/resources/empty.json new file mode 100644 index 00000000000..e69de29bb2d