Skip to content

HTTP/2 protocol deadlock when using flow control and when streams have very different data rates #16955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
lf- opened this issue Apr 4, 2025 · 2 comments
Assignees
Labels

Comments

@lf-
Copy link

lf- commented Apr 4, 2025

I did this

You can clone the whole repro from https://gist.github.com/lf-/276cb01858d894f4946787f69254a923.

Upstream bug report: https://git.lix.systems/lix-project/lix/issues/662

This is the bug I promised to report after #16280 was fixed.

What is going wrong?

Curl fails to send window size updates when large differences exist in HTTP/2 stream throughput when using pauses.
This then presents as the transfer getting deadlocked, since Curl fails to request more data with more window size.

Symptoms

The first transfer finishes, the second transfer has a bunch of bytes buffered up after its initial window size, and the receiver is slowly drip-feeding them out.

<...>
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
curl: [0-0] [MULTI] [PERFORMING] multi_perform(running=2)
curl: [0-0] [TCP] recv(len=5) -> 5, err=0
curl: [0-0] [SSL] ossl_bio_cf_in_read(len=5) -> 5, err=0
curl: [0-0] [TCP] recv(len=33) -> 33, err=0
curl: [0-0] [SSL] ossl_bio_cf_in_read(len=33) -> 33, err=0
curl: [0-0] [SSL] cf_recv(len=16384) -> 16, 0
curl: [0-0] [HTTP/2] [0] ingress: read 16 bytes
curl: [0-0] [WRITE] [OUT] wrote 7 body bytes -> 7
curl: [0-0] [WRITE] [PAUSE] writing 7/7 bytes of type 1 -> 0
curl: [0-0] [WRITE] download_write body(type=1, blen=7) -> 0
curl: [0-0] [WRITE] client_write(type=1, len=7) -> 0
curl: [0-0] [WRITE] xfer_write_resp(len=7, eos=0) -> 0
curl: [0-0] [HTTP/2] [1] <- FRAME[DATA, len=7, eos=1, padlen=0]
curl: [0-0] [HTTP/2] [1] DATA, window=7/10485760
curl: [0-0] [HTTP/2] [1] DRAIN select_bits=1
curl: [0-0] [HTTP/2] [1] CLOSED
curl: [0-0] [HTTP/2] [1] DRAIN select_bits=1
curl: [0-0] [HTTP/2] [0] progress ingress: inbufg=0
curl: [0-0] [HTTP/2] [1] DRAIN select_bits=1
curl: [0-0] [HTTP/2] [0] progress ingress: done
curl: [0-0] [HTTP/2] [1] returning CLOSE
curl: [0-0] [HTTP/2] handle_stream_close -> 0, 0
curl: [0-0] [HTTP/2] [1] cf_recv(len=16384) -> 0 0, window=-1/-1, connection 933230227/1048576000
curl: [0-0] sendrecv_dl: we are done
curl: [0-0] nread == 0, stream closed, bailing
curl: [0-0] [WRITE] [PAUSE] writing 0/0 bytes of type 81 -> 0
curl: [0-0] [WRITE] download_write body(type=81, blen=0) -> 0
curl: [0-0] [WRITE] client_write(type=81, len=0) -> 0
curl: [0-0] [WRITE] xfer_write_resp(len=0, eos=1) -> 0
curl: [0-0] [MULTI] [PERFORMING] -> [DONE] (line 1856)
curl: [0-0] [MULTI] [DONE] multi_done: status: 0 prem: 0 done: 0
curl: [0-0] [WRITE] [OUT] done
curl: [0-0] [READ] client_reset, clear readers
curl: [0-x] [MULTI] [DONE] Connection still in use 1, no more multi_done now!
curl: [0-x] [MULTI] [DONE] -> [COMPLETED] (line 2471)
curl: [0-x] [MULTI] [COMPLETED] Expire cleared
curl: [0-x] [MULTI] [COMPLETED] -> [MSGSENT] (line 2566)
finished download of 'https://localhost:9999/foo'; curl status = 0, body = 104860013d bytes
curl: [0-x] [MULTI] [COMPLETED] removed, transfers=1
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[], timeouts=0, paused 0/1 (r/w)
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
!! UNPAUSE !! 0x5555c91e9d20 uri = https://localhost:9999/foo
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[], timeouts=0, paused 0/1 (r/w)
!! UNPAUSE !! 0x5555c91e9760 uri = https://localhost:9999/bar
curl: [1-0] [HTTP/2] [3] DRAIN select_bits=1
curl: [1-0] [HTTP/2] [3] stream now unpaused
curl: [1-0] [WRITE] [OUT] unpause
curl: [1-0] [WRITE] [OUT] paused, buffering 16384 more bytes (16384/67108864)

