diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index 34d16013d67ce9..0fb0daae54ffd1 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -177,6 +177,7 @@ struct h3_stream_ctx { struct bufq sendbuf; /* h3 request body */ struct bufq recvbuf; /* h3 response body */ size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */ + size_t upload_blocked_len; /* the amount written last and EGAINed */ size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */ uint64_t error3; /* HTTP/3 stream error code */ curl_off_t upload_left; /* number of request bytes left to upload */ @@ -272,12 +273,12 @@ static void pktx_init(struct pkt_io_ctx *pktx, ngtcp2_path_storage_zero(&pktx->ps); } -static CURLcode cf_process_ingress(struct Curl_cfilter *cf, +static CURLcode cf_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx); +static CURLcode cf_progress_egress(struct Curl_cfilter *cf, struct Curl_easy *data, struct pkt_io_ctx *pktx); -static CURLcode cf_flush_egress(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct pkt_io_ctx *pktx); static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, uint64_t datalen, void *user_data, void *stream_user_data); @@ -985,6 +986,63 @@ static ngtcp2_callbacks ng_callbacks = { NULL, /* early_data_rejected */ }; +/** + * Connection maintenance like timeouts on packet ACKs etc. are done by us, not + * the OS like for TCP. POLL events on the socket therefore are not + * sufficient. + * ngtcp2 tells us when it wants to be invoked again. We handle that via + * the `Curl_expire()` mechanisms. + */ +static CURLcode check_and_set_expiry(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx) +{ + struct cf_ngtcp2_ctx *ctx = cf->ctx; + struct pkt_io_ctx local_pktx; + ngtcp2_tstamp expiry; + ngtcp2_duration timeout; + + if(!pktx) { + pktx_init(&local_pktx, cf, data); + pktx = &local_pktx; + } + else { + pktx->ts = timestamp(); + } + + expiry = ngtcp2_conn_get_expiry(ctx->qconn); + if(expiry != UINT64_MAX) { + if(expiry <= pktx->ts) { + CURLcode result; + int rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts); + if(rv) { + failf(data, "ngtcp2_conn_handle_expiry returned error: %s", + ngtcp2_strerror(rv)); + ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0); + return CURLE_SEND_ERROR; + } + timeout = 0; + result = cf_progress_ingress(cf, data, pktx); + if(result) + return result; + result = cf_progress_egress(cf, data, pktx); + if(result) + return result; + /* ask again, things might have changed */ + expiry = ngtcp2_conn_get_expiry(ctx->qconn); + } + + if(expiry > pktx->ts) { + timeout = expiry - pktx->ts; + if(timeout % NGTCP2_MILLISECONDS) { + timeout += NGTCP2_MILLISECONDS; + } + Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC); + } + } + return CURLE_OK; +} + static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, struct Curl_easy *data, curl_socket_t *socks) @@ -1022,7 +1080,7 @@ static void h3_drain_stream(struct Curl_cfilter *cf, (void)cf; bits = CURL_CSELECT_IN; - if(stream && !stream->send_closed && stream->upload_left) + if(stream && stream->upload_left && !stream->send_closed) bits |= CURL_CSELECT_OUT; if(data->state.dselect_bits != bits) { data->state.dselect_bits = bits; @@ -1420,7 +1478,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, report_consumed_data(cf, data, nread); } - if(cf_process_ingress(cf, data, &pktx)) { + if(cf_progress_ingress(cf, data, &pktx)) { *err = CURLE_RECV_ERROR; nread = -1; goto out; @@ -1450,10 +1508,17 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, } out: - if(cf_flush_egress(cf, data, &pktx)) { + if(cf_progress_egress(cf, data, &pktx)) { *err = CURLE_SEND_ERROR; nread = -1; } + else { + CURLcode result2 = check_and_set_expiry(cf, data, &pktx); + if(result2) { + *err = result2; + nread = -1; + } + } DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) -> %zd, %d", stream? stream->id : -1, len, nread, *err)); CF_DATA_RESTORE(cf, save); @@ -1482,10 +1547,8 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, Curl_bufq_skip(&stream->sendbuf, skiplen); stream->sendbuf_len_in_flight -= skiplen; - /* `sendbuf` *might* now have more room. If so, resume this - * possibly paused stream. And also tell our transfer engine that - * it may continue KEEP_SEND if told to PAUSE. */ - if(!Curl_bufq_is_full(&stream->sendbuf)) { + /* Everything ACKed, we resume upload processing */ + if(!stream->sendbuf_len_in_flight) { int rv = nghttp3_conn_resume_stream(conn, stream_id); if(rv) { return NGTCP2_ERR_CALLBACK_FAILURE; @@ -1644,16 +1707,19 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf, else /* data sending without specifying the data amount up front */ stream->upload_left = -1; /* unknown */ - reader.read_data = cb_h3_read_req_body; - preader = &reader; break; default: /* there is not request body */ stream->upload_left = 0; /* no request body */ - preader = NULL; break; } + stream->send_closed = (stream->upload_left == 0); + if(!stream->send_closed) { + reader.read_data = cb_h3_read_req_body; + preader = &reader; + } + rc = nghttp3_conn_submit_request(ctx->h3conn, stream->id, nva, nheader, preader, data); if(rc) { @@ -1691,6 +1757,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, ssize_t sent = 0; struct cf_call_data save; struct pkt_io_ctx pktx; + CURLcode result; CF_DATA_SAVE(save, cf, data); DEBUGASSERT(cf->connected); @@ -1699,10 +1766,10 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, pktx_init(&pktx, cf, data); *err = CURLE_OK; - if(stream && stream->closed) { - *err = CURLE_HTTP3; + result = cf_progress_ingress(cf, data, &pktx); + if(result) { + *err = result; sent = -1; - goto out; } if(!stream || stream->id < 0) { @@ -1712,32 +1779,64 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, goto out; } } + else if(stream->upload_blocked_len) { + /* the data in `buf` has alread been submitted or added to the + * buffers, but have been EAGAINed on the last invocation. */ + DEBUGASSERT(len >= stream->upload_blocked_len); + if(len < stream->upload_blocked_len) { + /* Did we get called again with a smaller `len`? This should not + * happen. We are not prepared to handle that. */ + failf(data, "HTTP/3 send again with decreased length"); + *err = CURLE_HTTP3; + sent = -1; + goto out; + } + sent = (ssize_t)stream->upload_blocked_len; + stream->upload_blocked_len = 0; + } + else if(stream->closed) { + *err = CURLE_HTTP3; + sent = -1; + goto out; + } else { sent = Curl_bufq_write(&stream->sendbuf, buf, len, err); DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send, add to " "sendbuf(len=%zu) -> %zd, %d", stream->id, len, sent, *err)); if(sent < 0) { - if(*err == CURLE_AGAIN) { - /* Can't add more to the send buf, needs to drain first. - * Pause the sending to avoid a busy loop. */ - data->req.keepon |= KEEP_SEND_HOLD; - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] pause send", - stream->id)); - } goto out; } (void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id); } - if(cf_flush_egress(cf, data, &pktx)) { - *err = CURLE_SEND_ERROR; + result = cf_progress_egress(cf, data, &pktx); + if(result) { + *err = result; sent = -1; - goto out; + } + + if(stream && sent > 0 && stream->sendbuf_len_in_flight) { + /* We have unacknowledged DATA and cannot report success to our + * caller. Instead we EAGAIN and remember how much we have already + * "written" into our various internal connection buffers. + * We put the stream upload on HOLD, until this gets ACKed. */ + stream->upload_blocked_len = sent; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu), " + "%zu bytes in flight -> EGAIN", stream->id, len, + stream->sendbuf_len_in_flight)); + *err = CURLE_AGAIN; + sent = -1; + data->req.keepon |= KEEP_SEND_HOLD; } out: + result = check_and_set_expiry(cf, data, &pktx); + if(result) { + *err = result; + sent = -1; + } CF_DATA_RESTORE(cf, save); return sent; } @@ -1837,15 +1936,15 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen, return CURLE_OK; } -static CURLcode cf_process_ingress(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct pkt_io_ctx *pktx) +static CURLcode cf_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx) { struct cf_ngtcp2_ctx *ctx = cf->ctx; struct pkt_io_ctx local_pktx; size_t pkts_chunk = 128, i; size_t pkts_max = 10 * pkts_chunk; - CURLcode result; + CURLcode result = CURLE_OK; if(!pktx) { pktx_init(&local_pktx, cf, data); @@ -1864,7 +1963,9 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, if(pktx->pkt_count < pkts_chunk) /* got less than we could */ break; /* give egress a chance before we receive more */ - result = cf_flush_egress(cf, data, pktx); + result = cf_progress_egress(cf, data, pktx); + if(result) /* error */ + break; } return result; } @@ -1976,18 +2077,15 @@ static ssize_t read_pkt_to_send(void *userp, return nwritten; } -static CURLcode cf_flush_egress(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct pkt_io_ctx *pktx) +static CURLcode cf_progress_egress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx) { struct cf_ngtcp2_ctx *ctx = cf->ctx; - int rv; ssize_t nread; size_t max_payload_size, path_max_payload_size, max_pktcnt; size_t pktcnt = 0; size_t gsolen = 0; /* this disables gso until we have a clue */ - ngtcp2_tstamp expiry; - ngtcp2_duration timeout; CURLcode curlcode; struct pkt_io_ctx local_pktx; @@ -2000,14 +2098,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, ngtcp2_path_storage_zero(&pktx->ps); } - rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts); - if(rv) { - failf(data, "ngtcp2_conn_handle_expiry returned error: %s", - ngtcp2_strerror(rv)); - ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0); - return CURLE_SEND_ERROR; - } - curlcode = vquic_flush(cf, data, &ctx->q); if(curlcode) { if(curlcode == CURLE_AGAIN) { @@ -2098,21 +2188,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, } out: - /* non-errored exit. check when we should run again. */ - expiry = ngtcp2_conn_get_expiry(ctx->qconn); - if(expiry != UINT64_MAX) { - if(expiry <= pktx->ts) { - timeout = 0; - } - else { - timeout = expiry - pktx->ts; - if(timeout % NGTCP2_MILLISECONDS) { - timeout += NGTCP2_MILLISECONDS; - } - } - Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC); - } - return CURLE_OK; } @@ -2172,11 +2247,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf, break; } case CF_CTRL_DATA_IDLE: - if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) { - if(cf_flush_egress(cf, data, NULL)) { - result = CURLE_SEND_ERROR; - } - } + result = check_and_set_expiry(cf, data, NULL); break; default: break; @@ -2398,16 +2469,16 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf, result = cf_connect_start(cf, data, &pktx); if(result) goto out; - result = cf_flush_egress(cf, data, &pktx); + result = cf_progress_egress(cf, data, &pktx); /* we do not expect to be able to recv anything yet */ goto out; } - result = cf_process_ingress(cf, data, &pktx); + result = cf_progress_ingress(cf, data, &pktx); if(result) goto out; - result = cf_flush_egress(cf, data, &pktx); + result = cf_progress_egress(cf, data, &pktx); if(result) goto out; @@ -2464,6 +2535,9 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf, r_ip, r_port, curl_easy_strerror(result)); } #endif + if(!result && ctx->qconn) { + result = check_and_set_expiry(cf, data, &pktx); + } DEBUGF(LOG_CF(data, cf, "connect -> %d, done=%d", result, *done)); CF_DATA_RESTORE(cf, save); return result; @@ -2535,7 +2609,7 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf, not in use by any other transfer, there shouldn't be any data here, only "protocol frames" */ *input_pending = FALSE; - if(cf_process_ingress(cf, data, NULL)) + if(cf_progress_ingress(cf, data, NULL)) alive = FALSE; else { alive = TRUE;