Skip to content
Permalink
Browse files
HAWQ-1827. enable rm to check segment down within RUAlive
  • Loading branch information
ztao1987 committed Jan 9, 2022
1 parent 27ac97c commit 04fee35750bfe8dd83b7d1172f21d4e950d28ddc
Showing 6 changed files with 50 additions and 3 deletions.
@@ -414,7 +414,8 @@ int processAllCommFileDescs(void)
if ( CommBuffers[i]->ClientHostname.Str != NULL &&
CommBuffers[i]->ServerPort != 0 )
{
elog(DEBUG3, "Return FD %d Index %d.", CommBuffers[i]->FD, i);
elog(DEBUG3, "Return FD %d Index %d. host:port %s:%d", CommBuffers[i]->FD, i,
CommBuffers[i]->ClientHostname.Str, CommBuffers[i]->ServerPort);
returnAliveConnectionRemoteByHostname(
&(CommBuffers[i]->FD),
CommBuffers[i]->ClientHostname.Str,
@@ -659,6 +660,7 @@ int registerAsyncConnectionFileDesc(const char *address,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void *userdata,
bool useNewConnection,
AsyncCommBuffer *newcommbuffer)
{
int res = FUNC_RETURN_OK;
@@ -678,7 +680,13 @@ int registerAsyncConnectionFileDesc(const char *address,
return UTIL_NETWORK_FAIL_GETHOST;
}

if ( rm_enable_connpool )
if ( useNewConnection )
{
/* remove old connection in the connpool for RUAlive check */
removeAliveSocketConnection(address, resolvedaddr, port);
}

if ( !useNewConnection && rm_enable_connpool )
{
/* Try to get an alive connection from connection pool. */
fd = fetchAliveSocketConnection(address, resolvedaddr, port);
@@ -111,12 +111,13 @@ int sendRUAlive(char *seghostname)
context->MessageCleanUpHandler = sentRUAliveCleanUp;
context->UserData = (void *)segres;

/* Connect to HAWQ RM server */
/* always create new connection for RUAlive msg */
res = registerAsyncConnectionFileDesc(seghostname,
rm_segment_port,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
true,
&newcommbuffer);
if ( res != FUNC_RETURN_OK )
{
@@ -342,6 +343,7 @@ int increaseMemoryQuota(char *seghostname, GRMContainerSet containerset)
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
false,
&commbuffer);
if ( res != FUNC_RETURN_OK )
{
@@ -505,6 +507,7 @@ int decreaseMemoryQuota(char *seghostname,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
false,
&commbuffer);
if ( res != FUNC_RETURN_OK )
{
@@ -103,6 +103,7 @@ int sendIMAlive(int *errorcode,
ASYNCCOMM_READBYTES | ASYNCCOMM_WRITEBYTES,
&AsyncCommBufferHandlersMessage,
context,
false,
&newcommbuffer);
if ( res != FUNC_RETURN_OK )
{
@@ -101,6 +101,7 @@ int registerAsyncConnectionFileDesc(const char *address,
uint32_t actionmask,
AsyncCommBufferHandlers methods,
void *userdata,
bool useNewConnection,
AsyncCommBuffer *newcommbuffer);

/* Process all registered file descriptors. */
@@ -148,4 +148,8 @@ AddressString getAddressStringByHostName(const char *hostname);
int fetchAliveSocketConnection(const char *hostname,
AddressString address,
uint16_t port);

int removeAliveSocketConnection(const char *hostname,
AddressString address,
uint16_t port);
#endif /* RESOURCE_MANANGER_NETWORK_UTILITIES_H */
@@ -603,6 +603,36 @@ int fetchAliveSocketConnection(const char *hostname,
return res;
}

int removeAliveSocketConnection(const char *hostname,
AddressString address,
uint16_t port)
{
ConnAddressString connaddr = createConnAddressString(address, port);
SimpArray key;
setSimpleArrayRef(&key, (char *)connaddr, SIZEOFCONNADDRSTRING(connaddr));
PAIR pair = getHASHTABLENode(&ActiveConnections, (void *)&key);
if ( pair == NULL )
{
freeConnAddressString(connaddr);
return -1;
}
List *aliveconns = (List *)(pair->Value);
while( aliveconns != NULL )
{
int fd = lfirst_int(list_head(aliveconns));
MEMORY_CONTEXT_SWITCH_TO(PCONTEXT)
aliveconns = list_delete_first(aliveconns);
MEMORY_CONTEXT_SWITCH_BACK
elog(DEBUG3, "Removed FD %d for %s:%d.", fd, hostname, port);
closeConnectionRemote(&fd);
}
pair->Value = NULL;
removeHASHTABLENode(&ActiveConnections, (void *)&key);
freeConnAddressString(connaddr);

return FUNC_RETURN_OK;
}

void returnAliveConnectionRemoteByHostname(int *clientfd,
const char *hostname,
uint16_t port)

0 comments on commit 04fee35

Please sign in to comment.