First one done!

<...>

curl: [1-0] [WRITE] [PAUSE] flushed 16384/901066 bytes, type=1 -> 0
!! PAUSE !!https://localhost:9999/bar
curl: [1-0] [WRITE] [OUT] wrote 16384 body bytes -> 268435457
curl: [1-0] [WRITE] [OUT] PAUSE requested by client
curl: [1-0] [WRITE] [OUT] paused, buffering 16384 more bytes (0/67108864)
curl: [1-0] [WRITE] [PAUSE] flushed 16384/884682 bytes, type=1 -> 0
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[], timeouts=0, paused 0/1 (r/w)
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[], timeouts=0, paused 0/1 (r/w)
!! UNPAUSE !! 0x5555c91e9760 uri = https://localhost:9999/bar
curl: [1-0] [HTTP/2] [3] DRAIN select_bits=1
curl: [1-0] [HTTP/2] [3] stream now unpaused
curl: [1-0] [WRITE] [OUT] unpause
curl: [1-0] [WRITE] [OUT] paused, buffering 16384 more bytes (16384/67108864)
curl: [1-0] [WRITE] [OUT] wrote 16384 body bytes -> 16384
curl: [1-0] [WRITE] [OUT] wrote 16384 body bytes -> 16384
curl: [1-0] [WRITE] [PAUSE] flushed 16384/868298 bytes, type=1 -> 0

<...>

curl: [1-0] [WRITE] [PAUSE] flushed 16384/65482 bytes, type=1 -> 0
curl: [1-0] [WRITE] [OUT] wrote 16384 body bytes -> 16384
curl: [1-0] [WRITE] [PAUSE] flushed 16384/49098 bytes, type=1 -> 0
curl: [1-0] [WRITE] [OUT] wrote 16384 body bytes -> 16384
curl: [1-0] [WRITE] [PAUSE] flushed 16384/32714 bytes, type=1 -> 0
curl: [1-0] [WRITE] [OUT] wrote 16330 body bytes -> 16330
curl: [1-0] [WRITE] [PAUSE] flushed 16330/16330 bytes, type=1 -> 0
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [TCP] recv(len=5) -> -1, err=81
curl: [1-0] [SSL] ossl_bio_cf_in_read(len=5) -> -1, err=81
curl: [1-0] [SSL] cf_recv(len=16384) -> -1, 81
curl: [1-0] [HTTP/2] [0] progress ingress: done
curl: [1-0] [HTTP/2] [3] cf_recv(len=16384) -> -1 81, window=10485760/10485760, connection 933230227/1048576000
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[fd=5 IN], timeouts=0
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [TCP] recv(len=5) -> -1, err=81
curl: [1-0] [SSL] ossl_bio_cf_in_read(len=5) -> -1, err=81
curl: [1-0] [SSL] cf_recv(len=16384) -> -1, 81
curl: [1-0] [HTTP/2] [0] progress ingress: done
curl: [1-0] [HTTP/2] [3] cf_recv(len=16384) -> -1 81, window=10485760/10485760, connection 933230227/1048576000
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[fd=5 IN], timeouts=0
!! UNPAUSE !! 0x5555c91e9760 uri = https://localhost:9999/bar
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [TCP] recv(len=5) -> -1, err=81
curl: [1-0] [SSL] ossl_bio_cf_in_read(len=5) -> -1, err=81
curl: [1-0] [SSL] cf_recv(len=16384) -> -1, 81
curl: [1-0] [HTTP/2] [0] progress ingress: done
curl: [1-0] [HTTP/2] [3] cf_recv(len=16384) -> -1 81, window=10485760/10485760, connection 933230227/1048576000
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[fd=5 IN], timeouts=0
!! UNPAUSE !! 0x5555c91e9760 uri = https://localhost:9999/bar

