diff --git a/server_proxy/src/pxmhub.c b/server_proxy/src/pxmhub.c index 074acde7..deeb2e1f 100644 --- a/server_proxy/src/pxmhub.c +++ b/server_proxy/src/pxmhub.c @@ -2879,6 +2879,33 @@ static int requestShutdownTimer(ism_timer_t timer, ism_time_t timestamp, void * return 0; } +/** + * This function will determine whether to make a metadata request or create + * a new TCP connection to the boostrap server and make the metadata request. + * + * @param mhub the mhub object + */ +static int needMetadata(ism_mhub_t * mhub) +{ + ism_mhub_lock(mhub); + if (mhub->enabled==1 && !mhub->expectingMetadata) { + if (mhub->metadata && mhub->metadata->pobj->state == TCP_CONNECTED) { + mhub->expectingMetadata = 1; + mhubMetadataRequest(mhub, mhub->metadata); + } else { + //Metadata is broken, need new transport. + mhub->prev_state = mhub->state; + mhub->state = MHS_Opening; + if (mhub->stateChanged) { + mhub->stateChanged(mhub); /* Notify of state change */ + } + ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(0)); + } + } + ism_mhub_unlock(mhub); + return 0; +} + /* * process a produce response @@ -3218,22 +3245,7 @@ static int mhubReceiveKafka(ism_transport_t * transport, char * inbuf, int bufle * Request metadata */ if (needmetadata > 0) { - ism_mhub_lock(mhub); - if (mhub->enabled==1 && !mhub->expectingMetadata) { - if (mhub->metadata && mhub->metadata->pobj->state == TCP_CONNECTED) { - mhub->expectingMetadata = 1; - mhubMetadataRequest(mhub, transport); - } else { - //Metadata is broken, need new transport. - mhub->prev_state = mhub->state; - mhub->state = MHS_Opening; - if (mhub->stateChanged) { - mhub->stateChanged(mhub); /* Notify of state change */ - } - ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubRetryConnect, mhub, retryDelay(0)); - } - } - ism_mhub_unlock(mhub); + needMetadata(mhub); } return 0; } @@ -5071,6 +5083,7 @@ int ism_mhub_publishEvent(ism_mhub_t * mhub, mqtt_pmsg_t * pmsg, const char * cl int rc = 1; kafka_produce_msg_t * event; int eventlen; + int needmetadata=0; /* * Get topic and partition @@ -5154,11 +5167,12 @@ int ism_mhub_publishEvent(ism_mhub_t * mhub, mqtt_pmsg_t * pmsg, const char * cl //The Connection Record is not in the open state. or TCP connection is not in Open State //Still keep the message in the Pending Q //Will move it once the connection established. - if(transport!=NULL){ - TRACE(5, "publishEvent: Partition Connection is not open. which=%d transport.index=%d transport.state=%d transport.ready=%d pending_msg_count=%d\n", which, transport->index, transport->state, transport->ready, mhub_part->kafka_msg_count); - }else{ - TRACE(5, "publishEvent: Partition Connection is not open. which=%d pending_msg_count=%d\n", which, mhub_part->kafka_msg_count); - } + if(transport!=NULL){ + TRACE(5, "publishEvent: Partition Connection is not open. which=%d transport.index=%d transport.state=%d transport.ready=%d pending_msg_count=%d\n", which, transport->index, transport->state, transport->ready, mhub_part->kafka_msg_count); + }else{ + TRACE(5, "publishEvent: Partition Connection is not open. which=%d pending_msg_count=%d\n", which, mhub_part->kafka_msg_count); + } + needmetadata=1; rc = 1; } @@ -5173,6 +5187,11 @@ int ism_mhub_publishEvent(ism_mhub_t * mhub, mqtt_pmsg_t * pmsg, const char * cl } pthread_mutex_unlock(&mhub_part->lock); + + if(needmetadata > 0){ + needMetadata(mhub); + } + return rc; }