Skip to content

Commit

Permalink
mod_http2: fixes in input/output bucket handling
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741414 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
icing committed Apr 28, 2016
1 parent 0f704b9 commit 871b4a4
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 76 deletions.
12 changes: 9 additions & 3 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0

*) mod_http2: fixed a bug that caused mod_proxy_http2 to be called for window
updates on requests it had already reported done. Added synchronization
on early connection/stream close that lets ongoing requests safely drain
their input filters.
[Stefan Eissing]

*) mod_http2: scoreboard updates that summarize the h2 session (and replace
the last request information) will only happen when the session is idle or
in shutdown/done phase. [Stefan Eissing]

*) mod_http2: bucket beams now have safe mutex remove. Used for streams where
the task worker has finished and all processing happens in the same
thread again. [Stefan Eissing]
*) mod_http2: HTTP protocol string reported internally, logged and used in
SERVER_PROTOCOL changed to "HTTP/2.0" for HTTP/2 connections.
[Stefan Eissing]

*) mod_proxy, mod_ssl: Handle SSLProxy* directives in <Proxy> sections,
allowing per backend TLS configuration. [Yann Ylavic]
Expand Down
124 changes: 75 additions & 49 deletions modules/http2/h2_bucket_beam.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ struct h2_beam_proxy {
APR_RING_ENTRY(h2_beam_proxy) link;
h2_bucket_beam *beam;
apr_bucket *bred;
apr_size_t n;
};

static const char Dummy = '\0';
Expand Down Expand Up @@ -108,15 +109,16 @@ static void beam_bucket_destroy(void *data)

static apr_bucket * h2_beam_bucket_make(apr_bucket *b,
h2_bucket_beam *beam,
apr_bucket *bred)
apr_bucket *bred, apr_size_t n)
{
h2_beam_proxy *d;

d = apr_bucket_alloc(sizeof(*d), b->list);
H2_BPROXY_LIST_INSERT_TAIL(&beam->proxies, d);
d->beam = beam;
d->bred = bred;

d->n = n;

b = apr_bucket_shared_make(b, d, 0, bred? bred->length : 0);
b->type = &h2_bucket_type_beam;

Expand All @@ -125,14 +127,15 @@ static apr_bucket * h2_beam_bucket_make(apr_bucket *b,

static apr_bucket *h2_beam_bucket_create(h2_bucket_beam *beam,
apr_bucket *bred,
apr_bucket_alloc_t *list)
apr_bucket_alloc_t *list,
apr_size_t n)
{
apr_bucket *b = apr_bucket_alloc(sizeof(*b), list);

APR_BUCKET_INIT(b);
b->free = apr_bucket_free;
b->list = list;
return h2_beam_bucket_make(b, beam, bred);
return h2_beam_bucket_make(b, beam, bred, n);
}

/*static apr_status_t beam_bucket_setaside(apr_bucket *b, apr_pool_t *pool)
Expand Down Expand Up @@ -282,15 +285,10 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}

static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred)
{
APR_BUCKET_REMOVE(bred);
H2_BLIST_INSERT_TAIL(&beam->purge, bred);
}

static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
{
h2_beam_lock bl;
apr_bucket *b, *next;

if (enter_yellow(beam, &bl) == APR_SUCCESS) {
/* even when beam buckets are split, only the one where
Expand All @@ -300,8 +298,48 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
* bucket bred is about to be destroyed.
* remove it from the hold, where it should be now */
if (proxy->bred) {
h2_beam_prep_purge(beam, proxy->bred);
proxy->bred = NULL;
for (b = H2_BLIST_FIRST(&beam->hold);
b != H2_BLIST_SENTINEL(&beam->hold);
b = APR_BUCKET_NEXT(b)) {
if (b == proxy->bred) {
break;
}
}
if (b != H2_BLIST_SENTINEL(&beam->hold)) {
/* bucket is in hold as it should be, mark this one
* and all before it for purging. We might have placed meta
* buckets without a green proxy into the hold before it
* and schedule them for purging now */
for (b = H2_BLIST_FIRST(&beam->hold);
b != H2_BLIST_SENTINEL(&beam->hold);
b = next) {
next = APR_BUCKET_NEXT(b);
if (b == proxy->bred) {
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->purge, b);
break;
}
else if (APR_BUCKET_IS_METADATA(b)) {
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->purge, b);
}
else {
/* another data bucket before this one in hold. this
* is normal since DATA buckets need not be destroyed
* in order */
}
}

proxy->bred = NULL;
}
else {
/* it should be there unless we screwed up */
ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool,
APLOGNO() "h2_beam(%d-%s): emitted bucket not "
"in hold, n=%d", beam->id, beam->tag,
(int)proxy->n);
AP_DEBUG_ASSERT(!proxy->bred);
}
}
/* notify anyone waiting on space to become available */
if (!bl.mutex) {
Expand Down Expand Up @@ -344,45 +382,17 @@ static apr_status_t beam_close(h2_bucket_beam *beam)
return APR_SUCCESS;
}

