Skip to content

Commit

Permalink
v1.8.8 faster reaction to new request/responses
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Eissing committed Jan 13, 2017
1 parent e740921 commit 9e1d7d7
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 167 deletions.
5 changes: 5 additions & 0 deletions ChangeLog
@@ -1,3 +1,8 @@
v1.8.8
--------------------------------------------------------------------------------
* streaming of request output now reacts timely to data
from other streams becoming available. Same for new incoming requests.

v1.8.7
--------------------------------------------------------------------------------
* fix for possible page fault when stream is resumed during session shutdown.
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Expand Up @@ -14,7 +14,7 @@
#

AC_PREREQ([2.69])
AC_INIT([mod_http2], [1.8.7], [stefan.eissing@greenbytes.de])
AC_INIT([mod_http2], [1.8.8], [stefan.eissing@greenbytes.de])

LT_PREREQ([2.2.6])
LT_INIT()
Expand Down
80 changes: 54 additions & 26 deletions mod_http2/h2_bucket_beam.c
Expand Up @@ -14,6 +14,7 @@
*/

#include <apr_lib.h>
#include <apr_atomic.h>
#include <apr_strings.h>
#include <apr_time.h>
#include <apr_buckets.h>
Expand Down Expand Up @@ -243,25 +244,29 @@ static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
}
}

static void report_consumption(h2_bucket_beam *beam, int force)
static int report_consumption(h2_bucket_beam *beam)
{
if (force || beam->received_bytes != beam->reported_consumed_bytes) {
if (beam->consumed_fn) {
beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
- beam->reported_consumed_bytes);
int rv = 0;
if (apr_atomic_read32(&beam->cons_ev_pending)) {
if (beam->cons_io_cb) {
beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
- beam->cons_bytes_reported);
rv = 1;
}
beam->reported_consumed_bytes = beam->received_bytes;
beam->cons_bytes_reported = beam->received_bytes;
apr_atomic_set32(&beam->cons_ev_pending, 0);
}
return rv;
}

static void report_production(h2_bucket_beam *beam, int force)
static void report_prod_io(h2_bucket_beam *beam, int force)
{
if (force || beam->sent_bytes != beam->reported_produced_bytes) {
if (beam->produced_fn) {
beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
- beam->reported_produced_bytes);
if (force || beam->prod_bytes_reported != beam->sent_bytes) {
if (beam->prod_io_cb) {
beam->prod_io_cb(beam->prod_ctx, beam, beam->sent_bytes
- beam->prod_bytes_reported);
}
beam->reported_produced_bytes = beam->sent_bytes;
beam->prod_bytes_reported = beam->sent_bytes;
}
}

Expand Down Expand Up @@ -322,7 +327,7 @@ static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
while (!beam->aborted && *premain <= 0
&& (block == APR_BLOCK_READ) && pbl->mutex) {
apr_status_t status;
report_production(beam, 1);
report_prod_io(beam, 1);
status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
Expand Down Expand Up @@ -453,7 +458,7 @@ static apr_status_t beam_send_cleanup(void *data)
/* sender has gone away, clear up all references to its memory */
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
report_consumption(beam, 0);
report_consumption(beam);
while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
H2_BPROXY_REMOVE(proxy);
Expand Down Expand Up @@ -634,7 +639,7 @@ void h2_beam_abort(h2_bucket_beam *beam)
beam->aborted = 1;
r_purge_sent(beam);
h2_blist_cleanup(&beam->send_list);
report_consumption(beam, 0);
report_consumption(beam);
}
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
Expand All @@ -650,7 +655,7 @@ apr_status_t h2_beam_close(h2_bucket_beam *beam)
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_sent(beam);
beam_close(beam);
report_consumption(beam, 0);
report_consumption(beam);
leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
Expand Down Expand Up @@ -851,12 +856,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
b = APR_BRIGADE_FIRST(red_brigade);
status = append_bucket(beam, b, block, &bl);
}
report_production(beam, force_report);
report_prod_io(beam, force_report);
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
}
report_consumption(beam, 0);
report_consumption(beam);
leave_yellow(beam, &bl);
}
return status;
Expand All @@ -872,6 +877,7 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
int transferred = 0;
apr_status_t status = APR_SUCCESS;
apr_off_t remain = readbytes;
int transferred_buckets = 0;

