Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Move all RPC I/O to separate thread.

  • Loading branch information...
commit 4f7a51e9ed4accbd1be6fad38a9bbc70531cb37f 1 parent cdb4cd9
Jeff Garzik authored Jeff Garzik committed
Showing with 640 additions and 51 deletions.
  1. +1 −1  Makefile.am
  2. +243 −44 cpu-miner.c
  3. +251 −0 elist.h
  4. +10 −1 miner.h
  5. +135 −5 util.c
2  Makefile.am
View
@@ -13,7 +13,7 @@ INCLUDES = $(PTHREAD_FLAGS) -fno-strict-aliasing $(JANSSON_INCLUDES)
bin_PROGRAMS = minerd
-minerd_SOURCES = miner.h compat.h \
+minerd_SOURCES = elist.h miner.h compat.h \
cpu-miner.c util.c \
sha256_generic.c sha256_4way.c sha256_via.c \
sha256_cryptopp.c sha256_sse2_amd64.c
287 cpu-miner.c
View
@@ -4,7 +4,7 @@
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
- * Software Foundation; either version 2 of the License, or (at your option)
+ * Software Foundation; either version 2 of the License, or (at your option)
* any later version. See COPYING for more details.
*/
@@ -32,6 +32,25 @@
#define DEF_RPC_URL "http://127.0.0.1:8332/"
#define DEF_RPC_USERPASS "rpcuser:rpcpass"
+struct thr_info {
+ int id;
+ pthread_t pth;
+ struct thread_q *q;
+};
+
+enum workio_commands {
+ WC_GET_WORK,
+ WC_SUBMIT_WORK,
+};
+
+struct workio_cmd {
+ enum workio_commands cmd;
+ struct thr_info *thr;
+ union {
+ struct work *work;
+ } u;
+};
+
enum sha256_algos {
ALGO_C, /* plain C */
ALGO_4WAY, /* parallel SSE2 */
@@ -70,6 +89,8 @@ static enum sha256_algos opt_algo = ALGO_C;
static int opt_n_threads = 1;
static char *rpc_url;
static char *userpass;
+static struct thr_info *thr_info;
+static int work_thr_id;
struct option_help {
@@ -214,20 +235,21 @@ static bool work_decode(const json_t *val, struct work *work)
return false;
}
-static void submit_work(CURL *curl, struct work *work)
+static bool submit_upstream_work(CURL *curl, const struct work *work)
{
char *hexstr = NULL;
json_t *val, *res;
char s[345], timestr[64];
time_t now;
struct tm *tm;
+ bool rc = false;
now = time(NULL);
/* build hex string */
hexstr = bin2hex(work->data, sizeof(work->data));
if (!hexstr) {
- fprintf(stderr, "submit_work OOM\n");
+ fprintf(stderr, "submit_upstream_work OOM\n");
goto out;
}
@@ -242,7 +264,7 @@ static void submit_work(CURL *curl, struct work *work)
/* issue JSON-RPC request */
val = json_rpc_call(curl, rpc_url, userpass, s);
if (!val) {
- fprintf(stderr, "submit_work json_rpc_call failed\n");
+ fprintf(stderr, "submit_upstream_work json_rpc_call failed\n");
goto out;
}
@@ -256,11 +278,14 @@ static void submit_work(CURL *curl, struct work *work)
json_decref(val);
+ rc = true;
+
out:
free(hexstr);
+ return rc;
}
-static bool get_work(CURL *curl, struct work *work)
+static bool get_upstream_work(CURL *curl, struct work *work)
{
static const char *rpc_req =
"{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
@@ -278,6 +303,120 @@ static bool get_work(CURL *curl, struct work *work)
return rc;
}
+static void workio_cmd_free(struct workio_cmd *wc)
+{
+ if (!wc)
+ return;
+
+ switch (wc->cmd) {
+ case WC_SUBMIT_WORK:
+ free(wc->u.work);
+ break;
+ default: /* do nothing */
+ break;
+ }
+
+ memset(wc, 0, sizeof(*wc)); /* poison */
+ free(wc);
+}
+
+static bool workio_get_work(struct workio_cmd *wc, CURL *curl)
+{
+ struct work *ret_work;
+ int failures = 0;
+
+ ret_work = calloc(1, sizeof(*ret_work));
+ if (!ret_work)
+ return false;
+
+ /* obtain new work from bitcoin via JSON-RPC */
+ while (!get_upstream_work(curl, ret_work)) {
+ fprintf(stderr, "json_rpc_call failed, ");
+
+ if ((opt_retries >= 0) && (++failures > opt_retries)) {
+ fprintf(stderr, "terminating workio thread\n");
+ free(ret_work);
+ return false;
+ }
+
+ /* pause, then restart work-request loop */
+ fprintf(stderr, "retry after %d seconds\n",
+ opt_fail_pause);
+ sleep(opt_fail_pause);
+ }
+
+ /* send work to requesting thread */
+ if (!tq_push(wc->thr->q, ret_work))
+ free(ret_work);
+
+ return true;
+}
+
+static bool workio_submit_work(struct workio_cmd *wc, CURL *curl)
+{
+ int failures = 0;
+
+ /* submit solution to bitcoin via JSON-RPC */
+ while (!submit_upstream_work(curl, wc->u.work)) {
+ if ((opt_retries >= 0) && (++failures > opt_retries)) {
+ fprintf(stderr, "...terminating workio thread\n");
+ return false;
+ }
+
+ /* pause, then restart work-request loop */
+ fprintf(stderr, "...retry after %d seconds\n",
+ opt_fail_pause);
+ sleep(opt_fail_pause);
+ }
+
+ return true;
+}
+
+static void *workio_thread(void *userdata)
+{
+ struct thr_info *mythr = userdata;
+ CURL *curl;
+ bool ok = true;
+
+ curl = curl_easy_init();
+ if (!curl) {
+ fprintf(stderr, "CURL initialization failed\n");
+ return NULL;
+ }
+
+ while (ok) {
+ struct workio_cmd *wc;
+
+ /* wait for workio_cmd sent to us, on our queue */
+ wc = tq_pop(mythr->q, NULL);
+ if (!wc) {
+ ok = false;
+ break;
+ }
+
+ /* process workio_cmd */
+ switch (wc->cmd) {
+ case WC_GET_WORK:
+ ok = workio_get_work(wc, curl);
+ break;
+ case WC_SUBMIT_WORK:
+ ok = workio_submit_work(wc, curl);
+ break;
+
+ default: /* should never happen */
+ ok = false;
+ break;
+ }
+
+ workio_cmd_free(wc);
+ }
+
+ tq_freeze(mythr->q);
+ curl_easy_cleanup(curl);
+
+ return NULL;
+}
+
static void hashmeter(int thr_id, const struct timeval *diff,
unsigned long hashes_done)
{
@@ -292,39 +431,82 @@ static void hashmeter(int thr_id, const struct timeval *diff,
khashes / secs);
}
-static void *miner_thread(void *thr_id_int)
+static bool get_work(struct thr_info *thr, struct work *work)
{
- int thr_id = (unsigned long) thr_id_int;
- int failures = 0;
- uint32_t max_nonce = 0xffffff;
- CURL *curl;
+ struct workio_cmd *wc;
+ struct work *work_heap;
- curl = curl_easy_init();
- if (!curl) {
- fprintf(stderr, "CURL initialization failed\n");
- return NULL;
+ /* fill out work request message */
+ wc = calloc(1, sizeof(*wc));
+ if (!wc)
+ return false;
+
+ wc->cmd = WC_GET_WORK;
+ wc->thr = thr;
+
+ /* send work request to workio thread */
+ if (!tq_push(thr_info[work_thr_id].q, wc)) {
+ workio_cmd_free(wc);
+ return false;
}
+ /* wait for response, a unit of work */
+ work_heap = tq_pop(thr->q, NULL);
+ if (!work_heap)
+ return false;
+
+ /* copy returned work into storage provided by caller */
+ memcpy(work, work_heap, sizeof(*work));
+ free(work_heap);
+
+ return true;
+}
+
+static bool submit_work(struct thr_info *thr, const struct work *work_in)
+{
+ struct workio_cmd *wc;
+
+ /* fill out work request message */
+ wc = calloc(1, sizeof(*wc));
+ if (!wc)
+ return false;
+
+ wc->u.work = malloc(sizeof(*work_in));
+ if (!wc->u.work)
+ goto err_out;
+
+ wc->cmd = WC_SUBMIT_WORK;
+ wc->thr = thr;
+ memcpy(wc->u.work, work_in, sizeof(*work_in));
+
+ /* send solution to workio thread */
+ if (!tq_push(thr_info[work_thr_id].q, wc))
+ goto err_out;
+
+ return true;
+
+err_out:
+ workio_cmd_free(wc);
+ return false;
+}
+
+static void *miner_thread(void *userdata)
+{
+ struct thr_info *mythr = userdata;
+ int thr_id = mythr->id;
+ uint32_t max_nonce = 0xffffff;
+
while (1) {
struct work work __attribute__((aligned(128)));
unsigned long hashes_done;
struct timeval tv_start, tv_end, diff;
bool rc;
- /* obtain new work from bitcoin */
- if (!get_work(curl, &work)) {
- fprintf(stderr, "json_rpc_call failed, ");
-
- if ((opt_retries >= 0) && (++failures > opt_retries)) {
- fprintf(stderr, "terminating thread\n");
- return NULL; /* exit thread */
- }
-
- /* pause, then restart work loop */
- fprintf(stderr, "retry after %d seconds\n",
- opt_fail_pause);
- sleep(opt_fail_pause);
- continue;
+ /* obtain new work from internal workio thread */
+ if (!get_work(mythr, &work)) {
+ fprintf(stderr, "work retrieval failed, exiting "
+ "mining thread %d\n", mythr->id);
+ goto out;
}
hashes_done = 0;
@@ -347,7 +529,7 @@ static void *miner_thread(void *thr_id_int)
max_nonce, &hashes_done);
rc = (rc5 == -1) ? false : true;
}
- break;
+ break;
#endif
#ifdef WANT_SSE2_4WAY
@@ -384,7 +566,7 @@ static void *miner_thread(void *thr_id_int)
default:
/* should never happen */
- return NULL;
+ goto out;
}
/* record scanhash elapsed time */
@@ -404,13 +586,12 @@ static void *miner_thread(void *thr_id_int)
max_nonce += 100000; /* small increase */
/* if nonce found, submit work */
- if (rc)
- submit_work(curl, &work);
-
- failures = 0;
+ if (rc && !submit_work(mythr, &work))
+ break;
}
- curl_easy_cleanup(curl);
+out:
+ tq_freeze(mythr->q);
return NULL;
}
@@ -564,8 +745,8 @@ static void parse_cmdline(int argc, char *argv[])
int main (int argc, char *argv[])
{
+ struct thr_info *thr;
int i;
- pthread_t *t_all;
rpc_url = strdup(DEF_RPC_URL);
userpass = strdup(DEF_RPC_USERPASS);
@@ -577,14 +758,33 @@ int main (int argc, char *argv[])
if (setpriority(PRIO_PROCESS, 0, 19))
perror("setpriority");
- t_all = calloc(opt_n_threads, sizeof(pthread_t));
- if (!t_all)
+ thr_info = calloc(opt_n_threads + 1, sizeof(*thr));
+ if (!thr_info)
+ return 1;
+
+ work_thr_id = opt_n_threads;
+ thr = &thr_info[work_thr_id];
+ thr->id = opt_n_threads;
+ thr->q = tq_new();
+ if (!thr->q)
+ return 1;
+
+ /* start work I/O thread */
+ if (pthread_create(&thr->pth, NULL, workio_thread, thr)) {
+ fprintf(stderr, "workio thread create failed\n");
return 1;
+ }
/* start mining threads */
for (i = 0; i < opt_n_threads; i++) {
- if (pthread_create(&t_all[i], NULL, miner_thread,
- (void *)(unsigned long) i)) {
+ thr = &thr_info[i];
+
+ thr->id = i;
+ thr->q = tq_new();
+ if (!thr->q)
+ return 1;
+
+ if (pthread_create(&thr->pth, NULL, miner_thread, thr)) {
fprintf(stderr, "thread %d create failed\n", i);
return 1;
}
@@ -597,11 +797,10 @@ int main (int argc, char *argv[])
opt_n_threads,
algo_names[opt_algo]);
- /* main loop - simply wait for all threads to exit */
- for (i = 0; i < opt_n_threads; i++)
- pthread_join(t_all[i], NULL);
+ /* main loop - simply wait for workio thread to exit */
+ pthread_join(thr_info[work_thr_id].pth, NULL);
- fprintf(stderr, "all threads dead, fred. exiting.\n");
+ fprintf(stderr, "workio thread dead, exiting.\n");
return 0;
}
251 elist.h
View
@@ -0,0 +1,251 @@
+#ifndef _LINUX_LIST_H
+#define _LINUX_LIST_H
+
+/*
+ * Simple doubly linked list implementation.
+ *
+ * Some of the internal functions ("__xxx") are useful when
+ * manipulating whole lists rather than single entries, as
+ * sometimes we already know the next/prev entries and we can
+ * generate better code by using them directly rather than
+ * using the generic single-entry routines.
+ */
+
+struct list_head {
+ struct list_head *next, *prev;
+};
+
+#define LIST_HEAD_INIT(name) { &(name), &(name) }
+
+#define LIST_HEAD(name) \
+ struct list_head name = LIST_HEAD_INIT(name)
+
+#define INIT_LIST_HEAD(ptr) do { \
+ (ptr)->next = (ptr); (ptr)->prev = (ptr); \
+} while (0)
+
+/*
+ * Insert a new entry between two known consecutive entries.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_add(struct list_head *new,
+ struct list_head *prev,
+ struct list_head *next)
+{
+ next->prev = new;
+ new->next = next;
+ new->prev = prev;
+ prev->next = new;
+}
+
+/**
+ * list_add - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it after
+ *
+ * Insert a new entry after the specified head.
+ * This is good for implementing stacks.
+ */
+static inline void list_add(struct list_head *new, struct list_head *head)
+{
+ __list_add(new, head, head->next);
+}
+
+/**
+ * list_add_tail - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it before
+ *
+ * Insert a new entry before the specified head.
+ * This is useful for implementing queues.
+ */
+static inline void list_add_tail(struct list_head *new, struct list_head *head)
+{
+ __list_add(new, head->prev, head);
+}
+
+/*
+ * Delete a list entry by making the prev/next entries
+ * point to each other.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_del(struct list_head *prev, struct list_head *next)
+{
+ next->prev = prev;
+ prev->next = next;
+}
+
+/**
+ * list_del - deletes entry from list.
+ * @entry: the element to delete from the list.
+ * Note: list_empty on entry does not return true after this, the entry is in an undefined state.
+ */
+static inline void list_del(struct list_head *entry)
+{
+ __list_del(entry->prev, entry->next);
+ entry->next = (void *) 0;
+ entry->prev = (void *) 0;
+}
+
+/**
+ * list_del_init - deletes entry from list and reinitialize it.
+ * @entry: the element to delete from the list.
+ */
+static inline void list_del_init(struct list_head *entry)
+{
+ __list_del(entry->prev, entry->next);
+ INIT_LIST_HEAD(entry);
+}
+
+/**
+ * list_move - delete from one list and add as another's head
+ * @list: the entry to move
+ * @head: the head that will precede our entry
+ */
+static inline void list_move(struct list_head *list, struct list_head *head)
+{
+ __list_del(list->prev, list->next);
+ list_add(list, head);
+}
+
+/**
+ * list_move_tail - delete from one list and add as another's tail
+ * @list: the entry to move
+ * @head: the head that will follow our entry
+ */
+static inline void list_move_tail(struct list_head *list,
+ struct list_head *head)
+{
+ __list_del(list->prev, list->next);
+ list_add_tail(list, head);
+}
+
+/**
+ * list_empty - tests whether a list is empty
+ * @head: the list to test.
+ */
+static inline int list_empty(struct list_head *head)
+{
+ return head->next == head;
+}
+
+static inline void __list_splice(struct list_head *list,
+ struct list_head *head)
+{
+ struct list_head *first = list->next;
+ struct list_head *last = list->prev;
+ struct list_head *at = head->next;
+
+ first->prev = head;
+ head->next = first;
+
+ last->next = at;
+ at->prev = last;
+}
+
+/**
+ * list_splice - join two lists
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ */
+static inline void list_splice(struct list_head *list, struct list_head *head)
+{
+ if (!list_empty(list))
+ __list_splice(list, head);
+}
+
+/**
+ * list_splice_init - join two lists and reinitialise the emptied list.
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ *
+ * The list at @list is reinitialised
+ */
+static inline void list_splice_init(struct list_head *list,
+ struct list_head *head)
+{
+ if (!list_empty(list)) {
+ __list_splice(list, head);
+ INIT_LIST_HEAD(list);
+ }
+}
+
+/**
+ * list_entry - get the struct for this entry
+ * @ptr: the &struct list_head pointer.
+ * @type: the type of the struct this is embedded in.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_entry(ptr, type, member) \
+ ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
+
+/**
+ * list_for_each - iterate over a list
+ * @pos: the &struct list_head to use as a loop counter.
+ * @head: the head for your list.
+ */
+#define list_for_each(pos, head) \
+ for (pos = (head)->next; pos != (head); \
+ pos = pos->next)
+/**
+ * list_for_each_prev - iterate over a list backwards
+ * @pos: the &struct list_head to use as a loop counter.
+ * @head: the head for your list.
+ */
+#define list_for_each_prev(pos, head) \
+ for (pos = (head)->prev; pos != (head); \
+ pos = pos->prev)
+
+/**
+ * list_for_each_safe - iterate over a list safe against removal of list entry
+ * @pos: the &struct list_head to use as a loop counter.
+ * @n: another &struct list_head to use as temporary storage
+ * @head: the head for your list.
+ */
+#define list_for_each_safe(pos, n, head) \
+ for (pos = (head)->next, n = pos->next; pos != (head); \
+ pos = n, n = pos->next)
+
+/**
+ * list_for_each_entry - iterate over list of given type
+ * @pos: the type * to use as a loop counter.
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry(pos, head, member) \
+ for (pos = list_entry((head)->next, typeof(*pos), member); \
+ &pos->member != (head); \
+ pos = list_entry(pos->member.next, typeof(*pos), member))
+
+/**
+ * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry
+ * @pos: the type * to use as a loop counter.
+ * @n: another type * to use as temporary storage
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_safe(pos, n, head, member) \
+ for (pos = list_entry((head)->next, typeof(*pos), member), \
+ n = list_entry(pos->member.next, typeof(*pos), member); \
+ &pos->member != (head); \
+ pos = n, n = list_entry(n->member.next, typeof(*n), member))
+
+/**
+ * list_for_each_entry_continue - iterate over list of given type
+ * continuing after existing point
+ * @pos: the type * to use as a loop counter.
+ * @head: the head for your list.
+ * @member: the name of the list_struct within the struct.
+ */
+#define list_for_each_entry_continue(pos, head, member) \
+ for (pos = list_entry(pos->member.next, typeof(*pos), member), \
+ prefetch(pos->member.next); \
+ &pos->member != (head); \
+ pos = list_entry(pos->member.next, typeof(*pos), member), \
+ prefetch(pos->member.next))
+
+#endif
11 miner.h
View
@@ -70,7 +70,7 @@ extern bool opt_protocol;
extern const uint32_t sha256_init_state[];
extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass,
const char *rpc_req);
-extern char *bin2hex(unsigned char *p, size_t len);
+extern char *bin2hex(const unsigned char *p, size_t len);
extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len);
extern unsigned int ScanHash_4WaySSE2(const unsigned char *pmidstate,
@@ -109,4 +109,13 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
extern bool fulltest(const unsigned char *hash, const unsigned char *target);
+struct thread_q;
+
+extern struct thread_q *tq_new(void);
+extern void tq_free(struct thread_q *tq);
+extern bool tq_push(struct thread_q *tq, void *data);
+extern void *tq_pop(struct thread_q *tq, const struct timespec *abstime);
+extern void tq_freeze(struct thread_q *tq);
+extern void tq_thaw(struct thread_q *tq);
+
#endif /* __MINER_H__ */
140 util.c
View
@@ -4,7 +4,7 @@
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
- * Software Foundation; either version 2 of the License, or (at your option)
+ * Software Foundation; either version 2 of the License, or (at your option)
* any later version. See COPYING for more details.
*/
@@ -14,9 +14,11 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <pthread.h>
#include <jansson.h>
#include <curl/curl.h>
#include "miner.h"
+#include "elist.h"
struct data_buffer {
void *buf;
@@ -28,11 +30,25 @@ struct upload_buffer {
size_t len;
};
+struct tq_ent {
+ void *data;
+ struct list_head q_node;
+};
+
+struct thread_q {
+ struct list_head q;
+
+ bool frozen;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+};
+
static void databuf_free(struct data_buffer *db)
{
if (!db)
return;
-
+
free(db->buf);
memset(db, 0, sizeof(*db));
@@ -163,7 +179,7 @@ json_t *json_rpc_call(CURL *curl, const char *url,
fprintf(stderr, "JSON-RPC call failed: %s\n", s);
free(s);
-
+
goto err_out;
}
@@ -179,13 +195,13 @@ json_t *json_rpc_call(CURL *curl, const char *url,
return NULL;
}
-char *bin2hex(unsigned char *p, size_t len)
+char *bin2hex(const unsigned char *p, size_t len)
{
int i;
char *s = malloc((len * 2) + 1);
if (!s)
return NULL;
-
+
for (i = 0; i < len; i++)
sprintf(s + (i * 2), "%02x", (unsigned int) p[i]);
@@ -296,3 +312,117 @@ bool fulltest(const unsigned char *hash, const unsigned char *target)
return true; /* FIXME: return rc; */
}
+
+struct thread_q *tq_new(void)
+{
+ struct thread_q *tq;
+
+ tq = calloc(1, sizeof(*tq));
+ if (!tq)
+ return NULL;
+
+ INIT_LIST_HEAD(&tq->q);
+ pthread_mutex_init(&tq->mutex, NULL);
+ pthread_cond_init(&tq->cond, NULL);
+
+ return tq;
+}
+
+void tq_free(struct thread_q *tq)
+{
+ struct tq_ent *ent, *iter;
+
+ if (!tq)
+ return;
+
+ list_for_each_entry_safe(ent, iter, &tq->q, q_node) {
+ list_del(&ent->q_node);
+ free(ent);
+ }
+
+ pthread_cond_destroy(&tq->cond);
+ pthread_mutex_destroy(&tq->mutex);
+
+ memset(tq, 0, sizeof(*tq)); /* poison */
+ free(tq);
+}
+
+static void tq_freezethaw(struct thread_q *tq, bool frozen)
+{
+ pthread_mutex_lock(&tq->mutex);
+
+ tq->frozen = frozen;
+
+ pthread_cond_signal(&tq->cond);
+ pthread_mutex_unlock(&tq->mutex);
+}
+
+void tq_freeze(struct thread_q *tq)
+{
+ tq_freezethaw(tq, true);
+}
+
+void tq_thaw(struct thread_q *tq)
+{
+ tq_freezethaw(tq, false);
+}
+
+bool tq_push(struct thread_q *tq, void *data)
+{
+ struct tq_ent *ent;
+ bool rc = true;
+
+ ent = calloc(1, sizeof(*ent));
+ if (!ent)
+ return false;
+
+ ent->data = data;
+ INIT_LIST_HEAD(&ent->q_node);
+
+ pthread_mutex_lock(&tq->mutex);
+
+ if (!tq->frozen) {
+ list_add_tail(&ent->q_node, &tq->q);
+ } else {
+ free(ent);
+ rc = false;
+ }
+
+ pthread_cond_signal(&tq->cond);
+ pthread_mutex_unlock(&tq->mutex);
+
+ return rc;
+}
+
+void *tq_pop(struct thread_q *tq, const struct timespec *abstime)
+{
+ struct tq_ent *ent;
+ void *rval = NULL;
+ int rc;
+
+ pthread_mutex_lock(&tq->mutex);
+
+ if (!list_empty(&tq->q))
+ goto pop;
+
+ if (abstime)
+ rc = pthread_cond_timedwait(&tq->cond, &tq->mutex, abstime);
+ else
+ rc = pthread_cond_wait(&tq->cond, &tq->mutex);
+ if (rc)
+ goto out;
+ if (list_empty(&tq->q))
+ goto out;
+
+pop:
+ ent = list_entry(tq->q.next, struct tq_ent, q_node);
+ rval = ent->data;
+
+ list_del(&ent->q_node);
+ free(ent);
+
+out:
+ pthread_mutex_unlock(&tq->mutex);
+ return rval;
+}
+
Please sign in to comment.
Something went wrong with that request. Please try again.