Permalink
Browse files

MB-7453: use time() for long running mapreduce task termination

I.e. because previous implementation used clock which is defined to
reflect CPU time of all process threads.

It should give us more precise long running task termination.

Change-Id: I8ab51cd94a769329be212d24dda6f275287751fe
Reviewed-on: http://review.couchbase.org/23582
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
Reviewed-on: http://review.couchbase.org/24952
  • Loading branch information...
1 parent 88cc07d commit 8a3a3591208c3b02b91aa991ede989a13e0223d7 Aliaksey Kandratsenka committed with fdmanana Dec 27, 2012
Showing with 11 additions and 9 deletions.
  1. +1 −1 src/mapreduce/mapreduce.cc
  2. +1 −1 src/mapreduce/mapreduce.h
  3. +9 −7 src/mapreduce/mapreduce_nif.cc
@@ -549,7 +549,7 @@ isolate_data_t *getIsolateData()
void taskStarted(map_reduce_ctx_t *ctx)
{
- ctx->taskStartTime = static_cast<long>((clock() / CLOCKS_PER_SEC) * 1000);
+ ctx->taskStartTime = time(NULL);
ctx->kvs = NULL;
}
@@ -69,7 +69,7 @@ typedef struct {
kv_pair_list_t *kvs;
unsigned int key;
ErlNifEnv *env;
- volatile long taskStartTime;
+ volatile time_t taskStartTime;
} map_reduce_ctx_t;
@@ -43,7 +43,8 @@
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_ERROR;
-static volatile int maxTaskDuration = 5000;
+// maxTaskDuration is in seconds
+static volatile int maxTaskDuration = 5;
static ErlNifResourceType *MAP_REDUCE_CTX_RES;
static ErlNifTid terminatorThreadId;
static ErlNifMutex *terminatorMutex;
@@ -335,7 +336,7 @@ ERL_NIF_TERM setTimeout(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return enif_make_badarg(env);
}
- maxTaskDuration = timeout;
+ maxTaskDuration = (timeout + 999) / 1000;
return ATOM_OK;
}
@@ -440,24 +441,25 @@ void free_map_reduce_context(ErlNifEnv *env, void *res) {
void *terminatorLoop(void *args)
{
std::map< unsigned int, map_reduce_ctx_t* >::iterator it;
- long now;
+ time_t now;
while (!shutdownTerminator) {
- now = static_cast<long>((clock() / CLOCKS_PER_SEC) * 1000);
enif_mutex_lock(terminatorMutex);
+ // due to truncation of second's fraction lets pretend we're one second before
+ now = time(NULL) - 1;
for (it = contexts.begin(); it != contexts.end(); ++it) {
map_reduce_ctx_t *ctx = (*it).second;
- if (ctx->taskStartTime > 0) {
- if ((now - ctx->taskStartTime) >= maxTaskDuration) {
+ if (ctx->taskStartTime >= 0) {
+ if (ctx->taskStartTime + maxTaskDuration < now) {
terminateTask(ctx);
}
}
}
enif_mutex_unlock(terminatorMutex);
- doSleep(maxTaskDuration);
+ doSleep(maxTaskDuration * 1000);
}
return NULL;

0 comments on commit 8a3a359

Please sign in to comment.