diff --git a/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java b/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java index fa883e3..5b52ea5 100644 --- a/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java +++ b/src/main/java/com/amazon/redshift/core/v3/QueryExecutorImpl.java @@ -1908,553 +1908,588 @@ private void processResultsOnThread(ResultHandler handler, logger.log(LogLevel.DEBUG, " useRingBuffer={0}, handler.wantsScrollableResultSet()={1}, subQueries={2}, bothRowsAndStatus={3}", new Object[]{useRingBuffer, handler.wantsScrollableResultSet(), subQueries, bothRowsAndStatus}); } - - - while (!endQuery) { - c = pgStream.receiveChar(); - switch (c) { - case 'A': // Asynchronous Notify - receiveAsyncNotify(); - break; - - case '1': // Parse Complete (response to Parse) - pgStream.receiveInteger4(); // len, discarded - - SimpleQuery parsedQuery = pendingParseQueue.removeFirst(); - String parsedStatementName = parsedQuery.getStatementName(); - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE ParseComplete [{0}]", parsedStatementName); - - break; - - case 't': { // ParameterDescription - pgStream.receiveInteger4(); // len, discarded - - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE ParameterDescription"); - - DescribeRequest describeData = pendingDescribeStatementQueue.getFirst(); - SimpleQuery query = describeData.query; - SimpleParameterList params = describeData.parameterList; - boolean describeOnly = describeData.describeOnly; - // This might differ from query.getStatementName if the query was re-prepared - String origStatementName = describeData.statementName; - - int numParams = pgStream.receiveInteger2(); - - for (int i = 1; i <= numParams; i++) { - int typeOid = pgStream.receiveInteger4(); - params.setResolvedType(i, typeOid); - } - - // Since we can issue multiple Parse and DescribeStatement - // messages in a single network trip, we need to make - // sure the describe results we requested are still - // applicable to the latest parsed query. - // - if ((origStatementName == null && query.getStatementName() == null) - || (origStatementName != null - && origStatementName.equals(query.getStatementName()))) { - query.setPrepareTypes(params.getTypeOIDs()); - } - - if (describeOnly) { - msgLoopState.doneAfterRowDescNoData = true; - } else { - pendingDescribeStatementQueue.removeFirst(); - } - break; - } - - case '2': // Bind Complete (response to Bind) - pgStream.receiveInteger4(); // len, discarded - - Portal boundPortal = pendingBindQueue.removeFirst(); - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE BindComplete [{0}]", boundPortal); - - registerOpenPortal(boundPortal); - break; - - case '3': // Close Complete (response to Close) - pgStream.receiveInteger4(); // len, discarded - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE CloseComplete"); - break; - - case 'n': // No Data (response to Describe) - pgStream.receiveInteger4(); // len, discarded - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE NoData"); - - pendingDescribePortalQueue.removeFirst(); - - if (msgLoopState.doneAfterRowDescNoData) { - DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst(); - SimpleQuery currentQuery = describeData.query; - - Field[] fields = currentQuery.getFields(); - - if (fields != null) { // There was a resultset. - tuples = new ArrayList(); - handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null); - tuples = null; - msgLoopState.queueTuples = null; + try { + while (!endQuery) { + c = pgStream.receiveChar(); + if (RedshiftLogger.isEnable() && c!='D') { // not logging only in case of Data packet + logger.log(LogLevel.DEBUG, " FE=> Received packet of type:{0}={1}", c, (char)c); } - } - break; - - case 's': { // Portal Suspended (end of Execute) - // nb: this appears *instead* of CommandStatus. - // Must be a SELECT if we suspended, so don't worry about it. - - pgStream.receiveInteger4(); // len, discarded - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE PortalSuspended"); - - ExecuteRequest executeData = pendingExecuteQueue.removeFirst(); - SimpleQuery currentQuery = executeData.query; - Portal currentPortal = executeData.portal; - - Field[] fields = currentQuery.getFields(); - if (fields != null - && (tuples == null - && msgLoopState.queueTuples == null)) { - // When no results expected, pretend an empty resultset was returned - // Not sure if new ArrayList can be always replaced with emptyList - tuples = noResults ? Collections.emptyList() : new ArrayList(); - } - - if (msgLoopState.queueTuples != null) { - // Mark end of result - try { - msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(currentPortal); - } - catch (InterruptedException ie) { - // Handle interrupted exception - handler.handleError( - new RedshiftException(GT.tr("Interrupted exception retrieving query results."), - RedshiftState.UNEXPECTED_ERROR, ie)); - } - } - else - handler.handleResultRows(currentQuery, fields, tuples, currentPortal, null, rowCount, null); - - tuples = null; - msgLoopState.queueTuples = null; - - break; - } + switch (c) { + case 'A': // Asynchronous Notify + receiveAsyncNotify(); + break; + + case '1': // Parse Complete (response to Parse) + pgStream.receiveInteger4(); // len, discarded + + SimpleQuery parsedQuery = pendingParseQueue.removeFirst(); + String parsedStatementName = parsedQuery.getStatementName(); + + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE ParseComplete [{0}]", parsedStatementName); + + break; + + case 't': { // ParameterDescription + pgStream.receiveInteger4(); // len, discarded + + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE ParameterDescription"); + + DescribeRequest describeData = pendingDescribeStatementQueue.getFirst(); + SimpleQuery query = describeData.query; + SimpleParameterList params = describeData.parameterList; + boolean describeOnly = describeData.describeOnly; + // This might differ from query.getStatementName if the query was re-prepared + String origStatementName = describeData.statementName; + + int numParams = pgStream.receiveInteger2(); + + for (int i = 1; i <= numParams; i++) { + int typeOid = pgStream.receiveInteger4(); + params.setResolvedType(i, typeOid); + } + + // Since we can issue multiple Parse and DescribeStatement + // messages in a single network trip, we need to make + // sure the describe results we requested are still + // applicable to the latest parsed query. + // + if ((origStatementName == null && query.getStatementName() == null) + || (origStatementName != null + && origStatementName.equals(query.getStatementName()))) { + query.setPrepareTypes(params.getTypeOIDs()); + } + + if (describeOnly) { + msgLoopState.doneAfterRowDescNoData = true; + } else { + pendingDescribeStatementQueue.removeFirst(); + } + break; + } - case 'C': { // Command Status (end of Execute) - // Handle status. - String status = receiveCommandStatus(); - if (isFlushCacheOnDeallocate() - && (status.startsWith("DEALLOCATE ALL") || status.startsWith("DISCARD ALL"))) { - deallocateEpoch++; - } + case '2': // Bind Complete (response to Bind) + pgStream.receiveInteger4(); // len, discarded - msgLoopState.doneAfterRowDescNoData = false; - - ExecuteRequest executeData = pendingExecuteQueue.peekFirst(); - SimpleQuery currentQuery = executeData.query; - Portal currentPortal = executeData.portal; - - String nativeSql = currentQuery.getNativeQuery().nativeSql; - // Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc) - // silently rollback the transaction in the response to COMMIT statement - // in case the transaction has failed. - // See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org - if (isRaiseExceptionOnSilentRollback() - && handler.getException() == null - && status.startsWith("ROLLBACK")) { - String message = null; - if (looksLikeCommit(nativeSql)) { - if (transactionFailCause == null) { - message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)"); - } else { - message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage()); - } - } else if (looksLikePrepare(nativeSql)) { - if (transactionFailCause == null) { - message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure is not known (check server logs?)"); - } else { - message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage()); - } - } - if (message != null) { - handler.handleError( - new RedshiftException( - message, RedshiftState.IN_FAILED_SQL_TRANSACTION, transactionFailCause)); - } - } + Portal boundPortal = pendingBindQueue.removeFirst(); + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE BindComplete [{0}]", boundPortal); - if (status.startsWith("SET")) { - // Scan only the first 1024 characters to - // avoid big overhead for long queries. - if (nativeSql.lastIndexOf("search_path", 1024) != -1 - && !nativeSql.equals(lastSetSearchPathQuery)) { - // Search path was changed, invalidate prepared statement cache - lastSetSearchPathQuery = nativeSql; - deallocateEpoch++; - } - } + registerOpenPortal(boundPortal); + break; - if (!executeData.asSimple) { - pendingExecuteQueue.removeFirst(); - } else { - // For simple 'Q' queries, executeQueue is cleared via ReadyForQuery message - } + case '3': // Close Complete (response to Close) + pgStream.receiveInteger4(); // len, discarded + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE CloseComplete"); + break; - // we want to make sure we do not add any results from these queries to the result set - if (currentQuery == autoSaveQuery - || currentQuery == releaseAutoSave) { - // ignore "SAVEPOINT" or RELEASE SAVEPOINT status from autosave query - break; - } + case 'n': // No Data (response to Describe) + pgStream.receiveInteger4(); // len, discarded + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE NoData"); - Field[] fields = currentQuery.getFields(); - if (fields != null - && (tuples == null - && msgLoopState.queueTuples == null)) { - // When no results expected, pretend an empty resultset was returned - // Not sure if new ArrayList can be always replaced with emptyList - tuples = noResults ? Collections.emptyList() : new ArrayList(); - } + pendingDescribePortalQueue.removeFirst(); - // If we received tuples we must know the structure of the - // resultset, otherwise we won't be able to fetch columns - // from it, etc, later. - if (fields == null - && (tuples != null - || msgLoopState.queueTuples != null)) { - throw new IllegalStateException( - "Received resultset tuples, but no field structure for them"); - } + if (msgLoopState.doneAfterRowDescNoData) { + DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst(); + SimpleQuery currentQuery = describeData.query; - if (fields != null - || (tuples != null - || msgLoopState.queueTuples != null)) { - // There was a resultset. - if (msgLoopState.queueTuples == null) - handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null); - else { - // Mark end of result - try { - msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(); - } catch (InterruptedException ie) { - // Handle interrupted exception - handler.handleError( - new RedshiftException(GT.tr("Interrupted exception retrieving query results."), - RedshiftState.UNEXPECTED_ERROR, ie)); - } - } - - tuples = null; - msgLoopState.queueTuples = null; - rowCount = new int[1]; // Allocate for the next resultset - - if (bothRowsAndStatus) { - interpretCommandStatus(status, handler); - } - } else { - interpretCommandStatus(status, handler); - } + Field[] fields = currentQuery.getFields(); - if (executeData.asSimple) { - // Simple queries might return several resultsets, thus we clear - // fields, so queries like "select 1;update; select2" will properly - // identify that "update" did not return any results - currentQuery.setFields(null); - } + if (fields != null) { // There was a resultset. + tuples = new ArrayList(); + handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null); + tuples = null; + msgLoopState.queueTuples = null; + } + } + break; + + case 's': { // Portal Suspended (end of Execute) + // nb: this appears *instead* of CommandStatus. + // Must be a SELECT if we suspended, so don't worry about it. + + pgStream.receiveInteger4(); // len, discarded + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE PortalSuspended"); + + ExecuteRequest executeData = pendingExecuteQueue.removeFirst(); + SimpleQuery currentQuery = executeData.query; + Portal currentPortal = executeData.portal; + + Field[] fields = currentQuery.getFields(); + if (fields != null + && (tuples == null + && msgLoopState.queueTuples == null)) { + // When no results expected, pretend an empty resultset was returned + // Not sure if new ArrayList can be always replaced with emptyList + tuples = noResults ? Collections.emptyList() : new ArrayList(); + } + + if (msgLoopState.queueTuples != null) { + // Mark end of result + try { + msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(currentPortal); + } + catch (InterruptedException ie) { + // Handle interrupted exception + handler.handleError( + new RedshiftException(GT.tr("Interrupted exception retrieving query results."), + RedshiftState.UNEXPECTED_ERROR, ie)); + } + } + else + handler.handleResultRows(currentQuery, fields, tuples, currentPortal, null, rowCount, null); - if (currentPortal != null) { - currentPortal.close(); - } - break; - } + tuples = null; + msgLoopState.queueTuples = null; - case 'D': // Data Transfer (ongoing Execute response) - boolean skipRow = false; - Tuple tuple = null; - try { - tuple = pgStream.receiveTupleV3(); - } catch (OutOfMemoryError oome) { - if (!noResults) { - handler.handleError( - new RedshiftException(GT.tr("Ran out of memory retrieving query results."), - RedshiftState.OUT_OF_MEMORY, oome)); - } - } catch (SQLException e) { - handler.handleError(e); - } - if (!noResults) { - if(rowCount != null) { - if(maxRows > 0 && rowCount[0] >= maxRows) { - // Ignore any more rows until server fix not to send more rows than max rows. - skipRow = true; - } - else - rowCount[0] += 1; - } - - if (useRingBuffer) { - boolean firstRow = false; - if (msgLoopState.queueTuples == null) { - // i.e. First row - firstRow = true; - msgLoopState.queueTuples = new RedshiftRowsBlockingQueue(fetchSize, fetchRingBufferSize, logger); - } - - // Add row in the queue - if(!skipRow) { - try { - msgLoopState.queueTuples.put(tuple); - } catch (InterruptedException ie) { - // Handle interrupted exception - handler.handleError( - new RedshiftException(GT.tr("Interrupted exception retrieving query results."), - RedshiftState.UNEXPECTED_ERROR, ie)); - } - } - - if(firstRow) { - // There was a resultset. - ExecuteRequest executeData = pendingExecuteQueue.peekFirst(); - SimpleQuery currentQuery = executeData.query; - Field[] fields = currentQuery.getFields(); - - // Create a new ring buffer thread to process rows - m_ringBufferThread = new RingBufferThread(handler, flags, fetchSize, msgLoopState, subQueries, rowCount, maxRows); - - handler.handleResultRows(currentQuery, fields, null, null, msgLoopState.queueTuples, rowCount, m_ringBufferThread); - - if (RedshiftLogger.isEnable()) { - int length; - if (tuple == null) { - length = -1; - } else { - length = tuple.length(); - } - logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length); + break; } - - // Start the ring buffer thread - m_ringBufferThread.start(); - - // Return to break the message loop on the application thread - return; - } - else - if(m_ringBufferStopThread) - return; // Break the ring buffer thread loop - } - else { - if (tuples == null) { - tuples = new ArrayList(); - } - - if(!skipRow) - tuples.add(tuple); - } - } - - if (RedshiftLogger.isEnable()) { - int length; - if (tuple == null) { - length = -1; - } else { - length = tuple.length(); - } - logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length); - if (skipRow) { - logger.log(LogLevel.DEBUG, " skipRow={0}, rowCount = {1}, maxRows = {2}" - , skipRow, (rowCount!= null) ? rowCount[0] : 0, maxRows); - } - } - break; - - case 'E': - // Error Response (response to pretty much everything; backend then skips until Sync) - SQLException error = receiveErrorResponse(false); - handler.handleError(error); - if (willHealViaReparse(error)) { - // prepared statement ... is not valid kind of error - // Technically speaking, the error is unexpected, thus we invalidate other - // server-prepared statements just in case. - deallocateEpoch++; - if (RedshiftLogger.isEnable()) { - logger.log(LogLevel.DEBUG, " FE: received {0}, will invalidate statements. deallocateEpoch is now {1}", - new Object[]{error.getSQLState(), deallocateEpoch}); - } - } - // keep processing - break; - - case 'I': { // Empty Query (end of Execute) - pgStream.receiveInteger4(); - - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE EmptyQuery"); - - ExecuteRequest executeData = pendingExecuteQueue.removeFirst(); - Portal currentPortal = executeData.portal; - handler.handleCommandStatus("EMPTY", 0, 0); - if (currentPortal != null) { - currentPortal.close(); - } - break; - } + case 'C': { // Command Status (end of Execute) + // Handle status. + String status = receiveCommandStatus(); + if (isFlushCacheOnDeallocate() + && (status.startsWith("DEALLOCATE ALL") || status.startsWith("DISCARD ALL"))) { + deallocateEpoch++; + } + + msgLoopState.doneAfterRowDescNoData = false; + + ExecuteRequest executeData = pendingExecuteQueue.peekFirst(); + SimpleQuery currentQuery = executeData.query; + Portal currentPortal = executeData.portal; + + String nativeSql = currentQuery.getNativeQuery().nativeSql; + // Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc) + // silently rollback the transaction in the response to COMMIT statement + // in case the transaction has failed. + // See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org + if (isRaiseExceptionOnSilentRollback() + && handler.getException() == null + && status.startsWith("ROLLBACK")) { + String message = null; + if (looksLikeCommit(nativeSql)) { + if (transactionFailCause == null) { + message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)"); + } else { + message = GT.tr("The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage()); + } + } else if (looksLikePrepare(nativeSql)) { + if (transactionFailCause == null) { + message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure is not known (check server logs?)"); + } else { + message = GT.tr("The database returned ROLLBACK, so the transaction cannot be prepared. Transaction failure cause is <<{0}>>", transactionFailCause.getMessage()); + } + } + if (message != null) { + handler.handleError( + new RedshiftException( + message, RedshiftState.IN_FAILED_SQL_TRANSACTION, transactionFailCause)); + } + } + + if (status.startsWith("SET")) { + // Scan only the first 1024 characters to + // avoid big overhead for long queries. + if (nativeSql.lastIndexOf("search_path", 1024) != -1 + && !nativeSql.equals(lastSetSearchPathQuery)) { + // Search path was changed, invalidate prepared statement cache + lastSetSearchPathQuery = nativeSql; + deallocateEpoch++; + } + } + + if (!executeData.asSimple) { + pendingExecuteQueue.removeFirst(); + } else { + // For simple 'Q' queries, executeQueue is cleared via ReadyForQuery message + } + + // we want to make sure we do not add any results from these queries to the result set + if (currentQuery == autoSaveQuery + || currentQuery == releaseAutoSave) { + // ignore "SAVEPOINT" or RELEASE SAVEPOINT status from autosave query + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "CommandStatus breaking to ignore SAVEPOINT or " + + "RELEASE SAVEPOINT status from autosave query"); + } + break; + } + + Field[] fields = currentQuery.getFields(); + if (fields != null + && (tuples == null + && msgLoopState.queueTuples == null)) { + // When no results expected, pretend an empty resultset was returned + // Not sure if new ArrayList can be always replaced with emptyList + tuples = noResults ? Collections.emptyList() : new ArrayList(); + } + + // If we received tuples we must know the structure of the + // resultset, otherwise we won't be able to fetch columns + // from it, etc, later. + if (fields == null + && (tuples != null + || msgLoopState.queueTuples != null)) { + throw new IllegalStateException( + "Received resultset tuples, but no field structure for them"); + } + + if (fields != null + || (tuples != null + || msgLoopState.queueTuples != null)) { + // There was a resultset. + if (msgLoopState.queueTuples == null) + handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null); + else { + // Mark end of result + try { + msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(); + } catch (InterruptedException ie) { + // Handle interrupted exception + handler.handleError( + new RedshiftException(GT.tr("Interrupted exception retrieving query results."), + RedshiftState.UNEXPECTED_ERROR, ie)); + } + } - case 'N': // Notice Response - SQLWarning warning = receiveNoticeResponse(); - handler.handleWarning(warning); - break; + tuples = null; + msgLoopState.queueTuples = null; + rowCount = new int[1]; // Allocate for the next resultset - case 'S': // Parameter Status - try { - receiveParameterStatus(); - } catch (SQLException e) { - handler.handleError(e); - endQuery = true; - } - break; + if (bothRowsAndStatus) { + interpretCommandStatus(status, handler); + } + } else { + interpretCommandStatus(status, handler); + } + + if (executeData.asSimple) { + // Simple queries might return several resultsets, thus we clear + // fields, so queries like "select 1;update; select2" will properly + // identify that "update" did not return any results + currentQuery.setFields(null); + } + + if (currentPortal != null) { + currentPortal.close(); + } + break; + } - case 'T': // Row Description (response to Describe) - Field[] fields = receiveFields(serverProtocolVersion); - tuples = new ArrayList(); + case 'D': // Data Transfer (ongoing Execute response) + boolean skipRow = false; + Tuple tuple = null; + try { + tuple = pgStream.receiveTupleV3(); + } catch (OutOfMemoryError oome) { + if (!noResults) { + handler.handleError( + new RedshiftException(GT.tr("Ran out of memory retrieving query results."), + RedshiftState.OUT_OF_MEMORY, oome)); + } + } catch (SQLException e) { + handler.handleError(e); + } + if (!noResults) { + if(rowCount != null) { + if(maxRows > 0 && rowCount[0] >= maxRows) { + // Ignore any more rows until server fix not to send more rows than max rows. + skipRow = true; + } + else + rowCount[0] += 1; + } - SimpleQuery query = pendingDescribePortalQueue.peekFirst(); - if (!pendingExecuteQueue.isEmpty() && !pendingExecuteQueue.peekFirst().asSimple) { - pendingDescribePortalQueue.removeFirst(); - } - query.setFields(fields); + if (useRingBuffer) { + boolean firstRow = false; + if (msgLoopState.queueTuples == null) { + // i.e. First row + firstRow = true; + msgLoopState.queueTuples = new RedshiftRowsBlockingQueue(fetchSize, fetchRingBufferSize, logger); + } + + // Add row in the queue + if(!skipRow) { + try { + msgLoopState.queueTuples.put(tuple); + } catch (InterruptedException ie) { + // Handle interrupted exception + handler.handleError( + new RedshiftException(GT.tr("Interrupted exception retrieving query results."), + RedshiftState.UNEXPECTED_ERROR, ie)); + } + } + + if(firstRow) { + // There was a resultset. + ExecuteRequest executeData = pendingExecuteQueue.peekFirst(); + SimpleQuery currentQuery = executeData.query; + Field[] fields = currentQuery.getFields(); + + // Create a new ring buffer thread to process rows + m_ringBufferThread = new RingBufferThread(handler, flags, fetchSize, msgLoopState, subQueries, rowCount, maxRows); + + handler.handleResultRows(currentQuery, fields, null, null, msgLoopState.queueTuples, rowCount, m_ringBufferThread); + + if (RedshiftLogger.isEnable()) { + int length; + if (tuple == null) { + length = -1; + } else { + length = tuple.length(); + } + logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length); + } + + // Start the ring buffer thread + m_ringBufferThread.start(); + + // Return to break the message loop on the application thread + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "DataRow exiting the message loop on the application thread"); + } + return; + } + else + if(m_ringBufferStopThread) { + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "DataRow exiting the ring buffer thread loop"); + } + return; // Break the ring buffer thread loop + } + } + else { + if (tuples == null) { + tuples = new ArrayList(); + } - if (msgLoopState.doneAfterRowDescNoData) { - DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst(); - SimpleQuery currentQuery = describeData.query; - currentQuery.setFields(fields); + if(!skipRow) + tuples.add(tuple); + } + } + + if (RedshiftLogger.isEnable()) { + int length; + if (tuple == null) { + length = -1; + } else { + length = tuple.length(); + } + logger.log(LogLevel.DEBUG, " <=BE DataRow(len={0})", length); + if (skipRow) { + logger.log(LogLevel.DEBUG, " skipRow={0}, rowCount = {1}, maxRows = {2}" + , skipRow, (rowCount!= null) ? rowCount[0] : 0, maxRows); + } + } + + break; + + case 'E': + // Error Response (response to pretty much everything; backend then skips until Sync) + SQLException error = receiveErrorResponse(false); + handler.handleError(error); + if (willHealViaReparse(error)) { + // prepared statement ... is not valid kind of error + // Technically speaking, the error is unexpected, thus we invalidate other + // server-prepared statements just in case. + deallocateEpoch++; + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, " FE: received {0}, will invalidate statements. deallocateEpoch is now {1}", + new Object[]{error.getSQLState(), deallocateEpoch}); + } + } + // keep processing + break; + + case 'I': { // Empty Query (end of Execute) + pgStream.receiveInteger4(); + + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE EmptyQuery"); + + ExecuteRequest executeData = pendingExecuteQueue.removeFirst(); + Portal currentPortal = executeData.portal; + handler.handleCommandStatus("EMPTY", 0, 0); + if (currentPortal != null) { + currentPortal.close(); + } + break; + } - if (msgLoopState.queueTuples != null) { - // TODO: is this possible? - } - - handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null); - tuples = null; - msgLoopState.queueTuples = null; - } - break; + case 'N': // Notice Response + SQLWarning warning = receiveNoticeResponse(); + handler.handleWarning(warning); + break; + + case 'S': // Parameter Status + try { + receiveParameterStatus(); + } catch (SQLException e) { + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.ERROR, "ParameterStatus exiting processResultsOnThread loop with " + + "error={0}, state={1}", e.getMessage(), e.getSQLState()); + } + handler.handleError(e); + endQuery = true; + } + break; + + case 'T': // Row Description (response to Describe) + Field[] fields = receiveFields(serverProtocolVersion); + tuples = new ArrayList(); + + SimpleQuery query = pendingDescribePortalQueue.peekFirst(); + if (!pendingExecuteQueue.isEmpty() && !pendingExecuteQueue.peekFirst().asSimple) { + pendingDescribePortalQueue.removeFirst(); + } + query.setFields(fields); + + if (msgLoopState.doneAfterRowDescNoData) { + DescribeRequest describeData = pendingDescribeStatementQueue.removeFirst(); + SimpleQuery currentQuery = describeData.query; + currentQuery.setFields(fields); + + if (msgLoopState.queueTuples != null) { + // TODO: is this possible? + } - case 'Z': // Ready For Query (eventual response to Sync) - receiveRFQ(); - if (!pendingExecuteQueue.isEmpty() && pendingExecuteQueue.peekFirst().asSimple) { - if (msgLoopState.queueTuples != null) { - try { - msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(); - } catch (InterruptedException ie) { - // Handle interrupted exception - handler.handleError( - new RedshiftException(GT.tr("Interrupted exception retrieving query results."), - RedshiftState.UNEXPECTED_ERROR, ie)); - } - } - tuples = null; - msgLoopState.queueTuples = null; - pgStream.clearResultBufferCount(); - - ExecuteRequest executeRequest = pendingExecuteQueue.removeFirst(); - // Simple queries might return several resultsets, thus we clear - // fields, so queries like "select 1;update; select2" will properly - // identify that "update" did not return any results - executeRequest.query.setFields(null); - - pendingDescribePortalQueue.removeFirst(); - if (!pendingExecuteQueue.isEmpty()) { - if (getTransactionState() == TransactionState.IDLE) { - handler.secureProgress(); - } - // process subsequent results (e.g. for cases like batched execution of simple 'Q' queries) - break; + handler.handleResultRows(currentQuery, fields, tuples, null, null, rowCount, null); + tuples = null; + msgLoopState.queueTuples = null; + } + break; + + case 'Z': // Ready For Query (eventual response to Sync) + receiveRFQ(); + if (!pendingExecuteQueue.isEmpty() && pendingExecuteQueue.peekFirst().asSimple) { + if (msgLoopState.queueTuples != null) { + try { + msgLoopState.queueTuples.checkAndAddEndOfRowsIndicator(); + } catch (InterruptedException ie) { + // Handle interrupted exception + handler.handleError( + new RedshiftException(GT.tr("Interrupted exception retrieving query results."), + RedshiftState.UNEXPECTED_ERROR, ie)); + } + } + tuples = null; + msgLoopState.queueTuples = null; + pgStream.clearResultBufferCount(); + + ExecuteRequest executeRequest = pendingExecuteQueue.removeFirst(); + // Simple queries might return several resultsets, thus we clear + // fields, so queries like "select 1;update; select2" will properly + // identify that "update" did not return any results + executeRequest.query.setFields(null); + + pendingDescribePortalQueue.removeFirst(); + if (!pendingExecuteQueue.isEmpty()) { + if (getTransactionState() == TransactionState.IDLE) { + handler.secureProgress(); + } + // process subsequent results (e.g. for cases like batched execution of simple 'Q' queries) + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "ReadyForQuery breaking to process subsequent results"); + } + break; + } + } + endQuery = true; + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "ReadyForQuery will exit from processResultsOnThread loop"); + } + + // Reset the statement name of Parses that failed. + while (!pendingParseQueue.isEmpty()) { + SimpleQuery failedQuery = pendingParseQueue.removeFirst(); + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, "ReadyForQuery resetting statement name for failed parse:{0}", + failedQuery.getStatementName()); + } + failedQuery.unprepare(); + } + + pendingParseQueue.clear(); // No more ParseComplete messages expected. + // Pending "describe" requests might be there in case of error + // If that is the case, reset "described" status, so the statement is properly + // described on next execution + while (!pendingDescribeStatementQueue.isEmpty()) { + DescribeRequest request = pendingDescribeStatementQueue.removeFirst(); + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " FE marking setStatementDescribed(false) for query {0}", QuerySanitizer.filterCredentials(request.query.toString())); + request.query.setStatementDescribed(false); + } + while (!pendingDescribePortalQueue.isEmpty()) { + SimpleQuery describePortalQuery = pendingDescribePortalQueue.removeFirst(); + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " FE marking setPortalDescribed(false) for query {0}", QuerySanitizer.filterCredentials(describePortalQuery.toString())); + describePortalQuery.setPortalDescribed(false); + } + pendingBindQueue.clear(); // No more BindComplete messages expected. + pendingExecuteQueue.clear(); // No more query executions expected. + break; + + case 'G': // CopyInResponse + if(RedshiftLogger.isEnable()) { + logger.log(LogLevel.DEBUG, " <=BE CopyInResponse"); + logger.log(LogLevel.DEBUG, " FE=> CopyFail"); + } + + // COPY sub-protocol is not implemented yet + // We'll send a CopyFail message for COPY FROM STDIN so that + // server does not wait for the data. + + byte[] buf = Utils.encodeUTF8(COPY_ERROR_MESSAGE); + pgStream.sendChar('f'); + pgStream.sendInteger4(buf.length + 4 + 1); + pgStream.send(buf); + pgStream.sendChar(0); + pgStream.flush(); + sendSync(true); // send sync message + skipMessage(); // skip the response message + break; + + case 'H': // CopyOutResponse + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE CopyOutResponse"); + + skipMessage(); + // In case of CopyOutResponse, we cannot abort data transfer, + // so just throw an error and ignore CopyData messages + handler.handleError( + new RedshiftException(GT.tr(COPY_ERROR_MESSAGE), + RedshiftState.NOT_IMPLEMENTED)); + break; + + case 'c': // CopyDone + skipMessage(); + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE CopyDone"); + break; + + case 'd': // CopyData + skipMessage(); + if(RedshiftLogger.isEnable()) + logger.log(LogLevel.DEBUG, " <=BE CopyData"); + break; + + default: + throw new IOException("Unexpected packet type: " + c); } - } - endQuery = true; - - // Reset the statement name of Parses that failed. - while (!pendingParseQueue.isEmpty()) { - SimpleQuery failedQuery = pendingParseQueue.removeFirst(); - failedQuery.unprepare(); - } - - pendingParseQueue.clear(); // No more ParseComplete messages expected. - // Pending "describe" requests might be there in case of error - // If that is the case, reset "described" status, so the statement is properly - // described on next execution - while (!pendingDescribeStatementQueue.isEmpty()) { - DescribeRequest request = pendingDescribeStatementQueue.removeFirst(); - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " FE marking setStatementDescribed(false) for query {0}", QuerySanitizer.filterCredentials(request.query.toString())); - request.query.setStatementDescribed(false); - } - while (!pendingDescribePortalQueue.isEmpty()) { - SimpleQuery describePortalQuery = pendingDescribePortalQueue.removeFirst(); - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " FE marking setPortalDescribed(false) for query {0}", QuerySanitizer.filterCredentials(describePortalQuery.toString())); - describePortalQuery.setPortalDescribed(false); - } - pendingBindQueue.clear(); // No more BindComplete messages expected. - pendingExecuteQueue.clear(); // No more query executions expected. - break; - - case 'G': // CopyInResponse - if(RedshiftLogger.isEnable()) { - logger.log(LogLevel.DEBUG, " <=BE CopyInResponse"); - logger.log(LogLevel.DEBUG, " FE=> CopyFail"); - } - - // COPY sub-protocol is not implemented yet - // We'll send a CopyFail message for COPY FROM STDIN so that - // server does not wait for the data. - - byte[] buf = Utils.encodeUTF8(COPY_ERROR_MESSAGE); - pgStream.sendChar('f'); - pgStream.sendInteger4(buf.length + 4 + 1); - pgStream.send(buf); - pgStream.sendChar(0); - pgStream.flush(); - sendSync(true); // send sync message - skipMessage(); // skip the response message - break; - - case 'H': // CopyOutResponse - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE CopyOutResponse"); - - skipMessage(); - // In case of CopyOutResponse, we cannot abort data transfer, - // so just throw an error and ignore CopyData messages - handler.handleError( - new RedshiftException(GT.tr(COPY_ERROR_MESSAGE), - RedshiftState.NOT_IMPLEMENTED)); - break; - - case 'c': // CopyDone - skipMessage(); - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE CopyDone"); - break; - - case 'd': // CopyData - skipMessage(); - if(RedshiftLogger.isEnable()) - logger.log(LogLevel.DEBUG, " <=BE CopyData"); - break; - - default: - throw new IOException("Unexpected packet type: " + c); - } + } + } catch (Exception e) { + if (RedshiftLogger.isEnable()) { + logger.log(LogLevel.ERROR, e, "Exception in query executor processResultsOnThread"); + } + throw e; } + } /** diff --git a/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java b/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java index 7f7fae5..f3cc110 100644 --- a/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java +++ b/src/main/java/com/amazon/redshift/jdbc/RedshiftCallableStatement.java @@ -942,8 +942,21 @@ private void getResults() throws SQLException { ResultSet rs; synchronized (this) { checkClosed(); + if (null == result) { + throw new RedshiftException( + GT.tr("Results cannot be retrieved from a CallableStatement before it is executed."), + RedshiftState.NO_DATA); + } rs = result.getResultSet(); } + + if(null == rs) { + if (!returnTypeSet) { + throw new RedshiftException(GT.tr("No function outputs were registered."), + RedshiftState.OBJECT_NOT_IN_STATE); + } + } + if (!rs.next()) { throw new RedshiftException(GT.tr("A CallableStatement was executed with nothing returned."), RedshiftState.NO_DATA);