From 887051c8a1ba57b0ad47278dce2fc9361a2afb9c Mon Sep 17 00:00:00 2001 From: Zalo Correa Date: Tue, 20 Feb 2018 16:57:40 -0800 Subject: [PATCH 1/5] [TRAFODION-2883] Preliminary Scale Enhacements - Increased cluster node limit to 1024 - Added timestamps to node down system message - Added timestamps and values to registry change notifications - Fixed monitor trace causing memory overwrites --- core/sqf/monitor/linux/cluster.cxx | 3 +- core/sqf/monitor/linux/monitor.cxx | 170 +++++++++++---- core/sqf/monitor/linux/msgdef.h | 48 +---- core/sqf/monitor/linux/pnode.cxx | 13 ++ core/sqf/monitor/linux/reqprocinfo.cxx | 83 +++++--- core/sqf/monitor/linux/reqqueue.cxx | 9 +- core/sqf/monitor/linux/shell.cxx | 273 +++++++++++++++---------- core/sqf/monitor/test/runtest | 3 +- core/sqf/sqenvcom.sh | 7 + core/sqf/sql/scripts/sqnodes.pm | 4 +- core/sqf/src/tm/tm_internal.h | 1 - core/sqf/src/tm/tmlibmsg.h | 4 +- core/sqf/src/tm/tools/dtmci.cpp | 2 - core/sqf/src/tm/tools/pwd.cpp | 2 - core/sqf/src/tm/tools/tmshutdown.cpp | 1 - 15 files changed, 379 insertions(+), 244 deletions(-) diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx index c22e8ead6a..49697ddaea 100644 --- a/core/sqf/monitor/linux/cluster.cxx +++ b/core/sqf/monitor/linux/cluster.cxx @@ -4166,8 +4166,8 @@ void CCluster::ReIntegrateSock( int initProblem ) { for (int i=0; inid = process->GetNid(); @@ -555,36 +556,94 @@ char * CMonitor::ProcCopy(char *bufPtr, CProcess *process) , process->GetPid() , process->GetVerifier() ); - char * stringData = &procObj->stringData; + char *stringData = &procObj->stringData; - // Copy the program name - procObj->nameLen = strlen(process->GetName()) + 1; - memcpy(stringData, process->GetName(), procObj->nameLen ); - stringData += procObj->nameLen; + if (strlen(process->GetName())) + { + // Copy the program name + procObj->nameLen = strlen(process->GetName()) + 1; + memcpy(stringData, process->GetName(), procObj->nameLen ); + stringData += procObj->nameLen; + stringDataLen = procObj->nameLen; + } + else + { + procObj->nameLen = 0; + } - // Copy the port - procObj->portLen = strlen(process->GetPort()) + 1; - memcpy(stringData, process->GetPort(), procObj->portLen ); - stringData += procObj->portLen; + if (strlen(process->GetPort())) + { + // Copy the port + procObj->portLen = strlen(process->GetPort()) + 1; + memcpy(stringData, process->GetPort(), procObj->portLen ); + stringData += procObj->portLen; + stringDataLen += procObj->portLen; + } + else + { + procObj->portLen = 0; + } if (process->IsPersistent()) { - // Copy the standard in file name - procObj->infileLen = strlen(process->infile()) + 1; - memcpy(stringData, process->infile(), procObj->infileLen); - stringData += procObj->infileLen; + if (strlen(process->infile())) + { + // Copy the standard in file name + procObj->infileLen = strlen(process->infile()) + 1; + memcpy(stringData, process->infile(), procObj->infileLen); + stringData += procObj->infileLen; + stringDataLen += procObj->infileLen; + } + else + { + procObj->infileLen = 0; + } - // Copy the standard out file name - procObj->outfileLen = strlen(process->outfile()) + 1; - memcpy(stringData, process->outfile(), procObj->outfileLen ); - stringData += procObj->outfileLen; + if (strlen(process->outfile())) + { + // Copy the standard out file name + procObj->outfileLen = strlen(process->outfile()) + 1; + memcpy(stringData, process->outfile(), procObj->outfileLen ); + stringData += procObj->outfileLen; + stringDataLen += procObj->outfileLen; + } + else + { + procObj->outfileLen = 0; + } - // Copy the program argument strings procObj->argvLen = process->userArgvLen(); - memcpy(stringData, process->userArgv(), procObj->argvLen); - stringData += procObj->argvLen; + if (procObj->argvLen) + { + // Copy the program argument strings + memcpy(stringData, process->userArgv(), procObj->argvLen); + stringData += procObj->argvLen; + stringDataLen += procObj->argvLen; + } + + procObj->persistent = true; - procObj->persistent = true; + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) + trace_printf( "%s@%d - Packing process string data:\n" + " name(%d) =%s\n" + " port(%d) =%s\n" + " infile(%d) =%s\n" + " outfile(%d) =%s\n" + " userArgv(%d) =%s\n" + " stringData(%d) =%s\n" + , method_name, __LINE__ + , procObj->nameLen + , process->GetName() + , procObj->portLen + , process->GetPort() + , procObj->infileLen + , process->infile() + , procObj->outfileLen + , process->outfile() + , procObj->argvLen + , procObj->argvLen?process->userArgv():"" + , stringDataLen + , stringDataLen?&procObj->stringData:"" ); } else { @@ -658,6 +717,9 @@ void CMonitor::UnpackProcObjs( char *&buffer, int procCount ) CNode * node = NULL; CProcess * process = NULL; + int stringDataLen; + char *name = NULL; + char *port = NULL; char *infile = NULL; char *outfile = NULL; char *userargv = NULL; @@ -671,45 +733,72 @@ void CMonitor::UnpackProcObjs( char *&buffer, int procCount ) { procObj = (struct clone_def *)buffer; + stringDataLen = 0; stringData = &procObj->stringData; node = Nodes->GetLNode (procObj->nid)->GetNode(); - if (procObj->infileLen) + if (procObj->nameLen) { - infile = &stringData[procObj->nameLen + procObj->portLen]; + name = &procObj->stringData; + stringDataLen += procObj->nameLen; } - else + + if (procObj->portLen) { - infile = NULL; + port = &stringData[stringDataLen]; + stringDataLen += procObj->portLen; } - if (procObj->outfileLen) + if (procObj->infileLen) { - outfile = &stringData[procObj->nameLen + procObj->portLen + procObj->infileLen]; + infile = &stringData[stringDataLen]; + stringDataLen += procObj->infileLen; } - else + + if (procObj->outfileLen) { - outfile = NULL; + outfile = &stringData[stringDataLen]; + stringDataLen += procObj->outfileLen; } if (procObj->argvLen) { - userargv = &stringData[procObj->nameLen + procObj->portLen - + procObj->infileLen + procObj->outfileLen]; - } - else - { - userargv = NULL; + userargv = &stringData[stringDataLen]; + stringDataLen += procObj->argvLen; } + if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) + trace_printf( "%s@%d - Unpacking process string data:\n" + " stringData(%d) =%s\n" + " name(%d) =%s\n" + " port(%d) =%s\n" + " infile(%d) =%s\n" + " outfile(%d) =%s\n" + " userArgc =%d\n" + " userArgv(%d) =%s\n" + , method_name, __LINE__ + , stringDataLen + , stringDataLen?&procObj->stringData:"" + , procObj->nameLen + , procObj->nameLen?name:"" + , procObj->portLen + , procObj->portLen?port:"" + , procObj->infileLen + , procObj->infileLen?infile:"" + , procObj->outfileLen + , procObj->outfileLen?outfile:"" + , procObj->argc + , procObj->argvLen + , procObj->argvLen?userargv:"" ); + process = node->CloneProcess (procObj->nid, procObj->type, procObj->priority, procObj->backup, procObj->unhooked, - &stringData[0], // process name - &stringData[procObj->nameLen], // port + procObj->nameLen?name:(char *)"", + procObj->portLen?port:(char *)"", procObj->os_pid, procObj->verifier, procObj->parent_nid, @@ -720,8 +809,8 @@ void CMonitor::UnpackProcObjs( char *&buffer, int procCount ) procObj->pathStrId, procObj->ldpathStrId, procObj->programStrId, - infile, - outfile, + procObj->infileLen?infile:(char *)"", + procObj->outfileLen?outfile:(char *)"", &procObj->creation_time); if ( process && procObj->argvLen ) @@ -734,8 +823,7 @@ void CMonitor::UnpackProcObjs( char *&buffer, int procCount ) process->SetPersistent(true); } - buffer = &stringData[procObj->nameLen + procObj->portLen + procObj->infileLen - + procObj->outfileLen + procObj->argvLen]; + buffer = &stringData[stringDataLen]; } TRACE_EXIT; diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h index 3f5c8c8cba..3532ac8db6 100644 --- a/core/sqf/monitor/linux/msgdef.h +++ b/core/sqf/monitor/linux/msgdef.h @@ -61,9 +61,11 @@ // NOTE: Increase with caution as this number // is also used to gather local CPU statistics // and a large number may degrade performance -#define MAX_NODES 256 // This can be higher when needed and will +#define MAX_NODES TC_NODES_MAX // This can be higher when needed and will // have performance implications - // Increment by 64 to match node state bitmask + // NOTE: Must increment by 64 to match node state + // bitmask. See trafconfig.h TC_NODES_MAX in + // Trafodion Configuration API #define MAX_LNODES_PER_NODE 1 // The 1 is a per physical node limit // (it can be more, but it is not currently used) #define MAX_LNODES (MAX_NODES*MAX_LNODES_PER_NODE) @@ -211,20 +213,9 @@ typedef enum { RoleType_Aggregation = 0x0002, // Maps to ZoneType_Aggregation, Backend or Any RoleType_Storage = 0x0004 // Maps to ZoneType_Storage, Backend or Any } RoleType; -#if 0 -typedef enum { - ZoneType_Undefined = 0x0000, // No zone type defined - ZoneType_Edge = 0x0001, // Zone of service only nodes - ZoneType_Aggregation = 0x0002, // Zone of compute only nodes - ZoneType_Storage = 0x0004, // Zone of storage only nodes - ZoneType_Excluded = 0x0010, // Excluded cores - ZoneType_Any = ( ZoneType_Edge | ZoneType_Aggregation | ZoneType_Storage ), - ZoneType_Frontend = ( ZoneType_Edge | ZoneType_Aggregation ), - ZoneType_Backend = ( ZoneType_Aggregation | ZoneType_Storage ) -} ZoneType; -#else + typedef TcZoneType_t ZoneType; -#endif + // Service Request types // note: other data structures depend on the ordering of the REQTYPE elements. // if the ordering changes corresponding changes must be made to @@ -330,32 +321,9 @@ typedef enum { // types, add any new message types // before this one } MSGTYPE; -#if 0 -typedef enum { - ProcessType_Undefined=0, // No process type as been defined - ProcessType_TSE, // Identifies a Table Storage Engine (DP2) - ProcessType_DTM, // Identifies a Distributed Transaction Monitor process - ProcessType_ASE, // Identifies a Audit Storage Engine (ADP) - ProcessType_Generic, // Identifies a generic process - ProcessType_Watchdog, // Identifies the monitor's watchdog processes - ProcessType_AMP, // Identifies a AMP process - ProcessType_Backout, // Identifies a Backout process - ProcessType_VolumeRecovery, // Identifies a Volume Recovery process - ProcessType_MXOSRVR, // Identifies a MXOSRVR process - ProcessType_SPX, // Identifies a SeaPilot ProXy process - ProcessType_SSMP, // Identifies a SQL Statistics Merge Process (SSMP) - ProcessType_PSD, // Identifies the monitor's process start daemon processes - ProcessType_SMS, // Identifies a SeaMonster Service process - ProcessType_TMID, // Identifies a Transaction Management ID process - ProcessType_PERSIST, // Identifies a generic persistent process - - ProcessType_Invalid // marks the end of the process - // types, add any new process - // types before this one -} PROCESSTYPE; -#else + typedef TcProcessType_t PROCESSTYPE; -#endif + typedef enum { ShutdownLevel_Undefined=-1, ShutdownLevel_Normal=0, // Wait for all transactions and processes to end diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx index 8aa067144f..75c2137d03 100644 --- a/core/sqf/monitor/linux/pnode.cxx +++ b/core/sqf/monitor/linux/pnode.cxx @@ -1977,6 +1977,13 @@ void CNodeContainer::UnpackNodeMappings( intBuffPtr_t &buffer, int nodeMapCount int pnid, pnidConfig; + // lock sync thread since we are making a change the monitor's + // operational view of the cluster + if ( !Emulate_Down ) + { + Monitor->EnterSyncCycle(); + } + for (int count = 0; count < nodeMapCount; count++) { pnidConfig = *buffer++; @@ -1991,6 +1998,12 @@ void CNodeContainer::UnpackNodeMappings( intBuffPtr_t &buffer, int nodeMapCount UpdateCluster(); + // unlock sync thread + if ( !Emulate_Down ) + { + Monitor->ExitSyncCycle(); + } + TRACE_EXIT; return; } diff --git a/core/sqf/monitor/linux/reqprocinfo.cxx b/core/sqf/monitor/linux/reqprocinfo.cxx index f7eeefe8be..4148f360ec 100644 --- a/core/sqf/monitor/linux/reqprocinfo.cxx +++ b/core/sqf/monitor/linux/reqprocinfo.cxx @@ -287,11 +287,14 @@ void CExtProcInfoReq::performRequest() verifier_ = msg_->u.request.u.process_info.verifier; processName_ = msg_->u.request.u.process_info.process_name; + int pnid = -1; int target_nid = -1; int target_pid = -1; string target_process_name; Verifier_t target_verifier = -1; - CProcess *requester = NULL; + CClusterConfig *clusterConfig = NULL; + CLNodeConfig *lnodeConfig = NULL; + CProcess *requester = NULL; target_nid = msg_->u.request.u.process_info.target_nid; target_pid = msg_->u.request.u.process_info.target_pid; @@ -390,42 +393,54 @@ void CExtProcInfoReq::performRequest() , msg_->u.request.u.process_info.type); } - if (target_pid == -1) + clusterConfig = Nodes->GetClusterConfig(); + if (clusterConfig) { - // get info for all processes in node - if (target_nid >= 0 && target_nid < Nodes->GetLNodesConfigMax()) + if (clusterConfig->IsConfigReady()) { - count = ProcessInfo_BuildReply(Nodes->GetNode(target_nid)->GetFirstProcess(), - msg_, - msg_->u.request.u.process_info.type, - false, - msg_->u.request.u.process_info.target_process_pattern); - } - } - else - { - // get info for single process in node - if ((requester->GetType() == ProcessType_TSE || - requester->GetType() == ProcessType_ASE || - requester->GetType() == ProcessType_AMP) && - (requester->GetNid() == target_nid && - requester->GetPid() == target_pid)) - { - ProcessInfo_CopyData(requester, - msg_->u.reply.u.process_info.process[0]); - count = 1; - } - else if (target_nid >= 0 && target_nid < Nodes->GetLNodesConfigMax()) - { // find by nid/pid (check node state, don't check process state, backup is Ok) - CProcess *process = Nodes->GetProcess( target_nid - , target_pid - , target_verifier - , true, false, true ); - if (process) + lnodeConfig = clusterConfig->GetLNodeConfig( target_nid ); + if (lnodeConfig) { - ProcessInfo_CopyData(process, - msg_->u.reply.u.process_info.process[0]); - count = 1; + + if (target_pid == -1) + { + // get info for all processes in node + if (target_nid >= 0 && target_nid < Nodes->GetLNodesConfigMax()) + { + count = ProcessInfo_BuildReply(Nodes->GetNode(target_nid)->GetFirstProcess(), + msg_, + msg_->u.request.u.process_info.type, + false, + msg_->u.request.u.process_info.target_process_pattern); + } + } + else + { + // get info for single process in node + if ((requester->GetType() == ProcessType_TSE || + requester->GetType() == ProcessType_ASE || + requester->GetType() == ProcessType_AMP) && + (requester->GetNid() == target_nid && + requester->GetPid() == target_pid)) + { + ProcessInfo_CopyData(requester, + msg_->u.reply.u.process_info.process[0]); + count = 1; + } + else if (target_nid >= 0 && target_nid < Nodes->GetLNodesConfigMax()) + { // find by nid/pid (check node state, don't check process state, backup is Ok) + CProcess *process = Nodes->GetProcess( target_nid + , target_pid + , target_verifier + , true, false, true ); + if (process) + { + ProcessInfo_CopyData(process, + msg_->u.reply.u.process_info.process[0]); + count = 1; + } + } + } } } } diff --git a/core/sqf/monitor/linux/reqqueue.cxx b/core/sqf/monitor/linux/reqqueue.cxx index becb0cdfd1..e4e8dbd2bb 100644 --- a/core/sqf/monitor/linux/reqqueue.cxx +++ b/core/sqf/monitor/linux/reqqueue.cxx @@ -2355,7 +2355,6 @@ void CIntReviveReq::performRequest() if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) trace_printf("%s@%d - Spare Nodes List unpacked\n", method_name, __LINE__); - //Nodes->UnpackNodeMappings( (intBuffPtr_t&)buffer, header.nodeMapCount_ ); Nodes->UnpackNodeMappings( (intBuffPtr_t&)buffer, header.nodeMapCount_ ); if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) @@ -2469,8 +2468,8 @@ void CIntSnapshotReq::performRequest() } // estimate size of snapshot buffer - // about 100 bytes per process, 2 times total - int procSize = Nodes->ProcessCount() * 2 * 100; + // about 500 bytes per process, 2 times total + int procSize = Nodes->ProcessCount() * 2 * 500; int idsSize = Nodes->GetSNodesCount() * sizeof(int); // spare pnids idsSize += (Nodes->GetPNodesCount() + Nodes->GetLNodesCount()) * sizeof(int); // pnid/nid map idsSize += Nodes->GetLNodesCount() * sizeof(int); // nids @@ -2481,7 +2480,7 @@ void CIntSnapshotReq::performRequest() mem_log_write(MON_REQQUEUE_SNAPSHOT_4, procSize, idsSize); - snapshotBuf = (char *) malloc (procSize + idsSize); + snapshotBuf = (char *) malloc (procSize + idsSize); if (!snapshotBuf) { @@ -2497,6 +2496,7 @@ void CIntSnapshotReq::performRequest() clock_gettime(CLOCK_REALTIME, &startTime); + memset( snapshotBuf, 0, (procSize + idsSize) ); char *buf = snapshotBuf; CCluster::snapShotHeader_t header; @@ -2550,6 +2550,7 @@ void CIntSnapshotReq::performRequest() return; } + memset( compBuf, 0, compSize ); z_result = compress((Bytef *)compBuf, (unsigned long *)&compSize, (Bytef *)snapshotBuf, header.fullSize_); diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx index 5ad73abfeb..7d7db50cf7 100644 --- a/core/sqf/monitor/linux/shell.cxx +++ b/core/sqf/monitor/linux/shell.cxx @@ -1102,15 +1102,16 @@ void recv_notice_msg(struct message_def *recv_msg, int ) switch (recv_msg->type ) { case MsgType_Change: - printf ("[%s] Configuration Change Notice for Group: %s Key: %s\n", - MyName, + printf ("[%s] %s - Configuration Change Notice for Group: %s Key: %s Value: %s\n", + MyName, time_string(), recv_msg->u.request.u.change.group, - recv_msg->u.request.u.change.key); + recv_msg->u.request.u.change.key, + recv_msg->u.request.u.change.value); break; case MsgType_Event: - printf("[%s] Event %d received\n", - MyName, recv_msg->u.request.u.event_notice.event_id); + printf("[%s] %s - Event %d received\n", + MyName, time_string(), recv_msg->u.request.u.event_notice.event_id); break; case MsgType_NodeAdded: @@ -1177,8 +1178,8 @@ void recv_notice_msg(struct message_def *recv_msg, int ) break; case MsgType_NodeDown: - printf ("[%s] Node %d (%s) is DOWN\n", - MyName, recv_msg->u.request.u.down.nid, + printf ("[%s] %s - Node %d (%s) is DOWN\n", + MyName, time_string(), recv_msg->u.request.u.down.nid, recv_msg->u.request.u.down.node_name ); NodeState[recv_msg->u.request.u.down.nid] = false; @@ -1211,15 +1212,15 @@ void recv_notice_msg(struct message_def *recv_msg, int ) case MsgType_NodePrepare: - printf("[%s] Node %s (%d) node-up preparation, takeover=%s\n", - MyName, recv_msg->u.request.u.prepare.node_name, + printf("[%s] %s - Node %s (%d) node-up preparation, takeover=%s\n", + MyName, time_string(), recv_msg->u.request.u.prepare.node_name, recv_msg->u.request.u.prepare.nid, ((recv_msg->u.request.u.prepare.takeover)? "true": "false")); break; case MsgType_NodeQuiesce: - printf ("[%s] Node %d (%s) is QUIESCEd\n", - MyName, msg->u.request.u.quiesce.nid, + printf ("[%s] %s - Node %d (%s) is QUIESCEd\n", + MyName, time_string(), msg->u.request.u.quiesce.nid, msg->u.request.u.quiesce.node_name ); NodeState[msg->u.request.u.quiesce.nid] = false; if ( waitDeathPending ) @@ -1248,15 +1249,15 @@ void recv_notice_msg(struct message_def *recv_msg, int ) case MsgType_ProcessCreated: if ( recv_msg->u.request.u.process_created.return_code == MPI_SUCCESS ) { - printf ("[%s] Process %s successfully created. Nid=%d, Pid=%d\n", - MyName, recv_msg->u.request.u.process_created.process_name, + printf ("[%s] %s - Process %s successfully created. Nid=%d, Pid=%d\n", + MyName, time_string(), recv_msg->u.request.u.process_created.process_name, recv_msg->u.request.u.process_created.nid, recv_msg->u.request.u.process_created.pid); } else { - printf ("[%s] Process %s NOT created. Nid=%d, Pid=%d\n", - MyName, recv_msg->u.request.u.process_created.process_name, + printf ("[%s] %s - Process %s NOT created. Nid=%d, Pid=%d\n", + MyName, time_string(), recv_msg->u.request.u.process_created.process_name, recv_msg->u.request.u.process_created.nid, recv_msg->u.request.u.process_created.pid); } @@ -1265,15 +1266,15 @@ void recv_notice_msg(struct message_def *recv_msg, int ) case MsgType_ProcessDeath: if ( recv_msg->u.request.u.death.aborted ) { - printf ("[%s] Process %s abnormally terminated. Nid=%d, Pid=%d\n", - MyName, recv_msg->u.request.u.death.process_name, + printf ("[%s] %s - Process %s abnormally terminated. Nid=%d, Pid=%d\n", + MyName, time_string(), recv_msg->u.request.u.death.process_name, recv_msg->u.request.u.death.nid, recv_msg->u.request.u.death.pid); } else { - printf ("[%s] Process %s terminated normally. Nid=%d, Pid=%d\n", - MyName, recv_msg->u.request.u.death.process_name, + printf ("[%s] %s - Process %s terminated normally. Nid=%d, Pid=%d\n", + MyName, time_string(), recv_msg->u.request.u.death.process_name, recv_msg->u.request.u.death.nid, recv_msg->u.request.u.death.pid); } @@ -1298,18 +1299,18 @@ void recv_notice_msg(struct message_def *recv_msg, int ) break; case MsgType_Shutdown: - printf("[%s] Shutdown notice, level=%d received\n", - MyName, recv_msg->u.request.u.shutdown.level); + printf("[%s] %s - Shutdown notice, level=%d received\n", + MyName, time_string(), recv_msg->u.request.u.shutdown.level); nodePendingComplete(); break; case MsgType_TmSyncAbort: - printf("[%s] TmSync abort notice received\n", - MyName); + printf("[%s] %s - TmSync abort notice received\n", + MyName, time_string()); break; case MsgType_TmSyncCommit: - printf("[%s] TmSync commit notice received\n", - MyName); + printf("[%s] %s - TmSync commit notice received\n", + MyName, time_string()); break; case MsgType_ReintegrationError: @@ -1321,8 +1322,8 @@ void recv_notice_msg(struct message_def *recv_msg, int ) break; default: - printf("[%s] Unexpected notice type(%d) received\n", - MyName, recv_msg->type); + printf("[%s] %s - Unexpected notice type(%d) received\n", + MyName, time_string(), recv_msg->type); } @@ -3868,7 +3869,7 @@ int node_up( int nid, char *node_name, bool nowait ) // If this is a real cluster if ( nid == -1 ) { - // Get current physical state of target nodes + // Get current physical state of all nodes if ( !update_node_state( node_name, false ) ) { return( rc ) ; @@ -4133,7 +4134,8 @@ void persist_config( char *prefix ) } if (!foundConfig) { - printf ("[%s] Persistent process configuration does not exist\n", MyName); + printf("[%s] %s - Persistent process configuration does not exist\n" + , MyName, time_string() ); } } @@ -4213,7 +4215,8 @@ void persist_info( char *prefix ) } if (!foundConfig) { - printf ("[%s] Persistent process configuration does not exist\n", MyName); + printf("[%s] %s - Persistent process configuration does not exist\n" + , MyName, time_string() ); } } @@ -4268,7 +4271,9 @@ bool persist_process_kill( CPersistConfig *persistConfig ) , persistZones ); if ( !find_process( processName ) ) { - printf( "Persistent process %s does not exist\n", processName); + printf( "[%s] %s - Persistent process %s does not exist\n" + , MyName, time_string() + , processName ); continue; } kill_process( -1, -1, processName, true ); @@ -4288,7 +4293,9 @@ bool persist_process_kill( CPersistConfig *persistConfig ) , persistZones ); if ( !find_process( processName ) ) { - printf( "Persistent process %s does not exist\n", processName); + printf( "[%s] %s - Persistent process %s does not exist\n" + , MyName, time_string() + , processName ); break; } kill_process( -1, -1, processName, true ); @@ -4306,7 +4313,9 @@ bool persist_process_kill( CPersistConfig *persistConfig ) , persistZones ); if ( !find_process( processName ) ) { - printf( "Persistent process %s does not exist\n", processName); + printf( "[%s] %s - Persistent process %s does not exist\n" + , MyName, time_string() + , processName ); break; } kill_process( -1, -1, processName, true ); @@ -4377,7 +4386,9 @@ bool persist_process_start( CPersistConfig *persistConfig ) , persistZones ); if ( find_process( processName ) ) { - printf( "Persistent process %s already exists\n", processName); + printf( "[%s] %s - Persistent process %s already exists\n" + , MyName, time_string() + , processName ); continue; } if (programArgc) @@ -4403,7 +4414,9 @@ bool persist_process_start( CPersistConfig *persistConfig ) //, (char *)persistConfig->GetProgramName() ); if (pid > 0) { - printf( "Persistent process %s created\n", processName); + printf( "[%s] %s - Persistent process %s created\n" + , MyName, time_string() + , processName ); if (process_type == ProcessType_DTM) { DTMexists = true; @@ -4431,7 +4444,9 @@ bool persist_process_start( CPersistConfig *persistConfig ) , persistZones ); if ( find_process( processName ) ) { - printf( "Persistent process %s already exists\n", processName); + printf( "[%s] %s - Persistent process %s already exists\n" + , MyName, time_string() + , processName ); break; } if (programArgc) @@ -4456,7 +4471,9 @@ bool persist_process_start( CPersistConfig *persistConfig ) , programNameAndArgs ); if (pid > 0) { - printf( "Persistent process %s created\n", processName); + printf( "[%s] %s - Persistent process %s created\n" + , MyName, time_string() + , processName ); if (process_type == ProcessType_DTM) { DTMexists = true; @@ -4482,7 +4499,9 @@ bool persist_process_start( CPersistConfig *persistConfig ) , persistZones ); if ( find_process( processName ) ) { - printf( "Persistent process %s already exists\n", processName); + printf( "[%s] %s - Persistent process %s already exists\n" + , MyName, time_string() + , processName ); break; } if (programArgc) @@ -4507,7 +4526,9 @@ bool persist_process_start( CPersistConfig *persistConfig ) , programNameAndArgs ); if (pid > 0) { - printf( "Persistent process %s created\n", processName); + printf( "[%s] %s - Persistent process %s created\n" + , MyName, time_string() + , processName ); if (process_type == ProcessType_DTM) { DTMexists = true; @@ -4516,8 +4537,8 @@ bool persist_process_start( CPersistConfig *persistConfig ) else { if ( trace_settings & TRACE_SHELL_CMD ) - trace_printf("%s@%d [%s] persistexec failed!\n", - method_name, __LINE__, MyName); + trace_printf("%s@%d [%s] persist exec failed!\n" + , method_name, __LINE__, MyName); } break; default: @@ -6177,7 +6198,7 @@ void help_cmd (void) printf ("[%s] -- persist exec \n", MyName); printf ("[%s] -- persist info []\n", MyName); printf ("[%s] -- persist kill \n", MyName); - printf ("[%s] -- ps [{CS|DTM|GEN|PSD|SMS|SSMP|WDG}] [|]\n", MyName); + printf ("[%s] -- ps [{CS|DTM|GEN|PSD|SMS|SSMP|WDG}] [||]\n", MyName); printf ("[%s] -- pwd\n", MyName); printf ("[%s] -- quit\n", MyName); printf ("[%s] -- scanbufs\n", MyName); @@ -6372,6 +6393,7 @@ void monstats_cmd (char *) void node_cmd (char *cmd_tail) { int nid; + int pnid; char token[MAX_TOKEN]; char delimiter; char *cmd = cmd_tail; @@ -6423,7 +6445,7 @@ void node_cmd (char *cmd_tail) { sprintf( msgString, "[%s] Node add is not available with Virtual Nodes!",MyName); write_startup_log( msgString ); - printf ("[%s] Node add is not available with Virtual Nodes!\n", MyName); + printf ("%s\n", msgString); } else { @@ -6474,7 +6496,7 @@ void node_cmd (char *cmd_tail) { sprintf( msgString, "[%s] Node delete is not available with Virtual Nodes!",MyName); write_startup_log( msgString ); - printf ("[%s] Node delete is not available with Virtual Nodes!\n", MyName); + printf ("%s\n", msgString); } else { @@ -6487,7 +6509,7 @@ void node_cmd (char *cmd_tail) { sprintf( msgString, "[%s] Node delete is not enabled, to enable export SQ_ELASTICY_ENABLED=1",MyName); write_startup_log( msgString ); - printf ("[%s] Node delete is not enabled, to enable export SQ_ELASTICY_ENABLED=1\n", MyName); + printf ("%s\n", msgString); } } } @@ -6516,15 +6538,15 @@ void node_cmd (char *cmd_tail) if ( *cmd ) { nid = atoi (cmd); - if ((!isNumeric(cmd)) || (nid >= LNodesConfigMax) || (nid < 0)) + pnid = get_pnid_by_nid( nid ); + if ( pnid == -1 ) { - printf ("[%s] Invalid nid\n", MyName); - } - else - { - node_info(nid); - CurNodes = NumLNodes-NumDown; + printf( "[%s] Node id %d does not exist in configuration!\n" + , MyName, nid ); + return; } + node_info(nid); + CurNodes = NumLNodes-NumDown; } else { @@ -6546,7 +6568,7 @@ void node_cmd (char *cmd_tail) { sprintf( msgString, "[%s] Node name is not available with Virtual Nodes!",MyName); write_startup_log( msgString ); - printf ("[%s] Node name is not available with Virtual Nodes!\n", MyName); + printf ("%s\n", msgString); } else { @@ -6559,7 +6581,7 @@ void node_cmd (char *cmd_tail) { sprintf( msgString, "[%s] Node name is not enabled, to enable export SQ_ELASTICY_ENABLED=1",MyName); write_startup_log( msgString ); - printf ("[%s] Node name is not enabled, to enable export SQ_ELASTICY_ENABLED=1\n", MyName); + printf ("%s\n", msgString); } } } @@ -6714,9 +6736,9 @@ void node_config_cmd( char *cmd ) char *cmd_tail = cmd; char delim; - char msgString[MAX_BUFFER] = { 0 }; char token[MAX_TOKEN] = { 0 }; int nid = -1; + int pnid = -1; if ( trace_settings & TRACE_SHELL_CMD ) trace_printf ("%s@%d [%s] processing node config command.\n", @@ -6730,33 +6752,22 @@ void node_config_cmd( char *cmd ) if ( isNumeric( token ) ) { nid = atoi (token); - if (nid < 0 || nid > LNodesConfigMax - 1) + pnid = get_pnid_by_nid( nid ); + if ( pnid == -1 ) { - sprintf( msgString, "[%s] Node id is not configured!",MyName); - write_startup_log( msgString ); - printf ("%s\n", msgString); - return; + printf( "[%s] Node id %d does not exist in configuration!\n" + , MyName, nid ); + return; } - snprintf( msgString, sizeof(msgString) - , "[%s] Executing node config. (nid=%s)" - , MyName, token ); - write_startup_log( msgString ); } else { if ( get_node_name( token ) != 0 ) { - sprintf( msgString, "[%s] Node %s is not configured!" - , MyName, token); - write_startup_log( msgString ); - printf( "[%s] Node %s is not configured!\n" - , MyName, token); + printf( "[%s] Node %s does not exist in configuration!\n" + , MyName, token ); return; } - snprintf( msgString, sizeof(msgString) - , "[%s] Executing node config. (node_name=%s)" - , MyName, token ); - write_startup_log( msgString ); } } @@ -6793,11 +6804,10 @@ void node_delete_cmd( char *cmd ) { if ( get_node_name( token ) != 0 ) { - sprintf( msgString, "[%s] Node %s is not configured!" + sprintf( msgString, "[%s] Node %s does not exist in configuration!" , MyName, token); write_startup_log( msgString ); - printf( "[%s] Node %s is not configured!\n" - , MyName, token); + printf ("%s\n", msgString); return; } STRCPY(node_name, token); @@ -6805,13 +6815,14 @@ void node_delete_cmd( char *cmd ) , "[%s] Executing node delete. (node_name=%s)" , MyName, node_name ); write_startup_log( msgString ); + printf ("%s\n", msgString); } } else { sprintf( msgString, "[%s] Invalid node delete options syntax!",MyName); write_startup_log( msgString ); - printf ("[%s] Invalid node delete options syntax!\n", MyName); + printf ("%s\n", msgString); return; } @@ -6824,6 +6835,7 @@ void node_down_cmd( char *cmd ) int numLNodes = -1; int nid; + int pnid; char *cmd_tail = cmd; char delim; char msgString[MAX_BUFFER] = { 0 }; @@ -6852,41 +6864,45 @@ void node_down_cmd( char *cmd ) } write_startup_log( msgString ); printf ("%s\n", msgString); + nid = atoi (token); - if (nid < 0 || nid > LNodesConfigMax - 1) + pnid = get_pnid_by_nid( nid ); + if ( pnid == -1 ) { - sprintf( msgString, "[%s] Invalid node id!",MyName); + sprintf( msgString, "[%s] Node id %d does not exist in configuration!" + , MyName, nid); write_startup_log( msgString ); printf ("%s\n", msgString); - return; + return; } } else { - if ( get_node_name( token ) != 0 ) - { - sprintf( msgString, "[%s] Node %s is not configured!" - , MyName, token); - write_startup_log( msgString ); - printf ("%s\n", msgString); - return; - } - STRCPY(node_name, token); - nid = get_first_nid( node_name ); if (cmd_tail[0] != 0) { snprintf( msgString, sizeof(msgString) , "[%s] Executing node down. (node_name=%s) \"%s\"" - , MyName, node_name, cmd_tail ); + , MyName, token, cmd_tail ); } else { snprintf( msgString, sizeof(msgString) , "[%s] Executing node down. (node_name=%s)" - , MyName, node_name ); + , MyName, token ); } write_startup_log( msgString ); printf ("%s\n", msgString); + + if ( get_node_name( token ) != 0 ) + { + sprintf( msgString, "[%s] Node %s does not exist in configuration!" + , MyName, token); + write_startup_log( msgString ); + printf ("%s\n", msgString); + return; + } + STRCPY(node_name, token); + nid = get_first_nid( node_name ); } numLNodes = get_lnodes_count( nid ); @@ -6895,7 +6911,6 @@ void node_down_cmd( char *cmd ) return; } - int pnid; int zid = -1; STATE state; @@ -6907,7 +6922,7 @@ void node_down_cmd( char *cmd ) { sprintf( msgString, "[%s] Node is already down! (nid=%d, state=%s)\n", MyName, nid, StateString(state) ); write_startup_log( msgString ); - printf ("[%s] Node is already down! (nid=%d, state=%s)\n", MyName, nid, StateString(state) ); + printf ("%s\n", msgString); return; } else @@ -6918,7 +6933,7 @@ void node_down_cmd( char *cmd ) { sprintf( msgString, "[%s] Multiple logical nodes in physical node. Use '!' to down all logical nodes in physical node\n", MyName); write_startup_log( msgString ); - printf ("[%s] Multiple logical nodes in physical node. Use '!' to down all logical nodes in physical node\n", MyName); + printf ("%s\n", msgString); return; } } @@ -7058,7 +7073,7 @@ void node_up_cmd( char *cmd, char delimiter ) { sprintf( msgString, "[%s] Invalid up options syntax!",MyName); write_startup_log( msgString ); - printf ("[%s] Invalid up options syntax!\n", MyName); + printf ("%s\n", msgString); delimiter = ' '; break; } @@ -7068,7 +7083,7 @@ void node_up_cmd( char *cmd, char delimiter ) { sprintf( msgString, "[%s] Invalid up syntax!",MyName); write_startup_log( msgString ); - printf ("[%s] Invalid up syntax!\n", MyName); + printf ("%s\n", msgString); } else if (delimiter == '}') { @@ -7080,6 +7095,11 @@ void node_up_cmd( char *cmd, char delimiter ) { if ( VirtualNodes ) { + sprintf( msgString, "[%s] Executing node up. (nid=%s)" + , MyName, cmd_tail); + write_startup_log( msgString ); + printf ("%s\n", msgString); + get_token( cmd_tail, token, &delim ); if ( isNumeric( token ) ) { @@ -7088,7 +7108,7 @@ void node_up_cmd( char *cmd, char delimiter ) { sprintf( msgString, "[%s] Invalid node id!",MyName); write_startup_log( msgString ); - printf ("[%s] Invalid node id!\n", MyName); + printf ("%s\n", msgString); } else { @@ -7100,27 +7120,47 @@ void node_up_cmd( char *cmd, char delimiter ) { sprintf( msgString, "[%s] Invalid node id!",MyName); write_startup_log( msgString ); - printf ("[%s] Invalid node id!\n", MyName); + printf ("%s\n", msgString); } } else { - if ( get_node_name( cmd_tail ) == 0 ) + sprintf( msgString, "[%s] Executing node up. (node=%s)" + , MyName, cmd_tail); + write_startup_log( msgString ); + printf ("%s\n", msgString); + + get_token( cmd_tail, token, &delim ); + if ( isNumeric( token ) ) + { + sprintf( msgString, "[%s] Invalid node name (%s)!" + , MyName, token); + write_startup_log( msgString ); + printf ("%s\n", msgString); + return; + } + else { - if ( ClusterConfig.GetStorageType() == TCDBSQLITE) + if ( get_node_name( token ) == 0 ) { - if ( copy_config_db( cmd_tail ) == 0 ) + if ( ClusterConfig.GetStorageType() == TCDBSQLITE) { - node_up( -1, cmd_tail, nowait ); + if ( copy_config_db( cmd_tail ) != 0 ) + { + return; + } } } + else + { + sprintf( msgString, "[%s] Node %s does not exist in configuration!" + , MyName, token); + write_startup_log( msgString ); + printf ("%s\n", msgString); + return; + } } - else - { - sprintf( msgString, "[%s] Invalid node name!",MyName); - write_startup_log( msgString ); - printf ("[%s] Invalid node name!\n", MyName); - } + node_up( -1, cmd_tail, nowait ); } } } @@ -7485,6 +7525,7 @@ void ps_cmd (char *cmd_tail, char delimiter) { int nid; int pid; + int pnid; char process_name[MAX_PROCESS_NAME]; char token[MAX_TOKEN]; PROCESSTYPE process_type = ProcessType_Undefined; @@ -7546,7 +7587,7 @@ void ps_cmd (char *cmd_tail, char delimiter) } } - // check if we have a process or + // check if we have a process or or if (isdigit (*cmd_tail)) { cmd_tail = get_token (cmd_tail, token, &delimiter); @@ -7558,7 +7599,15 @@ void ps_cmd (char *cmd_tail, char delimiter) } else { - printf ("[%s] Invalid process Nid,Pid!\n", MyName); + nid = atoi (token); + pid = -1; + //printf ("[%s] Invalid process Nid,Pid!\n", MyName); + //return; + } + pnid = get_pnid_by_nid( nid ); + if ( pnid == -1 ) + { + printf( "[%s] Invalid node, nid=%d\n", MyName, nid ); return; } } @@ -8251,8 +8300,6 @@ bool process_command( char *token, char *cmd_tail, char delimiter ) } else if (strcmp (token, "up") == 0) { - sprintf( msgString, "[%s] Executing node up. (node=%s)",MyName,cmd_tail); - write_startup_log( msgString ); if (Started) { node_up_cmd( cmd_tail, delimiter ); diff --git a/core/sqf/monitor/test/runtest b/core/sqf/monitor/test/runtest index 9c08949451..8c1193b87c 100755 --- a/core/sqf/monitor/test/runtest +++ b/core/sqf/monitor/test/runtest @@ -102,7 +102,8 @@ fi # # Setup test execution # -export PATH=$PATH:$PWD/Linux-x86_64/64/dbg +ARCH=`arch` +export PATH=$PATH:$PWD/Linux-${ARCH}/64/dbg cd $TRAF_HOME/monitor/test echo $PWD diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh index eec2a3bc0a..7058637a2e 100644 --- a/core/sqf/sqenvcom.sh +++ b/core/sqf/sqenvcom.sh @@ -707,6 +707,13 @@ export SQ_MON_EPOLL_RETRY_COUNT=4 # Trafodion Configuration Zookeeper store #export TC_ZCONFIG_SESSION_TIMEOUT=120 +# increase SQ_MON,ZCLIENT,WDT timeout only to jenkins env. +if [[ "$TRAF_HOME" == *"/home/jenkins"* ]]; then +export SQ_MON_EPOLL_WAIT_TIMEOUT=20 +export SQ_MON_ZCLIENT_SESSION_TIMEOUT=360 +export SQ_WDT_KEEPALIVETIMERVALUE=360 +fi + # set to 0 to disable phandle verifier export SQ_PHANDLE_VERIFIER=1 diff --git a/core/sqf/sql/scripts/sqnodes.pm b/core/sqf/sql/scripts/sqnodes.pm index 36d8f0c588..0d09565117 100644 --- a/core/sqf/sql/scripts/sqnodes.pm +++ b/core/sqf/sql/scripts/sqnodes.pm @@ -279,10 +279,10 @@ sub verifyParse displayStmt($stmtOk); print " Error: node-id not specified\n"; } - elsif ($nodeId > 255) + elsif ($nodeId > 1023) { displayStmt($stmtOk); - print " Error: node-id must be in the range 0..255.\n"; + print " Error: node-id must be in the range 0..1023.\n"; } if (@cores == 0) { diff --git a/core/sqf/src/tm/tm_internal.h b/core/sqf/src/tm/tm_internal.h index bca0f77f63..cf735b2e48 100644 --- a/core/sqf/src/tm/tm_internal.h +++ b/core/sqf/src/tm/tm_internal.h @@ -46,7 +46,6 @@ #define MAX_FILE_NAME 64 #define MAX_NUM_TRANS 1024 -#define MAX_NODES 256 #define MAX_SYNC_TXS 50 #define MAX_RECEIVE_BUFFER 200000 // low number for testing diff --git a/core/sqf/src/tm/tmlibmsg.h b/core/sqf/src/tm/tmlibmsg.h index c8eea04475..1049b7d651 100644 --- a/core/sqf/src/tm/tmlibmsg.h +++ b/core/sqf/src/tm/tmlibmsg.h @@ -43,8 +43,8 @@ //#include "dumapp.h" +#include "trafconf/trafconfig.h" #include "dtm/tm.h" - #include "dtm/xa.h" #include "rm.h" #include "../../inc/fs/feerrors.h" //legacy error codes for SQL @@ -86,7 +86,7 @@ #define MAX_NUM_TRANS 5000 #define STEADYSTATE_LOW_TRANS 5 #define STEADYSTATE_HIGH_TRANS 1000 -#define MAX_NODES 256 +#define MAX_NODES TC_NODES_MAX #define MAX_SYNC_TXS 50 #define MAX_TXN_TAGS UINT_MAX diff --git a/core/sqf/src/tm/tools/dtmci.cpp b/core/sqf/src/tm/tools/dtmci.cpp index fff798f856..5d197347a0 100644 --- a/core/sqf/src/tm/tools/dtmci.cpp +++ b/core/sqf/src/tm/tools/dtmci.cpp @@ -50,8 +50,6 @@ DEFINE_EXTERN_COMP_DOVERS(dtmci) const char ga_timestamp[] = "v 3.1.0, Nov 26, 2014"; -#define MAX_NODES 256 - using namespace std; extern const char *ms_getenv_str(const char *pp_key); diff --git a/core/sqf/src/tm/tools/pwd.cpp b/core/sqf/src/tm/tools/pwd.cpp index f95c126d0e..97606a22c0 100644 --- a/core/sqf/src/tm/tools/pwd.cpp +++ b/core/sqf/src/tm/tools/pwd.cpp @@ -47,8 +47,6 @@ DEFINE_EXTERN_COMP_DOVERS(dtmci) const char ga_timestamp[] = "v 3.1.0, Nov 26, 2014"; -#define MAX_NODES 256 - using namespace std; extern const char *ms_getenv_str(const char *pp_key); diff --git a/core/sqf/src/tm/tools/tmshutdown.cpp b/core/sqf/src/tm/tools/tmshutdown.cpp index 5961c5b047..b0ca8b0553 100644 --- a/core/sqf/src/tm/tools/tmshutdown.cpp +++ b/core/sqf/src/tm/tools/tmshutdown.cpp @@ -37,7 +37,6 @@ // Version DEFINE_EXTERN_COMP_DOVERS(tmshutdown) -#define MAX_NODES 256 #define MAX_ARGLEN 32 using namespace std; From bded0e843f8b600a5459c5353bbdc9f59d6d6551 Mon Sep 17 00:00:00 2001 From: Zalo Correa Date: Tue, 27 Feb 2018 17:34:25 -0800 Subject: [PATCH 2/5] [TRAFODION-2883] Preliminary Scale Enhacements - Added timestamps to node down system message - Added timestamps and values to registry change notifications - Fixed monitor trace causing memory overwrites - Implemented AGENT mode monitor functionality o This is a pre reliminary change to remove dependency on OpenMPI during initialization of operational cluster by creating a cluster of one node (MASTER monitor) where other remote nodes (SLAVE monitors) join the cluster through the MASTER - Implemented MASTER monitor selection logic - Scale bug fixes found when creating clusters greater than 120 nodes- --- .../include/common/evl_sqlog_eventnum.h | 14 + core/sqf/monitor/linux/cluster.cxx | 291 ++++++++++++-- core/sqf/monitor/linux/cluster.h | 7 + core/sqf/monitor/linux/commaccept.cxx | 108 ++--- core/sqf/monitor/linux/mlio.cxx | 8 +- core/sqf/monitor/linux/monitor.cxx | 370 +++++++++++++++--- core/sqf/monitor/linux/monitor.h | 2 - core/sqf/monitor/linux/pnode.cxx | 55 +-- core/sqf/monitor/linux/process.cxx | 75 +++- core/sqf/monitor/linux/reqqueue.cxx | 1 + core/sqf/monitor/linux/zclient.cxx | 343 +++++++++++++++- core/sqf/monitor/linux/zclient.h | 5 + core/sqf/sqenvcom.sh | 12 + core/sqf/sql/scripts/sqnodes.pm | 4 +- core/sqf/src/trafconf/clusterconf.cpp | 10 + core/sqf/src/trafconf/clusterconf.h | 4 + 16 files changed, 1131 insertions(+), 178 deletions(-) diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h index 96c3df9305..0418c70544 100644 --- a/core/sqf/export/include/common/evl_sqlog_eventnum.h +++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h @@ -255,6 +255,12 @@ #define MON_MONITOR_MAIN_9 101020109 #define MON_MONITOR_MAIN_10 101020110 #define MON_MONITOR_MAIN_11 101020111 +#define MON_MONITOR_MAIN_12 101020112 +#define MON_MONITOR_MAIN_13 101020113 +#define MON_MONITOR_MAIN_14 101020114 +#define MON_MONITOR_MAIN_15 101020115 +#define MON_MONITOR_MAIN_16 101020116 +#define MON_MONITOR_MAIN_17 101020117 #define MON_MONITOR_TMLEADER_1 101020201 #define MON_MONITOR_TMLEADER_2 101020202 #define MON_MONITOR_DEATH_HANDLER_1 101020301 @@ -895,6 +901,14 @@ #define MON_ZCLIENT_ISZNODEEXPIRED_2 101371802 #define MON_ZCLIENT_CHECKMYZNODE_1 101371901 #define MON_ZCLIENT_CHECKMYZNODE_2 101371902 +#define MON_ZCLIENT_AMICONFIGUREDMASTER_1 101372101 +#define MON_ZCLIENT_AMICONFIGUREDMASTER_2 101372102 +#define MON_ZCLIENT_WAITFORANDRETURNMASTER 101372103 +#define MON_ZCLIENT_CREATEMASTERZNODE 101372104 +#define MON_ZCLIENT_WATCHMASTERNODEDELETE_1 101372105 +#define MON_ZCLIENT_WATCHMASTERNODEDELETE_2 101372106 +#define MON_ZCLIENT_WATCHMASTERNODEDELETE_3 101372107 +#define MON_ZCLIENT_CREATEORSETMASTERWATCH 101372108 /* Module: zconfig.cxx = 38 */ #define ZCONFIG_ZCONFIG_1 101380101 diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx index 49697ddaea..d1b3e912ff 100644 --- a/core/sqf/monitor/linux/cluster.cxx +++ b/core/sqf/monitor/linux/cluster.cxx @@ -67,6 +67,11 @@ using namespace std; extern bool IAmIntegrating; extern bool IAmIntegrated; extern bool IsRealCluster; +extern bool IsAgentMode; +extern bool IsMaster; +extern bool IsMPIChild; +extern char MasterMonitorName[MAX_PROCESS_PATH]; +extern char Node_name[MPI_MAX_PROCESSOR_NAME]; extern bool ZClientEnabled; extern char IntegratingMonitorPort[MPI_MAX_PORT_NAME]; extern char MyCommPort[MPI_MAX_PORT_NAME]; @@ -289,11 +294,12 @@ void CCluster::NodeTmReady( int nid ) if (trace_settings & (TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf( "%s@%d - TmReady, nid=%d, tm count=%d, soft node down=%d\n" + trace_printf( "%s@%d - TmReady, nid=%d, tm count=%d, soft node down=%d, LNodesCount=%d\n" , method_name, __LINE__ , nid , tmReadyCount_ - , MyNode->IsSoftNodeDown() ); + , MyNode->IsSoftNodeDown() + , MyNode->GetLNodesCount() ); } MyNode->StartPStartDPersistentDTM( nid ); @@ -352,6 +358,131 @@ void CCluster::NodeReady( CNode *spareNode ) TRACE_EXIT; } +// Assign leaders as required +// Current leaders are TM Leader and Monitor Leader +void CCluster::AssignLeaders( int pnid, bool checkProcess ) +{ + const char method_name[] = "CCluster::AssignLeaders"; + TRACE_ENTRY; + + AssignTmLeader ( pnid, checkProcess ); + AssignMonitorLeader ( pnid ); + + TRACE_EXIT; +} + +// Assign montior lead in the case of failure +void CCluster::AssignMonitorLeader( int pnid ) +{ + const char method_name[] = "CCluster::AssignMonitorLeader"; + TRACE_ENTRY; + + int i = 0; + int rc = 0; + + int lMonitorLeaderPNid = MonitorLeaderPNid; + CNode *node = NULL; + + if (MonitorLeaderPNid != pnid) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d" " - (MasterMonitor) returning, pnid %d != monitorLead %d\n" + , method_name, __LINE__, pnid, MonitorLeaderPNid ); + } + return; + } + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf( "%s@%d" " - (MasterMonitor) Node " "%d" " MonitorLeader failed!\n" + , method_name, __LINE__, MonitorLeaderPNid ); + } + + for (i=0; iGetPNid() + , node->GetName() + , NodePhaseString(node->GetPhase()) + , node->IsSoftNodeDown()); + } + + if ( node->IsSpareNode() || + node->IsSoftNodeDown() || + node->GetState() != State_Up || + node->GetPhase() != Phase_Ready ) + { + continue; // skip this node for any of the above reasons + } + + MonitorLeaderPNid = node->GetPNid(); + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " - Node " "%d" " is the new MonitorLeaderPNid." "\n", method_name, __LINE__, MonitorLeaderPNid); + } + + if (ZClientEnabled) + { + rc = ZClient->CreateMasterZNode ( node->GetName() ); + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader CreateMasterZNode with rc = %d\n", method_name, __LINE__, rc); + } + if ( (rc == ZOK) || (rc == ZNODEEXISTS) ) + { + if ( IsAgentMode ) + { + rc = ZClient->WatchMasterNode( node->GetName( ) ); + if ( trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC) ) + { + trace_printf( "%s@%d" " (MasterMonitor) AssignMonitorLeader WatchMasterNode with rc = %d\n", method_name, __LINE__, rc ); + } + } + } + else + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) + { + trace_printf("%s@%d" " (MasterMonitor) AssignMonitorLeader Unable to set create or set watch\n", method_name, __LINE__); + } + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], Unable to set create or set watch on master node %s\n" + , method_name, node->GetName() ); + mon_log_write(MON_ZCLIENT_CREATEORSETMASTERWATCH, SQ_LOG_ERR, buf); + } + } + + break; + } + + TRACE_EXIT; +} + // Assigns a new TMLeader if given pnid is same as TmLeaderNid // TmLeader is a logical node num. // pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen. @@ -494,6 +625,7 @@ CCluster::CCluster (void) configPNodesMax_ (-1), NodeMap (NULL), TmLeaderNid (-1), + MonitorLeaderPNid (-1), tmReadyCount_(0), minRecvCount_(4096), recvBuffer_(NULL), @@ -529,6 +661,7 @@ CCluster::CCluster (void) const char method_name[] = "CCluster::CCluster"; TRACE_ENTRY; + configMaster_ = -1; MPI_Comm_set_errhandler(MPI_COMM_WORLD,MPI_ERRORS_RETURN); char *env = getenv("SQ_MON_CHECK_SEQNUM"); @@ -548,6 +681,9 @@ CCluster::CCluster (void) CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); configPNodesMax_ = clusterConfig->GetPNodesConfigMax(); + // get master from CClusterConfig + configMaster_ = clusterConfig->GetConfigMaster(); + // Compute minimum "sync cycles" per second. The minimum is 1/10 // the expected number, assuming "next_test_delay" cycles per second (where // next_test_delay is in microseconds). @@ -640,6 +776,21 @@ CCluster::~CCluster (void) const char method_name[] = "CCluster::~CCluster"; TRACE_ENTRY; + if (epollFD_ != -1) + { + close( epollFD_ ); + } + + if (commSock_ != -1) + { + close( commSock_ ); + } + + if (syncSock_ != -1) + { + close( syncSock_ ); + } + delete [] comms_; delete [] otherMonRank_; delete [] socks_; @@ -677,26 +828,26 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[]) unsigned long long seqNum = 0; - for (int i = 0; i < GetConfigPNodesMax(); i++) + for (int i = 0; i < GetConfigPNodesCount(); i++) { if (trace_settings & TRACE_RECOVERY) { - trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum ); + trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[indexToPnid_[i]].seq_num, seqNum ); } - if (nodestate[i].seq_num > 1) + if (nodestate[indexToPnid_[i]].seq_num > 1) { if (seqNum == 0) { - seqNum = nodestate[i].seq_num; + seqNum = nodestate[indexToPnid_[i]].seq_num; } else { - assert(nodestate[i].seq_num == seqNum); + assert(nodestate[indexToPnid_[i]].seq_num == seqNum); } } if (trace_settings & TRACE_RECOVERY) { - trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum ); + trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[indexToPnid_[i]].seq_num, seqNum ); } } @@ -857,6 +1008,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state) if ( ZClientEnabled ) { ZClient->WatchNodeDelete( node->GetName() ); + ZClient->WatchNodeMasterDelete( node->GetName() ); } } } @@ -875,7 +1027,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state) if ( Emulate_Down ) { IAmIntegrated = false; - AssignTmLeader(pnid, false); + AssignLeaders(pnid, false); } TRACE_EXIT; @@ -976,7 +1128,7 @@ void CCluster::SoftNodeDown( int pnid ) } IAmIntegrated = false; - AssignTmLeader(pnid, false); + AssignLeaders(pnid, false); TRACE_EXIT; } @@ -1237,8 +1389,8 @@ int CCluster::HardNodeUp( int pnid, char *node_name ) TRACE_ENTRY; if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) - trace_printf( "%s@%d - pnid=%d, name=%s\n" - , method_name, __LINE__, pnid, node_name ); + trace_printf( "%s@%d - pnid=%d, name=%s (MyPNID = %d)\n" + , method_name, __LINE__, pnid, node_name, MyPNID ); if ( pnid == -1 ) { @@ -2252,7 +2404,7 @@ void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, { case SyncType_TmData: if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) - trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); + trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d), (phase=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid, MyNode->GetPhase()); if ( ! MyNode->IsSpareNode() && MyNode->GetPhase() != Phase_Ready ) { MyNode->CheckActivationPhase(); @@ -2871,16 +3023,49 @@ void CCluster::InitializeConfigCluster( void ) InitServerSock(); } - // The new monitor in a real cluster initializes all - // existing nodes to a down state. - // ReIntegrate() will set the state to up when communication is established. - if ( IAmIntegrating ) + if (trace_settings & TRACE_INIT) + { + trace_printf( "%s@%d (MasterMonitor) IAmIntegrating=%d," + " IsAgentMode=%d, IsMaster=%d," + " MasterMonitorName=%s, Node_name=%s\n" + , method_name, __LINE__ + , IAmIntegrating + , IsAgentMode, IsMaster, MasterMonitorName, Node_name ); + } + + if (IAmIntegrating || IsAgentMode) { + int TmLeaderPNid = -1; + if (IsMaster) + { + TmLeaderNid = Nodes->GetFirstNid(); + TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid(); + } + // Non-master monitors in AGENT mode in a real cluster initialize all + // remote nodes to a down state. The master monitor and the joining + // monitors will set the joining node state to up as part of the node + // re-integration processing as monitor processes join the cluster + // through the master. for (int i=0; i < clusterConfig->GetPNodesCount(); i++) { - if ( Node[indexToPnid_[i]] && Node[indexToPnid_[i]]->GetPNid() != MyPNID ) + if (Node[indexToPnid_[i]]) { - Node[indexToPnid_[i]]->SetState( State_Down ); + if (Node[indexToPnid_[i]]->GetPNid() == MyPNID) + { // Set bit indicating node is up + upNodes_.upNodes[indexToPnid_[i]/MAX_NODE_BITMASK] |= + (1ull << (indexToPnid_[i]%MAX_NODE_BITMASK)); + } + else + { // Set node state to down + Node[indexToPnid_[i]]->SetState( State_Down ); + if (IsMaster) + { + if (TmLeaderPNid == indexToPnid_[i]) + { + AssignTmLeader(indexToPnid_[i], false); + } + } + } } } } @@ -3060,6 +3245,23 @@ void CCluster::InitializeConfigCluster( void ) if (nodeNames) delete [] nodeNames; } + if ( CommType == CommType_Sockets ) + { + // Allgather() cluster sockets are established as remote + // monitor processes join the cluster + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + for ( int i =0; i < clusterConfig->GetPNodesCount() ; i++ ) + { + trace_printf( "%s@%d %s (%d), state=%s, socks_[%d]=%d\n" + , method_name, __LINE__ + , Node[indexToPnid_[i]]->GetName() + , Node[indexToPnid_[i]]->GetPNid() + , StateString(Node[indexToPnid_[i]]->GetState()) + , indexToPnid_[i], socks_[indexToPnid_[i]]); + } + } + } if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { for ( int i =0; i < MAX_NODE_MASKS ; i++ ) @@ -3072,7 +3274,10 @@ void CCluster::InitializeConfigCluster( void ) // Kill the MPICH hydra_pmi_proxy to prevent it from killing all // processes in cluster when mpirun or monitor processes are killed - kill( getppid(), SIGKILL ); + if (!IsAgentMode || (IsAgentMode && IsMPIChild)) + { + kill( getppid(), SIGKILL ); + } TRACE_EXIT; } @@ -3807,10 +4012,31 @@ void CCluster::ReIntegrateSock( int initProblem ) TEST_POINT( TP010_NODE_UP ); // Connect with my creator monitor - joinSock_ = Monitor->Connect( IntegratingMonitorPort ); - if ( joinSock_ < 0 ) + bool lv_done = false; + bool lv_did_not_connect_in_first_attempt = false; + while ( ! lv_done ) { - HandleReintegrateError( joinSock_, Reintegrate_Err1, -1, NULL, true ); + joinSock_ = Monitor->Connect( IntegratingMonitorPort ); + if ( joinSock_ < 0 ) + { + if ( IsAgentMode ) + { + lv_did_not_connect_in_first_attempt = true; + sleep( 15 ); + } + else + { + HandleReintegrateError( joinSock_, Reintegrate_Err1, -1, NULL, true ); + } + } + else + { + if ( lv_did_not_connect_in_first_attempt ) + { + sleep( 10 ); + } + lv_done = true; + } } mem_log_write(CMonLog::MON_REINTEGRATE_4, MyPNID); @@ -4281,8 +4507,6 @@ void CCluster::SetIntegratingPNid( int pnid ) TRACE_ENTRY; integratingPNid_ = pnid; - // Indicate to the commAcceptor thread to stop accepting connections - CommAccept.stopAccepting(); TRACE_EXIT; } @@ -6181,8 +6405,8 @@ void CCluster::HandleDownNode( int pnid ) if (trace_settings & TRACE_INIT) trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName()); - // assign new TmLeader if TMLeader node is dead. - AssignTmLeader(pnid, false); + // assign new leaders if needed + AssignLeaders(pnid, false); // Build available list of spare nodes CNode *spareNode; @@ -8276,6 +8500,19 @@ int CCluster::MkSrvSock( int *pport ) return ( -1 ); } + int reuse = 1; // sockopt reuse option + if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) ) + { + char la_buf[MON_STRING_BUF_SIZE]; + int err = errno; + sprintf( la_buf, "[%s], setsockopt(SO_REUSEADDR) failed! errno=%d (%s)\n" + , method_name, err, strerror( err )); + mon_log_write(MON_CLUSTER_MKSRVSOCK_4, SQ_LOG_ERR, la_buf); + close( sock ); + return ( -1 ); + } + + // Bind socket. size = sizeof(sockinfo); memset( (char *) &sockinfo, 0, size ); diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h index 58d3540e78..6b658ae72c 100644 --- a/core/sqf/monitor/linux/cluster.h +++ b/core/sqf/monitor/linux/cluster.h @@ -113,7 +113,9 @@ class CCluster #ifndef USE_BARRIER void ArmWakeUpSignal (void); #endif + void AssignLeaders( int pnid, bool checkProcess ); void AssignTmLeader( int pnid, bool checkProcess ); + void AssignMonitorLeader( int pnid ); void stats(); void CompleteSyncCycle() { syncCycle_.lock(); syncCycle_.wait(); syncCycle_.unlock(); } @@ -124,6 +126,8 @@ class CCluster void ExpediteDown( void ); inline int GetTmLeader( void ) { return( TmLeaderNid); } inline void SetTmLeader( int tmLeaderNid ) { TmLeaderNid = tmLeaderNid; } + inline int GetMonitorLeader( void ) { return( MonitorLeaderPNid); } + inline void SetMonitorLeader( int monitorLeaderPNid ) { MonitorLeaderPNid = monitorLeaderPNid; } int GetDownedNid( void ); inline int GetTmSyncPNid( void ) { return( TmSyncPNid ); } // Physical Node ID of current TmSync operations master void InitClusterComm(int worldSize, int myRank, int *rankToPnid); @@ -177,6 +181,7 @@ class CCluster bool ReinitializeConfigCluster( bool nodeAdded, int pnid ); int incrGetVerifierNum(); + int getConfigMaster() { return configMaster_; } enum { SYNC_MAX_RESPONSIVE = 1 }; // Max seconds before sync thread is "stuck" @@ -201,6 +206,7 @@ class CCluster int syncSock_; int epollFD_; int *indexToPnid_; + int configMaster_; CNode **Node; // array of nodes CLNode **LNode; // array of logical nodes @@ -229,6 +235,7 @@ class CCluster int configPNodesMax_; // max # of physical nodes that can be configured int *NodeMap; // Mapping of Node ranks to COMM_WORLD ranks int TmLeaderNid; // Nid of currently assigned TM Leader node + int MonitorLeaderPNid; // PNid of currently assigned Monitor leader node int tmReadyCount_; // # of DTM processes ready for transactions size_t minRecvCount_; // minimum size of receive buffer for allgather diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx index 21b30a6655..11c12d7c96 100644 --- a/core/sqf/monitor/linux/commaccept.cxx +++ b/core/sqf/monitor/linux/commaccept.cxx @@ -556,6 +556,25 @@ void CCommAccept::processNewSock( int joinFd ) node= Nodes->GetNode( nodeId.nodeName ); + if ( node == NULL ) + { + close( joinFd ); + + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], got connection from unknown " + "node %d (%s). Ignoring it.\n" + , method_name + , nodeId.pnid + , nodeId.nodeName); + mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf); + + // Requests is complete, begin accepting connections again + CommAccept.startAccepting(); + + return; + } + if ( nodeId.ping ) { // Reply with my node info @@ -595,6 +614,10 @@ void CCommAccept::processNewSock( int joinFd ) , method_name, node?node->GetName():"", ErrorMsg(rc)); mon_log_write(MON_COMMACCEPT_19, SQ_LOG_ERR, buf); } + + // Requests is complete, begin accepting connections again + CommAccept.startAccepting(); + return; } @@ -607,53 +630,6 @@ void CCommAccept::processNewSock( int joinFd ) , nodeId.creatorShellVerifier ); } - int pnid = -1; - if ( node != NULL ) - { // Store port numbers for the node - char commPort[MPI_MAX_PORT_NAME]; - char syncPort[MPI_MAX_PORT_NAME]; - strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME); - strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME); - char *pch1; - char *pch2; - pnid = nodeId.pnid; - - node->SetCommPort( commPort ); - pch1 = strtok (commPort,":"); - pch1 = strtok (NULL,":"); - node->SetCommSocketPort( atoi(pch1) ); - - node->SetSyncPort( syncPort ); - pch2 = strtok (syncPort,":"); - pch2 = strtok (NULL,":"); - node->SetSyncSocketPort( atoi(pch2) ); - - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) - { - trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n" - , method_name, __LINE__ - , node->GetPNid() - , node->GetName() - , pch1, atoi(pch1) - , pch2, atoi(pch2) ); - } - } - else - { - close( joinFd ); - - char buf[MON_STRING_BUF_SIZE]; - snprintf( buf, sizeof(buf) - , "[%s], got connection from unknown " - "node %d (%s). Ignoring it.\n" - , method_name - , nodeId.pnid - , nodeId.nodeName); - mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf); - - return; - } - // Sanity check, re-integrating node must be down if ( node->GetState() != State_Down ) { @@ -672,9 +648,43 @@ void CCommAccept::processNewSock( int joinFd ) , StateString(node->GetState())); mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf); + // Requests is complete, begin accepting connections again + CommAccept.startAccepting(); + return; } + int pnid = -1; + + // Store port numbers for the node + char commPort[MPI_MAX_PORT_NAME]; + char syncPort[MPI_MAX_PORT_NAME]; + strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME); + strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME); + char *pch1; + char *pch2; + pnid = nodeId.pnid; + + node->SetCommPort( commPort ); + pch1 = strtok (commPort,":"); + pch1 = strtok (NULL,":"); + node->SetCommSocketPort( atoi(pch1) ); + + node->SetSyncPort( syncPort ); + pch2 = strtok (syncPort,":"); + pch2 = strtok (NULL,":"); + node->SetSyncSocketPort( atoi(pch2) ); + + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n" + , method_name, __LINE__ + , node->GetPNid() + , node->GetName() + , pch1, atoi(pch1) + , pch2, atoi(pch2) ); + } + mem_log_write(CMonLog::MON_CONNTONEWMON_4, pnid); if ( MyNode->IsCreator() ) @@ -916,6 +926,8 @@ void CCommAccept::commAcceptorIB() interComm = MPI_COMM_NULL; rc = MPI_Comm_accept( MyCommPort, MPI_INFO_NULL, 0, MPI_COMM_SELF, &interComm ); + // Stop accepting connections until this request completes + CommAccept.stopAccepting(); } else { @@ -988,6 +1000,8 @@ void CCommAccept::commAcceptorSock() mem_log_write(CMonLog::MON_CONNTONEWMON_1); joinFd = Monitor->AcceptCommSock(); + // Stop accepting connections until this request completes + CommAccept.stopAccepting(); } else { diff --git a/core/sqf/monitor/linux/mlio.cxx b/core/sqf/monitor/linux/mlio.cxx index 61803f8345..7db35ec714 100644 --- a/core/sqf/monitor/linux/mlio.cxx +++ b/core/sqf/monitor/linux/mlio.cxx @@ -1261,7 +1261,13 @@ SQ_LocalIOToClient::SQ_LocalIOToClient(int nid) if (cmid == -1) { if (trace_settings & TRACE_INIT) - trace_printf("%s@%d" " failed shmget(" "%d" "), errno=" "%d" "\n", method_name, __LINE__, (shsize), errno); + { + int err = errno; + char la_buf[MON_STRING_BUF_SIZE]; + trace_printf( "%s@%d" " failed shmget(%d), errno=%d (%s)\n" + , method_name, __LINE__ + , (shsize), err, strerror(err) ); + } if ( errno == EEXIST) { // and try getting it with a smaller size diff --git a/core/sqf/monitor/linux/monitor.cxx b/core/sqf/monitor/linux/monitor.cxx index 70df7cc57d..124b1ffd67 100755 --- a/core/sqf/monitor/linux/monitor.cxx +++ b/core/sqf/monitor/linux/monitor.cxx @@ -53,6 +53,7 @@ using namespace std; #include "tmsync.h" #include "cluster.h" #include "monitor.h" +#include "props.h" #ifdef DMALLOC #include "dm.h" @@ -99,12 +100,16 @@ char MySyncPort[MPI_MAX_PORT_NAME] = {'\0'}; char Node_name[MPI_MAX_PROCESSOR_NAME] = {'\0'}; sigset_t SigSet; bool Emulate_Down = false; -long next_test_delay = 10000; // in usec. - +long next_test_delay = 100000; // in usec. (default 100 msec) +CClusterConfig *ClusterConfig = NULL; bool IAmIntegrating = false; bool IAmIntegrated = false; char IntegratingMonitorPort[MPI_MAX_PORT_NAME] = {'\0'}; bool IsRealCluster = true; +bool IsAgentMode = false; +bool IsMaster = false; +bool IsMPIChild = false; +char MasterMonitorName[MAX_PROCESS_PATH]= {'\0'}; CommType_t CommType = CommType_Undefined; bool SMSIntegrating = false; int CreatorShellPid = -1; @@ -865,9 +870,9 @@ void HandleNodeExpiration( const char *nodeName ) TRACE_EXIT; } -void CMonitor::CreateZookeeperClient( void ) +void CreateZookeeperClient( void ) { - const char method_name[] = "CMonitor::CreateZookeeperClient"; + const char method_name[] = "CreateZookeeperClient"; TRACE_ENTRY; if ( ZClientEnabled ) @@ -961,9 +966,9 @@ void CMonitor::CreateZookeeperClient( void ) TRACE_EXIT; } -void CMonitor::StartZookeeperClient( void ) +void StartZookeeperClient( void ) { - const char method_name[] = "CMonitor::StartZookeeperClient"; + const char method_name[] = "StartZookeeperClient"; TRACE_ENTRY; int rc = -1; @@ -1043,19 +1048,71 @@ int main (int argc, char *argv[]) char temp_fname[MAX_PROCESS_PATH]; char buf[MON_STRING_BUF_SIZE]; unsigned int initSleepTime = 1; // 1 second + mallopt(M_ARENA_MAX, 4); // call to limit the number of arena's of monitor to 4.This call doesn't seem to have any effect ! CALL_COMP_DOVERS(monitor, argc, argv); const char method_name[] = "main"; + if (argc < 2) { + printf("error: monitor needs an argument...exitting...\n"); + exit(0); + } + + int lv_arg_index = 1; + while ( lv_arg_index < argc ) + { + // Installations like Cloudera Manager, the monitor is started in AGENT mode + if ( strcmp( argv[lv_arg_index], "COLD_AGENT" ) == 0 ) + { + IsAgentMode = true; + } + + lv_arg_index++; + } + // Set flag to indicate whether we are operating in a real cluster // or a virtual cluster. This is used throughout the monitor when // behavior differs for a real vs. virtual cluster environment. - if ( getenv("SQ_VIRTUAL_NODES") ) + if ( !IsAgentMode ) { - IsRealCluster = false; - Emulate_Down = true; + if ( getenv( "SQ_VIRTUAL_NODES" ) ) + { + IsRealCluster = false; + Emulate_Down = true; + } + if (IsRealCluster) + { + // The monitor processes may be started by MPIrun utility + env = getenv("SQ_MON_CREATOR"); + if ( env != NULL && strcmp(env, "MPIRUN") == 0 ) + { + IsMPIChild = true; + } + // The monitor can be set to run in AGENT mode + env = getenv("SQ_MON_RUN_MODE"); + if ( env != NULL && strcmp(env, "AGENT") == 0 ) + { + IsAgentMode = true; + } + } + } + + if ( IsAgentMode ) + { + MON_Props xprops( true ); + xprops.load( "monitor.env" ); + MON_Smap_Enum xenum( &xprops ); + while ( xenum.more( ) ) + { + char *xkey = xenum.next( ); + const char *xvalue = xprops.get( xkey ); + if ( xkey && xkey[0] && xvalue ) + { + setenv( xkey, xvalue, 1 ); + } + } } MonLog = new CMonLog( "log4cxx.monitor.mon.config", "MON", "alt.mon", -1, -1, getpid(), "$MONITOR" ); @@ -1240,7 +1297,7 @@ int main (int argc, char *argv[]) abort(); } - if (argc > 3 && strcmp (argv[2], "-integrate") == 0) + if ((!IsAgentMode) && (argc > 3 && strcmp (argv[2], "-integrate") == 0)) { switch( CommType ) { @@ -1257,13 +1314,13 @@ int main (int argc, char *argv[]) } break; case CommType_Sockets: - if ( isdigit (*argv[3]) ) + if ( IsAgentMode || isdigit (*argv[3]) ) { // In agent mode and when re-integrating (node up), all // monitors processes start as a cluster of 1 and join to the // creator monitor to establish the real cluster. - // Therefore, MyPNID will always be zero when in and - // it is necessary to use the node name to obtain the correct + // Therefore, MyPNID will always be zero them it is + // necessary to use the node name to obtain the correct // from the configuration which occurs when creating the // CMonitor object down below. By setting MyPNID to -1, when the // CCluster::InitializeConfigCluster() invoked during the creation @@ -1306,8 +1363,15 @@ int main (int argc, char *argv[]) // Trace cannot be specified on startup command but need to // check for trace environment variable settings. MonTrace->mon_trace_init("0", NULL); + + } + + if (IsAgentMode) + { + CreatorShellPid = 1000; // per monitor.sh + CreatorShellVerifier = 0; } - else + if (argc == 3 && isdigit(*argv[2]) ) { MonTrace->mon_trace_init(argv[2], "STDOUT"); @@ -1398,8 +1462,12 @@ int main (int argc, char *argv[]) MonStats->MonitorBusyIncr(); snprintf(buf, sizeof(buf), - "[CMonitor::main], %s, Started! CommType: %s\n" - , CALL_COMP_GETVERS2(monitor), CommTypeString( CommType )); + "[CMonitor::main], %s, Started! CommType: %s (%s%s%s)\n" + , CALL_COMP_GETVERS2(monitor) + , CommTypeString( CommType ) + , IsRealCluster?"RealCluster":"VirtualCluster" + , IsAgentMode?"/AgentMode":"" + , IsMPIChild?"/MPIChild":"" ); mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf); #ifdef DMALLOC @@ -1420,11 +1488,230 @@ int main (int argc, char *argv[]) // Create thread for monitoring redirected i/o. // This is also used for monitor logs, so start it early. Redirector.start(); + + // Create global configuration now + ClusterConfig = new CClusterConfig(); + if (ClusterConfig) + { + bool traceEnabled = (trace_settings & TRACE_TRAFCONFIG) ? true : false; + if (ClusterConfig->Initialize( traceEnabled, MonTrace->getTraceFileName())) + { + if (!ClusterConfig->LoadConfig()) + { + char la_buf[MON_STRING_BUF_SIZE]; + sprintf(la_buf, "[%s], Failed to load cluster configuration.\n", method_name); + mon_log_write(MON_MONITOR_MAIN_12, SQ_LOG_CRIT, la_buf); + + abort(); + } + } + else + { + char la_buf[MON_STRING_BUF_SIZE]; + sprintf(la_buf, "[%s], Failed to open cluster configuration.\n", method_name); + mon_log_write(MON_MONITOR_MAIN_13, SQ_LOG_CRIT, la_buf); + + abort(); + } + } + else + { + char la_buf[MON_STRING_BUF_SIZE]; + sprintf(la_buf, "[%s], Failed to allocate cluster configuration.\n", method_name); + mon_log_write(MON_MONITOR_MAIN_14, SQ_LOG_CRIT, la_buf); + + abort(); + } + + // Set up zookeeper and determine the master + if ( IsAgentMode || IsRealCluster ) + { + // Zookeeper client is enabled only in a real cluster + env = getenv("SQ_MON_ZCLIENT_ENABLED"); + + if ( env ) + { + if ( env && isdigit(*env) ) + { + if ( strcmp(env,"0")==0 ) + { + ZClientEnabled = false; + } + } + } + + if ( ZClientEnabled ) + { + CreateZookeeperClient( ); + } + } + else + { + ZClientEnabled = false; + } + + if (IsAgentMode) + { + if ((ZClientEnabled) && (ZClient != NULL)) + { + // Do not wait, just see if one exists + const char *masterMonitor = ZClient->WaitForAndReturnMaster(false); - // CNodeContainer loads static configuration from database - Nodes = new CNodeContainer (); + if (masterMonitor) + { + strcpy (MasterMonitorName, masterMonitor); + // unfortunately, we have to do this to see if we are the master before + // other things are set up. This is how we must do that + if (strcmp(Node_name, masterMonitor) == 0) + { + IsMaster = true; + } + else + { + IsMaster = false; + } + } + else + { + strcpy (MasterMonitorName, ClusterConfig->GetConfigMasterByName()); + if (strcmp (Node_name, ClusterConfig->GetConfigMasterByName()) == 0) + { + IsMaster = true; + } + else + { + IsMaster = false; + } + } + + } + } + + if (IsAgentMode) + { + if (!IsMaster) + { + MyPNID=-1; + SMSIntegrating = IAmIntegrating = true; + char *monitorPort = getenv ("MONITOR_COMM_PORT"); + if (monitorPort) + { + strcpy( IntegratingMonitorPort, MasterMonitorName); + strcat( IntegratingMonitorPort, ":"); + strcat( IntegratingMonitorPort, monitorPort); + } + if (trace_settings & TRACE_INIT) + { + trace_printf( "%s@%d (MasterMonitor) IsAgentMode = TRUE, I am NOT the master, " + "MyPNID=%d, master port=%s\n" + , method_name, __LINE__ + , MyPNID, IntegratingMonitorPort ); + } + } + else + { + if (trace_settings & TRACE_INIT) + { + trace_printf( "%s@%d (MasterMonitor) IsAgentMode = TRUE, I am the master, MyPNID=%d\n" + , method_name, __LINE__, MyPNID ); + } + IAmIntegrating = false; + } + } + Nodes = new CNodeContainer (); Config = new CConfigContainer (); - Monitor = new CMonitor (procTermSig); + Monitor = new CMonitor (procTermSig); + + if ( IsAgentMode ) + { + if (trace_settings & TRACE_INIT) + { + trace_printf( "%s@%d MyPNID=%d\n" + , method_name, __LINE__, MyPNID ); + } + MonLog->setPNid( MyPNID ); + } + + if (IsAgentMode) + { + CNode *myNode = Nodes->GetNode(MyPNID); + const char *masterMonitor=NULL; + if (myNode == NULL) + { + char la_buf[MON_STRING_BUF_SIZE]; + sprintf( la_buf + , "[%s], Failed to get my Node, MyPNID=%d\n" + , method_name, MyPNID ); + mon_log_write(MON_MONITOR_MAIN_15, SQ_LOG_CRIT, la_buf); + + abort(); + } + + if ((ZClientEnabled) && (ZClient != NULL)) + { + CNode *masterNode = Nodes->GetNode(MasterMonitorName); + if (!masterNode) + { + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsMaster == %d, masterNode is NULL, with MasterMonitorName %s\n", method_name, __LINE__, IsMaster, MasterMonitorName); + } + char la_buf[MON_STRING_BUF_SIZE]; + sprintf(la_buf, "[%s], Failed to get my Master Node.\n", method_name); + mon_log_write(MON_MONITOR_MAIN_16, SQ_LOG_CRIT, la_buf); + + abort(); + } + else + { + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsMaster == %d, masterNode=%s\n", method_name, __LINE__, IsMaster, masterNode->GetName() ); + } + } + Monitor->SetMonitorLeader( masterNode->GetPNid() ); + if (MyPNID == masterNode->GetPNid()) + { + ZClient->CreateMasterZNode ( myNode->GetName() ); + strcpy (MasterMonitorName, myNode->GetName()); + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsMaster == %d, set monitor lead to %d\n", method_name, __LINE__, IsMaster, MyPNID); + } + } + else + { + masterMonitor = ZClient->WaitForAndReturnMaster(true); + CNode *masterNode = NULL; + if (masterMonitor) + { + strcpy (MasterMonitorName, masterMonitor); + masterNode = Nodes->GetNode(MasterMonitorName); + } + + if (masterNode) + { + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsMaster == %d, set monitor lead to %d\n", method_name, __LINE__, IsMaster, masterNode->GetPNid()); + } + Monitor->SetMonitorLeader( masterNode->GetPNid() ); + } + else + { + if (trace_settings & TRACE_INIT) + { + trace_printf("%s@%d (MasterMonitor) IsMaster == %d, masterNode is NULL, with MasterMonitorName %s\n", method_name, __LINE__, IsMaster, MasterMonitorName); + } + char la_buf[MON_STRING_BUF_SIZE]; + sprintf(la_buf, "[%s], Failed to get my Master Node.\n", method_name); + mon_log_write(MON_MONITOR_MAIN_17, SQ_LOG_CRIT, la_buf); + + abort(); + } + } + } + } if (!IAmIntegrating) { Config->Init (); @@ -1493,7 +1780,6 @@ int main (int argc, char *argv[]) { strcpy (Node_name, myNode->GetName()); } - // create with no caching, user read/write, group read/write, other read fd = open( port_fname , O_RDWR | O_TRUNC | O_CREAT | O_DIRECT @@ -1539,7 +1825,6 @@ int main (int argc, char *argv[]) MPI_Abort(MPI_COMM_SELF,99); } free( ioBuffer ); - int ret = SQ_theLocalIOToClient->initWorker(); if (ret) { @@ -1566,33 +1851,7 @@ int main (int argc, char *argv[]) printf("%s@%d" " RLIMIT_SIGPENDING cur=%d, max=%d\n", method_name, __LINE__, (int)Rl.rlim_cur, (int)Rl.rlim_max); } } - - if ( IsRealCluster ) - { - // Zookeeper client is enabled only in a real cluster - env = getenv("SQ_MON_ZCLIENT_ENABLED"); - if ( env ) - { - if ( env && isdigit(*env) ) - { - if ( strcmp(env,"0")==0 ) - { - ZClientEnabled = false; - } - } - } - - if ( ZClientEnabled ) - { - Monitor->CreateZookeeperClient(); - } - } - else - { - ZClientEnabled = false; - } - - if ( IAmIntegrating ) + if ( IAmIntegrating ) { // This monitor is integrating to (joining) an existing cluster Monitor->ReIntegrate( 0 ); @@ -1602,7 +1861,7 @@ int main (int argc, char *argv[]) trace_printf("%s@%d" " After UpdateCluster" "\n", method_name, __LINE__); } else - { + { Monitor->EnterSyncCycle(); done = Monitor->exchangeNodeData(); Monitor->ExitSyncCycle(); @@ -1618,7 +1877,18 @@ int main (int argc, char *argv[]) { if ( ZClientEnabled ) { - Monitor->StartZookeeperClient(); + { + StartZookeeperClient(); + // Set watch for master + if (IsAgentMode) + { + ZClient->WatchMasterNode( MasterMonitorName ); + } + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d (MasterMonitor) set watch for MasterMonitorName %s\n", method_name, __LINE__, MasterMonitorName ); + } + } } } diff --git a/core/sqf/monitor/linux/monitor.h b/core/sqf/monitor/linux/monitor.h index 1b44c57017..49308b9c50 100644 --- a/core/sqf/monitor/linux/monitor.h +++ b/core/sqf/monitor/linux/monitor.h @@ -63,7 +63,6 @@ friend class CExternalReq; ~CMonitor( void ); bool CompleteProcessStartup( struct message_def *msg ); - void CreateZookeeperClient( void ); void IncOpenCount(void); void IncNoticeCount(void); void IncProcessCount(void); @@ -71,7 +70,6 @@ friend class CExternalReq; void DecrNoticeCount(void); void DecrProcessCount(void); void StartPrimitiveProcesses( void ); - void StartZookeeperClient( void ); void openProcessMap ( void ); void writeProcessMapEntry ( const char * buf ); void writeProcessMapBegin( const char *name diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx index 75c2137d03..485d013cbc 100644 --- a/core/sqf/monitor/linux/pnode.cxx +++ b/core/sqf/monitor/linux/pnode.cxx @@ -49,6 +49,8 @@ using namespace std; #include "replicate.h" #include "reqqueue.h" +#include "healthcheck.h" + extern CReqQueue ReqQueue; extern char MyPath[MAX_PROCESS_PATH]; extern int MyPNID; @@ -64,9 +66,13 @@ extern CNode *MyNode; extern CMonStats *MonStats; extern CRedirector Redirector; extern CReplicate Replicator; +extern CHealthCheck HealthCheck; extern CMonTrace *MonTrace; - +extern bool IsAgentMode; extern bool IAmIntegrating; +extern char MasterMonitorName[MAX_PROCESS_PATH]; +extern char Node_name[MPI_MAX_PROCESSOR_NAME]; +extern CClusterConfig *ClusterConfig; const char *StateString( STATE state); const char *SyncStateString( SyncState state); @@ -464,13 +470,14 @@ void CNode::CheckActivationPhase( void ) int tmCount = 0; CLNode *lnode; CProcess *process; - bool tmReady; + bool tmReady = false; const char method_name[] = "CNode::CheckActivationPhase"; TRACE_ENTRY; // check for a TM process in each lnode lnode = GetFirstLNode(); + tmReady = lnode ? true : false; for ( ; lnode ; lnode = lnode->GetNextP() ) { @@ -1701,8 +1708,11 @@ void CNodeContainer::AddNodes( ) } else { - if (pnid >= maxNode) // only for workstation acting as single node - rank = -1; + if (pnid >= maxNode) // only for workstation acting as single node +// || (IsAgentMode &&(strcmp( MasterMonitorName, Node_name ) != 0))) + { + rank = -1; // -1 creates node in down state + } node = new CNode( (char *)pnodeConfig->GetName(), pnid, rank ); assert( node != NULL ); } @@ -3134,7 +3144,7 @@ void CNodeContainer::SetupCluster( CNode ***pnode_list, CLNode ***lnode_list, in if ( node->GetState() == State_Up && node->IsSpareNode() ) { spareNodesConfigList_.push_back( node ); - if ( IAmIntegrating ) + if (IAmIntegrating) { // do nothing. spareNodesList will get populated in the join phase. } @@ -3166,40 +3176,11 @@ void CNodeContainer::LoadConfig( void ) const char method_name[] = "CNodeContainer::LoadConfig"; TRACE_ENTRY; + // The configuration is now global. To minimize impact for the time being, just set the local + // pointer to the global configuration if ( !clusterConfig_ ) { - clusterConfig_ = new CClusterConfig(); - } - if ( clusterConfig_ ) - { - bool traceEnabled = (trace_settings & TRACE_TRAFCONFIG) ? true : false; - if ( clusterConfig_->Initialize( traceEnabled, MonTrace->getTraceFileName() ) ) - { - if ( ! clusterConfig_->LoadConfig() ) - { - char la_buf[MON_STRING_BUF_SIZE]; - sprintf(la_buf, "[%s], Failed to load cluster configuration.\n", method_name); - mon_log_write(MON_NODECONT_LOAD_CONFIG_1, SQ_LOG_CRIT, la_buf); - - abort(); - } - } - else - { - char la_buf[MON_STRING_BUF_SIZE]; - sprintf(la_buf, "[%s], Failed to open cluster configuration.\n", method_name); - mon_log_write(MON_NODECONT_LOAD_CONFIG_2, SQ_LOG_CRIT, la_buf); - - abort(); - } - } - else - { - char la_buf[MON_STRING_BUF_SIZE]; - sprintf(la_buf, "[%s], Failed to allocate cluster configuration.\n", method_name); - mon_log_write(MON_NODECONT_LOAD_CONFIG_3, SQ_LOG_CRIT, la_buf); - - abort(); + clusterConfig_ = ClusterConfig; } TRACE_EXIT; diff --git a/core/sqf/monitor/linux/process.cxx b/core/sqf/monitor/linux/process.cxx index 6a8e08bec4..bce018bbdd 100644 --- a/core/sqf/monitor/linux/process.cxx +++ b/core/sqf/monitor/linux/process.cxx @@ -72,6 +72,9 @@ extern CReqQueue ReqQueue; #include "replicate.h" +extern bool IsAgentMode; +extern bool IsMaster; + extern bool PidMap; extern int Measure; extern int trace_level; @@ -1651,13 +1654,39 @@ bool CProcess::Create (CProcess *parent, int & result) } string LDpath; - if ( ldpathStrId_.nid != -1 ) - Config->strIdToString(ldpathStrId_, LDpath); - if ( !LDpath.empty() ) + static bool sv_getenv_ld_library_path_done = false; + static string sv_ld_library_path; + if (IsAgentMode) { - setEnvStrVal ( childEnv, nextEnv, "LD_LIBRARY_PATH", LDpath.c_str() ); + if (! sv_getenv_ld_library_path_done) + { + sv_getenv_ld_library_path_done = true; + sv_ld_library_path = getenv( "LD_LIBRARY_PATH" ); + if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d" " - LD_LIBRARY_PATH = " "%s" "\n", method_name, __LINE__, sv_ld_library_path.c_str() ); + } + } + LDpath = sv_ld_library_path; if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) - trace_printf("%s@%d - LD_LIBRARY_PATH = %s\n", method_name, __LINE__, LDpath.c_str()); + { + trace_printf( "%s@%d" " - LD_LIBRARY_PATH = " "%s" "\n", method_name, __LINE__, LDpath.c_str() ); + } + } + else + { + if (ldpathStrId_.nid != -1) + { + Config->strIdToString( ldpathStrId_, LDpath ); + } + } + if (!LDpath.empty()) + { + setEnvStrVal( childEnv, nextEnv, "LD_LIBRARY_PATH", LDpath.c_str( ) ); + if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d - LD_LIBRARY_PATH = %s\n", method_name, __LINE__, LDpath.c_str() ); + } } setEnvStr ( childEnv, nextEnv, "LD_BIND_NOW=true" ); @@ -1695,15 +1724,39 @@ bool CProcess::Create (CProcess *parent, int & result) trace_printf("%s@%d - PWD=%s\n", method_name, __LINE__, pwd.c_str()); } - - string path; - if ( pathStrId_.nid != -1 ) - Config->strIdToString( pathStrId_, path); - setEnvStrVal ( childEnv, nextEnv, "PATH", path.c_str() ); + static bool sv_getenv_path_done = false; + static string sv_path; + if (IsAgentMode) + { + if (! sv_getenv_path_done) + { + sv_getenv_path_done = true; + sv_path = getenv( "PATH" ); + if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d" " - PATH = " "%s" "\n", method_name, __LINE__, sv_path.c_str() ); + } + } + path = sv_path; + if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) + { + trace_printf( "%s@%d" " - PATH = " "%s" "\n", method_name, __LINE__, path.c_str() ); + } + } + else + { + if (pathStrId_.nid != -1) + { + Config->strIdToString( pathStrId_, path ); + } + } + setEnvStrVal( childEnv, nextEnv, "PATH", path.c_str( ) ); if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_PROCESS_DETAIL)) - trace_printf("%s@%d" " - PATH = " "%s" "\n", method_name, __LINE__, path.c_str()); + { + trace_printf( "%s@%d" " - PATH = " "%s" "\n", method_name, __LINE__, path.c_str() ); + } // Set values from registry as environment variables setEnvFromRegistry ( childEnv, nextEnv ); diff --git a/core/sqf/monitor/linux/reqqueue.cxx b/core/sqf/monitor/linux/reqqueue.cxx index e4e8dbd2bb..b4d2529906 100644 --- a/core/sqf/monitor/linux/reqqueue.cxx +++ b/core/sqf/monitor/linux/reqqueue.cxx @@ -56,6 +56,7 @@ extern int req_type_startup; extern bool IAmIntegrating; extern bool IAmIntegrated; + extern CommType_t CommType; CReqResource::CReqResource() diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx index 36a0600c38..107cf322aa 100644 --- a/core/sqf/monitor/linux/zclient.cxx +++ b/core/sqf/monitor/linux/zclient.cxx @@ -488,6 +488,103 @@ int CZClient::ZooExistRetry(zhandle_t *zh, const char *path, int watch, struct S return rc; } +const char* CZClient::WaitForAndReturnMaster( bool doWait ) +{ + const char method_name[] = "CZClient::WaitForAndReturnMaster"; + TRACE_ENTRY; + + bool found = false; + int rc = -1; + int retries = 0; + Stat stat; + + struct String_vector nodes = {0, NULL}; + stringstream ss; + ss.str( "" ); + ss << zkRootNode_.c_str() + << zkRootNodeInstance_.c_str() + << ZCLIENT_MASTER_ZNODE; + string masterMonitor( ss.str( ) ); + + // wait for 3 minutes for giving up. + while ( (!found) && (retries < 180)) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d trafCluster=%s\n" + , method_name, __LINE__, masterMonitor.c_str() ); + } + // Verify the existence of the parent ZCLIENT_MASTER_ZNODE + rc = ZooExistRetry( ZHandle, masterMonitor.c_str( ), 0, &stat ); + + if ( rc == ZNONODE ) + { + if (doWait == false) + { + break; + } + continue; + } + else if ( rc == ZOK ) + { + // Now get the list of available znodes in the cluster. + // + // This will return child znodes for each monitor process that has + // registered, including this process. + rc = zoo_get_children( ZHandle, masterMonitor.c_str( ), 0, &nodes ); + if ( nodes.count > 0 ) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d nodes.count=%d\n" + , method_name, __LINE__ + , nodes.count ); + } + found = true; + } + else + { + if (doWait == false) + { + break; + } + usleep(1000000); // sleep for a second as to not overwhelm the system + retries++; + continue; + } + } + + else // error + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d Error (MasterMonitor) WaitForAndReturnMaster returned rc (%d), retries %d\n" + , method_name, __LINE__, rc, retries ); + } + char buf[MON_STRING_BUF_SIZE]; + snprintf( buf, sizeof(buf) + , "[%s], ZooExistRetry() for %s failed with error %s\n" + , method_name, masterMonitor.c_str( ), zerror(rc)); + mon_log_write(MON_ZCLIENT_WAITFORANDRETURNMASTER, SQ_LOG_ERR, buf); + break; + } + } + + //should we assert nodes.count == 1? + if (found) + { + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + { + trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s)\n" + , method_name, __LINE__, masterMonitor.c_str() ); + } + return nodes.data[0]; + } + + TRACE_EXIT; + return NULL; +} + int CZClient::GetClusterZNodes( String_vector *nodes ) { const char method_name[] = "CZClient::GetClusterZNodes"; @@ -700,7 +797,7 @@ void CZClient::HandleExpiredZNode( void ) int CZClient::InitializeZClient( void ) { - const char method_name[] = "CZClient::MakeClusterZNodes"; + const char method_name[] = "CZClient::InitializeZClient"; TRACE_ENTRY; int rc; @@ -799,6 +896,67 @@ bool CZClient::IsZNodeExpired( const char *nodeName, int &zerr ) return( expired ); } +int CZClient::CreateMasterZNode( const char *nodeName ) +{ + const char method_name[] = "CZClient::CreateMasterZNode"; + TRACE_ENTRY; + + int rc; + int retries = 0; + + stringstream masterpath; + masterpath.str( "" ); + masterpath << zkRootNode_.c_str() + << zkRootNodeInstance_.c_str() + << ZCLIENT_MASTER_ZNODE<< "/" + << nodeName; + + string monZnode = masterpath.str( ); + + stringstream ss; + ss.str( "" ); + ss < ZNodeList_t; @@ -137,6 +138,7 @@ class CZClient : public CLock , const char *instanceZNode ); ~CZClient( void ); + int CreateMasterZNode( const char *nodeName ); int GetSessionTimeout( void) { return( zkSessionTimeout_ ); } bool IsZNodeExpired( const char *nodeName, int &zerr ); void MonitorZCluster( void ); @@ -148,8 +150,11 @@ class CZClient : public CLock int StartWork( void ); void StopMonitoring( void ); void TriggerCheck( int type, const char *znodePath ); + const char* WaitForAndReturnMaster( bool doWait ); int WatchNode( const char *nodeName ); + int WatchMasterNode( const char *nodeName ); int WatchNodeDelete( const char *nodeName ); + int WatchNodeMasterDelete( const char *nodeName ); private: int ZooExistRetry(zhandle_t *zh, const char *path, int watch, struct Stat *stat); diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh index 7058637a2e..c5a9164f82 100644 --- a/core/sqf/sqenvcom.sh +++ b/core/sqf/sqenvcom.sh @@ -673,6 +673,18 @@ export SQ_LUNMGR_VERBOSITY=1 # Control SQ default startup behavior (c=cold, w=warm, if removed sqstart will autocheck) export SQ_STARTUP=r +# Monitor process creator: +# MPIRUN - monitor process is created by mpirun +# Uncomment SQ_MON_CREATOR when running monitor in AGENT mode +#export SQ_MON_CREATOR=MPIRUN + +# Monitor process run mode: +# AGENT - monitor process runs in agent mode versus MPI collective +# Uncomment the three environment variables below +#export SQ_MON_RUN_MODE=AGENT +#export MONITOR_COMM_PORT=23399 +#export MONITOR_SYNC_PORT=23398 + # Alternative logging capability in monitor export SQ_MON_ALTLOG=0 diff --git a/core/sqf/sql/scripts/sqnodes.pm b/core/sqf/sql/scripts/sqnodes.pm index 0d09565117..36d8f0c588 100644 --- a/core/sqf/sql/scripts/sqnodes.pm +++ b/core/sqf/sql/scripts/sqnodes.pm @@ -279,10 +279,10 @@ sub verifyParse displayStmt($stmtOk); print " Error: node-id not specified\n"; } - elsif ($nodeId > 1023) + elsif ($nodeId > 255) { displayStmt($stmtOk); - print " Error: node-id must be in the range 0..1023.\n"; + print " Error: node-id must be in the range 0..255.\n"; } if (@cores == 0) { diff --git a/core/sqf/src/trafconf/clusterconf.cpp b/core/sqf/src/trafconf/clusterconf.cpp index 0ebffeb963..039b87db66 100644 --- a/core/sqf/src/trafconf/clusterconf.cpp +++ b/core/sqf/src/trafconf/clusterconf.cpp @@ -49,6 +49,7 @@ using namespace std; CClusterConfig::CClusterConfig( void ) : CPNodeConfigContainer(TC_NODES_MAX) , CLNodeConfigContainer(TC_NODES_MAX) + , configMaster_(-1) , nodeReady_(false) , persistReady_(false) , newPNodeConfig_(true) @@ -61,6 +62,8 @@ CClusterConfig::CClusterConfig( void ) const char method_name[] = "CClusterConfig::CClusterConfig"; TRACE_ENTRY; + memset( &configMasterName_, 0, TC_PROCESSOR_NAME_MAX ); + TRACE_EXIT; } @@ -373,6 +376,13 @@ bool CClusterConfig::LoadNodeConfig( void ) for (int i =0; i < nodeCount; i++ ) { ProcessLNode( nodeConfigData[i], pnodeConfigInfo, lnodeConfigInfo ); + // We want to pick the first configured node so all monitors pick the same one + // This only comes into play for a Trafodion start from scratch + if (i == 0) + { + configMaster_ = pnodeConfigInfo.pnid; + strcpy (configMasterName_ ,pnodeConfigInfo.nodename); + } AddNodeConfiguration( pnodeConfigInfo, lnodeConfigInfo ); } diff --git a/core/sqf/src/trafconf/clusterconf.h b/core/sqf/src/trafconf/clusterconf.h index 1a8942f023..ff4b17e328 100644 --- a/core/sqf/src/trafconf/clusterconf.h +++ b/core/sqf/src/trafconf/clusterconf.h @@ -43,6 +43,8 @@ class CClusterConfig : public CPNodeConfigContainer void Clear( void ); bool DeleteNodeConfig( int pnid ); + int GetConfigMaster ( ) { return configMaster_;} + char * GetConfigMasterByName() {return configMasterName_;} bool Initialize( void ); bool Initialize( bool traceEnabled, const char *traceFile ); void InitCoreMask( cpu_set_t &coreMask ); @@ -73,6 +75,8 @@ class CClusterConfig : public CPNodeConfigContainer protected: private: + int configMaster_; + char configMasterName_[TC_PROCESSOR_NAME_MAX]; bool nodeReady_; // true when node configuration loaded bool persistReady_; // true when persist configuration loaded bool newPNodeConfig_; From 3d7855b6f64733ad3776fc421cb598883acbb6bf Mon Sep 17 00:00:00 2001 From: Zalo Correa Date: Wed, 28 Feb 2018 15:23:31 -0800 Subject: [PATCH 3/5] Code review fixes. --- core/sqf/monitor/linux/cluster.cxx | 128 ++++++++++++++--------------- core/sqf/monitor/linux/cluster.h | 25 +++--- core/sqf/monitor/linux/pnode.cxx | 1 - core/sqf/monitor/linux/tmsync.cxx | 10 +-- core/sqf/monitor/linux/zclient.cxx | 8 +- 5 files changed, 87 insertions(+), 85 deletions(-) diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx index d1b3e912ff..83ea9231a3 100644 --- a/core/sqf/monitor/linux/cluster.cxx +++ b/core/sqf/monitor/linux/cluster.cxx @@ -380,45 +380,46 @@ void CCluster::AssignMonitorLeader( int pnid ) int i = 0; int rc = 0; - int lMonitorLeaderPNid = MonitorLeaderPNid; + int monitorLeaderPNid = monitorLeaderPNid_; CNode *node = NULL; - if (MonitorLeaderPNid != pnid) + if (monitorLeaderPNid_ != pnid) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { trace_printf( "%s@%d" " - (MasterMonitor) returning, pnid %d != monitorLead %d\n" - , method_name, __LINE__, pnid, MonitorLeaderPNid ); + , method_name, __LINE__, pnid, monitorLeaderPNid_ ); } - return; + TRACE_EXIT; + return; } if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { trace_printf( "%s@%d" " - (MasterMonitor) Node " "%d" " MonitorLeader failed!\n" - , method_name, __LINE__, MonitorLeaderPNid ); + , method_name, __LINE__, monitorLeaderPNid_ ); } for (i=0; iGetPNid(); + monitorLeaderPNid_ = node->GetPNid(); if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf("%s@%d" " - Node " "%d" " is the new MonitorLeaderPNid." "\n", method_name, __LINE__, MonitorLeaderPNid); + trace_printf("%s@%d" " - Node " "%d" " is the new monitorLeaderPNid_." "\n", method_name, __LINE__, monitorLeaderPNid_); } if (ZClientEnabled) @@ -483,7 +484,7 @@ void CCluster::AssignMonitorLeader( int pnid ) TRACE_EXIT; } -// Assigns a new TMLeader if given pnid is same as TmLeaderNid +// Assigns a new TMLeader if given pnid is same as tmLeaderNid_ // TmLeader is a logical node num. // pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen. void CCluster::AssignTmLeader( int pnid, bool checkProcess ) @@ -495,15 +496,15 @@ void CCluster::AssignTmLeader( int pnid, bool checkProcess ) CNode *node = NULL; CProcess *process = NULL; - int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid(); + int TmLeaderPNid = LNode[tmLeaderNid_]->GetNode()->GetPNid(); if (TmLeaderPNid != pnid) { - node = LNode[TmLeaderNid]->GetNode(); + node = LNode[tmLeaderNid_]->GetNode(); if (checkProcess) { - process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM ); + process = LNode[tmLeaderNid_]->GetProcessLByType( ProcessType_DTM ); if (process) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) @@ -544,7 +545,7 @@ void CCluster::AssignTmLeader( int pnid, bool checkProcess ) if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { trace_printf( "%s@%d" " - Node " "%d" " TmLeader failed! (checkProcess=%d)\n" - , method_name, __LINE__, TmLeaderNid, checkProcess ); + , method_name, __LINE__, tmLeaderNid_, checkProcess ); } for (i=0; iGetFirstLNode()->GetNid(); + tmLeaderNid_ = node->GetFirstLNode()->GetNid(); if (checkProcess) { - process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM ); + process = LNode[tmLeaderNid_]->GetProcessLByType( ProcessType_DTM ); if (!process) { continue; // skip this node no DTM process exists @@ -599,7 +600,7 @@ void CCluster::AssignTmLeader( int pnid, bool checkProcess ) if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) { - trace_printf("%s@%d" " - Node " "%d" " is the new TmLeader." "\n", method_name, __LINE__, TmLeaderNid); + trace_printf("%s@%d" " - Node " "%d" " is the new TmLeader." "\n", method_name, __LINE__, tmLeaderNid_); } break; @@ -618,14 +619,13 @@ CCluster::CCluster (void) ,epollFD_(-1), Node (NULL), LNode (NULL), - TmSyncPNid (-1), - CurNodes (0), - CurProcs (0), + tmSyncPNid_ (-1), + currentNodes_ (0), configPNodesCount_ (-1), configPNodesMax_ (-1), - NodeMap (NULL), - TmLeaderNid (-1), - MonitorLeaderPNid (-1), + nodeMap_ (NULL), + tmLeaderNid_ (-1), + monitorLeaderPNid_ (-1), tmReadyCount_(0), minRecvCount_(4096), recvBuffer_(NULL), @@ -795,10 +795,10 @@ CCluster::~CCluster (void) delete [] otherMonRank_; delete [] socks_; delete [] sockPorts_; - if (NodeMap) + if (nodeMap_) { - delete [] NodeMap; - NodeMap = NULL; + delete [] nodeMap_; + nodeMap_ = NULL; } delete [] recvBuffer2_; @@ -2416,7 +2416,7 @@ void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, // Begin a Slave Sync Start if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf("%s@%d - Slave Sync Start on Node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); - TmSyncPNid = pnid; + tmSyncPNid_ = pnid; Node[pnid]->SetTmSyncState( recv_msg->u.sync.state ); if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) { @@ -2430,12 +2430,12 @@ void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, trace_printf("%s@%d - Sync State Collision! Node %s (pnid=%d) TmSyncState=(%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState()) ); if ( MyNode->GetTmSyncState() == SyncState_Continue ) { - if ( pnid > TmSyncPNid ) + if ( pnid > tmSyncPNid_ ) // highest node id will continue { // They take priority ... we abort if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) - trace_printf("%s@%d - Aborting Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[Monitor->TmSyncPNid]->GetName(), Monitor->TmSyncPNid); + trace_printf("%s@%d - Aborting Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[Monitor->tmSyncPNid_]->GetName(), Monitor->tmSyncPNid_); MyNode->SetTmSyncState( SyncState_Null ); if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState() ) ); @@ -2443,7 +2443,7 @@ void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, // Continue with other node's Slave TmSync Start request if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf("%s@%d - Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); - TmSyncPNid = pnid; + tmSyncPNid_ = pnid; Node[pnid]->SetTmSyncState( recv_msg->u.sync.state ); if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) { @@ -2467,7 +2467,7 @@ void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, // Continue with other node's Slave TmSync Start request if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf("%s@%d - Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); - TmSyncPNid = pnid; + tmSyncPNid_ = pnid; Node[pnid]->SetTmSyncState( recv_msg->u.sync.state ); if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) { @@ -2783,9 +2783,9 @@ void CCluster::HandleMyNodeMsg (struct internal_msg_def *recv_msg, case SyncType_TmData: if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[MyPNID]->GetName(), MyPNID); - TmSyncPNid = MyPNID; + tmSyncPNid_ = MyPNID; if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) - trace_printf("%s@%d - Sync communicated, TmSyncPNid=%d\n", method_name, __LINE__, TmSyncPNid); + trace_printf("%s@%d - Sync communicated, tmSyncPNid_=%d\n", method_name, __LINE__, tmSyncPNid_); if ( ! MyNode->IsSpareNode() && MyNode->GetPhase() != Phase_Ready ) { MyNode->CheckActivationPhase(); @@ -2974,7 +2974,7 @@ void CCluster::InitializeConfigCluster( void ) int rankToPnid[worldSize]; CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); - CurNodes = worldSize; + currentNodes_ = worldSize; if ( IsRealCluster ) { @@ -3038,10 +3038,10 @@ void CCluster::InitializeConfigCluster( void ) int TmLeaderPNid = -1; if (IsMaster) { - TmLeaderNid = Nodes->GetFirstNid(); - TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid(); + tmLeaderNid_ = Nodes->GetFirstNid(); + TmLeaderPNid = LNode[tmLeaderNid_]->GetNode()->GetPNid(); } - // Non-master monitors in AGENT mode in a real cluster initialize all + // Monitors processes in AGENT mode in a real cluster initialize all // remote nodes to a down state. The master monitor and the joining // monitors will set the joining node state to up as part of the node // re-integration processing as monitor processes join the cluster @@ -3179,8 +3179,8 @@ void CCluster::InitializeConfigCluster( void ) delete [] commPortNums; delete [] syncPortNums; - TmLeaderNid = Nodes->GetFirstNid(); - int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid(); + tmLeaderNid_ = Nodes->GetFirstNid(); + int TmLeaderPNid = LNode[tmLeaderNid_]->GetNode()->GetPNid(); // Any nodes not in the initial MPI_COMM_WORLD are down. for (int i=0; iSetSyncPort( nodeInfo[i].syncPort ); Node[i]->SetState( State_Up ); @@ -4163,7 +4163,7 @@ void CCluster::ReIntegrateSock( int initProblem ) } otherMonRank_[nodeInfo[i].pnid] = 0; - ++CurNodes; + ++currentNodes_; // Store port numbers for the node strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME); @@ -4310,7 +4310,7 @@ void CCluster::ReIntegrateSock( int initProblem ) } otherMonRank_[nodeInfo[i].pnid] = 0; - ++CurNodes; + ++currentNodes_; // Store port numbers for the node strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME); @@ -4583,7 +4583,7 @@ void CCluster::setNewComm( int pnid ) close( socks_[pnid] ); socks_[pnid] = -1; } - --CurNodes; + --currentNodes_; } if (trace_settings & TRACE_RECOVERY) @@ -4595,7 +4595,7 @@ void CCluster::setNewComm( int pnid ) comms_[it->pnid] = it->comm; otherMonRank_[it->pnid] = it->otherRank; - ++CurNodes; + ++currentNodes_; // Set bit indicating node is up upNodes_.upNodes[it->pnid/MAX_NODE_BITMASK] |= (1ull << (it->pnid%MAX_NODE_BITMASK)); @@ -4686,14 +4686,14 @@ void CCluster::setNewSock( int pnid ) shutdown( socks_[pnid], SHUT_RDWR); close( socks_[pnid] ); socks_[pnid] = -1; - --CurNodes; + --currentNodes_; } CNode *node= Nodes->GetNode( it->pnid ); socks_[it->pnid] = it->socket; sockPorts_[it->pnid] = node->GetSyncSocketPort(); otherMonRank_[it->pnid] = it->otherRank; - ++CurNodes; + ++currentNodes_; if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { @@ -6040,7 +6040,7 @@ void CCluster::ValidateClusterState( cluster_state_def_t nodestate[], // Evaluate each active (up) node in the cluster int pnodesCount = 0; for (int index = 0; - index < GetConfigPNodesMax() && pnodesCount < CurNodes; + index < GetConfigPNodesMax() && pnodesCount < currentNodes_; ++index) { if ( nodestate[index].seq_num != 0 ) @@ -6104,11 +6104,11 @@ void CCluster::ValidateClusterState( cluster_state_def_t nodestate[], if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) { - trace_printf("%s@%d concurringNodes=%d, CurNodes=%d\n", - method_name, __LINE__, concurringNodes, CurNodes); + trace_printf("%s@%d concurringNodes=%d, currentNodes_=%d\n", + method_name, __LINE__, concurringNodes, currentNodes_); } - if (concurringNodes == CurNodes) + if (concurringNodes == currentNodes_) { // General agreement that node is down, proceed to mark it down CNode *downNode = Nodes->GetNode( it->exitedPnid ); @@ -6149,7 +6149,7 @@ void CCluster::ValidateClusterState( cluster_state_def_t nodestate[], "%d but only %d of %d nodes also lost the " "connection. See up: %s. See down: %s. So node " "%d is going down (at seq #%lld).\n", method_name, - it->exitedPnid, concurringNodes, CurNodes, + it->exitedPnid, concurringNodes, currentNodes_, setSeesUp.c_str(), setSeesDown.c_str(), MyPNID, seqNum_ ); mon_log_write(MON_CLUSTER_VALIDATE_STATE_2, SQ_LOG_ERR, buf); @@ -6198,7 +6198,7 @@ void CCluster::ValidateClusterState( cluster_state_def_t nodestate[], int pnodesCount2 = 0; for (int remIndex = 0; - remIndex < GetConfigPNodesMax() && pnodesCount2 < CurNodes; + remIndex < GetConfigPNodesMax() && pnodesCount2 < currentNodes_; ++remIndex) { bool someExited = false; @@ -6248,7 +6248,7 @@ void CCluster::ValidateClusterState( cluster_state_def_t nodestate[], { // This remote node sees node pnid as up int pnodesCount3 = 0; for (int exitedPNid = 0; - exitedPNid < GetConfigPNodesMax() && pnodesCount3 < CurNodes; + exitedPNid < GetConfigPNodesMax() && pnodesCount3 < currentNodes_; ++exitedPNid) { CNode *exitedNode = Nodes->GetNode( /*indexToPnid_[remIndex]*/exitedPNid ); @@ -6666,7 +6666,7 @@ void CCluster::UpdateClusterState( bool &doShutdown, abort(); } Node[index]->SetState( State_Down ); - --CurNodes; + --currentNodes_; // Clear bit in set of "up nodes" upNodes_.upNodes[index/MAX_NODE_BITMASK] &= ~(1ull << (index%MAX_NODE_BITMASK)); } @@ -6738,7 +6738,7 @@ void CCluster::UpdateClusterState( bool &doShutdown, // Programmer bonehead! abort(); } - --CurNodes; + --currentNodes_; // Clear bit in set of "up nodes" upNodes_.upNodes[index/MAX_NODE_BITMASK] &= ~(1ull << (index%MAX_NODE_BITMASK)); @@ -7068,14 +7068,14 @@ bool CCluster::checkIfDone ( ) if (trace_settings & TRACE_SYNC_DETAIL) trace_printf("%s@%d - Node %d shutdown level=%d, state=%s. Process " - "count=%d, internal state=%d, CurNodes=%d, " + "count=%d, internal state=%d, currentNodes_=%d, " "local process count=%d\n", method_name, __LINE__, MyNode->GetPNid(), MyNode->GetShutdownLevel(), StateString(MyNode->GetState()), Nodes->ProcessCount(), MyNode->getInternalState(), - CurNodes, MyNode->GetNumProcs()); + currentNodes_, MyNode->GetNumProcs()); // Check if we are also done if (( MyNode->GetState() != State_Down ) && @@ -7094,7 +7094,7 @@ bool CCluster::checkIfDone ( ) return false; } else if ( (Nodes->ProcessCount() <= - (CurNodes*MAX_PRIMITIVES)) // only WDGs alive + (currentNodes_*MAX_PRIMITIVES)) // only WDGs alive && !MyNode->isInQuiesceState() // post-quiescing will // expire WDG (cluster) && !waitForWatchdogExit_ ) // WDG not yet exiting diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h index 6b658ae72c..ff49e56e22 100644 --- a/core/sqf/monitor/linux/cluster.h +++ b/core/sqf/monitor/linux/cluster.h @@ -124,12 +124,12 @@ class CCluster void DoDeviceReq(char * ldevname); void ExpediteDown( void ); - inline int GetTmLeader( void ) { return( TmLeaderNid); } - inline void SetTmLeader( int tmLeaderNid ) { TmLeaderNid = tmLeaderNid; } - inline int GetMonitorLeader( void ) { return( MonitorLeaderPNid); } - inline void SetMonitorLeader( int monitorLeaderPNid ) { MonitorLeaderPNid = monitorLeaderPNid; } + inline int GetTmLeader( void ) { return( tmLeaderNid_ ); } + inline void SetTmLeader( int tmLeaderNid ) { tmLeaderNid_ = tmLeaderNid; } + inline int GetMonitorLeader( void ) { return( monitorLeaderPNid_); } + inline void SetMonitorLeader( int monitorLeaderPNid ) { monitorLeaderPNid_ = monitorLeaderPNid; } int GetDownedNid( void ); - inline int GetTmSyncPNid( void ) { return( TmSyncPNid ); } // Physical Node ID of current TmSync operations master + inline int GetTmSyncPNid( void ) { return( tmSyncPNid_ ); } // Physical Node ID of current TmSync operations master void InitClusterComm(int worldSize, int myRank, int *rankToPnid); void addNewComm(int nid, int otherRank, MPI_Comm comm); void addNewSock(int nid, int otherRank, int sockFd ); @@ -210,7 +210,7 @@ class CCluster CNode **Node; // array of nodes CLNode **LNode; // array of logical nodes - int TmSyncPNid; // Physical Node ID of current TmSync operations master + int tmSyncPNid_; // Physical Node ID of current TmSync operations master void AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer @@ -229,15 +229,14 @@ class CCluster CLock syncCycle_; private: - int CurNodes; // Current # of nodes in the cluster - int CurProcs; // Current # if processes alive in MPI_COMM_WORLD + int currentNodes_; // Current # of nodes in the cluster int configPNodesCount_; // # of physical nodes configured int configPNodesMax_; // max # of physical nodes that can be configured - int *NodeMap; // Mapping of Node ranks to COMM_WORLD ranks - int TmLeaderNid; // Nid of currently assigned TM Leader node - int MonitorLeaderPNid; // PNid of currently assigned Monitor leader node - int tmReadyCount_; // # of DTM processes ready for transactions - size_t minRecvCount_; // minimum size of receive buffer for allgather + int *nodeMap_; // Mapping of Node ranks to COMM_WORLD ranks + int tmLeaderNid_; // Nid of currently assigned TM Leader node + int monitorLeaderPNid_; // PNid of currently assigned Monitor leader node + int tmReadyCount_; // # of DTM processes ready for transactions + size_t minRecvCount_; // minimum size of receive buffer for allgather // Pointer to array of "sync_buffer_def" structures. Used by // ShareWithPeers in "Allgather" operation. diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx index 485d013cbc..783640fe05 100644 --- a/core/sqf/monitor/linux/pnode.cxx +++ b/core/sqf/monitor/linux/pnode.cxx @@ -1709,7 +1709,6 @@ void CNodeContainer::AddNodes( ) else { if (pnid >= maxNode) // only for workstation acting as single node -// || (IsAgentMode &&(strcmp( MasterMonitorName, Node_name ) != 0))) { rank = -1; // -1 creates node in down state } diff --git a/core/sqf/monitor/linux/tmsync.cxx b/core/sqf/monitor/linux/tmsync.cxx index 60d9f40a36..b87f0f4383 100644 --- a/core/sqf/monitor/linux/tmsync.cxx +++ b/core/sqf/monitor/linux/tmsync.cxx @@ -321,7 +321,7 @@ int CTmSync_Container::CoordinateTmDataBlock ( struct sync_def *sync ) exchangeTmSyncData( sync, false ); syncCycle_.unlock(); ExchangeTmSyncState( false ); - if (( Monitor->TmSyncPNid == MyPNID ) && + if (( Monitor->tmSyncPNid_ == MyPNID ) && ( Nodes->GetTmState( SyncState_Start ) == SyncState_Start ) ) { // send unsolicited messages to other TMs in @@ -353,7 +353,7 @@ int CTmSync_Container::CoordinateTmDataBlock ( struct sync_def *sync ) else { if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) - trace_printf("%s@%d" " - Tm Sync failed to start, TmSyncPNid=%d, MyPNID=%d, " "TmSyncState=%d, expecting=%d\n", method_name, __LINE__, TmSyncPNid, MyPNID, Nodes->GetTmState( SyncState_Start ), SyncState_Start); + trace_printf("%s@%d" " - Tm Sync failed to start, tmSyncPNid_=%d, MyPNID=%d, " "TmSyncState=%d, expecting=%d\n", method_name, __LINE__, tmSyncPNid_, MyPNID, Nodes->GetTmState( SyncState_Start ), SyncState_Start); if (MyNode->GetTmSyncState() == SyncState_Start) { MyNode->SetTmSyncState( SyncState_Null ); @@ -449,7 +449,7 @@ void CTmSync_Container::EndTmSync( MSGTYPE type ) { trace_printf("%s@%d - Request (%p) nid=%d, handle=%d, tag=%d, unsol=%d, comp=%d\n", method_name, __LINE__, req, req->Nid, req->Handle, req->Tag, req->Unsolicited, req->Completed); } - if ( TmSyncPNid == MyPNID ) + if ( tmSyncPNid_ == MyPNID ) { if ( MyNode->GetLNodesCount() > 1 ) { @@ -666,7 +666,7 @@ void CTmSync_Container::ProcessTmSyncReply( struct message_def * msg ) TmSyncReplyCode |= msg->u.reply.u.unsolicited_tm_sync.return_code; tmsync_req->Completed = true; UnsolicitedComplete( msg ); - if ( TmSyncPNid == MyPNID ) + if ( tmSyncPNid_ == MyPNID ) { if (trace_settings & (TRACE_REQUEST | TRACE_TMSYNC)) trace_printf("%s@%d - Local Unsolicited TmSync reply, handle=" @@ -1102,7 +1102,7 @@ bool CTmSync_Container::TmSyncPending( void ) trace_printf("%s@%d" " - PendingTmSync=%d, total=%d, replies=%d, pending=%d\n", method_name, __LINE__, PendingSlaveTmSync, GetTotalSlaveTmSyncCount(), GetTmSyncReplies(), GetPendingSlaveTmSyncCount() ); if (( MyNode->GetTmSyncState() == SyncState_Abort ) && - ( TmSyncPNid != MyPNID ) && + ( tmSyncPNid_ != MyPNID ) && ( GetTmSyncReplies() == GetTotalSlaveTmSyncCount() ) ) { CommitTmDataBlock( MPI_ERR_UNKNOWN ); diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx index 107cf322aa..1c133ca0f7 100644 --- a/core/sqf/monitor/linux/zclient.cxx +++ b/core/sqf/monitor/linux/zclient.cxx @@ -523,6 +523,8 @@ const char* CZClient::WaitForAndReturnMaster( bool doWait ) { break; } + usleep(1000000); // sleep for a second as to not overwhelm the system + retries++; continue; } else if ( rc == ZOK ) @@ -549,14 +551,14 @@ const char* CZClient::WaitForAndReturnMaster( bool doWait ) break; } usleep(1000000); // sleep for a second as to not overwhelm the system - retries++; + retries++; continue; } } else // error { - if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) + if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { trace_printf( "%s@%d Error (MasterMonitor) WaitForAndReturnMaster returned rc (%d), retries %d\n" , method_name, __LINE__, rc, retries ); @@ -946,6 +948,8 @@ int CZClient::CreateMasterZNode( const char *nodeName ) , "[%s], RegisterZNode(%s) failed with error %s\n" , method_name, monData.c_str(), zerror(rc) ); mon_log_write(MON_ZCLIENT_CREATEMASTERZNODE, SQ_LOG_ERR, buf); + + TRACE_EXIT; return(rc); // Return the error } if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) From 717f9c3d25870c02d8a719f9fc15e55c0d4f1be8 Mon Sep 17 00:00:00 2001 From: Zalo Correa Date: Wed, 28 Feb 2018 15:52:37 -0800 Subject: [PATCH 4/5] More code review fixes. --- core/sqf/monitor/linux/mlio.cxx | 1 - core/sqf/monitor/linux/zclient.cxx | 11 ++++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/sqf/monitor/linux/mlio.cxx b/core/sqf/monitor/linux/mlio.cxx index 7db35ec714..b62cd46541 100644 --- a/core/sqf/monitor/linux/mlio.cxx +++ b/core/sqf/monitor/linux/mlio.cxx @@ -1263,7 +1263,6 @@ SQ_LocalIOToClient::SQ_LocalIOToClient(int nid) if (trace_settings & TRACE_INIT) { int err = errno; - char la_buf[MON_STRING_BUF_SIZE]; trace_printf( "%s@%d" " failed shmget(%d), errno=%d (%s)\n" , method_name, __LINE__ , (shsize), err, strerror(err) ); diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx index 1c133ca0f7..0ca03b161f 100644 --- a/core/sqf/monitor/linux/zclient.cxx +++ b/core/sqf/monitor/linux/zclient.cxx @@ -580,6 +580,7 @@ const char* CZClient::WaitForAndReturnMaster( bool doWait ) trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s)\n" , method_name, __LINE__, masterMonitor.c_str() ); } + TRACE_EXIT; return nodes.data[0]; } @@ -1811,7 +1812,7 @@ int CZClient::WatchNodeMasterDelete( const char *nodeName ) // This is fine since we call it indiscriminately if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete deleted %s, with rc == ZNONODE (fine)\n" + trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete already deleted %s, with rc == ZNONODE (fine)\n" , method_name, __LINE__ , nodeName ); } @@ -1821,15 +1822,15 @@ int CZClient::WatchNodeMasterDelete( const char *nodeName ) { if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) { - trace_printf( "%s@%d (MasterMonitor) WatchNodeMasterDelete deleted %s, with rc == ZOK\n" + trace_printf( "%s@%d (MasterMonitor) znode (%s) already deleted or cannot be accessed, rc=%d (%s)\n" , method_name, __LINE__ - , nodeName ); + , nodeName, rc, zerror(rc) ); } rc = ZOK; char buf[MON_STRING_BUF_SIZE]; snprintf( buf, sizeof(buf) - , "[%s], znode (%s) already deleted or cannot be accessed!\n" - , method_name, nodeName ); + , "[%s], znode (%s) already deleted or cannot be accessed, rc=%d (%s)\n" + , method_name, nodeName, rc, zerror(rc) ); mon_log_write(MON_ZCLIENT_WATCHMASTERNODEDELETE_2, SQ_LOG_INFO, buf); } else From 87739b81a022ae6005d810aadac7fc3c6071cf1d Mon Sep 17 00:00:00 2001 From: Zalo Correa Date: Wed, 28 Feb 2018 16:17:26 -0800 Subject: [PATCH 5/5] Code review fix. Missed one :-( --- core/sqf/src/trafconf/clusterconf.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sqf/src/trafconf/clusterconf.cpp b/core/sqf/src/trafconf/clusterconf.cpp index 039b87db66..907919de8d 100644 --- a/core/sqf/src/trafconf/clusterconf.cpp +++ b/core/sqf/src/trafconf/clusterconf.cpp @@ -381,7 +381,7 @@ bool CClusterConfig::LoadNodeConfig( void ) if (i == 0) { configMaster_ = pnodeConfigInfo.pnid; - strcpy (configMasterName_ ,pnodeConfigInfo.nodename); + strncpy( configMasterName_ , pnodeConfigInfo.nodename, sizeof(configMasterName_) ); } AddNodeConfiguration( pnodeConfigInfo, lnodeConfigInfo ); }