At this point it's stuck and will not make any forward progress:

curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [TCP] recv(len=5) -> -1, err=81
curl: [1-0] [SSL] ossl_bio_cf_in_read(len=5) -> -1, err=81
curl: [1-0] [SSL] cf_recv(len=16384) -> -1, 81
curl: [1-0] [HTTP/2] [0] progress ingress: done
curl: [1-0] [HTTP/2] [3] cf_recv(len=16384) -> -1 81, window=10485760/10485760, connection 933230227/1048576000
curl: [1-0] [MULTI] [PERFORMING] multi_wait pollset[fd=5 IN], timeouts=0
curl: [1-0] [MULTI] [PERFORMING] multi_perform(running=1)
curl: [1-0] [TCP] recv(len=5) -> -1, err=81
curl: [1-0] [SSL] ossl_bio_cf_in_read(len=5) -> -1, err=81
curl: [1-0] [SSL] cf_recv(len=16384) -> -1, 81
curl: [1-0] [HTTP/2] [0] progress ingress: done
curl: [1-0] [HTTP/2] [3] cf_recv(len=16384) -> -1 81, window=10485760/10485760, connection 933230227/1048576000

The streams have received the following approximate amounts of data at time of deadlock:

(rr) p nix::recvd1
$1 = 104860013
(rr) p nix::recvd2
$2 = 10485760

I have captured a packet capture of the problem with SSLKEYLOGFILE and tcpdump and observed that there are numerous WINDOW_UPDATE frames for stream 1 and only one (at the very beginning) for stream 3 (the slow one).

That is to say, stream 3 has more data to read from the server, but curl didn't ask the server for it and is sitting twiddling its thumbs hoping someone, anyone, will give it some data.

Reproducer

This is a reproducer using a hacked up version of the Lix HTTP library.
There are two streams being fetched via the same connection, the latter of which is much much slower to read.
They are both fetching the same 100MB file of urandom garbage.
The data being fetched is zstd encoded (unsure if this matters; curl compression support is disabled for testing purposes).

mkdir trash
dd if=/dev/urandom of=trash/foo bs=1M count=1024
ln -s foo trash/bar

Caddyfile:

{
    local_certs
    skip_install_trust
    auto_https disable_redirects
    debug
}
:9999 {
    bind 127.0.0.1
    tls internal {
        on_demand
    }

    root * ./trash
    file_server

    encode {
        zstd
        match {
            header Content-Type *
        }
    }
}

Short version of the reproducer:

    auto ft = makeCurlFileTransfer2(0);
    auto a = ft->download("https://localhost:9999/foo");
    auto b = ft->download("https://localhost:9999/bar");

    auto af = std::async(std::launch::async, [&] {
        char c[1024];
        for (;;) {
            recvd1 += a->read(c, sizeof(c));
        }
    });
    auto bf = std::async(std::launch::async, [&] {
        char c[1024];
        for (;;) {
            recvd2 += b->read(c, sizeof(c));
            usleep(1000);
        }
    });

    return 0;

For the runnable version, see repro.cc and Makefile; it has been checked to build with Clang 18 on NixOS.
I am deeply sorry about submitting 600 lines of C++ of repro.
I cut down the lines by about 50% over the original code, and it is now self-contained.
More reduction would take rewriting it and that would probably make the code harder to deal with for debugging.

