Skip to content

Commit

Permalink
Fix: race-conditions in GUI updates when downloading HTTP files
Browse files Browse the repository at this point in the history
  • Loading branch information
TrueBrain committed Jan 2, 2024
1 parent 628092f commit ef254a7
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/network/core/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct HTTPCallback {
* @param length the amount of received data, 0 when all data has been received.
* @note When nullptr is sent the HTTP socket handler is closed/freed.
*/
virtual void OnReceiveData(const char *data, size_t length) = 0;
virtual void OnReceiveData(std::unique_ptr<char[]> data, size_t length) = 0;

/**
* Check if there is a request to cancel the transfer.
Expand Down
60 changes: 48 additions & 12 deletions src/network/core/http_curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "../network_internal.h"

#include "http.h"
#include "http_shared.h"

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -44,6 +45,11 @@ static auto _certificate_directories = {
};
#endif /* UNIX */

static std::vector<HTTPThreadSafeCallback *> _http_callbacks;
static std::vector<HTTPThreadSafeCallback *> _new_http_callbacks;
static std::mutex _http_callback_mutex;
static std::mutex _new_http_callback_mutex;

/** Single HTTP request. */
class NetworkHTTPRequest {
public:
Expand All @@ -59,11 +65,19 @@ class NetworkHTTPRequest {
callback(callback),
data(data)
{
std::lock_guard<std::mutex> lock(_new_http_callback_mutex);
_new_http_callbacks.push_back(&this->callback);
}

~NetworkHTTPRequest()
{
std::lock_guard<std::mutex> lock(_http_callback_mutex);
_http_callbacks.erase(std::remove(_http_callbacks.begin(), _http_callbacks.end(), &this->callback), _http_callbacks.end());
}

const std::string uri; ///< URI to connect to.
HTTPCallback *callback; ///< Callback to send data back on.
const std::string data; ///< Data to send, if any.
const std::string uri; ///< URI to connect to.
HTTPThreadSafeCallback callback; ///< Callback to send data back on.
const std::string data; ///< Data to send, if any.
};

static std::thread _http_thread;
Expand Down Expand Up @@ -92,6 +106,20 @@ static std::string _http_ca_path = "";

/* static */ void NetworkHTTPSocketHandler::HTTPReceive()
{
std::lock_guard<std::mutex> lock(_http_callback_mutex);

{
std::lock_guard<std::mutex> lock_new(_new_http_callback_mutex);
if (!_new_http_callbacks.empty()) {
/* We delay adding new callbacks, as HandleQueue() below might add a new callback. */
_http_callbacks.insert(_http_callbacks.end(), _new_http_callbacks.begin(), _new_http_callbacks.end());
_new_http_callbacks.clear();
}
}

for (auto &callback : _http_callbacks) {
callback->HandleQueue();
}
}

void HttpThread()
Expand Down Expand Up @@ -163,22 +191,27 @@ void HttpThread()
/* Setup our (C-style) callback function which we pipe back into the callback. */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, +[](char *ptr, size_t size, size_t nmemb, void *userdata) -> size_t {
Debug(net, 4, "HTTP callback: {} bytes", size * nmemb);
HTTPCallback *callback = static_cast<HTTPCallback *>(userdata);
callback->OnReceiveData(ptr, size * nmemb);
HTTPThreadSafeCallback *callback = static_cast<HTTPThreadSafeCallback *>(userdata);

/* Copy the buffer out of CURL. OnReceiveData() will free it when done. */
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(size * nmemb);
memcpy(buffer.get(), ptr, size * nmemb);
callback->OnReceiveData(std::move(buffer), size * nmemb);

return size * nmemb;
});
curl_easy_setopt(curl, CURLOPT_WRITEDATA, request->callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &request->callback);

/* Create a callback from which we can cancel. Sadly, there is no other
* thread-safe way to do this. If the connection went idle, it can take
* up to a second before this callback is called. There is little we can
* do about this. */
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, +[](void *userdata, curl_off_t /*dltotal*/, curl_off_t /*dlnow*/, curl_off_t /*ultotal*/, curl_off_t /*ulnow*/) -> int {
const HTTPCallback *callback = static_cast<HTTPCallback *>(userdata);
return (callback->IsCancelled() || _http_thread_exit) ? 1 : 0;
const HTTPThreadSafeCallback *callback = static_cast<HTTPThreadSafeCallback *>(userdata);
return (callback->cancelled || _http_thread_exit) ? 1 : 0;
});
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, request->callback);
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, &request->callback);

