From 085ec1a3e41574c176183fba7629ffd2dc856c28 Mon Sep 17 00:00:00 2001 From: Wen Lin Date: Mon, 21 Dec 2015 18:51:58 +0800 Subject: [PATCH] HAWQ-267. Add check for AM heartbeat thread status --- depends/libyarn/src/CMakeLists.txt | 2 +- .../src/libyarnclient/LibYarnClient.cpp | 49 ++++++++++++++++--- .../libyarn/src/libyarnclient/LibYarnClient.h | 1 + .../src/libyarnclient/LibYarnClientC.cpp | 2 +- .../ApplicationClientProtocol.cpp | 4 +- .../resourcebroker_LIBYARN_proc.c | 3 +- 6 files changed, 50 insertions(+), 11 deletions(-) diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt index e9657cc0ab..d19b3b23ed 100644 --- a/depends/libyarn/src/CMakeLists.txt +++ b/depends/libyarn/src/CMakeLists.txt @@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) SET(libyarn_VERSION_MAJOR 0) SET(libyarn_VERSION_MINOR 1) -SET(libyarn_VERSION_PATCH 9) +SET(libyarn_VERSION_PATCH 10) SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}") SET(libyarn_VERSION_API 1) SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src) diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp index 3fb7e5b2ec..5208e059f1 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp +++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp @@ -42,7 +42,7 @@ LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort, amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(amHost), amPort(amPort), am_tracking_url(am_tracking_url), heartbeatInterval(heartbeatInterval),response_id(0),clientJobId(""), - keepRun(false){ + keepRun(false), needHeartbeatAlive(false){ pthread_mutex_init( &(heartbeatLock), NULL ); amrmClient = NULL; @@ -220,6 +220,7 @@ int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) { "error code %d", rc); throw std::runtime_error( "Fail to create heart-beat thread."); } + needHeartbeatAlive = true; #endif LOG(INFO,"LibYarnClient::createJob, after AM register to RM, a heartbeat thread has been started"); @@ -272,6 +273,7 @@ int LibYarnClient::forceKillJob(string &jobId) { throw std::invalid_argument("The jobId is wrong, please check the jobId argument"); } + needHeartbeatAlive = false; for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { ostringstream key; Container *container = it->second; @@ -484,6 +486,10 @@ int LibYarnClient::allocateResources(string &jobId, throw std::invalid_argument("The jobId is wrong, check the jobId argument"); } + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient; list allocatedContainerCache; list preContainerReports; @@ -622,6 +628,11 @@ int LibYarnClient::releaseResources(string &jobId,int64_t releaseContainerIds[], if (jobId != clientJobId) { throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); } + + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient; //1) asksBlank list asksBlank; @@ -705,6 +716,10 @@ int LibYarnClient::activeResources(string &jobId,int64_t activeContainerIds[],in if (jobId != clientJobId) { throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); } + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + LOG(INFO, "LibYarnClient::activeResources, activeResources started"); for (int i = 0; i < activeContainerSize; i++){ @@ -772,12 +787,13 @@ int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus finalStatus) } #endif - try{ - if (jobId != clientJobId) { - throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); - } + try{ + if (jobId != clientJobId) { + throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); + } + needHeartbeatAlive = false; + //1. we should stop all containers related with this job - //ContainerManagement cmgmt; for (map::iterator it = jobIdContainers.begin(); it != jobIdContainers. end(); it++) { ostringstream key; Container *container = it->second; @@ -820,6 +836,11 @@ int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport &applica if (jobId != clientJobId) { throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); } + + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + LOG(INFO,"LibYarnClient::getApplicationReport, appId[cluster_timestamp:%lld,id:%d]", clientAppId.getClusterTimestamp(), clientAppId.getId()); applicationReport = ((ApplicationClient*) appClient)->getApplicationReport(clientAppId); @@ -848,6 +869,11 @@ int LibYarnClient::getContainerReports(string &jobId,list &cont if (jobId != clientJobId) { throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); } + + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + LOG(INFO,"LibYarnClient::getContainerReports, appId[cluster_timestamp:%lld,id:%d]", clientAppId.getClusterTimestamp(), clientAppId.getId()); containerReports = ((ApplicationClient*) appClient)->getContainers(clientAppAttempId); @@ -872,6 +898,10 @@ int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int throw std::invalid_argument("The jobId is wrong,please check the jobId argument"); } + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + for (int i = 0; i < containerSize; i++) { int64_t containerId = containerIds[i]; map::iterator it = jobIdContainers.find(containerId); @@ -908,6 +938,10 @@ int LibYarnClient::getContainerStatuses(string &jobId,int64_t containerIds[],int int LibYarnClient::getQueueInfo(string &queue, bool includeApps, bool includeChildQueues, bool recursive,QueueInfo &queueInfo) { try{ + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } + queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue, includeApps, includeChildQueues, recursive); return FR_SUCCEEDED; @@ -927,6 +961,9 @@ int LibYarnClient::getQueueInfo(string &queue, bool includeApps, int LibYarnClient::getClusterNodes(list &states,list &nodeReports) { try{ + if (!keepRun && needHeartbeatAlive) { + throw std::runtime_error("LibYarnClient::libyarn AM heartbeat thread has stopped."); + } nodeReports = ((ApplicationClient*) appClient)->getClusterNodes(states); return FR_SUCCEEDED; } diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.h b/depends/libyarn/src/libyarnclient/LibYarnClient.h index 9ce51be427..24ccb6d86f 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClient.h +++ b/depends/libyarn/src/libyarnclient/LibYarnClient.h @@ -142,6 +142,7 @@ namespace libyarn { list askRequests; volatile bool keepRun; + bool needHeartbeatAlive; #ifdef MOCKTEST private: /* diff --git a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp index 51971fdf68..c558e315d7 100644 --- a/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp +++ b/depends/libyarn/src/libyarnclient/LibYarnClientC.cpp @@ -263,7 +263,7 @@ extern "C" { goto exit_err; preferredAllocatedSize = allocatedPreferredList.size(); - preferredAllocatedArray = (LibYarnResource_t *)(sizeof(LibYarnResource_t) * preferredAllocatedSize); + preferredAllocatedArray = (LibYarnResource_t *)malloc(sizeof(LibYarnResource_t) * preferredAllocatedSize); if(preferredAllocatedArray == NULL) { setErrorMessage("LibYarnClientC::fail to allocate memory for resource array"); goto exit_err; diff --git a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp index 96691525fc..ac0f026fb7 100644 --- a/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp +++ b/depends/libyarn/src/libyarnserver/ApplicationClientProtocol.cpp @@ -170,11 +170,11 @@ GetApplicationReportResponse ApplicationClientProtocol::getApplicationReport( rpc getContainers (GetContainersRequestProto) returns (GetContainersResponseProto); message GetContainersRequestProto { - optional ApplicationIdProto application_id = 1; + optional ApplicationIdProto application_attempt_id = 1; } message GetContainersResponseProto { - repeated ContainerReportProto containers_reports = 1; + repeated ContainerReportProto containers = 1; } */ GetContainersResponse ApplicationClientProtocol::getContainers(GetContainersRequest &request){ diff --git a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c index 814a578873..d3028c3ef6 100644 --- a/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c +++ b/src/backend/resourcemanager/resourcebroker/resourcebroker_LIBYARN_proc.c @@ -1787,7 +1787,8 @@ int RB2YARN_getContainerReport(RB_GRMContainerStat *ctnstats, int *size) freeContainerStatusArray(ctnstatarr, ctnstatsize); } - freeContainerReportArray(ctnrparr, arrsize); + if(ctnrparr != NULL && arrsize > 0) + freeContainerReportArray(ctnrparr, arrsize); return yarnres; }