Skip to content
Permalink
Browse files

Use C++11 threads

  • Loading branch information...
LBPHacker committed Apr 16, 2019
1 parent 9d92b77 commit 2e76b10619f9ead5e2a7ce165186a624415e4f94
@@ -21,8 +21,6 @@ namespace http
post_fields_last(NULL)
#endif
{
pthread_cond_init(&done_cv, NULL);
pthread_mutex_init(&rm_mutex, NULL);
easy = curl_easy_init();
RequestManager::Ref().AddRequest(this);
}
@@ -36,8 +34,6 @@ namespace http
curl_formfree(post_fields_first);
#endif
curl_slist_free_all(headers);
pthread_mutex_destroy(&rm_mutex);
pthread_cond_destroy(&done_cv);
}

void Request::AddHeader(ByteString name, ByteString value)
@@ -179,9 +175,10 @@ namespace http
curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, Request::WriteDataHandler);
}

pthread_mutex_lock(&rm_mutex);
rm_started = true;
pthread_mutex_unlock(&rm_mutex);
{
std::lock_guard<std::mutex> g(rm_mutex);
rm_started = true;
}
RequestManager::Ref().StartRequest(this);
}

@@ -194,27 +191,26 @@ namespace http
return ""; // shouldn't happen but just in case
}

pthread_mutex_lock(&rm_mutex);
while (!rm_finished)
{
pthread_cond_wait(&done_cv, &rm_mutex);
}
rm_started = false;
rm_canceled = true;
if (status_out)
ByteString response_out;
{
*status_out = status;
std::unique_lock<std::mutex> l(rm_mutex);
done_cv.wait(l, [this]() { return rm_finished; });
rm_started = false;
rm_canceled = true;
if (status_out)
{
*status_out = status;
}
response_out = std::move(response_body);
}
ByteString response_out = std::move(response_body);
pthread_mutex_unlock(&rm_mutex);

RequestManager::Ref().RemoveRequest(this);
return response_out;
}

void Request::CheckProgress(int *total, int *done)
{
pthread_mutex_lock(&rm_mutex);
std::lock_guard<std::mutex> g(rm_mutex);
if (total)
{
*total = rm_total;
@@ -223,43 +219,37 @@ namespace http
{
*done = rm_done;
}
pthread_mutex_unlock(&rm_mutex);
}

// returns true if the request has finished
bool Request::CheckDone()
{
pthread_mutex_lock(&rm_mutex);
bool ret = rm_finished;
pthread_mutex_unlock(&rm_mutex);
return ret;
std::lock_guard<std::mutex> g(rm_mutex);
return rm_finished;
}

// returns true if the request was canceled
bool Request::CheckCanceled()
{
pthread_mutex_lock(&rm_mutex);
bool ret = rm_canceled;
pthread_mutex_unlock(&rm_mutex);
return ret;
std::lock_guard<std::mutex> g(rm_mutex);
return rm_canceled;
}

// returns true if the request is running
bool Request::CheckStarted()
{
pthread_mutex_lock(&rm_mutex);
bool ret = rm_started;
pthread_mutex_unlock(&rm_mutex);
return ret;
std::lock_guard<std::mutex> g(rm_mutex);
return rm_started;

}

// cancels the request, the request thread will delete the Request* when it finishes (do not use Request in any way after canceling)
void Request::Cancel()
{
pthread_mutex_lock(&rm_mutex);
rm_canceled = true;
pthread_mutex_unlock(&rm_mutex);
{
std::lock_guard<std::mutex> g(rm_mutex);
rm_canceled = true;
}
RequestManager::Ref().RemoveRequest(this);
}

@@ -3,10 +3,10 @@

#include <map>
#include "common/tpt-minmax.h" // for MSVC, ensures windows.h doesn't cause compile errors by defining min/max
#include "common/tpt-thread.h"
#include <mutex>
#include <condition_variable>
#include <curl/curl.h>
#include "common/String.h"
#undef GetUserName // pthreads defines this, breaks stuff

#if defined(CURL_AT_LEAST_VERSION) && CURL_AT_LEAST_VERSION(7, 55, 0)
# define REQUEST_USE_CURL_OFFSET_T
@@ -32,7 +32,7 @@ namespace http
volatile bool rm_finished;
volatile bool rm_canceled;
volatile bool rm_started;
pthread_mutex_t rm_mutex;
std::mutex rm_mutex;

bool added_to_multi;
int status;
@@ -46,7 +46,7 @@ namespace http
std::map<ByteString, ByteString> post_fields_map;
#endif

pthread_cond_t done_cv;
std::condition_variable done_cv;

static size_t WriteDataHandler(char * ptr, size_t size, size_t count, void * userdata);

@@ -21,36 +21,27 @@ namespace http
rt_shutting_down(false),
multi(NULL)
{
pthread_cond_init(&rt_cv, NULL);
pthread_mutex_init(&rt_mutex, NULL);
}

RequestManager::~RequestManager()
{
pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cv);
}

void RequestManager::Shutdown()
{
pthread_mutex_lock(&rt_mutex);
rt_shutting_down = true;
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
{
std::lock_guard<std::mutex> g(rt_mutex);
rt_shutting_down = true;
}
rt_cv.notify_one();

pthread_join(worker_thread, NULL);
worker_thread.join();

curl_multi_cleanup(multi);
multi = NULL;
curl_global_cleanup();
}

TH_ENTRY_POINT void *RequestManager::RequestManagerHelper(void *obj)
{
((RequestManager *)obj)->Worker();
return NULL;
}

