Skip to content

Commit

Permalink
Add long polling support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Garzik authored and jgarzik committed Mar 18, 2011
1 parent 6818c69 commit 7a87bee
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 21 deletions.
122 changes: 104 additions & 18 deletions cpu-miner.c
Expand Up @@ -31,12 +31,6 @@
#define DEF_RPC_URL "http://127.0.0.1:8332/"
#define DEF_RPC_USERPASS "rpcuser:rpcpass"

struct thr_info {
int id;
pthread_t pth;
struct thread_q *q;
};

enum workio_commands {
WC_GET_WORK,
WC_SUBMIT_WORK,
Expand Down Expand Up @@ -78,18 +72,21 @@ static const char *algo_names[] = {

bool opt_debug = false;
bool opt_protocol = false;
bool want_longpoll = true;
bool have_longpoll = false;
static bool opt_quiet = false;
static int opt_retries = 10;
static int opt_fail_pause = 30;
static int opt_scantime = 5;
int opt_scantime = 5;
static json_t *opt_config;
static const bool opt_time = true;
static enum sha256_algos opt_algo = ALGO_C;
static int opt_n_threads = 1;
static char *rpc_url;
static char *userpass;
static struct thr_info *thr_info;
struct thr_info *thr_info;
static int work_thr_id;
int longpoll_thr_id;
struct work_restart *work_restart = NULL;


Expand Down Expand Up @@ -130,6 +127,9 @@ static struct option_help options_help[] = {
{ "debug",
"(-D) Enable debug output (default: off)" },

{ "no-longpoll",
"Disable X-Long-Polling support (default: enabled)" },

{ "protocol-dump",
"(-P) Verbose dump of protocol-level activities (default: off)" },

Expand Down Expand Up @@ -170,6 +170,7 @@ static struct option options[] = {
{ "scantime", 1, NULL, 's' },
{ "url", 1, NULL, 1001 },
{ "userpass", 1, NULL, 1002 },
{ "no-longpoll", 0, NULL, 1003 },
{ }
};

Expand Down Expand Up @@ -262,7 +263,7 @@ static bool submit_upstream_work(CURL *curl, const struct work *work)
fprintf(stderr, "DBG: sending RPC call:\n%s", s);

/* issue JSON-RPC request */
val = json_rpc_call(curl, rpc_url, userpass, s);
val = json_rpc_call(curl, rpc_url, userpass, s, false, false);
if (!val) {
fprintf(stderr, "submit_upstream_work json_rpc_call failed\n");
goto out;
Expand All @@ -285,14 +286,16 @@ static bool submit_upstream_work(CURL *curl, const struct work *work)
return rc;
}

static const char *rpc_req =
"{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";

static bool get_upstream_work(CURL *curl, struct work *work)
{
static const char *rpc_req =
"{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n";
json_t *val;
bool rc;

val = json_rpc_call(curl, rpc_url, userpass, rpc_req);
val = json_rpc_call(curl, rpc_url, userpass, rpc_req,
want_longpoll, false);
if (!val)
return false;

Expand Down Expand Up @@ -593,14 +596,76 @@ static void *miner_thread(void *userdata)
return NULL;
}

void restart_threads(void)
static void restart_threads(void)
{
int i;

for (i = 0; i < opt_n_threads; i++)
work_restart[i].restart = 1;
}

static void *longpoll_thread(void *userdata)
{
struct thr_info *mythr = userdata;
CURL *curl = NULL;
char *copy_start, *hdr_path, *lp_url = NULL;
bool need_slash = false;
int failures = 0;

hdr_path = tq_pop(mythr->q, NULL);
if (!hdr_path)
goto out;
copy_start = (*hdr_path == '/') ? (hdr_path + 1) : hdr_path;
if (rpc_url[strlen(rpc_url) - 1] != '/')
need_slash = true;

lp_url = malloc(strlen(rpc_url) + strlen(copy_start) + 2);
if (!lp_url)
goto out;

sprintf(lp_url, "%s%s%s", rpc_url, need_slash ? "/" : "", copy_start);

fprintf(stderr, "Long-polling activated for %s\n", lp_url);

curl = curl_easy_init();
if (!curl) {
fprintf(stderr, "CURL initialization failed\n");
goto out;
}

while (1) {
json_t *val;

val = json_rpc_call(curl, lp_url, userpass, rpc_req,
false, true);
if (val) {
failures = 0;
json_decref(val);
fprintf(stderr, "LONGPOLL detected new block\n");
restart_threads();
} else {
if (failures++ < 10) {
sleep(30);
fprintf(stderr,
"longpoll failed, sleeping for 30s\n");
} else {
fprintf(stderr,
"longpoll failed, ending thread\n");
goto out;
}
}
}

out:
free(hdr_path);
free(lp_url);
tq_freeze(mythr->q);
if (curl)
curl_easy_cleanup(curl);

return NULL;
}

static void show_usage(void)
{
int i;
Expand Down Expand Up @@ -696,6 +761,9 @@ static void parse_arg (int key, char *arg)
free(userpass);
userpass = strdup(arg);
break;
case 1003:
want_longpoll = false;
break;
default:
show_usage();
}
Expand Down Expand Up @@ -763,17 +831,18 @@ int main (int argc, char *argv[])
if (setpriority(PRIO_PROCESS, 0, 19))
perror("setpriority");

thr_info = calloc(opt_n_threads + 1, sizeof(*thr));
if (!thr_info)
return 1;

work_restart = calloc(opt_n_threads, sizeof(*work_restart));
if (!work_restart)
return 1;

thr_info = calloc(opt_n_threads + 2, sizeof(*thr));
if (!thr_info)
return 1;

/* init workio thread info */
work_thr_id = opt_n_threads;
thr = &thr_info[work_thr_id];
thr->id = opt_n_threads;
thr->id = work_thr_id;
thr->q = tq_new();
if (!thr->q)
return 1;
Expand All @@ -784,6 +853,23 @@ int main (int argc, char *argv[])
return 1;
}

/* init longpoll thread info */
if (want_longpoll) {
longpoll_thr_id = opt_n_threads + 1;
thr = &thr_info[longpoll_thr_id];
thr->id = longpoll_thr_id;
thr->q = tq_new();
if (!thr->q)
return 1;

/* start longpoll thread */
if (pthread_create(&thr->pth, NULL, longpoll_thread, thr)) {
fprintf(stderr, "longpoll thread create failed\n");
return 1;
}
} else
longpoll_thr_id = -1;

/* start mining threads */
for (i = 0; i < opt_n_threads; i++) {
thr = &thr_info[i];
Expand Down
14 changes: 12 additions & 2 deletions miner.h
Expand Up @@ -42,6 +42,12 @@
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
#endif

struct thr_info {
int id;
pthread_t pth;
struct thread_q *q;
};

static inline uint32_t swab32(uint32_t v)
{
#ifdef WANT_BUILTIN_BSWAP
Expand Down Expand Up @@ -70,7 +76,7 @@ extern bool opt_debug;
extern bool opt_protocol;
extern const uint32_t sha256_init_state[];
extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass,
const char *rpc_req);
const char *rpc_req, bool, bool);
extern char *bin2hex(const unsigned char *p, size_t len);
extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len);

Expand Down Expand Up @@ -110,15 +116,19 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);