If you need more reduction, I can do it, but it will take a while.

Set up the Caddy (2.8.4 was used) server as above (mkdir trash, dd some randomness), then run it:

caddy run --config Caddyfile

Run the repro:

$ make repro
$ SSLKEYLOGFILE=keys.log LD_PRELOAD=/path/to/curl/lib/libcurl.so CURL_DEBUG=all ./repro

If you don't want to copy paste this stuff, you can git clone https://gist.github.com/lf-/276cb01858d894f4946787f69254a923 repro/

Makefile
CXXFLAGS = -std=c++20 -g -O2
CXXFLAGS += $(shell pkg-config --cflags libcurl)
CC = $(CXX)
LDFLAGS += $(shell pkg-config --libs libcurl)
repro: repro.o
repro.cc
#include <cstring>
#include <curl/curl.h>
#include <iostream>
#include <inttypes.h>
#include <future>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <vector>
#include <unistd.h>

#include <cstdlib>
#include <list>
#include <mutex>
#include <condition_variable>
#include <cassert>
#include <optional>
#include <utility>

/**
 * Abstract source of binary data.
 */
struct Source
{
    virtual ~Source() {}

    /**
     * Store exactly ‘len’ bytes in the buffer pointed to by ‘data’.
     * It blocks until all the requested data is available, or throws
     * an error if it is not going to be available.
     */
    void operator()(char * data, size_t len);

    /**
     * Store up to ‘len’ in the buffer pointed to by ‘data’, and
     * return the number of bytes stored.  It blocks until at least
     * one byte is available.
     *
     * Should not return 0 (generally you want to throw EndOfFile), but nothing
     * stops that.
     *
     * \throws EndOfFile if there is no more data.
     */
    virtual size_t read(char * data, size_t len) = 0;

    virtual bool good()
    {
        return true;
    }
};

/**
 * This template class ensures synchronized access to a value of type
 * T. It is used as follows:
 *
 *   struct Data { int x; ... };
 *
 *   Sync<Data> data;
 *
 *   {
 *     auto data_(data.lock());
 *     data_->x = 123;
 *   }
 *
 * Here, "data" is automatically unlocked when "data_" goes out of
 * scope.
 */
template<class T, class M = std::mutex>
class Sync
{
private:
    M mutex;
    T data;

public:

    Sync() {}
    Sync(const T & data) : data(data) {}
    Sync(T && data) noexcept : data(std::move(data)) {}

    template<typename... Args>
    Sync(std::in_place_t, Args &&... args) : data(std::forward<Args>(args)...)
    {
    }

    class Lock
    {
    protected:
        // Non-owning pointer. This would be an
        // optional<reference_wrapper<Sync>> if it didn't break gdb accessing
        // Lock values (as of 2024-06-15, gdb 14.2)
        Sync * s;
        std::unique_lock<M> lk;
        friend Sync;
        Lock(Sync & s) : s(&s), lk(s.mutex) {}
        Lock(Sync & s, std::unique_lock<M> lk) : s(&s), lk(std::move(lk)) {}

        inline void checkLockingInvariants()
        {
            assert(s);
            assert(lk.owns_lock());
        }

    public:
        Lock(Lock && l) : s(l.s), lk(std::move(l.lk))
        {
            l.s = nullptr;
        }

        Lock(const Lock & l) = delete;

        ~Lock() = default;

        T * operator->()
        {
            checkLockingInvariants();
            return &s->data;
        }

        T & operator*()
        {
            checkLockingInvariants();
            return s->data;
        }

        /**
         * Wait for the given condition variable with no timeout.
         *
         * May spuriously wake up.
         */
        void wait(std::condition_variable & cv)
        {
            checkLockingInvariants();
            cv.wait(lk);
        }
    };

    /**
     * Lock this Sync and return a RAII guard object.
     */
    Lock lock()
    {
        return Lock(*this);
    }

