Skip to content

Commit

Permalink
On the trunk:
Browse files Browse the repository at this point in the history
mod_http2: separate mutex instances for each bucket beam, resulting in 
     less lock contention. input beams only created when necessary.



git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1784571 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
icing committed Feb 27, 2017
1 parent 89e33de commit 5f7e679
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 138 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0

*) mod_http2: separate mutex instances for each bucket beam, resulting in
less lock contention. input beams only created when necessary.
[Stefan Eissing]

*) mod_syslog: Support use of optional "tag" in syslog entries.
PR 60525. [Ben Rubson <ben.rubson gmail.com>, Jim Jagielski]

Expand Down
118 changes: 78 additions & 40 deletions modules/http2/h2_bucket_beam.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,19 @@ static apr_bucket *h2_beam_bucket(h2_bucket_beam *beam,
* bucket beam that can transport buckets across threads
******************************************************************************/

static void mutex_leave(void *ctx, apr_thread_mutex_t *lock)
{
apr_thread_mutex_unlock(lock);
}

static apr_status_t mutex_enter(void *ctx, h2_beam_lock *pbl)
{
h2_bucket_beam *beam = ctx;
pbl->mutex = beam->lock;
pbl->leave = mutex_leave;
return apr_thread_mutex_lock(pbl->mutex);
}

static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
h2_beam_mutex_enter *enter = beam->m_enter;
Expand Down Expand Up @@ -227,26 +240,37 @@ static apr_off_t bucket_mem_used(apr_bucket *b)
}
}

static int report_consumption(h2_bucket_beam *beam)
static int report_consumption(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
int rv = 0;
if (beam->cons_io_cb) {
beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
- beam->cons_bytes_reported);
apr_off_t len = beam->received_bytes - beam->cons_bytes_reported;
h2_beam_io_callback *cb = beam->cons_io_cb;

if (cb) {
void *ctx = beam->cons_ctx;

if (pbl) leave_yellow(beam, pbl);
cb(ctx, beam, len);
if (pbl) enter_yellow(beam, pbl);
rv = 1;
}
beam->cons_bytes_reported = beam->received_bytes;
beam->cons_bytes_reported += len;
return rv;
}

static void report_prod_io(h2_bucket_beam *beam, int force)
static void report_prod_io(h2_bucket_beam *beam, int force, h2_beam_lock *pbl)
{
if (force || beam->prod_bytes_reported != beam->sent_bytes) {
if (beam->prod_io_cb) {
beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
- beam->prod_bytes_reported);
apr_off_t len = beam->sent_bytes - beam->prod_bytes_reported;
if (force || len > 0) {
h2_beam_io_callback *cb = beam->prod_io_cb;
if (cb) {
void *ctx = beam->prod_ctx;

leave_yellow(beam, pbl);
cb(ctx, beam, len);
enter_yellow(beam, pbl);
}
beam->prod_bytes_reported = beam->sent_bytes;
beam->prod_bytes_reported += len;
}
}

Expand Down Expand Up @@ -293,10 +317,10 @@ static apr_size_t calc_space_left(h2_bucket_beam *beam)
static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
{
if (beam->timeout > 0) {
return apr_thread_cond_timedwait(beam->m_cond, lock, beam->timeout);
return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout);
}
else {
return apr_thread_cond_wait(beam->m_cond, lock);
return apr_thread_cond_wait(beam->cond, lock);
}
}

Expand All @@ -307,7 +331,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
while (!beam->aborted && *premain <= 0
&& (block == APR_BLOCK_READ) && pbl->mutex) {
apr_status_t status;
report_prod_io(beam, 1);
report_prod_io(beam, 1, pbl);
status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
Expand Down Expand Up @@ -378,8 +402,8 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
if (!bl.mutex) {
r_purge_sent(beam);
}
else if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
else if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
leave_yellow(beam, &bl);
}
Expand All @@ -399,8 +423,8 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
{
if (!beam->closed) {
beam->closed = 1;
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
}
return APR_SUCCESS;
Expand Down Expand Up @@ -445,7 +469,7 @@ static apr_status_t beam_send_cleanup(void *data)
/* sender is going away, clear up all references to its memory */
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
report_consumption(beam);
report_consumption(beam, NULL);
while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
H2_BPROXY_REMOVE(proxy);
Expand Down Expand Up @@ -555,10 +579,16 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *pool,
H2_BPROXY_LIST_INIT(&beam->proxies);
beam->tx_mem_limits = 1;
beam->max_buf_size = max_buf_size;
apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);

