Skip to content

Commit

Permalink
[patch] Added LOGS and Updated Trace for MHUB Metadata Connection (#132)
Browse files Browse the repository at this point in the history
* [patch] Test a change

* [patch] Updated Trace for mhub debugging

* [patch] Updated logs

* [patch] Add retryConnect when creating ot Metadata failed

* [patch] Fixed mhub unlock

* [patch] Updated mhub unlock

* [patch] Updated with spaces

* [patch] Removed bracket

* [patch] More space fixes

* [patch] More spaces fix

* [patch] Added 2 log points for createDataConnection

* [patch] Added logs points to data connection for debugging purpose
  • Loading branch information
baoquocphan authored and Jon Levell committed Jan 17, 2024
1 parent 308bfc4 commit f104592
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 26 deletions.
82 changes: 58 additions & 24 deletions server_proxy/src/pxmhub.c
Expand Up @@ -48,6 +48,7 @@
#include <sasl_scram.h>
#define MAX_MQTT_SERVERS 16


/*
* TLS methods
*/
Expand Down Expand Up @@ -2829,6 +2830,9 @@ static int mhubGetAddress(ism_server_t * server, ism_transport_t * transport, i
transport->server_addr = ism_transport_putString(transport, addr);
}
transport->serverport = port;
TRACE(5, "MHUB GetAddress: connect=%u name=%s server_addr=%s server_port=%u broker=%s\n",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0]);

req = ism_common_calloc(ISM_MEM_PROBE(ism_memory_proxy_eventstreams,16),1, sizeof(*req)+sizeof(*sigevt)+sizeof(*hints)+16);
sigevt = (struct sigevent *)(req+1);
Expand Down Expand Up @@ -3674,6 +3678,10 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu
return receiveSASL(transport, inbuf, buflen, kind);
}

LOG(INFO, Server, 982, "%u%s%s%u%s", "Mhub Metadata received: connect={0} name={1} server_addr={2} server_port={3} broker={4}",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0]);

/*
* Check for the type of response based on the correlation ID
*/
Expand All @@ -3692,8 +3700,9 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu
int topiclen = ism_kafka_getString(buf, &topicstr);
int part_cnt = ism_kafka_getInt4(buf);
if (ism_kafka_more(buf)<0 || part_cnt >= 0x10000 || topiclen < 1) {
TRACE(5, "MessageHub metadata incomplete: connect=%u name=%s\n", transport->index, transport->name);
rc = 2;
TRACE(5, "MessageHub metadata incomplete: connect=%u name=%s morebuf=%d part_cnt=%d topiclen=%d\n",
transport->index, transport->name,ism_kafka_more(buf)<0, part_cnt, topiclen);
rc = 2;
} else {
rc = processTopicMetadata(mhub, topicstr, topiclen, topicrc, part_cnt);
for (j=0; rc==0 && j<part_cnt; j++) {
Expand All @@ -3708,7 +3717,8 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu
if (ism_kafka_more(buf) >= 0) {
processPartMetadata(mhub, brokers, broker_cnt, topicstr, topiclen, partid, partrc, leader);
} else {
TRACE(5, "MessageHub metadata incomplete: connect=%u name=%s\n", transport->index, transport->name);
TRACE(5, "MessageHub metadata incomplete: connect=%u name=%s morebuf=%d part_rc=%d part_id=%d leader=%d replica=%d\n",
transport->index, transport->name,ism_kafka_more(buf)<0, partrc, partid, leader, replica);
rc = 2;
}
}
Expand All @@ -3717,11 +3727,14 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu
mhub->expectingMetadata = 0;
}
if (rc) {
LOG(ERROR, Server, 983, "%u%s%s%u%s", "Mhub Metadata is incomplete: connect={0} name={1} server_addr={2} server_port={3} broker={4}",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0]);
transport->close(transport, ISMRC_BadClientData, 0, "MessageHub metadata incomplete");

ism_mhub_lock(mhub);
if(!g_shuttingDown && mhub->enabled==1)
ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(mhub->retry++));
ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(mhub->retry++));
ism_mhub_unlock(mhub);

