From 060bfc6c41277907af523d7bb62e02eb91b5bc86 Mon Sep 17 00:00:00 2001 From: selvaganesang Date: Tue, 17 Apr 2018 05:09:04 +0000 Subject: [PATCH 1/2] [TRAFODION-3009] Streamline error handling in Executor utility commands Changes to avoid allocation of ComDiagsArea in some more places unless it is needed to pass errors or warnings. [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables Disabling USE_LIBHDFS_SCAN by default. The new implementation of Hdfs Scan is now used to scan the text type hive files. Hdfs Scan is now stopped gracefully when the hive scan is cancelled. This avoids the random core seen with new implementation. --- core/sql/executor/ExExeUtilCli.cpp | 4 +- core/sql/executor/ExExeUtilLoad.cpp | 202 ++---------------- core/sql/executor/ExHdfsScan.cpp | 2 + core/sql/sqlcomp/nadefaults.cpp | 2 +- .../java/org/trafodion/sql/HDFSClient.java | 2 +- 5 files changed, 27 insertions(+), 185 deletions(-) diff --git a/core/sql/executor/ExExeUtilCli.cpp b/core/sql/executor/ExExeUtilCli.cpp index aa1553b331..eb7ae04ffa 100644 --- a/core/sql/executor/ExExeUtilCli.cpp +++ b/core/sql/executor/ExExeUtilCli.cpp @@ -1094,7 +1094,7 @@ Lng32 ExeCliInterface::executeImmediate(const char * stmtStr, Lng32 retcode = 0; ComDiagsArea * tempDiags = NULL; - if (globalDiags != NULL && *globalDiags != NULL) + if (globalDiags != NULL && *globalDiags != NULL && (*globalDiags)->getNumber() > 0) { tempDiags = ComDiagsArea::allocate(heap_); tempDiags->mergeAfter(**globalDiags); @@ -1119,7 +1119,7 @@ Lng32 ExeCliInterface::executeImmediate(const char * stmtStr, { // Allocate the diagnostics area if needed // and populate the diagnostics conditions - if (*globalDiags == NULL && retcode != 0) { + if (*globalDiags == NULL && retcode != 0 && retcode != 100) { *globalDiags = ComDiagsArea::allocate(getHeap()); SQL_EXEC_MergeDiagnostics_Internal(**globalDiags); } diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index 5f33b72dc4..5024f8cdac 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -124,9 +124,7 @@ short ExExeUtilCreateTableAsTcb::work() ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals(); ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals(); ContextCli *currContext = masterGlob->getStatement()->getContext(); - - ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()-> - castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction(); + ExTransaction *ta = currContext->getTransaction(); while (1) { @@ -535,69 +533,24 @@ short ExExeUtilCreateTableAsTcb::work() case DONE_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = - pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_NO_DATA; - - // insert into parent - qparent_.up->insert(); - + retcode = handleDone(); + if (retcode == 1) + return WORK_OK; step_ = INITIAL_; - qparent_.down->removeHead(); - return WORK_OK; } break; case ERROR_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = - pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_SQLERROR; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = - ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap()); - else - diagsArea->incrRefCount (); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - - up_entry->setDiagsArea (diagsArea); - - // insert into parent - qparent_.up->insert(); - - pstate.matches_ = 0; - + retcode = handleError(); + if (retcode == 1) + return WORK_OK; step_ = DONE_; } break; - - } // switch } // while - - return WORK_OK; @@ -1679,74 +1632,20 @@ short ExExeUtilHBaseBulkLoadTcb::work() case DONE_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_NO_DATA; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - + retcode = handleDone(); + if (retcode == 1) + return WORK_OK; masterGlob->setRowsAffected(rowsAffected_); - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); step_ = INITIAL_; - qparent_.down->removeHead(); return WORK_OK; } break; case LOAD_ERROR_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_SQLERROR; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - { - diagsArea->mergeAfter(*getDiagsArea()); - diagsArea->setRowCount(rowsAffected_); - } - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); - - pstate.matches_ = 0; - - - + retcode = handleError(); + if (retcode == 1) + return WORK_OK; step_ = DONE_; } break; @@ -2153,9 +2052,8 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() ex_queue_entry * pentry_down = qparent_.down->getHeadEntry(); ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate); - - ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()-> - castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction(); + ExMasterStmtGlobals *masterGlob = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals(); + ExTransaction *ta = masterGlob->getStatement()->getContext()->getTransaction(); while (1) { @@ -2177,7 +2075,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() { ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, retcode, "ExpHbaseInterface_JNI::init"); - handleError(); step_ = UNLOAD_END_ERROR_; break; } @@ -2327,7 +2224,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, hbcRetCode, "HBaseClient_JNI::createSnapshot/verifySnapshot", snapshotsList_->at(i)->snapshotName->data() ); - handleError(); step_ = UNLOAD_END_ERROR_; break; } @@ -2389,7 +2285,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, hbcRetCode, "HBaseClient_JNI::createSnapshot/verifySnapshot", snapshotsList_->at(i)->snapshotName->data() ); - handleError(); step_ = UNLOAD_END_ERROR_; break; } @@ -2477,36 +2372,11 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() case DONE_: { - if (qparent_.up->isFull()) + retcode = handleDone(); + if (retcode == 1) return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_NO_DATA; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - diagsArea->setRowCount(rowsAffected_); - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); + masterGlob->setRowsAffected(rowsAffected_); step_ = INITIAL_; - qparent_.down->removeHead(); - freeResources(); return WORK_OK; } @@ -2514,36 +2384,9 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() case UNLOAD_ERROR_: { - if (qparent_.up->isFull()) - return WORK_OK; - - // Return EOF. - ex_queue_entry * up_entry = qparent_.up->getTailEntry(); - - up_entry->upState.parentIndex = pentry_down->downState.parentIndex; - - up_entry->upState.setMatchNo(0); - up_entry->upState.status = ex_queue::Q_SQLERROR; - - ComDiagsArea *diagsArea = up_entry->getDiagsArea(); - - if (diagsArea == NULL) - diagsArea = ComDiagsArea::allocate(getMyHeap()); - else - diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count - - if (getDiagsArea()) - diagsArea->mergeAfter(*getDiagsArea()); - - up_entry->setDiagsArea(diagsArea); - - // insert into parent - qparent_.up->insert(); - - pstate.matches_ = 0; - - - + retcode = handleError(); + if (retcode == 1) + return WORK_OK; step_ = DONE_; } break; @@ -2552,7 +2395,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work() } // while return WORK_OK; - } short ExExeUtilHBaseBulkUnLoadTcb::moveRowToUpQueue(const char * row, Lng32 len, @@ -3447,7 +3289,6 @@ short ExExeUtilLobExtractTcb::work() retcode = handleError(); if (retcode == 1) return WORK_OK; - step_ = DONE_; } break; @@ -3457,7 +3298,6 @@ short ExExeUtilLobExtractTcb::work() retcode = handleDone(); if (retcode == 1) return WORK_OK; - step_ = EMPTY_; return WORK_OK; } diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index f77d714651..d09f6cd354 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -255,6 +255,8 @@ ExHdfsScanTcb::~ExHdfsScanTcb() void ExHdfsScanTcb::freeResources() { + if (hdfsScan_ != NULL) + hdfsScan_->stop(); if (loggingFileName_ != NULL) { NADELETEBASIC(loggingFileName_, getHeap()); loggingFileName_ = NULL; diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 244a2bcb1a..0d7b41bcf4 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -3033,7 +3033,7 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"), // Use large queues on RHS of Flow/Nested Join when appropriate DDkwd__(USE_LARGE_QUEUES, "ON"), - DDkwd__(USE_LIBHDFS_SCAN, "ON"), + DDkwd__(USE_LIBHDFS_SCAN, "OFF"), DDkwd__(USE_MAINTAIN_CONTROL_TABLE, "OFF"), diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index ff78d3d0dd..0346bef891 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -470,7 +470,7 @@ public void stop() throws IOException { if (future_ != null) { try { - future_.get(200, TimeUnit.MILLISECONDS); + future_.get(30, TimeUnit.SECONDS); } catch(TimeoutException e) { logger_.error("Asynchronous Thread of HdfsScan is Cancelled (timeout), ", e); future_.cancel(true); From 37ab3c0331db8d8cd1ab026e1833bd492cea0e76 Mon Sep 17 00:00:00 2001 From: selvaganesang Date: Wed, 18 Apr 2018 20:03:24 +0000 Subject: [PATCH 2/2] [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables Fix for seabase/TEST031 failure with USE_LIBHDFS_SCAN turned 'OFF' --- core/sql/executor/ExHdfsScan.cpp | 55 +++++++++++++++++++------------- core/sql/executor/ExHdfsScan.h | 2 +- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index d09f6cd354..dcf0d07717 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -124,8 +124,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( , hdfsScan_(NULL) , hdfsStats_(NULL) , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries()) - , errBuf_(NULL) - + , numFiles_(0) { Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); @@ -320,8 +319,6 @@ void ExHdfsScanTcb::freeResources() NADELETE(logFileHdfsClient_, HdfsClient, getHeap()); if (hdfsScan_ != NULL) NADELETE(hdfsScan_, HdfsScan, getHeap()); - if (errBuf_ != NULL) - NADELETEBASIC(errBuf_, getHeap()); } NABoolean ExHdfsScanTcb::needStatsEntry() @@ -628,14 +625,23 @@ ExWorkProcRetcode ExHdfsScanTcb::work() else extraBytesRead_ = 0; // headRoom_ is the number of extra bytes to be read (rangeTailIOSize) - // If EOF is reached while reading the range and the extraBytes read - // is less than headRoom_ then process all the data till EOF - // TODO: If the whole range fits in one buffer, it is need too to process rows till EOF for the last range alone - // No easy way to identify that last range read, but can identify that it is not the first range. - // The rows could be read more than once if there are more than 2 ranges. - // Fix optimizer not to have more than 2 ranges in that case - if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && hdfo->getStartOffset() != 0) - extraBytesRead_ = 0; + // If the whole range fits in one buffer, it is needed to process rows till EOF for the last range alone. +/* + if (retArray_[IS_EOF] && (extraBytesRead_ < headRoom_) + && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1))) + extraBytesRead_ = 0; +*/ + if (numFiles_ <= 1) { + if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1))) + extraBytesRead_ = 0; + } + else { + // If EOF is reached while reading the range and the extraBytes read + // is less than headRoom_ then process all the data till EOF + if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ ) + extraBytesRead_ = 0; + } + bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_; prevRangeNum_ = retArray_[RANGE_NO]; headRoomCopied_ = 0; @@ -656,6 +662,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work() } else hdfsBufNextRow_ = (char *)bufBegin_; + QRLogger::log(CAT_SQL_EXE, LL_DEBUG, "FileName %s Offset %ld BytesToRead %ld BytesRead %ld RangeNo %d IsEOF %d BufBegin: 0x%lx BufEnd: 0x%lx BufLogicalEnd: 0x%lx headRoom %d extraBytes %d recordSkip %d ", + hdfo->fileName(), hdfo->getStartOffset(), hdfo->bytesToRead_, retArray_[BYTES_COMPLETED], retArray_[RANGE_NO], retArray_[IS_EOF], bufBegin_ , bufEnd_, bufLogicalEnd_, headRoom_, extraBytesRead_, recordSkip_); + // If the first record starts after the logical end, this record should have been processed by other ESPs + if ((BYTE *)hdfsBufNextRow_ > bufLogicalEnd_) { + headRoomCopied_ = 0; + hdfsBufNextRow_ = NULL; + } step_ = PROCESS_HDFS_ROW; } break; @@ -784,16 +797,14 @@ ExWorkProcRetcode ExHdfsScanTcb::work() ComDiagsArea * diagsArea = NULL; if (hdfsErrorDetail == ENOENT) { - if (errBuf_ != NULL) - NADELETEBASIC(errBuf_, getHeap()); Lng32 len = strlen(hdfsScanTdb().tableName()) + strlen(hdfsFileName_) + 100; - errBuf_ = new (getHeap()) char[len]; - snprintf(errBuf_, len, "%s (fileLoc: %s)", + char errBuf[len]; + snprintf(errBuf, len, "%s (fileLoc: %s)", hdfsScanTdb().tableName(), hdfsFileName_); ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(EXE_TABLE_NOT_FOUND), NULL, NULL, NULL, NULL, - errBuf_); + errBuf); } else ExRaiseSqlError(getHeap(), &diagsArea, @@ -1091,7 +1102,8 @@ ExWorkProcRetcode ExHdfsScanTcb::work() int err = 0; char *startOfNextRow = extractAndTransformAsciiSourceToSqlRow(err, transformDiags, hdfsScanTdb().getHiveScanMode()); - + QRLogger::log(CAT_SQL_EXE, LL_DEBUG, "HdfsBufRow 0x%lx StartOfNextRow 0x%lx RowLength %ld ", hdfsBufNextRow_, startOfNextRow, + startOfNextRow-hdfsBufNextRow_); bool rowWillBeSelected = true; lastErrorCnd_ = NULL; if(err) @@ -1936,7 +1948,6 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, void ExHdfsScanTcb::computeRangesAtRuntime() { - int numFiles = 0; Int64 totalSize = 0; Int64 myShare = 0; Int64 runningSum = 0; @@ -1949,13 +1960,13 @@ void ExHdfsScanTcb::computeRangesAtRuntime() HDFS_FileInfo *fileInfos; HDFS_Client_RetCode hdfsClientRetcode; - hdfsClientRetcode = hdfsClient_->hdfsListDirectory(hdfsScanTdb().hdfsRootDir_, &fileInfos, &numFiles); + hdfsClientRetcode = hdfsClient_->hdfsListDirectory(hdfsScanTdb().hdfsRootDir_, &fileInfos, &numFiles_); ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error:hdfsClient->hdfsListDirectory returned an error") deallocateRuntimeRanges(); // in a first round, count the total number of bytes - for (int f=0; f