*pbeam = beam;

status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT,
pool);
if (status == APR_SUCCESS) {
status = apr_thread_cond_create(&beam->cond, pool);
if (status == APR_SUCCESS) {
apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
*pbeam = beam;
}
}
return status;
}

Expand Down Expand Up @@ -586,19 +616,27 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)

void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
apr_thread_cond_t *cond,
void *m_ctx)
{
h2_beam_lock bl;

if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->m_enter = m_enter;
beam->m_ctx = m_ctx;
beam->m_cond = cond;
leave_yellow(beam, &bl);
}
}

void h2_beam_mutex_enable(h2_bucket_beam *beam)
{
h2_beam_mutex_set(beam, mutex_enter, beam);
}

void h2_beam_mutex_disable(h2_bucket_beam *beam)
{
h2_beam_mutex_set(beam, NULL, NULL);
}

void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
h2_beam_lock bl;
Expand Down Expand Up @@ -630,10 +668,10 @@ void h2_beam_abort(h2_bucket_beam *beam)
beam->aborted = 1;
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
report_consumption(beam);
report_consumption(beam, &bl);
}
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
leave_yellow(beam, &bl);
}
Expand All @@ -646,7 +684,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_sent(beam);
beam_close(beam);
report_consumption(beam);
report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
Expand Down Expand Up @@ -680,8 +718,8 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block)
status = APR_EAGAIN;
break;
}
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
status = wait_cond(beam, bl.mutex);
}
Expand Down Expand Up @@ -868,12 +906,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
b = APR_BRIGADE_FIRST(sender_bb);
status = append_bucket(beam, b, block, &bl);
}
report_prod_io(beam, force_report);
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
report_prod_io(beam, force_report, &bl);
if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
}
report_consumption(beam);
report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return status;
Expand Down Expand Up @@ -1040,24 +1078,24 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
}

if (transferred) {
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
status = APR_SUCCESS;
}
else if (beam->closed) {
status = APR_EOF;
}
else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) {
status = wait_cond(beam, bl.mutex);
if (status != APR_SUCCESS) {
goto leave;
}
goto transfer;
}
else {
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
if (beam->cond) {
apr_thread_cond_broadcast(beam->cond);
}
status = APR_EAGAIN;
}
Expand Down Expand Up @@ -1198,7 +1236,7 @@ int h2_beam_report_consumption(h2_bucket_beam *beam)
h2_beam_lock bl;
int rv = 0;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
rv = report_consumption(beam);
rv = report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
return rv;
Expand Down
7 changes: 5 additions & 2 deletions modules/http2/h2_bucket_beam.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,10 @@ struct h2_bucket_beam {
unsigned int close_sent : 1;
unsigned int tx_mem_limits : 1; /* only memory size counts on transfers */

struct apr_thread_mutex_t *lock;
struct apr_thread_cond_t *cond;
void *m_ctx;
h2_beam_mutex_enter *m_enter;
struct apr_thread_cond_t *m_cond;

apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
h2_beam_ev_callback *cons_ev_cb;
Expand Down Expand Up @@ -315,9 +316,11 @@ apr_status_t h2_beam_wait_empty(h2_bucket_beam *beam, apr_read_type_e block);

void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
struct apr_thread_cond_t *cond,
void *m_ctx);

void h2_beam_mutex_enable(h2_bucket_beam *beam);
void h2_beam_mutex_disable(h2_bucket_beam *beam);

/**
* Set/get the timeout for blocking read/write operations. Only works
* if a mutex has been set for the beam.
Expand Down
Loading

0 comments on commit 5f7e679

Please sign in to comment.