/* Called from the green thread to take buckets from the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
Expand Down Expand Up @@ -968,6 +974,8 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
APR_BUCKET_REMOVE(bred);
H2_BLIST_INSERT_TAIL(&beam->hold_list, bred);
beam->received_bytes += bred->length;
++transferred_buckets;

if (bgreen) {
APR_BRIGADE_INSERT_TAIL(bb, bgreen);
remain -= bgreen->length;
Expand Down Expand Up @@ -1000,6 +1008,13 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
}
}

if (transferred_buckets > 0) {
apr_atomic_set32(&beam->cons_ev_pending, 1);
if (beam->cons_ev_cb) {
beam->cons_ev_cb(beam->cons_ctx, beam);
}
}

if (beam->closed
&& (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
&& H2_BLIST_EMPTY(&beam->send_list)) {
Expand Down Expand Up @@ -1042,25 +1057,25 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
}

void h2_beam_on_consumed(h2_bucket_beam *beam,
h2_beam_io_callback *cb, void *ctx)
h2_beam_ev_callback *ev_cb,
h2_beam_io_callback *io_cb, void *ctx)
{
h2_beam_lock bl;

if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->consumed_fn = cb;
beam->consumed_ctx = ctx;
beam->cons_ev_cb = ev_cb;
beam->cons_io_cb = io_cb;
beam->cons_ctx = ctx;
leave_yellow(beam, &bl);
}
}

void h2_beam_on_produced(h2_bucket_beam *beam,
h2_beam_io_callback *cb, void *ctx)
h2_beam_io_callback *io_cb, void *ctx)
{
h2_beam_lock bl;

if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->produced_fn = cb;
beam->produced_ctx = ctx;
beam->prod_io_cb = io_cb;
beam->prod_ctx = ctx;
leave_yellow(beam, &bl);
}
}
Expand Down Expand Up @@ -1173,3 +1188,16 @@ int h2_beam_no_files(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
return 0;
}

int h2_beam_report_consumption(h2_bucket_beam *beam)
{
if (apr_atomic_read32(&beam->cons_ev_pending)) {
h2_beam_lock bl;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
int rv = report_consumption(beam);
leave_yellow(beam, &bl);
return rv;
}
}
return 0;
}

42 changes: 30 additions & 12 deletions mod_http2/h2_bucket_beam.h
Expand Up @@ -154,6 +154,7 @@ typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);

typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
apr_off_t bytes);
typedef void h2_beam_ev_callback(void *ctx, h2_bucket_beam *beam);

typedef struct h2_beam_proxy h2_beam_proxy;
typedef struct {
Expand Down Expand Up @@ -205,12 +206,17 @@ struct h2_bucket_beam {
h2_beam_mutex_enter *m_enter;
struct apr_thread_cond_t *m_cond;

apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */
h2_beam_io_callback *consumed_fn;
void *consumed_ctx;
apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */
h2_beam_io_callback *produced_fn;
void *produced_ctx;
apr_uint32_t cons_ev_pending; /* != 0, consumer event pending */
apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
h2_beam_ev_callback *cons_ev_cb;
h2_beam_io_callback *cons_io_cb;
void *cons_ctx;

apr_uint32_t prod_ev_pending; /* != 0, producer event pending */
apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */
h2_beam_io_callback *prod_io_cb;
void *prod_ctx;

h2_beam_can_beam_callback *can_beam_fn;
void *can_beam_ctx;
};
Expand Down Expand Up @@ -336,26 +342,38 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
* amount of bytes that have been consumed by the receiver, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
* @param cb the callback or NULL
* @param ev_cb the callback or NULL, called when bytes are consumed
* @param io_cb the callback or NULL, called on sender with bytes consumed
* @param ctx the context to use in callback invocation
*
* Call from the sender side, callbacks invoked on sender side.
* Call from the sender side, io callbacks invoked on sender side, ev callback
* from any side.
*/
void h2_beam_on_consumed(h2_bucket_beam *beam,
h2_beam_io_callback *cb, void *ctx);
h2_beam_ev_callback *ev_cb,
h2_beam_io_callback *io_cb, void *ctx);

/**
* Call any registered consumed handler, if any changes have happened
* since the last invocation.
* @return !=0 iff a handler has been called
*
* Needs to be invoked from the sending side.
*/
int h2_beam_report_consumption(h2_bucket_beam *beam);

/**
* Register a callback to be invoked on the receiver side with the
* amount of bytes that have been produces by the sender, since the
* last callback invocation or reset.
* @param beam the beam to set the callback on
* @param cb the callback or NULL
* @param io_cb the callback or NULL, called on receiver with bytes produced
* @param ctx the context to use in callback invocation
*
* Call from the receiver side, callbacks invoked on receiver side.
* Call from the receiver side, callbacks invoked on either side.
*/
void h2_beam_on_produced(h2_bucket_beam *beam,
h2_beam_io_callback *cb, void *ctx);
h2_beam_io_callback *io_cb, void *ctx);