return 1;
Expand All @@ -3733,7 +3746,9 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu
mhub->stateChanged(mhub); /* Notify of state change */
}
pthread_spin_unlock(&mhub->lock);
TRACE(9, "MessageHub metadata process complete: connect=%u name=%s\n", transport->index, transport->name);
LOG(INFO, Server, 984, "%u%s%s%u%s", "Mhub Metadata processing is completed: connect={0} name={1} server_addr={2} server_port={3} broker={4}",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0]);
}
return 0;
}
Expand All @@ -3750,7 +3765,7 @@ static int createMetadataConnection(ism_mhub_t * mhub) {
return 0;

ism_transport_t * transport = ism_transport_newOutgoing(NULL, 1);
TRACE(5, "Start mhub metadata connection: org=%s mhub=%s\n", mhub->tenant->name, mhub->id);
TRACE(5, "Creating mhub metadata connection: org=%s mhub=%s\n", mhub->tenant->name, mhub->id);
ism_tcp_init_transport(transport);
transport->originated = 1;
transport->protocol = "mhub_metadata";
Expand All @@ -3773,12 +3788,23 @@ static int createMetadataConnection(ism_mhub_t * mhub) {
mhub->metadata = transport;
int rc = ism_kafka_createConnection(transport, (ism_server_t *)mhub);
if(rc){
char * erbuf = alloca(2048);
ism_common_formatLastError(erbuf, 2048);
TRACE(7, "Failed create the metadata connection. name=%s rc=%d errmsg=%s\n", transport->clientID, rc, erbuf);
transport->close(transport, rc, 0, erbuf);
char * erbuf = alloca(2048);
ism_common_formatLastError(erbuf, 2048);
LOG(ERROR, Server, 980, "%u%s%s%u%s%d%s", "Failed to create the metadata connection: connect={0} name={1} server_addr={2} server_port={3} broker={4} rc={5} errmsg={6}",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0], rc, erbuf);
transport->close(transport, rc, 0, erbuf);

//If the Metadata connection creation failed, retry again in a timer
ism_mhub_lock(mhub);
if(!g_shuttingDown && mhub->enabled==1)
ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(mhub->retry++));
ism_mhub_unlock(mhub);

}else{
TRACE(6, "Start mhub metadata connection: connect=%u name=%s\n", transport->index, transport->name);
LOG(INFO, Server, 981, "%u%s%s%u%s", "Created mhub metadata connection: connect={0} name={1} server_addr={2} server_port={3} broker={4}",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0]);
}
return 0;
}
Expand All @@ -3795,11 +3821,11 @@ static int mhubRetryConnect(ism_timer_t key, ism_time_t now, void * userdata) {

//Only create connection if MSPROXY is not shutdown
if(!g_shuttingDown){
createMetadataConnection(mhub);
createMetadataConnection(mhub);
}else{
TRACE(5, "Failed to connect metadata connection. Msproxy is shutting down. tenant=%s name=%s\n",
(mhub->tenant!=NULL)?mhub->tenant->name:"", mhub->name);
}
TRACE(5, "Failed to connect metadata connection. Msproxy is shutting down. tenant=%s name=%s\n",
(mhub->tenant!=NULL)?mhub->tenant->name:"", mhub->name);
}
return 0;
}

