-
-
Notifications
You must be signed in to change notification settings - Fork 7.1k
Closed
Labels
Description
I did this
I wrote the following program which at the high level does the following:
- fire two requests to the same url concurrently
- pause each of them after 16mb are received
- fire two requests to the same url concurrently 4s after the first request
- pause them after 16mb are received
- cancel requests fired in 3. 4s after they were started
- cancel requests fired in 1. 11s after they were started
Essentially it tries to cancel requests after they were paused.
#include <curl/curl.h>
#include <event2/event.h>
#include <iostream>
#include <list>
#include <sstream>
#include <vector>
namespace {
constexpr const char* kUrl =
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/"
"BigBuckBunny.mp4";
constexpr int kRequestCount = 2;
constexpr int kHttpVersion = CURL_HTTP_VERSION_2;
void TimeoutEvent(evutil_socket_t, short, void* handle);
template <typename T>
T* CheckNotNull(T* p) {
if (!p) {
throw std::runtime_error("Unexpected null pointer.");
}
return p;
}
void Check(int code) {
if (code < 0) {
std::stringstream stream;
stream << "Unexpected error code: " << code;
throw std::runtime_error(stream.str().c_str());
}
}
void Check(CURLcode code) {
if (code != CURLE_OK) {
throw std::runtime_error(curl_easy_strerror(code));
}
}
void Check(CURLMcode code) {
if (code != CURLM_OK) {
throw std::runtime_error(curl_multi_strerror(code));
}
}
struct CurlMultiDeleter {
void operator()(CURLM* handle) const noexcept {
Check(curl_multi_cleanup(handle));
}
};
struct CurlHandleDeleter {
void operator()(CURL* handle) const noexcept {
Check(curl_multi_remove_handle(multi_handle, handle));
curl_easy_cleanup(handle);
}
CURLM* multi_handle;
};
struct EventBaseDeleter {
void operator()(event_base* event_loop) const noexcept {
event_base_free(event_loop);
}
};
struct EventDeleter {
void operator()(event* event) const noexcept {
Check(event_del(event));
event_free(event);
}
};
class CurlGlobalInitializer {
public:
CurlGlobalInitializer() { Check(curl_global_init(CURL_GLOBAL_DEFAULT)); }
~CurlGlobalInitializer() noexcept { curl_global_cleanup(); }
};
struct MultiContext {
std::unique_ptr<CURLM, CurlMultiDeleter> handle{
CheckNotNull(curl_multi_init())};
event_base* event_loop;
std::unique_ptr<event, EventDeleter> timeout_event{
CheckNotNull(event_new(event_loop, /*fd=*/-1,
/*events=*/0, TimeoutEvent, handle.get()))};
};
struct HandleContext {
std::unique_ptr<CURL, CurlHandleDeleter> handle;
size_t bytes_read = 0;
int resume_count = 0;
event_base* event_loop;
MultiContext* context;
};
auto MakeHandle(CURLM* multi_handle) {
CURL* handle = CheckNotNull(curl_easy_init());
if (CURLMcode code = curl_multi_add_handle(multi_handle, handle);
code != CURLM_OK) {
curl_easy_cleanup(handle);
throw std::runtime_error(curl_multi_strerror(code));
}
return std::unique_ptr<CURL, CurlHandleDeleter>(
handle, CurlHandleDeleter{multi_handle});
}
void ProcessEvents(CURLM* multi_handle) {
CURLMsg* message;
do {
int message_count;
message = curl_multi_info_read(multi_handle, &message_count);
if (message && message->msg == CURLMSG_DONE) {
std::cerr << "TRANSFER DONE\n";
}
} while (message != nullptr);
}
size_t WriteCallback(char* /*ptr*/, size_t size, size_t nmemb, void* userdata) {
auto* context = static_cast<HandleContext*>(userdata);
context->bytes_read += size * nmemb;
if (context->bytes_read >= 16 * 1024 * 1024) {
context->bytes_read = 0;
std::cerr << "PAUSING " << context << '\n';
return CURL_WRITEFUNC_PAUSE;
}
return size * nmemb;
}
void TimeoutEvent(evutil_socket_t, short, void* handle) {
int running_handles;
Check(curl_multi_socket_action(handle, CURL_SOCKET_TIMEOUT, 0,
&running_handles));
ProcessEvents(handle);
}
void SocketEvent(evutil_socket_t fd, short event, void* handle) {
int running_handles;
Check(
curl_multi_socket_action(handle, fd,
((event & EV_READ) ? CURL_CSELECT_IN : 0) |
((event & EV_WRITE) ? CURL_CSELECT_OUT : 0),
&running_handles));
ProcessEvents(handle);
}
int SocketCallback(CURL*, curl_socket_t socket, int what, void* userp,
void* socketp) {
auto* context = reinterpret_cast<MultiContext*>(userp);
if (what == CURL_POLL_REMOVE) {
auto* data = reinterpret_cast<event*>(socketp);
if (data) {
event_free(data);
}
} else {
auto* data = reinterpret_cast<event*>(socketp);
if (data) {
event_free(data);
}
data = CheckNotNull(
event_new(context->event_loop, socket,
static_cast<short>(((what & CURL_POLL_IN) ? EV_READ : 0) |
((what & CURL_POLL_OUT) ? EV_WRITE : 0) |
EV_PERSIST),
SocketEvent, context->handle.get()));
Check(curl_multi_assign(context->handle.get(), socket, data));
Check(event_add(data, /*timeout=*/nullptr));
}
return 0;
}
int TimerCallback(CURLM*, long timeout_ms, void* userp) {
auto* http = reinterpret_cast<MultiContext*>(userp);
if (timeout_ms == -1) {
Check(event_del(http->timeout_event.get()));
} else {
timeval tv = {
.tv_sec = static_cast<decltype(tv.tv_sec)>(timeout_ms / 1000),
.tv_usec = static_cast<decltype(tv.tv_usec)>(timeout_ms % 1000 * 1000)};
Check(event_add(http->timeout_event.get(), &tv));
}
return 0;
}
} // namespace
int main() {
CurlGlobalInitializer initializer;
std::cerr << curl_version() << '\n';
std::unique_ptr<event_base, EventBaseDeleter> event_loop(
CheckNotNull(event_base_new()));
MultiContext context{.event_loop = event_loop.get()};
Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETFUNCTION,
SocketCallback));
Check(curl_multi_setopt(context.handle.get(), CURLMOPT_SOCKETDATA, &context));
Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERFUNCTION,
TimerCallback));
Check(curl_multi_setopt(context.handle.get(), CURLMOPT_TIMERDATA, &context));
std::list<HandleContext> handle_contexts{kRequestCount};
for (auto& handle_context : handle_contexts) {
handle_context.context = &context;
handle_context.event_loop = event_loop.get();
CURL* handle =
(handle_context.handle = MakeHandle(context.handle.get())).get();
Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
Check(curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
}
struct Data {
MultiContext* context;
event_base* event_loop;
std::list<HandleContext>* handle_contexts;
} data = {.context = &context,
.event_loop = event_loop.get(),
.handle_contexts = &handle_contexts};
{
timeval tv{.tv_sec = 3};
Check(event_base_once(
event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
[](evutil_socket_t, short, void* userdata) {
auto* context = static_cast<Data*>(userdata);
for (int i = 0; i < kRequestCount; i++) {
HandleContext& handle_context =
context->handle_contexts->emplace_back();
handle_context.context = context->context;
handle_context.event_loop = context->event_loop;
CURL* handle = (handle_context.handle =
MakeHandle(context->context->handle.get()))
.get();
Check(curl_easy_setopt(handle, CURLOPT_URL, kUrl));
Check(
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, WriteCallback));
Check(curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handle_context));
Check(curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, kHttpVersion));
timeval tv{.tv_sec = 4};
Check(event_base_once(
context->event_loop, /*fd=*/-1, /*events=*/EV_TIMEOUT,
[](evutil_socket_t, short, void* userdata) {
auto* context = static_cast<HandleContext*>(userdata);
std::cerr << "TIMEOUT DELAYED TRANSFER " << context << '\n';
Check(curl_multi_remove_handle(context->context->handle.get(),
context->handle.get()));
},
&handle_context, &tv));
}
},
&data, &tv));
}
for (auto& handle_context : handle_contexts) {
timeval tv{.tv_sec = 11};
Check(event_base_once(
event_loop.get(), /*fd=*/-1, /*events=*/EV_TIMEOUT,
[](evutil_socket_t, short, void* userdata) {
auto* context = static_cast<HandleContext*>(userdata);
std::cerr << "TIMEOUT TRANSFER " << context << '\n';
Check(curl_multi_remove_handle(context->context->handle.get(),
context->handle.get()));
},
&handle_context, &tv));
}
Check(event_base_loop(event_loop.get(), /*flags=*/0));
return 0;
}This results in the following memory leak when running with address sanitizer:
Detected memory leaks
Direct leak of 1048656 byte(s) in 1 object(s) allocated from:
at 0x563f7e2385ea __interceptor_realloc.part.0 (asan_malloc_linux.cpp.o)
at 0x563f7e293621 curl_dbg_realloc (memdebug.c:265)
at 0x563f7e2e5aec dyn_nappend (dynbuf.c:100)
at 0x563f7e2e58e9 Curl_dyn_addn (dynbuf.c:164)
at 0x563f7e2a7423 pausewrite (sendf.c:236)
at 0x563f7e2a6e15 chop_write (sendf.c:268)
at 0x563f7e2a6b47 Curl_client_write (sendf.c:386)
at 0x563f7e2b97aa readwrite_data (transfer.c:706)
at 0x563f7e2b88a5 Curl_readwrite (transfer.c:1111)
at 0x563f7e29ff52 multi_runsingle (multi.c:2436)
at 0x563f7e2a19ba multi_socket (multi.c:3219)
at 0x563f7e2a1b51 curl_multi_socket_action (multi.c:3340)
at 0x563f7e283d55 (anonymous namespace)::SocketEvent(int, short, void*) (curl.cc:142)
at 0x563f7e3512d7 event_persist_closure (event.c:1659)
at 0x563f7e3508de event_process_active_single_queue (event.c:1718)
at 0x563f7e34b0b4 event_process_active (event.c:1819)
at 0x563f7e349ebb event_base_loop (event.c:2068)
at 0x563f7e281c1e main (curl.cc:270)
at 0x7f95d37e978f
Direct leak of 16392 byte(s) in 1 object(s) allocated from:
at 0x563f7e2385ea __interceptor_realloc.part.0 (asan_malloc_linux.cpp.o)
at 0x563f7e293621 curl_dbg_realloc (memdebug.c:265)
at 0x563f7e2e5aec dyn_nappend (dynbuf.c:100)
at 0x563f7e2e58e9 Curl_dyn_addn (dynbuf.c:164)
at 0x563f7e2a7423 pausewrite (sendf.c:236)
at 0x563f7e2a6f92 chop_write (sendf.c:311)
at 0x563f7e2a6b47 Curl_client_write (sendf.c:386)
at 0x563f7e2b97aa readwrite_data (transfer.c:706)
at 0x563f7e2b88a5 Curl_readwrite (transfer.c:1111)
at 0x563f7e29ff52 multi_runsingle (multi.c:2436)
at 0x563f7e2a19ba multi_socket (multi.c:3219)
at 0x563f7e2a1b51 curl_multi_socket_action (multi.c:3340)
at 0x563f7e283d55 (anonymous namespace)::SocketEvent(int, short, void*) (curl.cc:142)
at 0x563f7e3512d7 event_persist_closure (event.c:1659)
at 0x563f7e3508de event_process_active_single_queue (event.c:1718)
at 0x563f7e34b0b4 event_process_active (event.c:1819)
at 0x563f7e349ebb event_base_loop (event.c:2068)
at 0x563f7e281c1e main (curl.cc:270)
at 0x7f95d37e978f
Interestingly enough, the memory leak doesn't occur when forcing http version 1.1. Possibly the issue could be specific to http 2.
I expected the following
No memory leaks.
curl/libcurl version
libcurl/7.88.1-DEV OpenSSL/3.0.8 zlib/1.2.13 c-ares/1.19.0 nghttp2/1.51.0
operating system
Linux razer 6.1.23-1-MANJARO #1 SMP PREEMPT_DYNAMIC Thu Apr 6 19:47:04 UTC 2023 x86_64 GNU/Linux
Happens on Windows as well.
Reactions are currently unavailable