Skip to content

Commit

Permalink
Merge pull request #4 from spuhpointer/rbtree
Browse files Browse the repository at this point in the history
Works on rbtree support
  • Loading branch information
oskarsp committed Sep 28, 2023
2 parents c986bd5 + f0e6e65 commit 382cdb0
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 126 deletions.
10 changes: 4 additions & 6 deletions tmqueue/corhandle.c
Expand Up @@ -106,7 +106,7 @@ expublic tmq_corhash_t * tmq_cor_add(tmq_qhash_t *qhash, char *corrid_str)
* @param mmsg remove from CDL, remove from hash, if CDL is free, remove HASH
* entry
*/
expublic void tmq_cor_msg_del(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
expublic void tmq_cor_msg_del(tmq_memmsg_t *mmsg)
{
/* find the corhash entry, if have one remove from from hash
* remove msg from CDL
Expand All @@ -123,21 +123,19 @@ expublic void tmq_cor_msg_del(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)

/**
* Add message to corelator hash / linked list
* @param qconf queue configuration
* @param qhash queue entry (holds the ptr to cor hash)
* @param mmsg message to add
* @return Qerror code
*/
expublic int tmq_cor_msg_add(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
expublic int tmq_cor_msg_add(tmq_memmsg_t *mmsg)
{
int ret = EXSUCCEED;
int isNew = EXFALSE;

tmq_corhash_t * corhash = tmq_cor_find(qhash, mmsg->corrid_str);
tmq_corhash_t * corhash = tmq_cor_find(mmsg->qhash, mmsg->corrid_str);

if (NULL==corhash)
{
corhash=tmq_cor_add(qhash, mmsg->corrid_str);
corhash=tmq_cor_add(mmsg->qhash, mmsg->corrid_str);
}

if (NULL==corhash)
Expand Down
49 changes: 30 additions & 19 deletions tmqueue/forward.c
Expand Up @@ -108,6 +108,7 @@ expublic int volatile ndrx_G_fwd_force_wake = EXFALSE;

/**
* We assume we are locked.
* and check run against time.
* @param mmsg which was enqueued
*/
expublic void ndrx_forward_chkrun(tmq_memmsg_t *mmsg)
Expand Down Expand Up @@ -137,27 +138,36 @@ expublic void ndrx_forward_chkrun(tmq_memmsg_t *mmsg)
{
return;
}

conf = tmq_qconf_get_with_default(mmsg->msg->hdr.qname, NULL);
if (NULL!=conf)

/* if it's not auto, nothing to check & run */
if (!TMQ_AUTOQ_ISAUTO(mmsg->qconf->autoq))
{
return;
}

/*
* we can run the message, if it is in the current list
* or in future, but, it's time has come.
*/
if ( (mmsg->qstate & NDRX_TMQ_LOC_CURQ

|| mmsg->qstate & NDRX_TMQ_LOC_FUTQ && (mmsg->msg->qctl.flags &TPQTIME_ABS)
&& mmsg->msg->qctl.deq_time <= time(NULL) )
/* Ignore error of cnt... */
&&conf->workers > tmq_fwd_busy_cnt(mmsg->msg->hdr.qname, &p_stats))
{
if (tmq_is_auto_valid_for_deq(mmsg, conf) &&
/* Ignore error of cnt... */
conf->workers > tmq_fwd_busy_cnt(mmsg->msg->hdr.qname, &p_stats))

ndrx_G_fwd_force_wake=EXTRUE;

if (ndrx_G_fwd_into_sleep)
{

ndrx_G_fwd_force_wake=EXTRUE;

if (ndrx_G_fwd_into_sleep)
{
/* wakup from main sleep */
pthread_cond_signal(&M_wait_cond);
}
else if (ndrx_G_fwd_into_poolsleep)
{
/* wakup from pool sleep */
ndrx_thpool_signal_one(G_tmqueue_cfg.fwdthpool);
}
/* wakup from main sleep */
pthread_cond_signal(&M_wait_cond);
}
else if (ndrx_G_fwd_into_poolsleep)
{
/* wakup from pool sleep */
ndrx_thpool_signal_one(G_tmqueue_cfg.fwdthpool);
}
}
}
Expand Down Expand Up @@ -885,6 +895,7 @@ expublic void thread_process_forward (void *ptr, int *p_finish_off)
}
else
{
/* schedule next run of the msg */
msg->qctl.flags |= TPQTIME_ABS;
if ( 0 == msg->trycounter )
{
Expand Down
54 changes: 31 additions & 23 deletions tmqueue/inflight.c
Expand Up @@ -67,6 +67,9 @@ expublic int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_mem
int isNew = EXFALSE;
char corrid_str[TMCORRIDLEN_STR+1];

mmsg->qconf=qconf;
mmsg->qhash=qhash;

if (mmsg->msg->lockthreadid)
{
CDL_APPEND(qhash->q_infligh, mmsg);
Expand All @@ -92,7 +95,7 @@ expublic int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_mem
NDRX_LOG(log_debug, "Adding to corrid_hash [%s] of queue [%s]",
corrid_str,mmsg->msg->hdr.qname);

if (EXSUCCEED!=tmq_cor_msg_add(qconf, qhash, mmsg))
if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
{
NDRX_LOG(log_error, "Failed to add msg to corhash!");
EXFAIL_OUT(ret);
Expand All @@ -114,24 +117,24 @@ expublic int ndrx_infl_addmsg(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_mem
* Move message from current/cur/fut to inflight
* NOTE that context must be with M_q_lock locked.
*/
expublic int ndrx_infl_mov2infl(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
expublic int ndrx_infl_mov2infl(tmq_memmsg_t *mmsg)
{
int ret = EXSUCCEED;

if (mmsg->qstate & NDRX_TMQ_LOC_FUTQ)
{
ndrx_rbt_delete(qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
ndrx_rbt_delete(mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
mmsg->qstate &= ~NDRX_TMQ_LOC_FUTQ;
}
else if (mmsg->qstate & NDRX_TMQ_LOC_CURQ)
{
ndrx_rbt_delete(qhash->q, (ndrx_rbt_node_t *)mmsg);
ndrx_rbt_delete(mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg);
mmsg->qstate &= ~NDRX_TMQ_LOC_CURQ;

/* remove from correlator too */
if (mmsg->qstate & NDRX_TMQ_LOC_CORQ)
{
tmq_cor_msg_del(qhash, mmsg);
tmq_cor_msg_del(mmsg);
mmsg->qstate &= ~NDRX_TMQ_LOC_CORQ;
}
}
Expand All @@ -146,7 +149,7 @@ expublic int ndrx_infl_mov2infl(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
}

/* add message to inflight */
CDL_APPEND(qhash->q_infligh, mmsg);
CDL_APPEND(mmsg->qhash->q_infligh, mmsg);
mmsg->qstate |= NDRX_TMQ_LOC_INFL;

out:
Expand All @@ -161,31 +164,31 @@ expublic int ndrx_infl_mov2infl(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
* @param mmsg message to move
* @return EXSUCCEED/EXFAIL
*/
expublic int ndrx_infl_mov2cur(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
expublic int ndrx_infl_mov2cur(tmq_memmsg_t *mmsg)
{
int ret = EXSUCCEED;

if (mmsg->qstate & NDRX_TMQ_LOC_INFL)
{
CDL_DELETE(qhash->q_infligh, mmsg);
CDL_DELETE(mmsg->qhash->q_infligh, mmsg);
mmsg->qstate &= ~NDRX_TMQ_LOC_INFL;

/* enqueue to cur/cor or fut */
if ( (mmsg->msg->qctl.flags & TPQTIME_ABS) &&
(mmsg->msg->qctl.deq_time > (long)time(NULL)))
{
ndrx_rbt_insert(qhash->q_fut, (ndrx_rbt_node_t *)mmsg, NULL);
ndrx_rbt_insert(mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg, NULL);
mmsg->qstate |= NDRX_TMQ_LOC_FUTQ;
}
else
{
/* insert to cur */
ndrx_rbt_insert(qhash->q, (ndrx_rbt_node_t *)mmsg, NULL);
ndrx_rbt_insert(mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg, NULL);
mmsg->qstate |= NDRX_TMQ_LOC_CURQ;

if (mmsg->msg->qctl.flags & TPQCORRID)
{
if (EXSUCCEED!=tmq_cor_msg_add(qconf, qhash, mmsg))
if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
{
NDRX_LOG(log_error, "Failed to add msg [%s] to corhash!",
mmsg->msgid_str);
Expand Down Expand Up @@ -215,14 +218,14 @@ expublic int ndrx_infl_mov2cur(tmq_qconfig_t * qconf, tmq_qhash_t *qhash, tmq_me
* @param mmsg message to remove
* NOTE that context must be with M_q_lock locked.
*/
expublic int ndrx_infl_delmsg(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)
expublic int ndrx_infl_delmsg(tmq_memmsg_t *mmsg)
{
int ret = EXSUCCEED;

if (mmsg->qstate & NDRX_TMQ_LOC_CORQ)
{
/* remove from correlator Q */
tmq_cor_msg_del(qhash, mmsg);
tmq_cor_msg_del(mmsg);
}

if (mmsg->qstate & NDRX_TMQ_LOC_MSGIDHASH)
Expand All @@ -232,45 +235,48 @@ expublic int ndrx_infl_delmsg(tmq_qhash_t *qhash, tmq_memmsg_t *mmsg)

if (mmsg->qstate & NDRX_TMQ_LOC_INFL)
{
CDL_DELETE(qhash->q_infligh, mmsg);
CDL_DELETE(mmsg->qhash->q_infligh, mmsg);
}
else if (mmsg->qstate & NDRX_TMQ_LOC_FUTQ)
{
ndrx_rbt_delete(qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
ndrx_rbt_delete(mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
}

return ret;
}

/**
* Move message from future to cur or/and cor
* @param qconf queue config
* @param qhash queue hash
*/
expublic int ndrx_infl_fut2cur(tmq_qconfig_t * qconf, tmq_qhash_t *qhash)
expublic int ndrx_infl_fut2cur(tmq_qhash_t *qhash)
{
int ret = EXSUCCEED;
tmq_memmsg_t *mmsg = NULL;
int isNew = EXFALSE;

/* read from q_fut tree with smallest dec_time */
mmsg = (tmq_memmsg_t*)ndrx_rbt_leftmost(qhash->q_fut);
mmsg = (tmq_memmsg_t*)ndrx_rbt_leftmost(mmsg->qhash->q_fut);

/* enqueue to cur and cor if needed */
if ( mmsg->msg->qctl.deq_time <= (long)time(NULL) )
{
/* delete from q_fut */
ret = ndrx_infl_delmsg(qhash, mmsg);
mmsg->qstate &= ~NDRX_TMQ_LOC_CURQ;
NDRX_LOG(log_debug, "Moving [%s] from future message list",
mmsg->msgid_str);

/* remove from future */
ndrx_rbt_delete(mmsg->qhash->q_fut, (ndrx_rbt_node_t *)mmsg);
mmsg->qstate &= ~NDRX_TMQ_LOC_FUTQ;

/* insert to cur */
ndrx_rbt_insert(qhash->q, (ndrx_rbt_node_t *)mmsg, &isNew);
ndrx_rbt_insert(mmsg->qhash->q, (ndrx_rbt_node_t *)mmsg, &isNew);
mmsg->qstate |= NDRX_TMQ_LOC_CURQ;

/* do the correlator too... if needed */
if (mmsg->msg->qctl.flags & TPQCORRID)
{
/* insert to cor */
if (EXSUCCEED!=tmq_cor_msg_add(qconf, qhash, mmsg))
if (EXSUCCEED!=tmq_cor_msg_add(mmsg))
{
NDRX_LOG(log_error, "Failed to add msg [%s] to corhash!",
mmsg->msgid_str);
Expand All @@ -282,3 +288,5 @@ expublic int ndrx_infl_fut2cur(tmq_qconfig_t * qconf, tmq_qhash_t *qhash)
out:
return ret;
}

/* vim: set ts=4 sw=4 et smartindent: */

0 comments on commit 382cdb0

Please sign in to comment.