void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx);
Expand Down
20 changes: 2 additions & 18 deletions mod_http2/h2_config.c
Expand Up @@ -64,7 +64,6 @@ static h2_config defconf = {
0, /* copy files across threads */
NULL, /* push list */
0, /* early hints, http status 103 */
2, /* TLS records flush count */
};

void h2_config_init(apr_pool_t *pool)
Expand Down Expand Up @@ -100,7 +99,6 @@ static void *h2_config_create(apr_pool_t *pool,
conf->copy_files = DEF_VAL;
conf->push_list = NULL;
conf->early_hints = DEF_VAL;
conf->tls_flush_count = DEF_VAL;
return conf;
}

Expand Down Expand Up @@ -153,7 +151,6 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
n->push_list = add->push_list? add->push_list : base->push_list;
}
n->early_hints = H2_CONFIG_GET(add, base, early_hints);
n->tls_flush_count = H2_CONFIG_GET(add, base, tls_flush_count);
return n;
}

Expand Down Expand Up @@ -211,8 +208,6 @@ apr_int64_t h2_config_geti64(const h2_config *conf, h2_config_var_t var)
return H2_CONFIG_GET(conf, &defconf, copy_files);
case H2_CONF_EARLY_HINTS:
return H2_CONFIG_GET(conf, &defconf, early_hints);
case H2_CONF_TLS_FLUSH_COUNT:
return H2_CONFIG_GET(conf, &defconf, tls_flush_count);
default:
return DEF_VAL;
}
Expand Down Expand Up @@ -314,7 +309,7 @@ static const char *h2_conf_set_stream_max_mem_size(cmd_parms *parms,
static const char *h2_add_alt_svc(cmd_parms *parms,
void *arg, const char *value)
{
if (value && strlen(value)) {
if (value && *value) {
h2_config *cfg = (h2_config *)h2_config_sget(parms->server);
h2_alt_svc *as = h2_alt_svc_parse(value, parms->pool);
if (!as) {
Expand Down Expand Up @@ -412,7 +407,7 @@ static const char *h2_conf_add_push_priority(cmd_parms *cmd, void *_cfg,
h2_priority *priority;
int weight;

if (!strlen(ctype)) {
if (!*ctype) {
return "1st argument must be a mime-type, like 'text/css' or '*'";
}

Expand Down Expand Up @@ -510,15 +505,6 @@ static const char *h2_conf_set_tls_cooldown_secs(cmd_parms *parms,
return NULL;
}

static const char *h2_conf_set_tls_flush_count(cmd_parms *parms,
void *arg, const char *value)
{
h2_config *cfg = (h2_config *)h2_config_sget(parms->server);
cfg->tls_flush_count = (int)apr_atoi64(value);
(void)arg;
return NULL;
}

static const char *h2_conf_set_push_diary_size(cmd_parms *parms,
void *arg, const char *value)
{
Expand Down Expand Up @@ -657,8 +643,6 @@ const command_rec h2_cmds[] = {
RSRC_CONF, "number of bytes on TLS connection before doing max writes"),
AP_INIT_TAKE1("H2TLSCoolDownSecs", h2_conf_set_tls_cooldown_secs, NULL,
RSRC_CONF, "seconds of idle time on TLS before shrinking writes"),
AP_INIT_TAKE1("H2TLSFlushCount", h2_conf_set_tls_flush_count, NULL,
RSRC_CONF, "number of max TLS records before output is flushed"),
AP_INIT_TAKE1("H2Push", h2_conf_set_push, NULL,
RSRC_CONF, "off to disable HTTP/2 server push"),
AP_INIT_TAKE23("H2PushPriority", h2_conf_add_push_priority, NULL,
Expand Down
2 changes: 0 additions & 2 deletions mod_http2/h2_config.h
Expand Up @@ -42,7 +42,6 @@ typedef enum {
H2_CONF_PUSH_DIARY_SIZE,
H2_CONF_COPY_FILES,
H2_CONF_EARLY_HINTS,
H2_CONF_TLS_FLUSH_COUNT,
} h2_config_var_t;

struct apr_hash_t;
Expand Down Expand Up @@ -80,7 +79,6 @@ typedef struct h2_config {
int copy_files; /* if files shall be copied vs setaside on output */
apr_array_header_t *push_list;/* list of h2_push_res configurations */
int early_hints; /* support status code 103 */
int tls_flush_count; /* max # of TLS records until output flushed */
} h2_config;


Expand Down

0 comments on commit 9e1d7d7

Please sign in to comment.