Expand All @@ -3813,15 +3839,18 @@ xUNUSED static int mhubDataRetryConnect(ism_timer_t key, ism_time_t now, void *
ism_common_cancelTimer(key);
pthread_mutex_lock(&mhub_part->lock);
ism_transport_t * transport= mhub_part->transport;
ism_kafka_con_t * pobj = (ism_kafka_con_t *)transport->pobj;
pthread_mutex_unlock(&mhub_part->lock);

if(!g_shuttingDown){
transport->ready = 7; //Set ready for Connect Timeout. See ddosTimer
int rc = ism_kafka_createConnection(transport, transport->pobj->server);
int rc = ism_kafka_createConnection(transport, pobj->server);
if(rc){

char * erbuf = alloca(2048);
ism_common_formatLastError(erbuf, 2048);
TRACE(7, "Failed create the data connection. name=%s rc=%d errmsg=%s\n", transport->clientID, rc, erbuf);
LOG(ERROR, Server, 987, "%u%s%s%u%d%d%d%s", "Failed to create the mhub data connection: connect={0} name={1} server_addr={2} server_port={3} partition={4} nodeid={5} rc={6} errmsg={7}",
transport->index, transport->name, transport->server_addr, transport->serverport, pobj->partID, pobj->nodeID, rc, erbuf);
transport->close(transport, rc, 0, erbuf);
}
} else {
Expand Down Expand Up @@ -3875,10 +3904,12 @@ static int createDataConnection(ism_mhub_t * mhub, mhub_topic_t * topic, int par
//Close the connection
char * erbuf = alloca(2048);
ism_common_formatLastError(erbuf, 2048);
TRACE(7, "Failed create the data connection. name=%s partition=%d nodeid=%d rc=%d errmsg=%s\n", transport->clientID, part, nodeid, rc, erbuf);
LOG(ERROR, Server, 985, "%u%s%s%u%d%d%d%s", "Failed to create the mhub data connection: connect={0} name={1} server_addr={2} server_port={3} partition={4} nodeid={5} rc={6} errmsg={7}",
transport->index, transport->name, transport->server_addr, transport->serverport, part, nodeid, rc, erbuf);
transport->close(transport, rc, 0, erbuf);
} else {
TRACE(6, "Start mhub data connection: connect=%u name=%s\n", transport->index, transport->name);
LOG(INFO, Server, 986, "%u%s%s%u%d%d", "Created mhub data connection: connect={0} name={1} server_addr={2} server_port={3} partition={4} nodeid={5}",
transport->index, transport->name, transport->server_addr, transport->serverport, part, nodeid);
}
return 0;
}
Expand All @@ -3898,7 +3929,7 @@ static int mhubCreateData(ism_timer_t key, ism_time_t now, void * userdata) {
broker.broker = info->broker;
broker.broker_len = strlen(info->broker);
broker.port = info->port;
TRACE(8, "mhubCreateData mhub=%s topic=%s broker=%s broker_len=%d port=%d\n", info->mhub->id, (topic ? topic->name : ""),
TRACE(5, "mhubCreateData mhub=%s topic=%s broker=%s broker_len=%d port=%d\n", info->mhub->id, (topic ? topic->name : ""),
broker.broker, broker.broker_len, broker.port);
if (topic) {
createDataConnection(info->mhub, topic, info->partid, info->leader, &broker);
Expand All @@ -3921,9 +3952,6 @@ static void mhubMetadataRequest(ism_mhub_t * mhub, ism_transport_t * transport)
if (g_shuttingDown)
return;

TRACE(6, "MessageHub MetadataRequest: connect=%u name=%s broker=%s:%u host=%s\n",
transport->index, transport->name, transport->server_addr, transport->serverport,
transport->client_host ? transport->client_host : "");
ism_kafka_putInt4(buf, MetadataRequest0);
ism_kafka_putInt4(buf, 0x20000); /* Correlation ID */
ism_kafka_putString(buf, ism_common_getHostnameInfo(), -1); /* Use our hostname as the kafka clientID */
Expand All @@ -3932,11 +3960,17 @@ static void mhubMetadataRequest(ism_mhub_t * mhub, ism_transport_t * transport)
for (i=0; i<mhub->topiccount; i++) {
mhub_topic_t * topic = mhub->topics[i];
ism_kafka_putString(buf, topic->name, -1);
TRACE(8, "MessageHub MetadataRequest for topic: %s\n", topic->name);
TRACE(5, "MessageHub MetadataRequest for topic: %s\n", topic->name);
topic_count++;
}
ism_kafka_putInt4At(buf, topic_count_pos, topic_count);
ism_kafka_putString(buf, transport->pobj->topic, -1);

//LOG when to send the Metadata Request with server info
LOG(INFO, Server, 979, "%u%s%s%u%s", "MessageHub metadatarequest submitted: connect={0} name={1} server_addr={2} server_port={3} broker={4}",
transport->index, transport->name, transport->server_addr, transport->serverport,
(mhub->trybroker>0)?mhub->brokers[mhub->trybroker-1]:mhub->brokers[0]);

transport->send(transport, buf->buf+4, buf->used-4, 0, SFLAG_FRAMESPACE);
}

Expand Down
4 changes: 2 additions & 2 deletions server_proxy/src/pxtcp.c
Expand Up @@ -4419,8 +4419,8 @@ static void moreOutgoing(ism_transport_t * transport, int rc, struct addrinfo *
transport->client_addr = ism_transport_putString(transport, tmpStr);
}

TRACE(7, "moreOutgoing: connect=%u server=[%s]:%u client=[%s]:%u state=0x%x sock=%d result=%s (%d)\n", transport->index,
transport->server_addr, transport->serverport,transport->client_addr, transport->clientport,
TRACE(7, "moreOutgoing: connect=%u server=[%s]:%u server_name=%s client=[%s]:%u state=0x%x sock=%d result=%s (%d)\n", transport->index,
transport->server_addr, transport->serverport, transport->tobj->servername, transport->client_addr, transport->clientport,
connection->state, sock, strerror(err), err);
}

Expand Down

0 comments on commit f104592

Please sign in to comment.