Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 113 additions & 77 deletions lib/vquic/curl_quiche.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,66 @@ static void h3_stream_hash_free(curl_off_t id, void *stream)
h3_stream_ctx_free((struct stream_ctx *)stream);
}

static void check_resumes(struct Curl_cfilter *cf,
struct Curl_easy *data)
typedef bool cf_quiche_svisit(struct Curl_cfilter *cf,
struct Curl_easy *sdata,
struct stream_ctx *stream,
void *user_data);

struct cf_quiche_visit_ctx {
struct Curl_cfilter *cf;
struct Curl_multi *multi;
cf_quiche_svisit *cb;
void *user_data;
};

static bool cf_quiche_stream_do(curl_off_t mid, void *val, void *user_data)
{
struct cf_quiche_visit_ctx *vctx = user_data;
struct stream_ctx *stream = val;
struct Curl_easy *sdata = Curl_multi_get_handle(vctx->multi, mid);
if(sdata)
return vctx->cb(vctx->cf, sdata, stream, vctx->user_data);
return TRUE;
}

static void cf_quiche_for_all_streams(struct Curl_cfilter *cf,
struct Curl_multi *multi,
cf_quiche_svisit *do_cb,
void *user_data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct Curl_llist_node *e;
struct cf_quiche_visit_ctx vctx;
vctx.cf = cf;
vctx.multi = multi;
vctx.cb = do_cb;
vctx.user_data = user_data;
Curl_hash_offt_visit(&ctx->streams, cf_quiche_stream_do, &vctx);
}

DEBUGASSERT(data->multi);
for(e = Curl_llist_head(&data->multi->process); e; e = Curl_node_next(e)) {
struct Curl_easy *sdata = Curl_node_elem(e);
if(sdata->conn == data->conn) {
struct stream_ctx *stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->quic_flow_blocked) {
stream->quic_flow_blocked = FALSE;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
CURL_TRC_CF(data, cf, "[%"FMT_PRIu64"] unblock", stream->id);
}
}
static bool cf_quiche_do_resume(struct Curl_cfilter *cf,
struct Curl_easy *sdata,
struct stream_ctx *stream,
void *user_data)
{
(void)user_data;
if(stream->quic_flow_blocked) {
stream->quic_flow_blocked = FALSE;
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
CURL_TRC_CF(sdata, cf, "[%"FMT_PRIu64"] unblock", stream->id);
}
return TRUE;
}

static bool cf_quiche_do_expire(struct Curl_cfilter *cf,
struct Curl_easy *sdata,
struct stream_ctx *stream,
void *user_data)
{
(void)stream;
(void)user_data;
CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
return TRUE;
}

static CURLcode h3_data_setup(struct Curl_cfilter *cf,
Expand Down Expand Up @@ -285,52 +327,12 @@ static void h3_drain_stream(struct Curl_cfilter *cf,
}
}

static struct Curl_easy *get_stream_easy(struct Curl_cfilter *cf,
struct Curl_easy *data,
curl_uint64_t stream_id,
struct stream_ctx **pstream)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream;

(void)cf;
stream = H3_STREAM_CTX(ctx, data);
if(stream && stream->id == stream_id) {
*pstream = stream;
return data;
}
else {
struct Curl_llist_node *e;
DEBUGASSERT(data->multi);
for(e = Curl_llist_head(&data->multi->process); e; e = Curl_node_next(e)) {
struct Curl_easy *sdata = Curl_node_elem(e);
if(sdata->conn != data->conn)
continue;
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->id == stream_id) {
*pstream = stream;
return sdata;
}
}
}
*pstream = NULL;
return NULL;
}

static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct Curl_llist_node *e;

DEBUGASSERT(data->multi);
CURL_TRC_CF(data, cf, "conn closed, expire all transfers");
for(e = Curl_llist_head(&data->multi->process); e; e = Curl_node_next(e)) {
struct Curl_easy *sdata = Curl_node_elem(e);
if(sdata == data || sdata->conn != data->conn)
continue;
CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
}
cf_quiche_for_all_streams(cf, data->multi, cf_quiche_do_expire, NULL);
}

/*
Expand Down Expand Up @@ -557,14 +559,48 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
return result;
}

static CURLcode cf_quiche_ev_process(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct stream_ctx *stream,
quiche_h3_event *ev)
{
CURLcode result = h3_process_event(cf, data, stream, ev);
h3_drain_stream(cf, data);
if(result)
CURL_TRC_CF(data, cf, "error processing event %s "
"for [%"FMT_PRIu64"] -> %d", cf_ev_name(ev),
stream->id, result);
return result;
}

struct cf_quich_disp_ctx {
curl_uint64_t stream_id;
struct Curl_cfilter *cf;
struct Curl_multi *multi;
quiche_h3_event *ev;
CURLcode result;
};

static bool cf_quiche_disp_event(curl_off_t mid, void *val, void *user_data)
{
struct cf_quich_disp_ctx *dctx = user_data;
struct stream_ctx *stream = val;

if(stream->id == dctx->stream_id) {
struct Curl_easy *sdata = Curl_multi_get_handle(dctx->multi, mid);
if(sdata)
dctx->result = cf_quiche_ev_process(dctx->cf, sdata, stream, dctx->ev);
return FALSE; /* stop iterating */
}
return TRUE;
}

static CURLcode cf_poll_events(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = NULL;
struct Curl_easy *sdata;
quiche_h3_event *ev;
CURLcode result;

/* Take in the events and distribute them to the transfers. */
while(ctx->h3c) {
Expand All @@ -576,28 +612,27 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
CURL_TRC_CF(data, cf, "error poll: %"FMT_PRId64, stream3_id);
return CURLE_HTTP3;
}

sdata = get_stream_easy(cf, data, stream3_id, &stream);
if(!sdata || !stream) {
CURL_TRC_CF(data, cf, "discard event %s for unknown [%"FMT_PRId64"]",
cf_ev_name(ev), stream3_id);
}
else {
result = h3_process_event(cf, sdata, stream, ev);
h3_drain_stream(cf, sdata);
if(result) {
CURL_TRC_CF(data, cf, "error processing event %s "
"for [%"FMT_PRIu64"] -> %d", cf_ev_name(ev),
stream3_id, result);
if(data == sdata) {
/* Only report this error to the caller if it is about the
* transfer we were called with. Otherwise we fail a transfer
* due to a problem in another one. */
quiche_h3_event_free(ev);
struct cf_quich_disp_ctx dctx;
dctx.stream_id = (curl_uint64_t)stream3_id;
dctx.cf = cf;
dctx.multi = data->multi;
dctx.ev = ev;
dctx.result = CURLE_OK;
stream = H3_STREAM_CTX(ctx, data);
if(stream && stream->id == dctx.stream_id) {
/* event for calling transfer */
CURLcode result = cf_quiche_ev_process(cf, data, stream, ev);
quiche_h3_event_free(ev);
if(result)
return result;
}
}
quiche_h3_event_free(ev);
else {
/* another transfer, do not return errors, as they are not for
* the calling transfer */
Curl_hash_offt_visit(&ctx->streams, cf_quiche_disp_event, &dctx);
quiche_h3_event_free(ev);
}
}
}
return CURLE_OK;
Expand Down Expand Up @@ -686,7 +721,8 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
if(rctx.pkts > 0) {
/* quiche digested ingress packets. It might have opened flow control
* windows again. */
check_resumes(cf, data);
DEBUGASSERT(data->multi);
cf_quiche_for_all_streams(cf, data->multi, cf_quiche_do_resume, NULL);
}
return cf_poll_events(cf, data);
}
Expand Down