void RequestManager::Initialise(ByteString Proxy)
{
curl_global_init(CURL_GLOBAL_DEFAULT);
@@ -64,32 +55,33 @@ namespace http

user_agent = ByteString::Build("PowderToy/", SAVE_VERSION, ".", MINOR_VERSION, " (", IDENT_PLATFORM, "; ", IDENT_BUILD, "; M", MOD_ID, ") TPTPP/", SAVE_VERSION, ".", MINOR_VERSION, ".", BUILD_NUM, IDENT_RELTYPE, ".", SNAPSHOT_ID);

pthread_create(&worker_thread, NULL, &RequestManager::RequestManagerHelper, this);
worker_thread = std::thread([this]() { Worker(); });
}

void RequestManager::Worker()
{
bool shutting_down = false;
while (!shutting_down)
{
pthread_mutex_lock(&rt_mutex);
if (!requests_added_to_multi)
{
while (!rt_shutting_down && requests_to_add.empty() && !requests_to_start && !requests_to_remove)
std::unique_lock<std::mutex> l(rt_mutex);
if (!requests_added_to_multi)
{
pthread_cond_wait(&rt_cv, &rt_mutex);
while (!rt_shutting_down && requests_to_add.empty() && !requests_to_start && !requests_to_remove)
{
rt_cv.wait(l);
}
}
shutting_down = rt_shutting_down;
requests_to_remove = false;
requests_to_start = false;
for (Request *request : requests_to_add)
{
request->status = 0;
requests.insert(request);
}
requests_to_add.clear();
}
shutting_down = rt_shutting_down;
requests_to_remove = false;
requests_to_start = false;
for (Request *request : requests_to_add)
{
request->status = 0;
requests.insert(request);
}
requests_to_add.clear();
pthread_mutex_unlock(&rt_mutex);

if (multi && requests_added_to_multi)
{
@@ -157,52 +149,60 @@ namespace http
std::set<Request *> requests_to_remove;
for (Request *request : requests)
{
pthread_mutex_lock(&request->rm_mutex);
if (shutting_down)
{
// In the weird case that a http::Request::Simple* call is
// waiting on this Request, we should fail the request
// instead of cancelling it ourselves.
request->status = 610;
}
if (!request->rm_canceled && request->rm_started && !request->added_to_multi && !request->status)
bool signal_done = false;

{
if (multi && request->easy)
std::lock_guard<std::mutex> g(request->rm_mutex);
if (shutting_down)
{
MultiAdd(request);
// In the weird case that a http::Request::Simple* call is
// waiting on this Request, we should fail the request
// instead of cancelling it ourselves.
request->status = 610;
}
else
if (!request->rm_canceled && request->rm_started && !request->added_to_multi && !request->status)
{
request->status = 604;
if (multi && request->easy)
{
MultiAdd(request);
}
else
{
request->status = 604;
}
}
}
if (!request->rm_canceled && request->rm_started && !request->rm_finished)
{
if (multi && request->easy)
if (!request->rm_canceled && request->rm_started && !request->rm_finished)
{
if (multi && request->easy)
{
#ifdef REQUEST_USE_CURL_OFFSET_T
curl_easy_getinfo(request->easy, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &request->rm_total);
curl_easy_getinfo(request->easy, CURLINFO_SIZE_DOWNLOAD_T, &request->rm_done);
curl_easy_getinfo(request->easy, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &request->rm_total);
curl_easy_getinfo(request->easy, CURLINFO_SIZE_DOWNLOAD_T, &request->rm_done);
#else
double total, done;
curl_easy_getinfo(request->easy, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &total);
curl_easy_getinfo(request->easy, CURLINFO_SIZE_DOWNLOAD, &done);
request->rm_total = (curl_off_t)total;
request->rm_done = (curl_off_t)done;
double total, done;
curl_easy_getinfo(request->easy, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &total);
curl_easy_getinfo(request->easy, CURLINFO_SIZE_DOWNLOAD, &done);
request->rm_total = (curl_off_t)total;
request->rm_done = (curl_off_t)done;
#endif
}
if (request->status)
{
request->rm_finished = true;
MultiRemove(request);
signal_done = true;
}
}
if (request->status)
if (request->rm_canceled)
{
request->rm_finished = true;
MultiRemove(request);
pthread_cond_signal(&request->done_cv);
requests_to_remove.insert(request);
}
}
if (request->rm_canceled)

if (signal_done)
{
requests_to_remove.insert(request);
request->done_cv.notify_one();
}
pthread_mutex_unlock(&request->rm_mutex);
}
for (Request *request : requests_to_remove)
{
@@ -235,25 +235,28 @@ namespace http

void RequestManager::AddRequest(Request *request)
{
pthread_mutex_lock(&rt_mutex);
requests_to_add.insert(request);
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
{
std::lock_guard<std::mutex> g(rt_mutex);
requests_to_add.insert(request);
}
rt_cv.notify_one();
}

void RequestManager::StartRequest(Request *request)
{
pthread_mutex_lock(&rt_mutex);
requests_to_start = true;
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
{
std::lock_guard<std::mutex> g(rt_mutex);
requests_to_start = true;
}
rt_cv.notify_one();
}

void RequestManager::RemoveRequest(Request *request)
{
pthread_mutex_lock(&rt_mutex);
requests_to_remove = true;
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
{
std::lock_guard<std::mutex> g(rt_mutex);
requests_to_remove = true;
}
rt_cv.notify_one();
}
}
Oops, something went wrong.

0 comments on commit 2e76b10

Please sign in to comment.
You can’t perform that action at this time.