Skip to content

Commit

Permalink
[patch] Added code to request Metadata when publish failed (#133)
Browse files Browse the repository at this point in the history
* [patch] Added RequestMetaProcess in PublishEvent

* [patch] put needMetadata code into a common function.

* [patch] Updated comments
  • Loading branch information
baoquocphan authored and Jon Levell committed Jan 17, 2024
1 parent f104592 commit 66628c5
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions server_proxy/src/pxmhub.c
Expand Up @@ -2879,6 +2879,33 @@ static int requestShutdownTimer(ism_timer_t timer, ism_time_t timestamp, void *
return 0;
}

/**
* This function will determine whether to make a metadata request or create
* a new TCP connection to the boostrap server and make the metadata request.
*
* @param mhub the mhub object
*/
static int needMetadata(ism_mhub_t * mhub)
{
ism_mhub_lock(mhub);
if (mhub->enabled==1 && !mhub->expectingMetadata) {
if (mhub->metadata && mhub->metadata->pobj->state == TCP_CONNECTED) {
mhub->expectingMetadata = 1;
mhubMetadataRequest(mhub, mhub->metadata);
} else {
//Metadata is broken, need new transport.
mhub->prev_state = mhub->state;
mhub->state = MHS_Opening;
if (mhub->stateChanged) {
mhub->stateChanged(mhub); /* Notify of state change */
}
ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(0));
}
}
ism_mhub_unlock(mhub);
return 0;
}


/*
* process a produce response
Expand Down Expand Up @@ -3218,22 +3245,7 @@ static int mhubReceiveKafka(ism_transport_t * transport, char * inbuf, int bufle
* Request metadata
*/
if (needmetadata > 0) {
ism_mhub_lock(mhub);
if (mhub->enabled==1 && !mhub->expectingMetadata) {
if (mhub->metadata && mhub->metadata->pobj->state == TCP_CONNECTED) {
mhub->expectingMetadata = 1;
mhubMetadataRequest(mhub, transport);
} else {
//Metadata is broken, need new transport.
mhub->prev_state = mhub->state;
mhub->state = MHS_Opening;
if (mhub->stateChanged) {
mhub->stateChanged(mhub); /* Notify of state change */
}
ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(0));
}
}
ism_mhub_unlock(mhub);
needMetadata(mhub);
}
return 0;
}
Expand Down Expand Up @@ -5071,6 +5083,7 @@ int ism_mhub_publishEvent(ism_mhub_t * mhub, mqtt_pmsg_t * pmsg, const char * cl
int rc = 1;
kafka_produce_msg_t * event;
int eventlen;
int needmetadata=0;

/*
* Get topic and partition
Expand Down Expand Up @@ -5154,11 +5167,12 @@ int ism_mhub_publishEvent(ism_mhub_t * mhub, mqtt_pmsg_t * pmsg, const char * cl
//The Connection Record is not in the open state. or TCP connection is not in Open State
//Still keep the message in the Pending Q
//Will move it once the connection established.
if(transport!=NULL){
TRACE(5, "publishEvent: Partition Connection is not open. which=%d transport.index=%d transport.state=%d transport.ready=%d pending_msg_count=%d\n", which, transport->index, transport->state, transport->ready, mhub_part->kafka_msg_count);
}else{
TRACE(5, "publishEvent: Partition Connection is not open. which=%d pending_msg_count=%d\n", which, mhub_part->kafka_msg_count);
}
if(transport!=NULL){
TRACE(5, "publishEvent: Partition Connection is not open. which=%d transport.index=%d transport.state=%d transport.ready=%d pending_msg_count=%d\n", which, transport->index, transport->state, transport->ready, mhub_part->kafka_msg_count);
}else{
TRACE(5, "publishEvent: Partition Connection is not open. which=%d pending_msg_count=%d\n", which, mhub_part->kafka_msg_count);
}
needmetadata=1;
rc = 1;
}

Expand All @@ -5173,6 +5187,11 @@ int ism_mhub_publishEvent(ism_mhub_t * mhub, mqtt_pmsg_t * pmsg, const char * cl
}

pthread_mutex_unlock(&mhub_part->lock);

if(needmetadata > 0){
needMetadata(mhub);
}

return rc;
}

Expand Down

0 comments on commit 66628c5

Please sign in to comment.