static void beam_shutdown(h2_bucket_beam *beam, int disconnect)
{
if (disconnect && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
/* If we are called before all green buckets we put out
* there have been destroyed, we need to disentangle ourself.
* We NULLify the beam and red buckets in every proxy from us, so
* a) red memory is no longer read
* b) destruction of the proxy no longer calls back to this beam
* This does not protect against races when red and green thread are still
* running concurrently and it does not protect from passed out red
* memory to still being accessed.
*/
while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
H2_BPROXY_REMOVE(proxy);
proxy->beam = NULL;
if (proxy->bred) {
h2_beam_prep_purge(beam, proxy->bred);
proxy->bred = NULL;
}
}
}
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
beam_close(beam);
report_consumption(beam);
}

static apr_status_t beam_cleanup(void *data)
{
h2_bucket_beam *beam = data;

if (beam->green) {
apr_brigade_destroy(beam->green);
beam->green = NULL;
}
beam_shutdown(beam, 0);
beam_close(beam);
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
report_consumption(beam);
h2_blist_cleanup(&beam->purge);
h2_blist_cleanup(&beam->hold);

return APR_SUCCESS;
}

Expand Down Expand Up @@ -507,14 +517,29 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}

void h2_beam_shutdown(h2_bucket_beam *beam)
apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
{
apr_status_t status;
h2_beam_lock bl;

if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam_shutdown(beam, 1);
if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
beam_close(beam);
report_consumption(beam);

while (status == APR_SUCCESS
&& (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
|| (beam->green && !APR_BRIGADE_EMPTY(beam->green)))) {
if (block == APR_NONBLOCK_READ || !bl.mutex) {
status = APR_EAGAIN;
break;
}
status = wait_cond(beam, bl.mutex);
}
leave_yellow(beam, &bl);
}
return status;
}

static apr_status_t append_bucket(h2_bucket_beam *beam,
Expand Down Expand Up @@ -763,7 +788,8 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
* the red brigade.
* the beam bucket will notify us on destruction that bred is
* no longer needed. */
bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc);
bgreen = h2_beam_bucket_create(beam, bred, bb->bucket_alloc,
beam->buckets_sent++);
}

/* Place the red bucket into our hold, to be destroyed when no
Expand Down
12 changes: 9 additions & 3 deletions modules/http2/h2_bucket_beam.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ struct h2_bucket_beam {
apr_off_t sent_bytes; /* amount of bytes send */
apr_off_t received_bytes; /* amount of bytes received */
apr_off_t reported_bytes; /* amount of bytes reported as consumed */
apr_size_t buckets_sent;

unsigned int aborted : 1;
unsigned int closed : 1;
Expand Down Expand Up @@ -276,11 +277,16 @@ void h2_beam_abort(h2_bucket_beam *beam);
apr_status_t h2_beam_close(h2_bucket_beam *beam);

/**
* Empty the buffer and close.
*
* Empty any buffered data and return APR_SUCCESS when all buckets
* in transit have been handled. When called with APR_BLOCK_READ and
* with a mutex installed, will wait until this is the case. Otherwise
* APR_EAGAIN is returned.
* If a timeout is set on the beam, waiting might also time out and
* return APR_ETIMEUP.
*
* Call from the red side only.
*/
void h2_beam_shutdown(h2_bucket_beam *beam);
apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block);

void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
Expand Down
33 changes: 22 additions & 11 deletions modules/http2/h2_mplx.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,15 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events)
{
conn_rec *slave = NULL;
int reuse_slave = 0;
apr_status_t status;

/* cleanup any buffered input */
h2_task_shutdown(task);
status = h2_task_shutdown(task, 0);
if (status != APR_SUCCESS){
ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO()
"h2_task(%s): shutdown", task->id);
}