extern bool fulltest(const unsigned char *hash, const unsigned char *target);

extern int opt_scantime;
extern bool want_longpoll;
extern bool have_longpoll;
struct thread_q;

struct work_restart {
volatile unsigned long restart;
char padding[128 - sizeof(unsigned long)];
};

extern struct thr_info *thr_info;
extern int longpoll_thr_id;
extern struct work_restart *work_restart;
extern void restart_threads(void);

extern struct thread_q *tq_new(void);
extern void tq_free(struct thread_q *tq);
Expand Down
81 changes: 80 additions & 1 deletion util.c
Expand Up @@ -13,6 +13,7 @@

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <jansson.h>
#include <curl/curl.h>
Expand All @@ -29,6 +30,10 @@ struct upload_buffer {
size_t len;
};

struct header_info {
char *lp_path;
};

struct tq_ent {
void *data;
struct list_head q_node;
Expand Down Expand Up @@ -95,8 +100,62 @@ static size_t upload_data_cb(void *ptr, size_t size, size_t nmemb,
return len;
}

static size_t resp_hdr_cb(void *ptr, size_t size, size_t nmemb, void *user_data)
{
struct header_info *hi = user_data;
size_t remlen, slen, ptrlen = size * nmemb;
char *rem, *val = NULL, *key = NULL;
void *tmp;

if (opt_protocol)
printf("In resp_hdr_cb\n");

val = calloc(1, ptrlen);
key = calloc(1, ptrlen);
if (!key || !val)
goto out;

tmp = memchr(ptr, ':', ptrlen);
if (!tmp || (tmp == ptr)) /* skip empty keys / blanks */
goto out;
slen = tmp - ptr;
if ((slen + 1) == ptrlen) /* skip key w/ no value */
goto out;
memcpy(key, ptr, slen); /* store & nul term key */
key[slen] = 0;

rem = ptr + slen + 1; /* trim value's leading whitespace */
remlen = ptrlen - slen - 1;
while ((remlen > 0) && (isspace(*rem))) {
remlen--;
rem++;
}

memcpy(val, rem, remlen); /* store value, trim trailing ws */
val[remlen] = 0;
while ((*val) && (isspace(val[strlen(val) - 1]))) {
val[strlen(val) - 1] = 0;
}
if (!*val) /* skip blank value */
goto out;

if (opt_protocol)
printf("HTTP hdr(%s): %s\n", key, val);

if (!strcasecmp("X-Long-Polling", key)) {
hi->lp_path = val; /* steal memory reference */
val = NULL;
}

out:
free(key);
free(val);
return ptrlen;
}

json_t *json_rpc_call(CURL *curl, const char *url,
const char *userpass, const char *rpc_req)
const char *userpass, const char *rpc_req,
bool longpoll_scan, bool longpoll)
{
json_t *val, *err_val, *res_val;
int rc;
Expand All @@ -106,9 +165,15 @@ json_t *json_rpc_call(CURL *curl, const char *url,
struct curl_slist *headers = NULL;
char len_hdr[64];
char curl_err_str[CURL_ERROR_SIZE];
long timeout = longpoll ? (60 * 60) : (60 * 10);
struct header_info hi = { };
bool lp_scanning = false;

/* it is assumed that 'curl' is freshly [re]initialized at this pt */

if (longpoll_scan)
lp_scanning = want_longpoll && !have_longpoll;

if (opt_protocol)
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1);
curl_easy_setopt(curl, CURLOPT_URL, url);
Expand All @@ -121,6 +186,11 @@ json_t *json_rpc_call(CURL *curl, const char *url,
curl_easy_setopt(curl, CURLOPT_READDATA, &upload_data);
curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_err_str);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
if (lp_scanning) {
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, resp_hdr_cb);
curl_easy_setopt(curl, CURLOPT_HEADERDATA, &hi);
}
if (userpass) {
curl_easy_setopt(curl, CURLOPT_USERPWD, userpass);
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
Expand Down Expand Up @@ -148,6 +218,15 @@ json_t *json_rpc_call(CURL *curl, const char *url,
goto err_out;
}

/* If X-Long-Polling was found, activate long polling */
if (hi.lp_path) {
have_longpoll = true;
opt_scantime = 60;
tq_push(thr_info[longpoll_thr_id].q, hi.lp_path);
} else
free(hi.lp_path);
hi.lp_path = NULL;

val = json_loads(all_data.buf, &err);
if (!val) {
fprintf(stderr, "JSON decode failed(%d): %s\n", err.line, err.text);
Expand Down

0 comments on commit 7a87bee

Please sign in to comment.