    std::optional<Lock> tryLock()
    {
        if (std::unique_lock lk(mutex, std::try_to_lock_t{}); lk.owns_lock()) {
            return Lock{*this, std::move(lk)};
        } else {
            return std::nullopt;
        }
    }
};

struct curlFileTransfer2
{
    std::unique_ptr<CURLM, decltype([](auto * m) { curl_multi_cleanup(m); })> curlm;

    const unsigned int baseRetryTimeMs;

    void wakeup()
    {
        if (auto mc = curl_multi_wakeup(curlm.get())) {
            std::ostringstream err;
            err << "unexpected error from curl_multi_wakeup(): " << curl_multi_strerror(mc);
            throw std::runtime_error(err.str());
        }
    }

    void stopWorkerThread()
    {
        /* Signal the worker thread to exit. */
        {
            auto state(state_.lock());
            state->quit = true;
        }
        wakeup();
    }

    struct TransferItem
    {
        struct DownloadState
        {
            bool done = false;
            std::exception_ptr exc;
            std::string data;
        };

        std::string uri;
        Sync<DownloadState> downloadState;
        std::condition_variable downloadEvent;
        bool headersDone = false, metadataReturned = false;
        std::promise<void> metadataPromise;

        uint64_t bodySize = 0;

        std::unique_ptr<curl_slist, decltype([](auto * s) { curl_slist_free_all(s); })>
            requestHeaders;
        std::unique_ptr<CURL, decltype([](auto * c) { curl_easy_cleanup(c); })> req;
        // buffer to accompany the `req` above
        char errbuf[CURL_ERROR_SIZE];

        inline static const std::set<long>
            successfulStatuses{200, 201, 204, 206, 304, 0 /* other protocol */};
        void appendCurlHeader(std::string_view name, std::string_view value)
        {
            std::ostringstream ss;
            ss << name << ": " << value;
            auto header = ss.str();
            if (auto next = curl_slist_append(requestHeaders.get(), header.c_str())) {
                (void) requestHeaders.release(); // next now owns this pointer
                requestHeaders.reset(next);
            } else {
                abort();
            }
        }