/* Perform the request. */
CURLcode res = curl_easy_perform(curl);
Expand All @@ -187,15 +220,18 @@ void HttpThread()

if (res == CURLE_OK) {
Debug(net, 1, "HTTP request succeeded");
request->callback->OnReceiveData(nullptr, 0);
request->callback.OnReceiveData(nullptr, 0);
} else {
long status_code = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status_code);

/* No need to be verbose about rate limiting. */
Debug(net, (request->callback->IsCancelled() || _http_thread_exit || status_code == HTTP_429_TOO_MANY_REQUESTS) ? 1 : 0, "HTTP request failed: status_code: {}, error: {}", status_code, curl_easy_strerror(res));
request->callback->OnFailure();
Debug(net, (request->callback.cancelled || _http_thread_exit || status_code == HTTP_429_TOO_MANY_REQUESTS) ? 1 : 0, "HTTP request failed: status_code: {}, error: {}", status_code, curl_easy_strerror(res));
request->callback.OnFailure();
}

/* Wait till the callback tells us all data is dequeued. */
request->callback.WaitTillEmpty();
}

curl_easy_cleanup(curl);
Expand Down
118 changes: 118 additions & 0 deletions src/network/core/http_shared.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* This file is part of OpenTTD.
* OpenTTD is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, version 2.
* OpenTTD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OpenTTD. If not, see <http://www.gnu.org/licenses/>.
*/

/**
* @file http_shared.h Shared functions for implementations of HTTP requests.
*/

#ifndef NETWORK_CORE_HTTP_SHARED_H
#define NETWORK_CORE_HTTP_SHARED_H

#include "http.h"

#include <condition_variable>
#include <mutex>
#include <vector>

/** Converts a HTTPCallback to a Thread-Safe variant. */
class HTTPThreadSafeCallback {
private:
/** Entries on the queue for later handling. */
class Callback {
public:
Callback(std::unique_ptr<char[]> data, size_t length) : data(std::move(data)), length(length), failure(false) {}
Callback() : data(nullptr), length(0), failure(true) {}

std::unique_ptr<char[]> data;
size_t length;
bool failure;
};

public:
/**
* Similar to HTTPCallback::OnFailure, but thread-safe.
*/
void OnFailure()
{
std::lock_guard<std::mutex> lock(this->mutex);
this->queue.emplace_back();
}

/**
* Similar to HTTPCallback::OnReceiveData, but thread-safe.
*/
void OnReceiveData(std::unique_ptr<char[]> data, size_t length)
{
std::lock_guard<std::mutex> lock(this->mutex);
this->queue.emplace_back(std::move(data), length);
}

/**
* Process everything on the queue.
*
* Should be called from the Game Thread.
*/
void HandleQueue()
{
this->cancelled = callback->IsCancelled();

std::lock_guard<std::mutex> lock(this->mutex);

for (auto &item : this->queue) {
if (item.failure) {
this->callback->OnFailure();
} else {
this->callback->OnReceiveData(std::move(item.data), item.length);
}
}

this->queue.clear();
this->queue_cv.notify_all();
}

/**
* Wait till the queue is dequeued.
*/
void WaitTillEmpty()
{
std::unique_lock<std::mutex> lock(this->mutex);

while (!queue.empty()) {
this->queue_cv.wait(lock);
}
}

/**
* Check if the queue is empty.
*/
bool IsQueueEmpty()
{
std::lock_guard<std::mutex> lock(this->mutex);
return this->queue.empty();
}

HTTPThreadSafeCallback(HTTPCallback *callback) : callback(callback) {}

~HTTPThreadSafeCallback()
{
std::lock_guard<std::mutex> lock(this->mutex);

/* Clear the list and notify explicitly. */
queue.clear();
queue_cv.notify_all();
}

std::atomic<bool> cancelled = false;

private:
HTTPCallback *callback; ///< The callback to send data back on.
std::mutex mutex; ///< Mutex to protect the queue.
std::vector<Callback> queue; ///< Queue of data to send back.
std::condition_variable queue_cv; ///< Condition variable to wait for the queue to be empty.
};

#endif /* NETWORK_CORE_HTTP_SHARED_H */

0 comments on commit ef254a7

Please sign in to comment.