Skip to content

Commit

Permalink
DRILL-3285: Part 1--Prep., Hygiene: Mainly, adding comments.
Browse files Browse the repository at this point in the history
Added/edited comments:
- field doc. comments
- method doc. comments
- branch/block comments

Removed unused recordBatchCount and getRecordBatchCount().

Added logger call for spurious batch.

Various cleanup:
- Cleaned up logger.
- Added "final" on updateColumns().
- Wrapped some lines
- Misc. comment whitespace.
  • Loading branch information
dsbos authored and mehant committed Jun 22, 2015
1 parent 9c125b0 commit 228be48
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 30 deletions.
112 changes: 83 additions & 29 deletions exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillCursor.java
Expand Up @@ -31,33 +31,58 @@
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryDataBatch; import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;




class DrillCursor implements Cursor { class DrillCursor implements Cursor {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class); private static final Logger logger = getLogger( DrillCursor.class );


private static final String UNKNOWN = "--UNKNOWN--"; private static final String UNKNOWN = "--UNKNOWN--";


/** The associated java.sql.ResultSet implementation. */ /** The associated {@link java.sql.ResultSet} implementation. */
private final DrillResultSetImpl resultSet; private final DrillResultSetImpl resultSet;


/** Holds current batch of records (none before first load). */
private final RecordBatchLoader currentBatch; private final RecordBatchLoader currentBatch;

private final DrillResultSetImpl.ResultsListener resultsListener; private final DrillResultSetImpl.ResultsListener resultsListener;


// TODO: Doc.: Say what's started (set of rows? just current result batch?) /** Whether we're past the special first call to this.next() from
* DrillResultSetImpl.execute(). */
private boolean started = false; private boolean started = false;

/** Whether cursor is after the end of the sequence of records/rows. */
private boolean finished = false; private boolean finished = false;
// TODO: Doc.: Say what "readFirstNext" means.
/**
* Whether the next call to this.next() should just return {@code true} rather
* than trying to actually advance to the next record.
* <p>
* Currently, can be true only for first call to next().
* </p>
* <p>
* (Relates to loadInitialSchema()'s calling nextRowInternally()
* one "extra" time
* (extra relative to number of ResultSet.next() calls) at the beginning to
* get first batch and schema before Statement.execute...(...) even returns.
* </p>
*/
private boolean redoFirstNext = false; private boolean redoFirstNext = false;
// TODO: Doc.: First what? (First batch? record? "next" call/operation?)
/** Whether on first batch. (Re skipping spurious empty batches.) */
private boolean first = true; private boolean first = true;


/** ... corresponds to current schema. */
private DrillColumnMetaDataList columnMetaDataList; private DrillColumnMetaDataList columnMetaDataList;

/** Schema of current batch (null before first load). */
private BatchSchema schema; private BatchSchema schema;


/** Zero-based index of current record in record batch. */ /** Zero-based offset of current record in record batch.
* (Not <i>row</i> number.) */
private int currentRecordNumber = -1; private int currentRecordNumber = -1;
private long recordBatchCount;
private final DrillAccessorList accessors = new DrillAccessorList(); private final DrillAccessorList accessors = new DrillAccessorList();




Expand All @@ -80,61 +105,93 @@ protected int getCurrentRecordNumber() {
} }


@Override @Override
public List<Accessor> createAccessors(List<ColumnMetaData> types, Calendar localCalendar, Factory factory) { public List<Accessor> createAccessors(List<ColumnMetaData> types,
Calendar localCalendar, Factory factory) {
columnMetaDataList = (DrillColumnMetaDataList) types; columnMetaDataList = (DrillColumnMetaDataList) types;
return accessors; return accessors;
} }