        TransferItem(const std::string & uri) : uri(uri), req(curl_easy_init())
        {
            if (req == nullptr) {
                abort();
            }

            curl_easy_setopt(req.get(), CURLOPT_VERBOSE, 1);
            curl_easy_setopt(req.get(), CURLOPT_DEBUGFUNCTION, TransferItem::debugCallback);

            curl_easy_setopt(req.get(), CURLOPT_URL, uri.c_str());
            curl_easy_setopt(req.get(), CURLOPT_FOLLOWLOCATION, 1L);

            curl_easy_setopt(
                req.get(), CURLOPT_ACCEPT_ENCODING, nullptr
            ); // Disable internal handling
            appendCurlHeader("Accept-Encoding", "zstd");

            curl_easy_setopt(req.get(), CURLOPT_MAXREDIRS, 10);
            curl_easy_setopt(req.get(), CURLOPT_NOSIGNAL, 1);
            curl_easy_setopt(req.get(), CURLOPT_PIPEWAIT, 1);
            curl_easy_setopt(req.get(), CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
            curl_easy_setopt(req.get(), CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
            curl_easy_setopt(req.get(), CURLOPT_WRITEDATA, this);

            curl_easy_setopt(req.get(), CURLOPT_NOPROGRESS, 1);

            curl_easy_setopt(req.get(), CURLOPT_ERRORBUFFER, errbuf);
            errbuf[0] = 0;

            curl_easy_setopt(req.get(), CURLOPT_PROTOCOLS_STR, "http,https,ftp,ftps");

            curl_easy_setopt(req.get(), CURLOPT_HTTPHEADER, requestHeaders.get());
            // This is a repro, we don't need any of that "security"
            curl_easy_setopt(req.get(), CURLOPT_SSL_VERIFYPEER, 0);
        }

        void maybeFinishSetup()
        {
            if (headersDone) {
                return;
            }

            metadataPromise.set_value();
            metadataReturned = true;

            headersDone = true;
        }

        std::exception_ptr callbackException;

        size_t writeCallback(void * contents, size_t size, size_t nmemb)
        {
            const size_t realSize = size * nmemb;

            try {
                maybeFinishSetup();

                auto state = downloadState.lock();

                // when the buffer is full (as determined by a historical magic value) we
                // pause the transfer and wait for the receiver to unpause it when ready.
                if (state->data.size() > 1024 * 1024) {
                    std::cout << "!! PAUSE !!" << this->uri << "\n";
                    return CURL_WRITEFUNC_PAUSE;
                }

                state->data.append(static_cast<const char *>(contents), realSize);
                downloadEvent.notify_all();
                bodySize += realSize;
                return realSize;
            } catch (...) {
                callbackException = std::current_exception();
                return CURL_WRITEFUNC_ERROR;
            }
        }

        static size_t writeCallbackWrapper(void * contents, size_t size, size_t nmemb, void * userp)
        {
            return static_cast<TransferItem *>(userp)->writeCallback(contents, size, nmemb);
        }

        static int
        debugCallback(CURL * handle, curl_infotype type, char * data, size_t size, void * userptr)
        {
            if (type == CURLINFO_TEXT) {
                std::cout << "curl: " << std::string(data, size);
            }
            return 0;
        }

        void finish(CURLcode code)
        {
            maybeFinishSetup();

            printf(
                "finished download of '%s'; curl status = %d, body = %" PRIu64 "d bytes\n",
                uri.c_str(),
                code,
                bodySize
            );

            if (callbackException) {
                abort();
            }

            else if (code == CURLE_OK)
            {
                downloadState.lock()->done = true;
                downloadEvent.notify_all();
            } else {
                std::cout << "foo\n";
            }
        }
    };

    void unpause(const std::shared_ptr<TransferItem> & transfer)
    {
        auto lock = state_.lock();
        lock->unpause.push_back(transfer);
        wakeup();
    }

    struct State
    {
        bool quit = false;
        std::vector<std::shared_ptr<TransferItem>> incoming;
        std::vector<std::shared_ptr<TransferItem>> unpause;
    };

    Sync<State> state_;

    std::thread workerThread;

    void workerThreadEntry()
    {
        workerThreadMain();
        exit(1);
    }

    curlFileTransfer2(unsigned int baseRetryTimeMs)
        : curlm(curl_multi_init())
        , baseRetryTimeMs(baseRetryTimeMs)
    {
        if (curlm == nullptr) {
            abort();
        }

        static std::once_flag globalInit;
        std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);

        curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
        curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS, 25);

        workerThread = std::thread([&]() { workerThreadEntry(); });
    }

    ~curlFileTransfer2()
    {
        stopWorkerThread();
        workerThread.join();
    }

    void workerThreadMain()
    {
        std::map<CURL *, std::shared_ptr<TransferItem>> items;

        bool quit = false;

        while (true) {
            /* Let curl do its thing. */
            int running;
            CURLMcode mc = curl_multi_perform(curlm.get(), &running);
            if (mc != CURLM_OK) {
                std::ostringstream err;
                err << "unexpected error from curl_multi_perform():" << curl_multi_strerror(mc);
                throw std::runtime_error(err.str());
            }

            /* Set the promises of any finished requests. */
            CURLMsg * msg;
            int left;
            while ((msg = curl_multi_info_read(curlm.get(), &left))) {
                if (msg->msg == CURLMSG_DONE) {
                    auto i = items.find(msg->easy_handle);
                    assert(i != items.end());
                    i->second->finish(msg->data.result);
                    curl_multi_remove_handle(curlm.get(), i->second->req.get());
                    items.erase(i);
                }
            }

            // only exit when all transfers are done (which will happen through the
            // progress callback issuing an abort in the case of user interruption)
            if (items.empty() && quit) {
                break;
            }

            /* Wait for activity, including wakeup events. */
            mc = curl_multi_poll(curlm.get(), nullptr, 0, 1000, nullptr);
            if (mc != CURLM_OK) {
                std::ostringstream err;
                err << "unexpected error from curl_multi_poll(): " << curl_multi_strerror(mc);
                throw std::runtime_error(err.str());
            }

            /* Add new curl requests from the incoming requests queue,
               except for requests that are embargoed (waiting for a
               retry timeout to expire). */

            std::vector<std::shared_ptr<TransferItem>> incoming;

            {
                auto unpause = [&] { return std::move(state_.lock()->unpause); }();
                for (auto & item : unpause) {
                    std::cout << "!! UNPAUSE !! " << item.get() << " uri = " << item->uri << "\n";
                    curl_easy_pause(item->req.get(), CURLPAUSE_CONT);
                }
            }

            {
                auto state(state_.lock());
                incoming = std::move(state->incoming);
                quit = state->quit;
            }

            for (auto & item : incoming) {
                std::cout << "starting download of " << item->uri << "\n";
                curl_multi_add_handle(curlm.get(), item->req.get());
                items[item->req.get()] = item;
            }
        }

        std::cout << "download thread shutting down\n";
    }

