From b041abd941bb1c8aabd78eccbae73695ea1cf8f3 Mon Sep 17 00:00:00 2001 From: Sandhya Sundaresan Date: Wed, 30 May 2018 18:55:46 +0000 Subject: [PATCH 1/2] Support to provide a locking mechanism for LOB insert/update operations --- core/sql/bin/SqlciErrors.txt | 1 + core/sql/cli/Cli.cpp | 15 ++++ core/sql/cli/Cli.h | 8 +- core/sql/cli/CliExtern.cpp | 122 +++++++++++++++++++++++++++ core/sql/cli/Context.cpp | 53 ++++++++++++ core/sql/cli/Context.h | 6 +- core/sql/cli/SessionDefaults.cpp | 1 + core/sql/comexe/ComTdbExeUtil.h | 9 +- core/sql/executor/ExExeUtil.h | 1 + core/sql/executor/ExExeUtilLoad.cpp | 70 +++++++++++++-- core/sql/exp/ExpErrorEnums.h | 1 + core/sql/exp/ExpLOB.cpp | 113 +++++++++++++++++++++---- core/sql/exp/ExpLOB.h | 15 +++- core/sql/exp/ExpLOBenums.h | 2 +- core/sql/exp/ExpLOBexternal.h | 4 +- core/sql/generator/GenItemFunc.cpp | 10 ++- core/sql/generator/GenPreCode.cpp | 4 +- core/sql/generator/GenRelExeUtil.cpp | 6 ++ core/sql/parser/sqlparser.y | 4 +- core/sql/runtimestats/SqlStats.cpp | 22 ++++- core/sql/runtimestats/SqlStats.h | 3 + core/sql/runtimestats/rts_msg.cpp | 39 +++++++++ core/sql/runtimestats/rts_msg.h | 40 ++++++++- core/sql/runtimestats/sscpipc.cpp | 53 ++++++++++++ core/sql/runtimestats/sscpipc.h | 1 + core/sql/runtimestats/ssmpipc.cpp | 52 ++++++++++++ core/sql/runtimestats/ssmpipc.h | 5 +- core/sql/sqlcomp/DefaultConstants.h | 1 + core/sql/sqlcomp/nadefaults.cpp | 5 ++ 29 files changed, 628 insertions(+), 38 deletions(-) diff --git a/core/sql/bin/SqlciErrors.txt b/core/sql/bin/SqlciErrors.txt index 8146be4013..30500946e9 100644 --- a/core/sql/bin/SqlciErrors.txt +++ b/core/sql/bin/SqlciErrors.txt @@ -1597,6 +1597,7 @@ $1~String1 -------------------------------- 8555 ZZZZZ 99999 ADVANCED CRTCL DIALOUT An internal error occurred in the SQL executor in the disk process. 8556 ZZZZZ 99999 BEGINNER MAJOR DBADMIN An error occurred while accessing HBase table $0~string0. $1~string1 8557 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The file name passed to externaltolob exceeds 256 bytes. +8558 ZZZZZ 99999 BEGINNER MAJOR DBADMIN The LOB operation is prevented due to conflicting access $0~string0. 8570 ZZZZZ 99999 ADVANCED MAJOR DBADMIN SQL could not allocate sufficient memory to build query. 8571 ZZZZZ 99999 ADVANCED MAJOR DBADMIN SQL could not allocate sufficient memory to execute query. 8572 ZZZZZ 99999 ADVANCED CRTCL DIALOUT The statement has incurred a fatal error and must be deallocated. diff --git a/core/sql/cli/Cli.cpp b/core/sql/cli/Cli.cpp index a06aadb492..a942bcaddd 100644 --- a/core/sql/cli/Cli.cpp +++ b/core/sql/cli/Cli.cpp @@ -8088,6 +8088,21 @@ Lng32 SQLCLI_GetSecInvalidKeys(CliGlobals *cliGlobals, return retcode; } +Lng32 SQLCLI_SetLobLock(CliGlobals *cliGlobals, + /* IN */ char *lobLockId + ) +{ + return cliGlobals->currContext()->setLobLock(lobLockId); +} +Lng32 SQLCLI_CheckLobLock(CliGlobals *cliGlobals, + /* IN */ char *lobLockId, + /*OUT */ NABoolean *found + ) +{ + Int32 retcode = 0; + retcode = cliGlobals->currContext()->checkLobLock(lobLockId, found); + return retcode; +} Lng32 SQLCLI_GetStatistics2(CliGlobals *cliGlobals, /* IN */ short statsReqType, /* IN */ char *statsReqStr, diff --git a/core/sql/cli/Cli.h b/core/sql/cli/Cli.h index bc8ae16a0c..a8a6b9a54c 100644 --- a/core/sql/cli/Cli.h +++ b/core/sql/cli/Cli.h @@ -763,7 +763,13 @@ Lng32 SQLCLI_GetSecInvalidKeys(CliGlobals *cliGlobals, /* IN/OUT */ Int32 *returnedNumSiKeys, /* IN/OUT */ Int64 *maxTimestamp); - +Lng32 SQLCLI_SetLobLock(CliGlobals *cliGlobals, + /* IN */ char * lobLockId + ); +Lng32 SQLCLI_CheckLobLock(CliGlobals *cliGlobals, + /* IN */ char *lobLockId, + /*OUT */ NABoolean *found + ); Lng32 SQLCLI_GetStatistics2(CliGlobals *cliGlobals, /* IN */ short statsReqType, /* IN */ char *statsReqStr, diff --git a/core/sql/cli/CliExtern.cpp b/core/sql/cli/CliExtern.cpp index 38a766c950..69e4717276 100644 --- a/core/sql/cli/CliExtern.cpp +++ b/core/sql/cli/CliExtern.cpp @@ -87,6 +87,7 @@ #include "Context.h" #include #include "QRLogger.h" +#include "ExpLOBenums.h" extern char ** environ; @@ -6122,6 +6123,7 @@ Lng32 SQL_EXEC_SetSecInvalidKeys( return retcode; } + Lng32 SQL_EXEC_GetSecInvalidKeys( /* IN */ Int64 prevTimestamp, /* IN/OUT */ SQL_QIKEY siKeys[], @@ -6165,6 +6167,126 @@ Lng32 SQL_EXEC_GetSecInvalidKeys( return retcode; } +Lng32 SQL_EXEC_SetLobLock(/* IN */ char *llid) +{ + + Lng32 retcode = 0; + if (!llid || strlen(llid) == 0 ) + return retcode; + CLISemaphore *tmpSemaphore = NULL; + ContextCli *threadContext; + CLI_NONPRIV_PROLOGUE(retcode); + try + { + tmpSemaphore = getCliSemaphore(threadContext); + tmpSemaphore->get(); + threadContext->incrNumOfCliCalls(); + char llidAdd[LOB_LOCK_ID_SIZE+1]; + // Prepend a '+' to indicate we are setting a new lock in the + // shared segement + llidAdd[0] = '+'; + memcpy(&llidAdd[1],llid,LOB_LOCK_ID_SIZE); + retcode = SQLCLI_SetLobLock(GetCliGlobals(), + (char *)llidAdd); + } + catch(...) + { + retcode = -CLI_INTERNAL_ERROR; +#if defined(_THROW_EXCEPTIONS) + if (cliWillThrow()) + { + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + throw; + } +#endif + } + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + + + RecordError(NULL, retcode); + return retcode; +} + +Lng32 SQL_EXEC_ReleaseLobLock(/* IN */ char *llid) +{ + Lng32 retcode = 0; + if (!llid || strlen(llid) ==0 ) + return retcode; + CLISemaphore *tmpSemaphore = NULL; + ContextCli *threadContext; + CLI_NONPRIV_PROLOGUE(retcode); + try + { + tmpSemaphore = getCliSemaphore(threadContext); + tmpSemaphore->get(); + threadContext->incrNumOfCliCalls(); + char llidDel[LOB_LOCK_ID_SIZE+1]; + // Prepend a '-' to indicate we are removing this lock from the + // shared segement + llidDel[0] = '-'; + memcpy(&llidDel[1],llid,LOB_LOCK_ID_SIZE); + retcode = SQLCLI_SetLobLock(GetCliGlobals(), + (char *)llidDel); + } + catch(...) + { + retcode = -CLI_INTERNAL_ERROR; +#if defined(_THROW_EXCEPTIONS) + if (cliWillThrow()) + { + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + throw; + } +#endif + } + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + + + RecordError(NULL, retcode); + return retcode; +} +Lng32 SQL_EXEC_CheckLobLock(/* IN */ char * llid, /* IN */ NABoolean *found) +{ + Lng32 retcode=0; + if (!llid || (strlen(llid)==0)) + { + *found = FALSE; + return retcode; + } + CLISemaphore *tmpSemaphore = NULL; + ContextCli *threadContext; + CLI_NONPRIV_PROLOGUE(retcode); + try + { + tmpSemaphore = getCliSemaphore(threadContext); + tmpSemaphore->get(); + threadContext->incrNumOfCliCalls(); + retcode = SQLCLI_CheckLobLock(GetCliGlobals(), + llid, found); + } + catch(...) + { + retcode = -CLI_INTERNAL_ERROR; +#if defined(_THROW_EXCEPTIONS) + if (cliWillThrow()) + { + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + throw; + } +#endif + } + threadContext->decrNumOfCliCalls(); + tmpSemaphore->release(); + + + RecordError(NULL, retcode); + return retcode; +} Lng32 SQL_EXEC_GetStatistics2( /* IN */ short statsReqType, /* IN */ char *statsReqStr, diff --git a/core/sql/cli/Context.cpp b/core/sql/cli/Context.cpp index 7aee780fa6..ac09a9cf06 100644 --- a/core/sql/cli/Context.cpp +++ b/core/sql/cli/Context.cpp @@ -3163,6 +3163,59 @@ Lng32 ContextCli::setSecInvalidKeys( } +Int32 ContextCli::checkLobLock(char *inLobLockId, NABoolean *found) +{ + Int32 retcode = 0; + *found = FALSE; + CliGlobals *cliGlobals = getCliGlobals(); + StatsGlobals *statsGlobals = GetCliGlobals()->getStatsGlobals(); + if (cliGlobals->getStatsGlobals() == NULL) + { + (diagsArea_) << DgSqlCode(-EXE_RTS_NOT_STARTED); + return diagsArea_.mainSQLCODE(); + } + statsGlobals->checkLobLock(cliGlobals,inLobLockId); + if (inLobLockId != NULL) + *found = TRUE; + return retcode; +} +Lng32 ContextCli::setLobLock( + /* IN */ char *lobLockId // objID+column number + ) +{ + CliGlobals *cliGlobals = getCliGlobals(); + if (cliGlobals->getStatsGlobals() == NULL) + { + (diagsArea_) << DgSqlCode(-EXE_RTS_NOT_STARTED); + return diagsArea_.mainSQLCODE(); + } + ComDiagsArea *tempDiagsArea = &diagsArea_; + tempDiagsArea->clear(); + + IpcServer *ssmpServer = ssmpManager_->getSsmpServer(exHeap(), + cliGlobals->myNodeName(), + cliGlobals->myCpu(), tempDiagsArea); + if (ssmpServer == NULL) + return diagsArea_.mainSQLCODE(); + + SsmpClientMsgStream *ssmpMsgStream = new (cliGlobals->getIpcHeap()) + SsmpClientMsgStream((NAHeap *)cliGlobals->getIpcHeap(), + ssmpManager_, tempDiagsArea); + ssmpMsgStream->addRecipient(ssmpServer->getControlConnection()); + LobLockRequest *llMsg = + new (cliGlobals->getIpcHeap()) LobLockRequest( + cliGlobals->getIpcHeap(), + lobLockId); + *ssmpMsgStream << *llMsg; + // Call send with no timeout. + ssmpMsgStream->send(); + // I/O is now complete. + llMsg->decrRefCount(); + cliGlobals->getEnvironment()->deleteCompletedMessages(); + ssmpManager_->cleanupDeletedSsmpServers(); + return diagsArea_.mainSQLCODE(); + +} ExStatisticsArea *ContextCli::getMergedStats( /* IN */ short statsReqType, /* IN */ char *statsReqStr, diff --git a/core/sql/cli/Context.h b/core/sql/cli/Context.h index d64ac096d3..c000714805 100644 --- a/core/sql/cli/Context.h +++ b/core/sql/cli/Context.h @@ -1006,7 +1006,11 @@ class ContextCli : public ExGod { Lng32 setSecInvalidKeys( /* IN */ Int32 numSiKeys, /* IN */ SQL_QIKEY siKeys[]); - + Int32 checkLobLock(char* inLobLockId, NABoolean *found); + + Lng32 setLobLock( + /* IN */ char *lobLockId// objID+column number + ); Lng32 holdAndSetCQD(const char * defaultName, const char * defaultValue); Lng32 restoreCQD(const char * defaultName); diff --git a/core/sql/cli/SessionDefaults.cpp b/core/sql/cli/SessionDefaults.cpp index 966ce77280..1e656bdc20 100644 --- a/core/sql/cli/SessionDefaults.cpp +++ b/core/sql/cli/SessionDefaults.cpp @@ -792,6 +792,7 @@ static const AQRInfo::AQRErrorMap aqrErrorMap[] = // locked row timeout AQREntry( 8550, 73, 2, 0, 0, 0, "", 0, 0), AQREntry( 8550, 78, 1, 60, 0, 0, "", 0, 0), + AQREntry( 8558, 0 , 2, 30, 0, 0, "", 0, 0), AQREntry( 8551, 12, 1, 60, 0, 0, "", 0, 0), diff --git a/core/sql/comexe/ComTdbExeUtil.h b/core/sql/comexe/ComTdbExeUtil.h index 47164608ff..5ae7084314 100644 --- a/core/sql/comexe/ComTdbExeUtil.h +++ b/core/sql/comexe/ComTdbExeUtil.h @@ -2921,6 +2921,7 @@ class ComTdbExeUtilLobExtract : public ComTdbExeUtil APPEND_OR_CREATE = 0x0080, RETRIEVE_HDFSFILENAME= 0x0100, RETRIEVE_OFFSET=0x0200 + }; @@ -3023,6 +3024,11 @@ class ComTdbExeUtilLobUpdate : public ComTdbExeUtil void setReplace(NABoolean v) {(v ? flags_ |= REPLACE_ : flags_ &= ~REPLACE_); }; NABoolean isReplace() { return (flags_ & REPLACE_) != 0; }; + + void setLobLocking(NABoolean v) + {(v ? flags_ |= LOB_LOCKING_ : flags_ &= ~LOB_LOCKING_); }; + NABoolean lobLocking() { return (flags_ & LOB_LOCKING_) != 0; }; + void setUpdateSize(Int64 upd_size){ updateSize_ = upd_size;}; Int64 updateSize() { return updateSize_;} void setTotalBufSize(Int64 bufSize) { totalBufSize_ = bufSize;}; @@ -3041,7 +3047,8 @@ class ComTdbExeUtilLobUpdate : public ComTdbExeUtil ERROR_IF_EXISTS_ = 0x0001, TRUNCATE_ = 0x0002, APPEND_ = 0x0004, - REPLACE_=0x0008 + REPLACE_=0x0008, + LOB_LOCKING_=0x0010 }; NABasicPtr handle_; Int32 handleLen_; diff --git a/core/sql/executor/ExExeUtil.h b/core/sql/executor/ExExeUtil.h index a3f197de8d..aec3ccba20 100644 --- a/core/sql/executor/ExExeUtil.h +++ b/core/sql/executor/ExExeUtil.h @@ -3083,6 +3083,7 @@ class ExExeUtilLobUpdateTcb : public ExExeUtilTcb ExLobStats lobStats_; char statusString_[200]; fstream indata_; + char lobLockId_[12]; ExLobGlobals *exLobGlobals_; }; // ----------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index bcd52f2bfc..e5624300aa 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -3325,7 +3325,7 @@ ExExeUtilLobUpdateTcb::ExExeUtilLobUpdateTcb lobHandleLen_ = 2050; lobHandle_[0] = '\0'; exLobGlobals_=NULL; - + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); ExpLOBinterfaceInit(exLobGlobals_,currContext->exHeap(),currContext,TRUE, lobTdb().getLobHdfsServer(), lobTdb().getLobHdfsPort()); @@ -3349,7 +3349,6 @@ short ExExeUtilLobUpdateTcb::work() { Lng32 cliRC = 0; Lng32 retcode = 0; - // if no parent request, return if (qparent_.down->isEmpty()) return WORK_OK; @@ -3459,6 +3458,25 @@ short ExExeUtilLobUpdateTcb::work() char outLobHandle[LOB_HANDLE_LEN]; Int32 outHandleLen; Int64 requestTag = 0; + if (lobTdb().lobLocking()) + { + ExpLOBoper::genLobLockId(uid,lobNum,lobLockId_); + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(lobLockId_ , &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(lobLockId_); + } + else if (found) + { + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); + ExRaiseSqlError(getHeap(), &diagsArea_, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + + step_=HANDLE_ERROR_; + break; + } + } retcode = ExpLOBInterfaceUpdate(lobGlobs, lobTdb().getLobHdfsServer(), lobTdb().getLobHdfsPort(), @@ -3521,6 +3539,24 @@ short ExExeUtilLobUpdateTcb::work() lobDataLen_ = lobTdb().totalBufSize_; strcpy(lobLoc_, lobTdb().getLobLocation()); + if (lobTdb().lobLocking()) + { + ExpLOBoper::genLobLockId(uid,lobNum,lobLockId_);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(lobLockId_, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(lobLockId_); + } + else if (found) + { + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); + ExRaiseSqlError(getHeap(), &diagsArea_, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + step_=HANDLE_ERROR_; + break; + } + } char outLobHandle[LOB_HANDLE_LEN]; Int32 outHandleLen; Int64 requestTag = 0; @@ -3586,8 +3622,25 @@ short ExExeUtilLobUpdateTcb::work() lobDataLen_ = lobTdb().totalBufSize_; strcpy(lobLoc_, lobTdb().getLobLocation()); - - + + if (lobTdb().lobLocking()) + { + ExpLOBoper::genLobLockId(uid,lobNum,lobLockId_);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(lobLockId_, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(lobLockId_); + } + else if (found) + { + memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); + ExRaiseSqlError(getHeap(), &diagsArea_, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + step_=HANDLE_ERROR_; + break; + } + } char outLobHandle[LOB_HANDLE_LEN]; Int32 outHandleLen; Int64 requestTag = 0; @@ -3653,8 +3706,13 @@ short ExExeUtilLobUpdateTcb::work() case HANDLE_ERROR_: { retcode = handleError(); + if (retcode == 1) - return WORK_OK; + { + if (lobLockId_[0] && lobTdb().lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(lobLockId_); + return WORK_OK; + } step_ = DONE_; } @@ -3662,6 +3720,8 @@ short ExExeUtilLobUpdateTcb::work() case DONE_: { retcode = handleDone(); + if(lobLockId_[0] && lobTdb().lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(lobLockId_); if (retcode == 1) return WORK_OK; diff --git a/core/sql/exp/ExpErrorEnums.h b/core/sql/exp/ExpErrorEnums.h index 8227cb8d98..2068c75424 100644 --- a/core/sql/exp/ExpErrorEnums.h +++ b/core/sql/exp/ExpErrorEnums.h @@ -175,6 +175,7 @@ enum ExeErrorCode EXE_ERROR_STREAM_OVERFLOW = 8553, EXE_EID_INTERNAL_ERROR = 8555, EXE_HBASE_ACCESS_ERROR = 8556, + EXE_LOB_CONCURRENT_ACCESS_ERROR = 8558, EXE_LAST_ERROR_FROM_FS_DP2 = 8569, // --------------------------------------------------------------------- diff --git a/core/sql/exp/ExpLOB.cpp b/core/sql/exp/ExpLOB.cpp index 5d3bf053a0..2ef8ae4f9e 100644 --- a/core/sql/exp/ExpLOB.cpp +++ b/core/sql/exp/ExpLOB.cpp @@ -278,7 +278,6 @@ Lng32 ExpLOBoper::dropLOB(ExLobGlobals * exLobGlob, ContextCli *currContext, Lng32 rc = 0; - // Call ExeLOBinterface to create the LOB // Call ExeLOBinterface to drop the LOB rc = ExpLOBinterfaceDrop(exLobGlob,hdfsServer, hdfsPort, lobName, lobLoc); @@ -442,6 +441,19 @@ Lng32 ExpLOBoper::extractFromLOBhandle(Int16 *flags, return 0; } +// 12 byte lock identifier uniquely identifies the LOB file that is being +// locked. +// Each LOB column has a unique lob number and +// each column has a unique data file. +void ExpLOBoper::genLobLockId(Int64 objid, Int32 lobNum, char *llid) +{ + memset(llid,'\0',LOB_LOCK_ID_SIZE); + if (objid != -1 && lobNum != -1) + { + memcpy(llid,&objid,sizeof(Int64)) ; + memcpy(&(llid[sizeof(Int64)]),&lobNum,sizeof(Int32)); + } +} // creates LOB handle in string format. void ExpLOBoper::createLOBhandleString(Int16 flags, @@ -664,6 +676,7 @@ void ExpLOBinsert::displayContents(Space * space, const char * displayStr, space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short)); } + ex_expr::exp_return_type ExpLOBiud::insertDesc(char *op_data[], CollHeap*h, ComDiagsArea** diagsArea) @@ -1010,18 +1023,45 @@ ex_expr::exp_return_type ExpLOBinsert::eval(char *op_data[], { ex_expr::exp_return_type err; - + Int32 retcode = 0; + char llid[LOB_LOCK_ID_SIZE]; + if (lobLocking()) + { + ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid); + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(llid); + } + else + { + ExRaiseSqlError(h, diagsArea, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + + return ex_expr::EXPR_ERROR; + } + } err = insertDesc(op_data, h, diagsArea); if (err == ex_expr::EXPR_ERROR) - return err; + { + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); + return err; + } if(fromEmpty()) - return err; + { + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); + return err; + } char * handle = op_data[0]; Lng32 handleLen = getOperand(0)->getLength(); err = insertData(handleLen, handle, op_data, h, diagsArea); - + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); return err; } @@ -1183,7 +1223,7 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], CollHeap*h, ComDiagsArea** diagsArea) { - Lng32 rc; + Lng32 rc, retcode = 0; Lng32 lobOperStatus = checkLobOperStatus(); if (lobOperStatus == DO_NOTHING_) @@ -1230,6 +1270,24 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], lobHandle); //op_data[2]); if (sDescSyskey == -1) //updating empty lob { + + char llid[LOB_LOCK_ID_SIZE]; + if (lobLocking()) + { + ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(llid); + } + else if (found) + { + ExRaiseSqlError(h, diagsArea, + (ExeErrorCode)(EXE_LOB_CONCURRENT_ACCESS_ERROR)); + return ex_expr::EXPR_ERROR; + } + } ex_expr::exp_return_type err = insertDesc(op_data, h, diagsArea); if (err == ex_expr::EXPR_ERROR) return err; @@ -1237,7 +1295,8 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], char * handle = op_data[0]; handleLen = getOperand(0)->getLength(); err = insertData(handleLen, handle, op_data, h, diagsArea); - + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); return err; } @@ -1320,6 +1379,27 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], lobLen = 0; so = Lob_Memory; } + + char llid[LOB_LOCK_ID_SIZE]; + if (lobLocking()) + { + ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid);; + NABoolean found = FALSE; + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (! retcode && !found) + { + retcode = SQL_EXEC_SetLobLock(llid); + } + else if (found) + { + Int32 lobError = LOB_DATA_FILE_LOCK_ERROR; + ExRaiseSqlError(h, diagsArea, + (ExeErrorCode)(8558), NULL,(Int32 *)&lobError, + NULL, NULL, (char*)"ExpLOBInterfaceInsert", + getLobErrStr(LOB_DATA_FILE_LOCK_ERROR),NULL); + return ex_expr::EXPR_ERROR; + } + } if (isAppend() && !fromEmpty()) { rc = ExpLOBInterfaceUpdateAppend @@ -1366,8 +1446,9 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], fromDescKey, fromDescTS, lobMaxSize, getLobMaxChunkMemSize(),getLobGCLimit()); } - - if (rc < 0) + if (lobLocking()) + retcode = SQL_EXEC_ReleaseLobLock(llid); + if (rc < 0) { Lng32 intParam1 = -rc; ExRaiseSqlError(h, diagsArea, @@ -1375,15 +1456,15 @@ ex_expr::exp_return_type ExpLOBupdate::eval(char *op_data[], &cliError, NULL, (char*)"ExpLOBInterfaceUpdate", (char*)"ExpLOBInterfaceUpdate",getLobErrStr(intParam1)); return ex_expr::EXPR_ERROR; - } + } - // update lob handle with the returned values - str_cpy_all(result, lobHandle, handleLen); - // str_cpy_all(result, op_data[2], handleLen); - // ExpLOBoper::updLOBhandle(sDescSyskey, 0, result); - getOperand(0)->setVarLength(handleLen, op_data[-MAX_OPERANDS]); + // update lob handle with the returned values + str_cpy_all(result, lobHandle, handleLen); + // str_cpy_all(result, op_data[2], handleLen); + // ExpLOBoper::updLOBhandle(sDescSyskey, 0, result); + getOperand(0)->setVarLength(handleLen, op_data[-MAX_OPERANDS]); - return ex_expr::EXPR_OK; + return ex_expr::EXPR_OK; } diff --git a/core/sql/exp/ExpLOB.h b/core/sql/exp/ExpLOB.h index 9c398764c9..231fa20f0f 100644 --- a/core/sql/exp/ExpLOB.h +++ b/core/sql/exp/ExpLOB.h @@ -137,6 +137,7 @@ class ExpLOBoper : public ex_clause { Int64 uid, Lng32 lobNum); static Lng32 initLOBglobal(ExLobGlobals *& lobGlob, NAHeap *heap, ContextCli *currContext,char *server, Int32 port ); + static void genLobLockId(Int64 objUid,Int32 lobNum, char *llid); // Extracts values from the LOB handle stored at ptr static Lng32 extractFromLOBhandle(Int16 *flags, @@ -392,7 +393,16 @@ class ExpLOBiud : public ExpLOBoper { { (v) ? liudFlags_ |= FROM_EXTERNAL: liudFlags_ &= ~FROM_EXTERNAL; }; - + + NABoolean lobLocking() + { + return ((liudFlags_ & LOB_LOCKING) != 0); + }; + + inline void setLobLocking(NABoolean v) + { + (v) ? liudFlags_ |= LOB_LOCKING: liudFlags_ &= ~LOB_LOCKING; + }; protected: Int64 objectUID_; @@ -407,7 +417,8 @@ class ExpLOBiud : public ExpLOBoper { FROM_EXTERNAL = 0x0020, FROM_BUFFER = 0x0040, FROM_EMPTY = 0x0080, - FROM_LOB_EXTERNAL = 0x0100 + FROM_LOB_EXTERNAL = 0x0100, + LOB_LOCKING = 0x0200 }; Lng32 liudFlags_; diff --git a/core/sql/exp/ExpLOBenums.h b/core/sql/exp/ExpLOBenums.h index 9c14dafd24..9729cbccbc 100644 --- a/core/sql/exp/ExpLOBenums.h +++ b/core/sql/exp/ExpLOBenums.h @@ -32,7 +32,7 @@ #define MAX_BLACK_BOX_LEN 2048 #define LOB_DESC_HEADER_KEY 1 #define NUM_WORKER_THREADS 2 - +#define LOB_LOCK_ID_SIZE 12 // 2 threads at most, one to read and the other to pick up next read from preOpen diff --git a/core/sql/exp/ExpLOBexternal.h b/core/sql/exp/ExpLOBexternal.h index 657ef1233f..201036a015 100644 --- a/core/sql/exp/ExpLOBexternal.h +++ b/core/sql/exp/ExpLOBexternal.h @@ -171,7 +171,9 @@ Lng32 SQL_EXEC_LOBddlInterface /*IN*/ Int64 lobMaxSize, /*IN*/ NABoolean lobTrace ); - +Lng32 SQL_EXEC_SetLobLock(/* IN */ char *llid); +Lng32 SQL_EXEC_ReleaseLobLock(/* IN */ char *llid); +Lng32 SQL_EXEC_CheckLobLock(/* IN */ char *llid, /* IN */ Int32 *found); /*************************************************************************** Called by loader to load or extract buffers of data. diff --git a/core/sql/generator/GenItemFunc.cpp b/core/sql/generator/GenItemFunc.cpp index 6121110cd4..60a8f19533 100644 --- a/core/sql/generator/GenItemFunc.cpp +++ b/core/sql/generator/GenItemFunc.cpp @@ -2673,6 +2673,11 @@ short LOBinsert::codeGen(Generator * generator) else if(obj_ == LOBoper::EMPTY_LOB_) li->setFromEmpty(TRUE); + if (CmpCommon::getDefault(LOB_LOCKING) == DF_ON) + li->setLobLocking(TRUE); + else + li->setLobLocking(FALSE); + li->lobNum() = lobNum(); li->setLobStorageType(lobStorageType()); li->setLobStorageLocation((char*)lobStorageLocation().data()); @@ -2748,7 +2753,10 @@ short LOBupdate::codeGen(Generator * generator) lu->setFromBuffer(TRUE); else if(obj_ == LOBoper::EMPTY_LOB_) lu->setFromEmpty(TRUE); - + if (CmpCommon::getDefault(LOB_LOCKING) == DF_ON) + lu->setLobLocking(TRUE); + else + lu->setLobLocking(FALSE); lu->lobNum() = lobNum(); lu->setLobStorageType(lobStorageType()); lu->setLobStorageLocation((char*)lobStorageLocation().data()); diff --git a/core/sql/generator/GenPreCode.cpp b/core/sql/generator/GenPreCode.cpp index 9832c446c4..4d65701354 100644 --- a/core/sql/generator/GenPreCode.cpp +++ b/core/sql/generator/GenPreCode.cpp @@ -4520,7 +4520,9 @@ RelExpr * GenericUpdate::preCodeGen(Generator * generator, { oltOptInfo().setOltOpt(FALSE); generator->oltOptInfo()->setOltOpt(FALSE); - generator->setAqrEnabled(FALSE); + //enabling AQR to take care of the lock conflict error 8558 that + // should be retried. + // generator->setAqrEnabled(FALSE); generator->setUpdAbortOnError(TRUE); generator->setUpdSavepointOnError(FALSE); } diff --git a/core/sql/generator/GenRelExeUtil.cpp b/core/sql/generator/GenRelExeUtil.cpp index 570c3e56fb..013b48a0b5 100644 --- a/core/sql/generator/GenRelExeUtil.cpp +++ b/core/sql/generator/GenRelExeUtil.cpp @@ -4336,6 +4336,8 @@ short ExeUtilLobUpdate::codeGen(Generator * generator) const char* f = ActiveSchemaDB()->getDefaults(). getValue(LOB_STORAGE_FILE_DIR); + + char *lobLoc = space->allocateAlignedSpace(strlen(f) + 1); strcpy(lobLoc, f); ComTdbExeUtilLobUpdate * exe_util_lobupdate_tdb = new(space) @@ -4383,6 +4385,10 @@ short ExeUtilLobUpdate::codeGen(Generator * generator) exe_util_lobupdate_tdb->setAppend(TRUE); else exe_util_lobupdate_tdb->setAppend(FALSE); + if((ActiveSchemaDB()->getDefaults()).getToken(LOB_LOCKING) == DF_ON) + exe_util_lobupdate_tdb->setLobLocking(TRUE); + else + exe_util_lobupdate_tdb->setLobLocking(FALSE); generator->initTdbFields(exe_util_lobupdate_tdb); diff --git a/core/sql/parser/sqlparser.y b/core/sql/parser/sqlparser.y index b84546da17..0f52e33723 100755 --- a/core/sql/parser/sqlparser.y +++ b/core/sql/parser/sqlparser.y @@ -11728,7 +11728,7 @@ blob_optional_left_len_right: '(' NUMERIC_LITERAL_EXACT_NO_SCALE optional_lob_un if (CmpCommon::getDefault(TRAF_BLOB_AS_VARCHAR) == DF_ON) { - $$ = (Int64)CmpCommon::getDefault(TRAF_MAX_CHARACTER_COL_LENGTH ); + $$ = (Int64)CmpCommon::getDefaultNumeric(TRAF_MAX_CHARACTER_COL_LENGTH ); } else { @@ -11764,7 +11764,7 @@ clob_optional_left_len_right: '(' NUMERIC_LITERAL_EXACT_NO_SCALE optional_lob_un if (CmpCommon::getDefault(TRAF_CLOB_AS_VARCHAR) == DF_ON) { - $$ = (Int64)CmpCommon::getDefault(TRAF_MAX_CHARACTER_COL_LENGTH ); + $$ = (Int64)CmpCommon::getDefaultNumeric(TRAF_MAX_CHARACTER_COL_LENGTH ); } else { diff --git a/core/sql/runtimestats/SqlStats.cpp b/core/sql/runtimestats/SqlStats.cpp index 0b64e02f9d..aec2999cc4 100644 --- a/core/sql/runtimestats/SqlStats.cpp +++ b/core/sql/runtimestats/SqlStats.cpp @@ -73,6 +73,7 @@ StatsGlobals::StatsGlobals(void *baseAddr, short envType, Lng32 maxSegSize) , maxPid_(0) , pidToCheck_(0) , ssmpDumpedTimestamp_(0) + , lobLocks_(NULL) { statsHeap_.setSharedMemory(); //Phandle wrapper in porting layer @@ -112,6 +113,7 @@ void StatsGlobals::init() stmtStatsList_ = new (&statsHeap_) SyncHashQueue(&statsHeap_, 512); rmsStats_ = new (&statsHeap_) ExRMSStats(&statsHeap_); recentSikeys_ = new (&statsHeap_) SyncHashQueue(&statsHeap_, 512); + lobLocks_ = new (&statsHeap_) SyncHashQueue(&statsHeap_, 512); rmsStats_->setCpu(cpu_); rmsStats_->setRmsVersion(version_); rmsStats_->setRmsEnvType(rtsEnvType_); @@ -1082,7 +1084,25 @@ Lng32 StatsGlobals::updateStats(ComDiagsArea &diags, SQLQUERY_ID *query_id, void diags << DgSqlCode(-CLI_INTERNAL_ERROR); return retcode; } - +Int32 StatsGlobals::checkLobLock(CliGlobals *cliGlobals, char *&lobLockId) +{ + int error = getStatsSemaphore(cliGlobals->getSemId(), cliGlobals->myPin()); + if ((lobLocks_ ==NULL) || lobLocks_->isEmpty()) + { + lobLockId = NULL; + releaseStatsSemaphore(cliGlobals->getSemId(), cliGlobals->myPin()); + return 0; + } + lobLocks_->position(lobLockId,LOB_LOCK_ID_SIZE); + //Look in the current chain for a match + while (lobLocks_->getCurr() != NULL && memcmp(lobLockId, (char *)(lobLocks_->getCurr()),LOB_LOCK_ID_SIZE) !=0 ) + lobLocks_->getNext(); + if (lobLocks_->getCurr() == NULL) + lobLockId = NULL; + + releaseStatsSemaphore(cliGlobals->getSemId(), cliGlobals->myPin()); + return 0; +} Lng32 StatsGlobals::getSecInvalidKeys( CliGlobals * cliGlobals, Int64 lastCallTimestamp, diff --git a/core/sql/runtimestats/SqlStats.h b/core/sql/runtimestats/SqlStats.h index 0377c3d034..5e23b0cfaa 100644 --- a/core/sql/runtimestats/SqlStats.h +++ b/core/sql/runtimestats/SqlStats.h @@ -473,6 +473,7 @@ class StatsGlobals SQL_QIKEY [], Int32 maxNumSiKeys, Int32 *returnedNumSiKeys); + Int32 checkLobLock(CliGlobals *cliGlobals,char *&lobLockId); void mergeNewSikeys(Int32 numSikeys, SQL_QIKEY sikeys[]); @@ -489,6 +490,7 @@ class StatsGlobals SB_Phandle_Type *getSsmpProcHandle() { return &ssmpProcHandle_; } SB_Phandle_Type *getSscpProcHandle() { return &sscpProcHandle_; } SyncHashQueue *getRecentSikeys() { return recentSikeys_; } + SyncHashQueue *getLobLocks() { return lobLocks_;} void setSsmpProcSemId(Long semId) { ssmpProcSemId_ = semId; } Long &getSsmpProcSemId() { return ssmpProcSemId_; } void setSscpProcSemId(Long semId) { sscpProcSemId_ = semId; } @@ -531,6 +533,7 @@ class StatsGlobals pid_t maxPid_; Int64 ssmpDumpedTimestamp_; MemoryMonitor *memMonitor_; + SyncHashQueue *lobLocks_; }; StatsGlobals * shareStatsSegment(Int32 &shmid, NABoolean checkForSSMP = TRUE); short getMasterCpu(char *uniqueStmtId, Lng32 uniqueStmtIdLen, char *nodeName, short maxLen, short &cpu); diff --git a/core/sql/runtimestats/rts_msg.cpp b/core/sql/runtimestats/rts_msg.cpp index 69146cf512..65b8497791 100644 --- a/core/sql/runtimestats/rts_msg.cpp +++ b/core/sql/runtimestats/rts_msg.cpp @@ -883,3 +883,42 @@ void SecInvalidKeyRequest::unpackObj(IpcMessageObjType objType, sikPtr_ = NULL; } +LobLockRequest::LobLockRequest(NAMemory *heap, + char *lobLockId + ) : + RtsMessageObj(LOB_LOCK_REQ, + CurrLobLockVersionNumber, heap) +{ + memcpy(lobLockId_,lobLockId,LOB_LOCK_ID_SIZE+1); +} + +LobLockRequest::~LobLockRequest() +{ + memset(lobLockId_,0,LOB_LOCK_ID_SIZE+1); +} + +IpcMessageObjSize LobLockRequest::packedLength() +{ + IpcMessageObjSize result = baseClassPackedLength(); + result += sizeof(lobLockId_); + return result; +} + +IpcMessageObjSize LobLockRequest::packObjIntoMessage( + IpcMessageBufferPtr buffer) +{ + IpcMessageObjSize result = packBaseClassIntoMessage(buffer); + result += packStrIntoBuffer(buffer, lobLockId_,LOB_LOCK_ID_SIZE+1); + return result; +} + +void LobLockRequest::unpackObj(IpcMessageObjType objType, + IpcMessageObjVersion objVersion, + NABoolean sameEndianness, + IpcMessageObjSize objSize, + IpcConstMessageBufferPtr buffer) +{ + unpackBaseClass(buffer); + unpackStrFromBuffer(buffer,lobLockId_,LOB_LOCK_ID_SIZE+1); +} + diff --git a/core/sql/runtimestats/rts_msg.h b/core/sql/runtimestats/rts_msg.h index 7f833c0b31..af110749cc 100644 --- a/core/sql/runtimestats/rts_msg.h +++ b/core/sql/runtimestats/rts_msg.h @@ -32,7 +32,7 @@ #include "ComSmallDefs.h" #include "ComCextdecs.h" #include "Int64.h" - +#include "ExpLOBenums.h" #include #include "sqlcli.h" @@ -73,7 +73,7 @@ const Int32 CurrSuspendQueryReplyVersionNumber = 100; const Int32 CurrActivateQueryReqVersionNumber = 100; const Int32 CurrActivateQueryReplyVersionNumber = 100; const Int32 CurrSecurityInvalidKeyVersionNumber = 100; - +const Int32 CurrLobLockVersionNumber=100; // // An enumeration of all IPC objects for RTS Servers. // Includes both message objects and stream objects. @@ -107,12 +107,13 @@ enum RtsMessageObjType CANCEL_QUERY_KILL_SERVERS_REQ, // 9019 CANCEL_QUERY_KILL_SERVERS_REPLY, // 9020 SECURITY_INVALID_KEY_REQ, // 9021 - + LOB_LOCK_REQ, // 9022 // Object Types RTS_QUERY_ID = IPC_MSG_RTS_FIRST + 500, // 9500 RTS_EXPLAIN_FRAG, // 9501 - RTS_DIAGNOSTICS_AREA = IPC_SQL_DIAG_AREA + RTS_DIAGNOSTICS_AREA = IPC_SQL_DIAG_AREA, + }; typedef Int64 RtsHandle; @@ -1194,5 +1195,36 @@ class SecInvalidKeyRequest: public RtsMessageObj }; + +// This message is sent from the CLI's ContextCli::setLobLock +// to MXSSMP. It is also sent from MXSSMP to MXSSCP. +class LobLockRequest: public RtsMessageObj +{ +public: + LobLockRequest(NAMemory *heap) + : RtsMessageObj(LOB_LOCK_REQ, + CurrLobLockVersionNumber, heap) + { + memset(lobLockId_,0, sizeof(lobLockId_)); + } + + LobLockRequest(NAMemory *heap, + char *lobId ); + + virtual ~LobLockRequest(); + + IpcMessageObjSize packedLength(); + IpcMessageObjSize packObjIntoMessage(IpcMessageBufferPtr buffer); + void unpackObj(IpcMessageObjType objType, + IpcMessageObjVersion objVersion, + NABoolean sameEndianness, + IpcMessageObjSize objSize, + IpcConstMessageBufferPtr buffer); + + char *getLobLockId() {return lobLockId_; } + +private: + char lobLockId_[LOB_LOCK_ID_SIZE+1];//allow for the lock as well as a '+' or '-' +}; #endif // _RTS_EXE_IPC_H_ diff --git a/core/sql/runtimestats/sscpipc.cpp b/core/sql/runtimestats/sscpipc.cpp index b3823840aa..ccb8dbb37b 100755 --- a/core/sql/runtimestats/sscpipc.cpp +++ b/core/sql/runtimestats/sscpipc.cpp @@ -215,6 +215,9 @@ void SscpNewIncomingConnectionStream::actOnReceive(IpcConnection *connection) case SECURITY_INVALID_KEY_REQ: processSecInvReq(); break; + case LOB_LOCK_REQ: + processLobLockReq(); + break; default: ex_assert(FALSE,"Invalid request for first client message"); } @@ -907,3 +910,53 @@ void SscpNewIncomingConnectionStream::processSecInvReq() request->decrRefCount(); } +void SscpNewIncomingConnectionStream::processLobLockReq() +{ + IpcMessageObjVersion msgVer = getNextObjVersion(); + + ex_assert(msgVer <= currRtsStatsReqVersionNumber, "Up-rev message received."); + NAHeap *statsHeap = NULL; + LobLockRequest *request = new(getHeap()) + LobLockRequest(getHeap()); + + *this >> *request; + ex_assert( !moreObjects(), "unknown object follows LobLockRequest."); + SscpGlobals *sscpGlobals = getSscpGlobals(); + StatsGlobals *statsGlobals = sscpGlobals->getStatsGlobals(); + int error = statsGlobals->getStatsSemaphore(sscpGlobals->getSemId(), + sscpGlobals->myPin()); + statsHeap = statsGlobals->getStatsHeap(); + char *ll = new (statsHeap) char [LOB_LOCK_ID_SIZE]; + memcpy(ll,request->getLobLockId(),LOB_LOCK_ID_SIZE+1); + SyncHashQueue *lobLockList = statsGlobals->getLobLocks(); + if (ll[0] == '+') // If it's a positive value, we are supposed to insert it. + lobLockList->insert(&ll[1],LOB_LOCK_ID_SIZE,&ll[1]); + else if (ll[0] =='-') + { + //negative value means we need to remove/release it from the list + lobLockList->position((char *)&ll[1], LOB_LOCK_ID_SIZE); + while (lobLockList->getCurr() && + memcmp(lobLockList->getCurr(), &ll[1],LOB_LOCK_ID_SIZE)!= 0) + lobLockList->getNext(); + + lobLockList->remove((char *)&ll[1], LOB_LOCK_ID_SIZE,lobLockList->getCurr()); + } + else + ex_assert(FALSE,"invalid lob lock id in LobLockRequest"); + + + statsGlobals->releaseStatsSemaphore(sscpGlobals->getSemId(), + sscpGlobals->myPin()); + clearAllObjects(); + setType(IPC_MSG_SSCP_REPLY); + setVersion(CurrSscpReplyMessageVersion); + + RmsGenericReply *reply = new(getHeap()) + RmsGenericReply(getHeap()); + + *this << *reply; + + send(FALSE); + reply->decrRefCount(); + request->decrRefCount(); +} diff --git a/core/sql/runtimestats/sscpipc.h b/core/sql/runtimestats/sscpipc.h index 3fcc04e3d5..dea7623055 100644 --- a/core/sql/runtimestats/sscpipc.h +++ b/core/sql/runtimestats/sscpipc.h @@ -110,6 +110,7 @@ class SscpNewIncomingConnectionStream : public IpcMessageStream void processKillServersReq(); void suspendActivateSchedulers(); void processSecInvReq(); + void processLobLockReq(); private: SscpGlobals *sscpGlobals_; diff --git a/core/sql/runtimestats/ssmpipc.cpp b/core/sql/runtimestats/ssmpipc.cpp index 9831d6f3e7..504ef7645a 100755 --- a/core/sql/runtimestats/ssmpipc.cpp +++ b/core/sql/runtimestats/ssmpipc.cpp @@ -1187,6 +1187,9 @@ void SsmpNewIncomingConnectionStream::actOnReceive(IpcConnection *connection) case SECURITY_INVALID_KEY_REQ: actOnSecInvalidKeyReq(connection); break; + case LOB_LOCK_REQ: + actOnLobLockReq(connection); + break; default: ex_assert(FALSE,"Invalid request from client"); } @@ -1618,7 +1621,32 @@ void SsmpNewIncomingConnectionStream::actOnActivateQueryReq( "expected an RTS_QUERY_ID following a SuspendQueryRequest"); } +void SsmpNewIncomingConnectionStream::actOnLobLockReq( + IpcConnection *connection) +{ + IpcMessageObjVersion msgVer = getNextObjVersion(); + ex_assert(msgVer <= CurrLobLockVersionNumber, + "Up-rev message received."); + LobLockRequest *llReq= new (getHeap()) LobLockRequest(getHeap()); + *this >> *llReq; + setHandle(llReq->getHandle()); + ex_assert(!moreObjects(),"Unexpected objects following LobLockRequest"); + clearAllObjects(); +// Forward request to all mxsscps. + ssmpGlobals_->allocateServers(); + SscpClientMsgStream *sscpMsgStream = new (heap_) + SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this); + sscpMsgStream->setUsedToSendLLMsgs(); + ssmpGlobals_->addRecipients(sscpMsgStream); + sscpMsgStream->clearAllObjects(); + *sscpMsgStream << *llReq; + llReq->decrRefCount(); + sscpMsgStream->send(FALSE); + + // Reply to client when the msgs to mxsscp have all completed. The reply + // is made from the sscpMsgStream's callback. +} void SsmpNewIncomingConnectionStream::actOnSecInvalidKeyReq( IpcConnection *connection) { @@ -2322,6 +2350,11 @@ void SscpClientMsgStream::actOnReceiveAllComplete() replySik(); break; } + case LL: + { + replyLL(); + break; + } default: { ex_assert(FALSE, "Unknown completionProcessing_ flag."); @@ -2353,6 +2386,25 @@ void SscpClientMsgStream::replySik() reply->decrRefCount(); } +void SscpClientMsgStream::replyLL() +{ + RmsGenericReply *reply = new(getHeap()) + RmsGenericReply(getHeap()); + + *ssmpStream_ << *reply; + + if (ssmpStream_->getSscpDiagsArea()) + { + // Pass errors from communication w/ SSCPs back to the + // client. + *ssmpStream_ << *(ssmpStream_->getSscpDiagsArea()); + ssmpStream_->clearSscpDiagsArea(); + } + + ssmpStream_->send(FALSE); + reply->decrRefCount(); +} + void SscpClientMsgStream::sendMergedStats() { StmtStats *stmtStats; diff --git a/core/sql/runtimestats/ssmpipc.h b/core/sql/runtimestats/ssmpipc.h index d3e5a9b481..7171e9bc1e 100644 --- a/core/sql/runtimestats/ssmpipc.h +++ b/core/sql/runtimestats/ssmpipc.h @@ -238,6 +238,7 @@ class SsmpNewIncomingConnectionStream : public IpcMessageStream void actOnSuspendQueryReq(IpcConnection *connection); void actOnActivateQueryReq(IpcConnection *connection); void actOnSecInvalidKeyReq(IpcConnection *connection); + void actOnLobLockReq(IpcConnection *connection); void getProcessStats(short reqType, short subReqType, pid_t pid); @@ -345,7 +346,9 @@ class SscpClientMsgStream : public IpcMessageStream inline short getDetailLevel() { return detailLevel_; } inline void setUsedToSendCbMsgs() { completionProcessing_ = CB; } inline void setUsedToSendSikMsgs() { completionProcessing_ = SIK; } + inline void setUsedToSendLLMsgs() { completionProcessing_ = LL; } void replySik(); + void replyLL(); inline short getSubReqType() { return subReqType_; } inline void setSubReqType(short subReqType) { subReqType_ = subReqType; } private: @@ -361,7 +364,7 @@ class SscpClientMsgStream : public IpcMessageStream short numSqlProcs_; short numCpus_; StmtStats *stmtStats_; - enum { STATS, CB, SIK } completionProcessing_; + enum { STATS, CB, SIK,LL} completionProcessing_; short detailLevel_; short subReqType_; }; diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index 0096424db5..1a9304ead4 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -2670,6 +2670,7 @@ enum DefaultConstants LOB_GC_LIMIT_SIZE, LOB_INPUT_LIMIT_FOR_BATCH, + LOB_LOCKING, // Should the DISK POOL be turned on when replicating the DDL using COPY DDL REPLICATE_DISK_POOL, diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 1c47c54be9..3622db10fe 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -1753,6 +1753,11 @@ SDDkwd__(ISO_MAPPING, (char *)SQLCHARSETSTRING_ISO88591), DD_____(LOB_HDFS_SERVER, "default"), // For JDBC/ODBC batch operations, LOB size limited to 4K bytes DDint__(LOB_INPUT_LIMIT_FOR_BATCH, "16384"), + // Control the locking via RMS shared lock. This ensures the CLI and HDFS + // operations for any LOB UID are done under a lock so concurrent operations + // wont conflict and cause incosistent data. For non concurrent applications, + // we can turn this off as a performance enhancement. + DDkwd__(LOB_LOCKING, "ON"), // Size of memoryin Megabytes used to perform I/O to lob data file // default size is 128MB . Change to adjust memory usage. DDint__(LOB_MAX_CHUNK_MEM_SIZE, "128"), From bf8d8d6ca1442fced0e99e442c64416dd6bd6e0e Mon Sep 17 00:00:00 2001 From: Sandhya Sundaresan Date: Wed, 6 Jun 2018 21:40:32 +0000 Subject: [PATCH 2/2] Address review comments --- core/sql/cli/Context.cpp | 41 ++++++++++++++++++--- core/sql/cli/SessionDefaults.cpp | 2 +- core/sql/executor/ExExeUtil.h | 2 +- core/sql/executor/ExExeUtilLoad.cpp | 2 +- core/sql/exp/ExpLOB.cpp | 20 ++++++++++- core/sql/runtimestats/ssmpipc.cpp | 55 +++++++++++++++++++++++------ 6 files changed, 103 insertions(+), 19 deletions(-) diff --git a/core/sql/cli/Context.cpp b/core/sql/cli/Context.cpp index ac09a9cf06..53d277ea59 100644 --- a/core/sql/cli/Context.cpp +++ b/core/sql/cli/Context.cpp @@ -3184,17 +3184,50 @@ Lng32 ContextCli::setLobLock( ) { CliGlobals *cliGlobals = getCliGlobals(); + NABoolean releasingLock = FALSE; if (cliGlobals->getStatsGlobals() == NULL) { (diagsArea_) << DgSqlCode(-EXE_RTS_NOT_STARTED); return diagsArea_.mainSQLCODE(); } ComDiagsArea *tempDiagsArea = &diagsArea_; + IpcServer *ssmpServer = NULL; tempDiagsArea->clear(); - - IpcServer *ssmpServer = ssmpManager_->getSsmpServer(exHeap(), - cliGlobals->myNodeName(), - cliGlobals->myCpu(), tempDiagsArea); + if (lobLockId[0] == '-') + releasingLock = TRUE; + // Get an ssmp node to talk to. Picking one based off of lobLockId should + // make it unique and avoid clash with another node that may be attempting + // to lock the same lob + if (!releasingLock) + { + Int32 nodeCount = 0; + Int32 rc = msg_mon_get_node_info(&nodeCount, 0, NULL); + Int32 targetNodeId = 0; + Int32 lockHash = 0; + char myNodeName[MAX_SEGMENT_NAME_LEN+1]; + MS_Mon_Node_Info_Type nodeInfo; + for (int i = 0; i < LOB_LOCK_ID_SIZE; i++) + lockHash +=(unsigned char)lobLockId[i]; + if (nodeCount) + targetNodeId = lockHash%nodeCount; + rc = msg_mon_get_node_info_detail(targetNodeId, &nodeInfo); + if (rc == 0) + strcpy(myNodeName, nodeInfo.node[0].node_name); + else + myNodeName[0] = '\0'; + + ssmpServer = ssmpManager_->getSsmpServer(exHeap(), + //cliGlobals->myNodeName(), + //cliGlobals->myCpu(), + myNodeName, + targetNodeId, + tempDiagsArea); + } + else + ssmpServer = ssmpManager_->getSsmpServer(exHeap(), + cliGlobals->myNodeName(), + cliGlobals->myCpu(), + tempDiagsArea); if (ssmpServer == NULL) return diagsArea_.mainSQLCODE(); diff --git a/core/sql/cli/SessionDefaults.cpp b/core/sql/cli/SessionDefaults.cpp index 1e656bdc20..388e02f7f9 100644 --- a/core/sql/cli/SessionDefaults.cpp +++ b/core/sql/cli/SessionDefaults.cpp @@ -792,7 +792,7 @@ static const AQRInfo::AQRErrorMap aqrErrorMap[] = // locked row timeout AQREntry( 8550, 73, 2, 0, 0, 0, "", 0, 0), AQREntry( 8550, 78, 1, 60, 0, 0, "", 0, 0), - AQREntry( 8558, 0 , 2, 30, 0, 0, "", 0, 0), + AQREntry( 8558, 0 , 2, 10, 0, 0, "", 0, 0), AQREntry( 8551, 12, 1, 60, 0, 0, "", 0, 0), diff --git a/core/sql/executor/ExExeUtil.h b/core/sql/executor/ExExeUtil.h index aec3ccba20..ea69e5e67f 100644 --- a/core/sql/executor/ExExeUtil.h +++ b/core/sql/executor/ExExeUtil.h @@ -3083,7 +3083,7 @@ class ExExeUtilLobUpdateTcb : public ExExeUtilTcb ExLobStats lobStats_; char statusString_[200]; fstream indata_; - char lobLockId_[12]; + char lobLockId_[LOB_LOCK_ID_SIZE]; ExLobGlobals *exLobGlobals_; }; // ----------------------------------------------------------------------- diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp index e5624300aa..160a6bb254 100644 --- a/core/sql/executor/ExExeUtilLoad.cpp +++ b/core/sql/executor/ExExeUtilLoad.cpp @@ -3632,7 +3632,7 @@ short ExExeUtilLobUpdateTcb::work() { retcode = SQL_EXEC_SetLobLock(lobLockId_); } - else if (found) + else if (found || retcode ) { memset(lobLockId_,'\0',LOB_LOCK_ID_SIZE); ExRaiseSqlError(getHeap(), &diagsArea_, diff --git a/core/sql/exp/ExpLOB.cpp b/core/sql/exp/ExpLOB.cpp index 2ef8ae4f9e..10b2bc1a1e 100644 --- a/core/sql/exp/ExpLOB.cpp +++ b/core/sql/exp/ExpLOB.cpp @@ -1029,10 +1029,27 @@ ex_expr::exp_return_type ExpLOBinsert::eval(char *op_data[], { ExpLOBoper::genLobLockId(objectUID_,lobNum(),llid); NABoolean found = FALSE; - retcode = SQL_EXEC_CheckLobLock(llid, &found); + int trycount = 0; + while (trycount < 3) + { + retcode = SQL_EXEC_CheckLobLock(llid, &found); + if (found || retcode ) + { + sleep(5); + trycount++; + } + else + trycount =3; + } if (! retcode && !found) { retcode = SQL_EXEC_SetLobLock(llid); + if (retcode) + { + ExRaiseSqlError(h, diagsArea, + retcode); + return ex_expr::EXPR_ERROR; + } } else { @@ -1041,6 +1058,7 @@ ex_expr::exp_return_type ExpLOBinsert::eval(char *op_data[], return ex_expr::EXPR_ERROR; } + } err = insertDesc(op_data, h, diagsArea); if (err == ex_expr::EXPR_ERROR) diff --git a/core/sql/runtimestats/ssmpipc.cpp b/core/sql/runtimestats/ssmpipc.cpp index 504ef7645a..e56c761421 100755 --- a/core/sql/runtimestats/ssmpipc.cpp +++ b/core/sql/runtimestats/ssmpipc.cpp @@ -1625,6 +1625,9 @@ void SsmpNewIncomingConnectionStream::actOnLobLockReq( IpcConnection *connection) { IpcMessageObjVersion msgVer = getNextObjVersion(); + StatsGlobals *statsGlobals; + NABoolean releasingLock = FALSE; + CliGlobals *cliGlobals = GetCliGlobals(); ex_assert(msgVer <= CurrLobLockVersionNumber, "Up-rev message received."); LobLockRequest *llReq= new (getHeap()) LobLockRequest(getHeap()); @@ -1632,17 +1635,47 @@ void SsmpNewIncomingConnectionStream::actOnLobLockReq( setHandle(llReq->getHandle()); ex_assert(!moreObjects(),"Unexpected objects following LobLockRequest"); clearAllObjects(); -// Forward request to all mxsscps. - ssmpGlobals_->allocateServers(); - SscpClientMsgStream *sscpMsgStream = new (heap_) - SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this); - sscpMsgStream->setUsedToSendLLMsgs(); - ssmpGlobals_->addRecipients(sscpMsgStream); - sscpMsgStream->clearAllObjects(); - *sscpMsgStream << *llReq; - llReq->decrRefCount(); - sscpMsgStream->send(FALSE); - + //check and set the lock in the local shared segment + statsGlobals = ssmpGlobals_->getStatsGlobals(); + char *inLobLockId = NULL; + inLobLockId = llReq->getLobLockId(); + if (inLobLockId[0] == '-') //we are releasing this lock. No need to check. + inLobLockId = NULL; + else + { + inLobLockId = &inLobLockId[1]; + statsGlobals->checkLobLock(cliGlobals, inLobLockId); + } + + if (inLobLockId) + { + //It's already set, don't propagate + if (sscpDiagsArea_== NULL) + sscpDiagsArea_ = ComDiagsArea::allocate(ssmpGlobals_->getHeap()); + *sscpDiagsArea_<< DgSqlCode(-EXE_LOB_CONCURRENT_ACCESS_ERROR); + RmsGenericReply *reply = new(getHeap()) + RmsGenericReply(getHeap()); + + *this << *reply; + *this << *sscpDiagsArea_; + this->clearSscpDiagsArea(); + send(FALSE); + reply->decrRefCount(); + } + else + { + + // Forward request to all mxsscps. + ssmpGlobals_->allocateServers(); + SscpClientMsgStream *sscpMsgStream = new (heap_) + SscpClientMsgStream(heap_, getIpcEnv(), ssmpGlobals_, this); + sscpMsgStream->setUsedToSendLLMsgs(); + ssmpGlobals_->addRecipients(sscpMsgStream); + sscpMsgStream->clearAllObjects(); + *sscpMsgStream << *llReq; + llReq->decrRefCount(); + sscpMsgStream->send(FALSE); + } // Reply to client when the msgs to mxsscp have all completed. The reply // is made from the sscpMsgStream's callback.