// TODO: Doc.: Specify what the return value actually means. (The wording /**
// "Moves to the next row" and "Whether moved" from the documentation of the * Advances this cursor to the next row, if any, or to after the sequence of
// implemented interface (net.hydromatic.avatica.Cursor) doesn't address * rows if no next row. However, the first call does not advance to the first
// moving past last row or how to evaluate "whether moved" on the first call. * row, only reading schema information.
// In particular, document what the return value indicates about whether we're * <p>
// currently at a valid row (or whether next() can be called again, or * Is to be called (once) from {@link DrillResultSetImpl#execute()}, and
// whatever it does indicate), especially the first time this next() called * then from {@link AvaticaResultSet#next()}.
// for a new result. * </p>
*
* @return whether cursor is positioned at a row (false when after end of
* results)
*/
@Override @Override
public boolean next() throws SQLException { public boolean next() throws SQLException {
if (!started) { if (!started) {
started = true; started = true;
redoFirstNext = true; redoFirstNext = true;
} else if (redoFirstNext && !finished) { } else if (redoFirstNext && !finished) {
// We have a deferred "not after end" to report--reset and report that.
redoFirstNext = false; redoFirstNext = false;
return true; return true;
} }


if (finished) { if (finished) {
// We're already after end of rows/records--just report that after end.
return false; return false;
} }


if (currentRecordNumber + 1 < currentBatch.getRecordCount()) { if (currentRecordNumber + 1 < currentBatch.getRecordCount()) {
// Next index is in within current batch--just increment to that record. // Have next row in current batch--just advance index and report "at a row."
currentRecordNumber++; currentRecordNumber++;
return true; return true;
} else { } else {
// Next index is not in current batch (including initial empty batch-- // No (more) records in any current batch--try to get first or next batch.
// (try to) get next batch. // (First call always takes this branch.)

try { try {
QueryDataBatch qrb = resultsListener.getNext(); QueryDataBatch qrb = resultsListener.getNext();
recordBatchCount++;
while (qrb != null && (qrb.getHeader().getRowCount() == 0 || qrb.getData() == null ) && !first) { // (Apparently:) Skip any spurious empty batches (batches that have
// zero rows and/or null data, other than the first batch (which carries
// the (initial) schema but no rows)).
while ( qrb != null
&& ( qrb.getHeader().getRowCount() == 0
|| qrb.getData() == null )
&& ! first ) {
// Empty message--dispose of and try to get another.
logger.warn( "Spurious batch read: {}", qrb );

qrb.release(); qrb.release();

qrb = resultsListener.getNext(); qrb = resultsListener.getNext();
recordBatchCount++;
if(qrb != null && qrb.getData()==null){ // NOTE: It is unclear why this check does not check getRowCount()
// as the loop condition above does.
if ( qrb != null && qrb.getData() == null ) {
// Got another batch with null data--dispose of and report "no more
// rows".

qrb.release(); qrb.release();

// NOTE: It is unclear why this returns false but doesn't set
// afterLastRow (as we do when we normally return false).
return false; return false;
} }
} }


first = false; first = false;


if (qrb == null) { if (qrb == null) {
currentBatch.clear(); // End of batches--clean up, set state to done, report after last row.

currentBatch.clear(); // (We load it so we clear it.)
finished = true; finished = true;
return false; return false;
} else { } else {
// Got next (or first) batch--reset record offset to beginning,
// assimilate schema if changed, ... ???

currentRecordNumber = 0; currentRecordNumber = 0;

final boolean changed; final boolean changed;
try { try {
changed = currentBatch.load(qrb.getHeader().getDef(), qrb.getData()); changed = currentBatch.load(qrb.getHeader().getDef(), qrb.getData());
Expand All @@ -146,6 +203,7 @@ public boolean next() throws SQLException {
if (changed) { if (changed) {
updateColumns(); updateColumns();
} }

if (redoFirstNext && currentBatch.getRecordCount() == 0) { if (redoFirstNext && currentBatch.getRecordCount() == 0) {
redoFirstNext = false; redoFirstNext = false;
} }
Expand Down Expand Up @@ -178,18 +236,14 @@ public boolean next() throws SQLException {
} }
} }


void updateColumns() { private void updateColumns() {
accessors.generateAccessors(this, currentBatch); accessors.generateAccessors(this, currentBatch);
columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, schema); columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, schema);
if (getResultSet().changeListener != null) { if (getResultSet().changeListener != null) {
getResultSet().changeListener.schemaChanged(schema); getResultSet().changeListener.schemaChanged(schema);
} }
} }


public long getRecordBatchCount() {
return recordBatchCount;
}

@Override @Override
public void close() { public void close() {
// currentBatch is owned by resultSet and cleaned up by // currentBatch is owned by resultSet and cleaned up by
Expand Down
Expand Up @@ -99,7 +99,7 @@ public DrillStatementImpl getStatement() {
* cancelation and no QueryCanceledSqlException had been thrown yet * cancelation and no QueryCanceledSqlException had been thrown yet
* for this ResultSet * for this ResultSet
* @throws AlreadyClosedSqlException if ResultSet is closed * @throws AlreadyClosedSqlException if ResultSet is closed
* @throws SQLException if error in calling {@link #isClosed()} * @throws SQLException if error in calling {@link #isClosed()}
*/ */
private void checkNotClosed() throws SQLException { private void checkNotClosed() throws SQLException {
if ( isClosed() ) { if ( isClosed() ) {
Expand Down Expand Up @@ -181,6 +181,9 @@ protected DrillResultSetImpl execute() throws SQLException{
// but JDBC client certainly could. // but JDBC client certainly could.
throw new SQLException( "Interrupted", e ); throw new SQLException( "Interrupted", e );
} }

// Read first (schema-only) batch to initialize result-set metadata from
// (initial) schema before Statement.execute...(...) returns result set:
cursor.next(); cursor.next();


return this; return this;
Expand Down

0 comments on commit 228be48

Please sign in to comment.