    void enqueueItem(std::shared_ptr<TransferItem> item)
    {
        {
            auto state(state_.lock());
            if (state->quit) {
                abort();
            }
            state->incoming.push_back(item);
        }
        wakeup();
    }

    std::unique_ptr<Source> enqueueFileTransfer(const std::string & uri)
    {
        auto source = std::make_unique<TransferSource>(*this, uri);
        source->awaitData();
        return std::move(source);
    }

    struct TransferSource : Source
    {
        curlFileTransfer2 & parent;
        std::string uri;

        std::shared_ptr<TransferItem> transfer;
        std::string chunk;
        std::string_view buffered;

        TransferSource(curlFileTransfer2 & parent, const std::string & uri)
            : parent(parent)
            , uri(uri)
        {
            startTransfer(uri);
        }

        void startTransfer(const std::string & uri)
        {
            transfer = std::make_shared<TransferItem>(uri);
            parent.enqueueItem(transfer);
            return transfer->metadataPromise.get_future().get();
        }

        bool awaitData()
        {
            auto waitForData = [&] {
                /* Grab data if available, otherwise wait for the download
                   thread to wake us up. */
                while (buffered.empty()) {
                    auto state(transfer->downloadState.lock());

                    if (!state->data.empty()) {
                        chunk = std::move(state->data);
                        buffered = chunk;
                        parent.unpause(transfer);
                    } else if (state->done) {
                        return false;
                    } else {
                        parent.unpause(transfer);
                        state.wait(transfer->downloadEvent);
                    }
                }

                return true;
            };
            return waitForData();
        }

        size_t read(char * data, size_t len) override
        {
            size_t total = 0;
            while (total < len && awaitData()) {
                const auto available = std::min(len - total, buffered.size());
                memcpy(data + total, buffered.data(), available);
                buffered.remove_prefix(available);
                total += available;
            }

            if (total == 0) {
                throw std::runtime_error("transfer finished");
            }

            return total;
        }
    };

    std::unique_ptr<Source> download(const std::string & uri)
    {
        return enqueueFileTransfer(uri);
    }
};

std::shared_ptr<curlFileTransfer2> makeCurlFileTransfer2(std::optional<unsigned int> baseRetryTimeMs
)
{
    return std::make_shared<curlFileTransfer2>(baseRetryTimeMs.value_or(250));
}

volatile size_t recvd1 = 0;
volatile size_t recvd2 = 0;

int main()
{
    auto ft = makeCurlFileTransfer2(0);
    auto a = ft->download("https://localhost:9999/foo");
    auto b = ft->download("https://localhost:9999/bar");

    auto af = std::async(std::launch::async, [&] {
        char c[1024];
        for (;;) {
            recvd1 += a->read(c, sizeof(c));
        }
    });
    auto bf = std::async(std::launch::async, [&] {
        char c[1024];
        for (;;) {
            recvd2 += b->read(c, sizeof(c));
            usleep(1000);
        }
    });

    return 0;
}

