Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update upstream serially between all processes in dyups module #1780

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion modules/ngx_http_upstream_dyups_module/ngx_http_dyups_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ typedef struct {
typedef struct {
ngx_flag_t enable;
ngx_flag_t trylock;
ngx_flag_t serial_update;
ngx_array_t dy_upstreams;/* ngx_http_dyups_srv_conf_t */
ngx_str_t shm_name;
ngx_uint_t shm_size;
Expand Down Expand Up @@ -72,13 +73,15 @@ typedef struct ngx_dyups_shctx_s {
ngx_queue_t msg_queue;
ngx_uint_t version;
ngx_dyups_status_t *status;
ngx_atomic_uint_t flag;
} ngx_dyups_shctx_t;


typedef struct ngx_dyups_global_ctx_s {
ngx_event_t msg_timer;
ngx_slab_pool_t *shpool;
ngx_dyups_shctx_t *sh;
ngx_int_t workers;
} ngx_dyups_global_ctx_t;


Expand Down Expand Up @@ -216,6 +219,13 @@ static ngx_command_t ngx_http_dyups_commands[] = {
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_dyups_main_conf_t, trylock),
NULL },

{ ngx_string("dyups_serial_update_upstream"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_HTTP_MAIN_CONF_OFFSET,
offsetof(ngx_http_dyups_main_conf_t, serial_update),
NULL },

ngx_null_command
};
Expand Down Expand Up @@ -318,6 +328,7 @@ ngx_http_dyups_create_main_conf(ngx_conf_t *cf)
dmcf->read_msg_timeout = NGX_CONF_UNSET_MSEC;
dmcf->read_msg_log = NGX_CONF_UNSET;
dmcf->trylock = NGX_CONF_UNSET;
dmcf->serial_update = NGX_CONF_UNSET;

return dmcf;
}
Expand Down Expand Up @@ -349,6 +360,10 @@ ngx_http_dyups_init_main_conf(ngx_conf_t *cf, void *conf)
if (dmcf->shm_size == NGX_CONF_UNSET_UINT) {
dmcf->shm_size = 2 * 1024 * 1024;
}

if (dmcf->serial_update == NGX_CONF_UNSET) {
dmcf->serial_update = 0;
}

return ngx_http_dyups_init_shm(cf, conf);
}
Expand Down Expand Up @@ -427,6 +442,7 @@ ngx_http_dyups_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)

sh->version = 0;
sh->status = NULL;
sh->flag = 0;

return NGX_OK;
}
Expand Down Expand Up @@ -562,6 +578,8 @@ ngx_http_dyups_init_process(ngx_cycle_t *cycle)
}

ngx_http_dyups_api_enable = 1;

ngx_dyups_global_ctx.workers = ccf->worker_processes;

timer = &ngx_dyups_global_ctx.msg_timer;
ngx_memzero(timer, sizeof(ngx_event_t));
Expand Down Expand Up @@ -2063,9 +2081,13 @@ ngx_http_dyups_read_msg(ngx_event_t *ev)
ngx_slab_pool_t *shpool;
ngx_http_dyups_srv_conf_t *duscfs, *duscf;
ngx_http_dyups_main_conf_t *dmcf;
ngx_dyups_shctx_t *sh;
ngx_uint_t workers;

dmcf = ev->data;
shpool = ngx_dyups_global_ctx.shpool;
sh = ngx_dyups_global_ctx.sh;
workers = ngx_dyups_global_ctx.workers;

count = 0;
s_count = 0;
Expand Down Expand Up @@ -2096,7 +2118,16 @@ ngx_http_dyups_read_msg(ngx_event_t *ev)
}

#if (NGX_HTTP_UPSTREAM_CHECK)
if (!ngx_shmtx_trylock(&shpool->mutex)) {
if (dmcf->serial_update) {
if (sh->flag % workers != ngx_worker) {
ngx_log_error(NGX_LOG_DEBUG, ev->log, 0,
"[dyups] ngx_pid: %P, ngx_worker: %d, sh->flag: %d, not meet lock condition", ngx_pid, ngx_worker, sh->flag);
ngx_dyups_add_timer(ev, dmcf->read_msg_timeout);
return;
}
ngx_shmtx_lock(&shpool->mutex);
}
else if (!ngx_shmtx_trylock(&shpool->mutex)) {
goto finish;
}
#else
Expand All @@ -2108,6 +2139,14 @@ ngx_http_dyups_read_msg(ngx_event_t *ev)
ngx_shmtx_unlock(&shpool->mutex);

#if (NGX_HTTP_UPSTREAM_CHECK)
if (dmcf->serial_update) {
ngx_atomic_fetch_add(&sh->flag, 1);

ngx_atomic_cmp_set(&sh->flag, workers, 0);

ngx_log_error(NGX_LOG_DEBUG, ev->log, 0,
"[dyups] ngx_pid: %P, ngx_worker: %d, sh->flag: %d, finish read msg under locking condition", ngx_pid, ngx_worker, sh->flag);
}
finish:
#endif
ngx_dyups_add_timer(ev, dmcf->read_msg_timeout);
Expand All @@ -2129,13 +2168,15 @@ ngx_http_dyups_read_msg_locked(ngx_event_t *ev)
ngx_dyups_msg_t *msg;
ngx_dyups_shctx_t *sh;
ngx_dyups_status_t *status;
ngx_http_dyups_main_conf_t *dmcf;

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,
"[dyups] read msg %P", ngx_pid);

ccf = (ngx_core_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
ngx_core_module);

dmcf = ev->data;
sh = ngx_dyups_global_ctx.sh;
shpool = ngx_dyups_global_ctx.shpool;

Expand Down