if (events) {
/* Process outstanding events before destruction */
input_consumed_signal(m, task);
Expand Down Expand Up @@ -365,14 +371,24 @@ static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error)
if (task->worker_done) {
/* already finished or not even started yet */
h2_iq_remove(m->q, task->stream_id);
task_destroy(m, task, 1);
task_destroy(m, task, 0);
return 0;
}
else {
/* cleanup once task is done */
task->orphaned = 1;
if (task->input.beam) {
h2_beam_shutdown(task->input.beam);
apr_status_t status;
status = h2_beam_shutdown(task->input.beam, APR_NONBLOCK_READ);
if (status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_stream(%ld-%d): wait on input shutdown",
m->id, task->stream_id);
status = h2_beam_shutdown(task->input.beam, APR_BLOCK_READ);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
"h2_stream(%ld-%d): input shutdown returned",
m->id, task->stream_id);
}
task->input.beam = NULL;
}
if (rst_error) {
Expand Down Expand Up @@ -471,13 +487,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
}
}

if (!h2_ihash_empty(m->tasks)) {
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c,
"h2_mplx(%ld): release_join, %d tasks still open",
m->id, (int)h2_ihash_count(m->tasks));
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy", m->id);
"h2_mplx(%ld): release_join (%d tasks left) -> destroy",
m->id, (int)h2_ihash_count(m->tasks));
leave_mutex(m, acquired);
h2_mplx_destroy(m);
/* all gone */
Expand Down Expand Up @@ -612,7 +624,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams)
else {
/* hang around until the h2_task is done, but
* shutdown input/output and send out any events asap. */
h2_task_shutdown(task);
h2_task_shutdown(task, 0);
input_consumed_signal(m, task);
}
}
Expand Down Expand Up @@ -951,7 +963,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
}

if (task->orphaned) {
/* TODO: add to purge list */
task_destroy(m, task, 0);
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
Expand Down
3 changes: 2 additions & 1 deletion modules/http2/h2_ngn_shed.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn,
ngn->no_finished++;
if (waslive) ngn->no_live--;
ngn->no_assigned--;

task->assigned = NULL;

return APR_SUCCESS;
}

Expand Down
4 changes: 2 additions & 2 deletions modules/http2/h2_session.c
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ static apr_status_t stream_release(h2_session *session,
}

return h2_conn_io_writeb(&session->io,
h2_bucket_eos_create(c->bucket_alloc, stream), 1);
h2_bucket_eos_create(c->bucket_alloc, stream), 0);
}

static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
Expand Down Expand Up @@ -618,7 +618,7 @@ static int on_send_data_cb(nghttp2_session *ngh2,
if (status == APR_SUCCESS && padlen) {
b = apr_bucket_immortal_create(immortal_zeros, padlen,
session->c->bucket_alloc);
status = h2_conn_io_writeb(&session->io, b, 1);
status = h2_conn_io_writeb(&session->io, b, 0);
}
}

Expand Down
19 changes: 13 additions & 6 deletions modules/http2/h2_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ static apr_status_t open_response(h2_task *task)
if (!response) {
/* This happens currently when ap_die(status, r) is invoked
* by a read request filter. */
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, task->c, APLOGNO(03204)
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, APLOGNO(03204)
"h2_task(%s): write without response for %s %s %s",
task->id,
task->request->method,
Expand Down Expand Up @@ -487,14 +487,21 @@ void h2_task_rst(h2_task *task, int error)
}
}

void h2_task_shutdown(h2_task *task)
apr_status_t h2_task_shutdown(h2_task *task, int block)
{
if (task->input.beam) {
h2_beam_shutdown(task->input.beam);
}
if (task->output.beam) {
h2_beam_shutdown(task->output.beam);
apr_status_t status;
status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ);
if (block && status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, task->c,
"h2_task(%s): output shutdown waiting", task->id);
status = h2_beam_shutdown(task->output.beam, APR_BLOCK_READ);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
"h2_task(%s): output shutdown done", task->id);
}
return status;
}
return APR_SUCCESS;
}

/*******************************************************************************
Expand Down
Loading

0 comments on commit 871b4a4

Please sign in to comment.