I expected the following

The second transfer of bar should not get stuck partially completed with no window size remaining.

curl/libcurl version

COMMIT ID: 8a45c28, which was HEAD as of the time of writing

WARNING: this libcurl is Debug-enabled, do not use in production

curl 8.13.0-DEV (x86_64-pc-linux-gnu) libcurl/8.13.0-DEV OpenSSL/3.4.1 zlib/1.3.1 brotli/1.1.0 zstd/1.5.6 libidn2/2.3.7 libpsl/0.21.5 libssh2
/1.11.1 nghttp2/1.64.0
Release-Date: [unreleased]
Protocols: dict file ftp ftps gopher gophers http https imap imaps ipfs ipns mqtt pop3 pop3s rtsp scp sftp smb smbs smtp smtps telnet tftp
Features: alt-svc AsynchDNS brotli Debug GSS-API HSTS HTTP2 HTTPS-proxy IDN IPv6 Kerberos Largefile libz NTLM PSL SPNEGO SSL threadsafe TLS-SRP TrackMemory UnixSockets zstd

Here is the way to obtain an approximately byte-identical Curl that I used using Nix, if necessary: put this in a file, then nix-build file.nix -A all. The results will be in result* symlinks. If you have to debug a Nix build of Curl, use NIX_DEBUG_INFO_DIRS=result-debug/lib/debug

let
  pkgs =
    import
      (builtins.fetchTarball "https://github.com/nixos/nixpkgs/archive/a18002797a7128bfc247a090b6c2349b4a1877f7.tar.gz")
      { };
in
pkgs.curl.overrideAttrs (old: {
  # src = builtins.fetchGit ./.;
  src = builtins.fetchTarball "https://github.com/curl/curl/archive/8a45c2851aeb6f3ec18ad5c39c4042ab516891dd.tar.gz";
  patches = [ ];
  postPatch = null;
  nativeBuildInputs = old.nativeBuildInputs or [ ] ++ [
    pkgs.buildPackages.autoreconfHook
    pkgs.buildPackages.updateAutotoolsGnuConfigScriptsHook
  ];
  configureFlags = old.configureFlags or [ ] ++ [
    "--enable-debug"
  ];
  env.CFLAGS = "-O2 -g";
  preConfigure = ''
    sed -e 's|/usr/bin|/no-such-path|g' -i.bak configure
    patchShebangs scripts/
  '';
})

operating system

Linux nucury 6.12.19 #1-NixOS SMP PREEMPT_DYNAMIC Thu Mar 13 12:02:20 UTC 2025 x86_64 GNU/Linux

@vszakats vszakats added the HTTP/2 label Apr 4, 2025
@icing icing self-assigned this Apr 4, 2025
@icing
Copy link
Contributor

icing commented Apr 4, 2025

I think I can reproduce in a local test case. The snag is that when the transfer gets paused and the HTTP/2 stream window size gets exhausted, the unpausing does not send a window update.

By default, curl's stream window size is 10 MB, which is in your example the data received by the stalled stream.

Working on a fix...

@icing
Copy link
Contributor

icing commented Apr 4, 2025

I am pretty confident that #16960 should solve this issue. Can you verify this in your setup? Thanks.

@bagder bagder closed this as completed in 5fbd78e Apr 5, 2025
nbaws pushed a commit to nbaws/curl that referenced this issue Apr 26, 2025
When pausing a HTTP/2 transfer, the stream's local window size
is reduced to 0 to prevent the server from sending further data
which curl cannot write out to the application.

When unpausing again, the stream's window size was not correctly
increased again. The attempt to trigger a window update was
ignored by nghttp2, the server never received it and the transfer
stalled.

Add a debug feature to allow use of small window sizes which
reproduces this bug in test_02_21.

Fixes curl#16955
Closes curl#16960
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

3 participants