Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

MB-7629, MB-7656 Add mapreduce library

To be used by the view compactor, view builder and view updater.
This is a C++ library, that links against the V8 JavaScript engine,
but it exposes a pure C API to the outside.

Change-Id: I4f80f7d490e31dd3b43e50007a6c23133baaa7b5
Reviewed-on: http://review.couchbase.org/25257
Tested-by: Fulu Li <fulu@couchbase.com>
Reviewed-by: Fulu Li <fulu@couchbase.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
commit e534452a12b3ddf72c53ce8b81fc8b9e541eff76 1 parent d909183
@fdmanana fdmanana authored fdmanana committed
View
23 Makefile.am
@@ -33,7 +33,7 @@ lib_LTLIBRARIES = libcouchstore.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libcouchstore.pc
-noinst_LTLIBRARIES = librfc1321.la libbyteswap.la
+noinst_LTLIBRARIES = librfc1321.la libbyteswap.la libmapreduce.la
libbyteswap_la_SOURCES = src/byteswap.c
@@ -43,6 +43,16 @@ librfc1321_la_SOURCES = \
src/rfc1321/global.h
librfc1321_la_CFLAGS = $(AM_CFLAGS) ${NO_WERROR}
+libmapreduce_la_CXXFLAGS = $(AM_CXXFLAGS) -DLIBMAPREDUCE_INTERNAL=1
+libmapreduce_la_SOURCES = \
+ src/views/mapreduce/mapreduce.h \
+ src/views/mapreduce/mapreduce_internal.h \
+ src/views/mapreduce/visibility.h \
+ src/views/mapreduce/mapreduce_c.cc \
+ src/views/mapreduce/mapreduce.cc
+libmapreduce_la_LDFLAGS = $(AM_LDFLAGS) $(AM_CXX_LDFLAGS) $(V8_LIB_FLAGS) -pthread
+libmapreduce_la_LIBADD = $(V8_FLAGS)
+
libcouchstore_la_SOURCES = \
config_static.h \
src/arena.c \
@@ -87,7 +97,7 @@ libcouchstore_la_SOURCES = \
src/views/sorted_list.c
-libcouchstore_la_LDFLAGS = $(AM_LDFLAGS) $(ICU_LOCAL_LDFLAGS) -version-info $(LIBCOUCHSTORE_API_CURRENT):$(LIBCOUCHSTORE_API_REVISION):$(LIBCOUCHSTORE_API_AGE) -no-undefined -lsnappy -lpthread
+libcouchstore_la_LDFLAGS = $(AM_LDFLAGS) $(ICU_LOCAL_LDFLAGS) -version-info $(LIBCOUCHSTORE_API_CURRENT):$(LIBCOUCHSTORE_API_REVISION):$(LIBCOUCHSTORE_API_AGE) -no-undefined -lstdc++ -lsnappy -lpthread
if WINDOWS
libcouchstore_la_SOURCES += src/os_win.c
@@ -97,7 +107,7 @@ libcouchstore_la_SOURCES += src/os.c
endif
libcouchstore_la_CFLAGS = $(AM_CFLAGS) $(ICU_LOCAL_CFLAGS) -DLIBCOUCHSTORE_INTERNAL=1 -Wstrict-aliasing=2 -pedantic
-libcouchstore_la_LIBADD = librfc1321.la libbyteswap.la $(ICU_LOCAL_LIBS)
+libcouchstore_la_LIBADD = librfc1321.la libbyteswap.la libmapreduce.la $(ICU_LOCAL_LIBS)
couch_dbdump_SOURCES = src/dbdump.c
couch_dbdump_DEPENDENCIES = libcouchstore.la
@@ -170,6 +180,11 @@ TESTS = ${check_PROGRAMS}
testapp_SOURCES = tests/testapp.c src/util.c tests/macros.h tests/indexer_test.c
testapp_SOURCES += \
+ tests/mapreduce/mapreduce_tests.h \
+ tests/mapreduce/tests.c \
+ tests/mapreduce/map.c \
+ tests/mapreduce/reduce.c \
+ tests/mapreduce/builtin.c \
tests/views/view_tests.h \
tests/views/tests.c \
tests/views/collate_json_test.c \
@@ -178,7 +193,7 @@ testapp_SOURCES += \
tests/views/sorted_lists.c
testapp_CFLAGS = $(AM_CFLAGS)
testapp_DEPENDENCIES = libcouchstore.la libbyteswap.la
-testapp_LDADD = libcouchstore.la libbyteswap.la
+testapp_LDADD = libcouchstore.la libbyteswap.la libmapreduce.la
test: check-TESTS $(extra_tests)
View
32 configure.ac
@@ -130,6 +130,38 @@ AC_SUBST(ICU_LOCAL_CFLAGS)
AC_SUBST(ICU_LOCAL_LDFLAGS)
AC_SUBST(ICU_LOCAL_BIN)
+AC_LANG_PUSH([C++])
+
+AC_ARG_WITH([v8-lib], [AC_HELP_STRING([--with-v8-lib=PATH],
+ [set PATH to the V8 library directory])],
+ [
+ V8_LIB_FLAGS="-L$withval -lv8"
+ ], [
+ V8_LIB_FLAGS="-lv8"
+ ])
+AC_SUBST(V8_LIB_FLAGS)
+
+AC_ARG_WITH([v8-include], [AC_HELP_STRING([--with-v8-include=PATH],
+ [set PATH to the V8 include directory])], [
+ V8_FLAGS="-I$withval"
+ ], [
+ V8_FLAGS="-I/usr/include/v8"
+ V8_FLAGS="$V8_FLAGS -I/opt/local/include"
+ V8_FLAGS="$V8_FLAGS -I/usr/local/include/v8"
+ V8_FLAGS="$V8_FLAGS -I/opt/local/include/v8"
+])
+AC_SUBST(V8_FLAGS)
+
+AC_CHECK_HEADER([v8.h], [], [AC_MSG_ERROR([Could not find the V8 JavaScript engine library.])])
+
+AC_LANG_POP([C++])
+
+dnl Check whether the user's system supports pthread
+AC_SEARCH_LIBS(pthread_create, pthread)
+if test "x$ac_cv_search_pthread_create" = "xno"; then
+ AC_MSG_ERROR([Can't enable threads without the POSIX thread library.])
+fi
+
AH_TOP([
#ifndef CONFIG_H
#define CONFIG_H
View
594 src/views/mapreduce/mapreduce.cc
@@ -0,0 +1,594 @@
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+#include "mapreduce.h"
+#include "mapreduce_internal.h"
+#include <iostream>
+#include <cstring>
+#include <stdlib.h>
+#include <v8.h>
+
+
+using namespace v8;
+
+typedef struct {
+ Persistent<Object> jsonObject;
+ Persistent<Function> jsonParseFun;
+ Persistent<Function> stringifyFun;
+ mapreduce_ctx_t *ctx;
+} isolate_data_t;
+
+
+static const char *SUM_FUNCTION_STRING =
+ "(function(values) {"
+ " var sum = 0;"
+ " for (var i = 0; i < values.length; ++i) {"
+ " sum += values[i];"
+ " }"
+ " return sum;"
+ "})";
+
+static const char *DATE_FUNCTION_STRING =
+ // I wish it was on the prototype, but that will require bigger
+ // C changes as adding to the date prototype should be done on
+ // process launch. The code you see here may be faster, but it
+ // is less JavaScripty.
+ // "Date.prototype.toArray = (function() {"
+ "(function(date) {"
+ " date = date.getUTCDate ? date : new Date(date);"
+ " return isFinite(date.valueOf()) ?"
+ " [date.getUTCFullYear(),"
+ " (date.getUTCMonth() + 1),"
+ " date.getUTCDate(),"
+ " date.getUTCHours(),"
+ " date.getUTCMinutes(),"
+ " date.getUTCSeconds()] : null;"
+ "})";
+
+static const char *BASE64_FUNCTION_STRING =
+ "(function(b64) {"
+ " var i, j, l, tmp, scratch, arr = [];"
+ " var lookup = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';"
+ " if (typeof b64 !== 'string') {"
+ " throw 'Input is not a string';"
+ " }"
+ " if (b64.length % 4 > 0) {"
+ " throw 'Invalid base64 source.';"
+ " }"
+ " scratch = b64.indexOf('=');"
+ " scratch = scratch > 0 ? b64.length - scratch : 0;"
+ " l = scratch > 0 ? b64.length - 4 : b64.length;"
+ " for (i = 0, j = 0; i < l; i += 4, j += 3) {"
+ " tmp = (lookup.indexOf(b64[i]) << 18) | (lookup.indexOf(b64[i + 1]) << 12);"
+ " tmp |= (lookup.indexOf(b64[i + 2]) << 6) | lookup.indexOf(b64[i + 3]);"
+ " arr.push((tmp & 0xFF0000) >> 16);"
+ " arr.push((tmp & 0xFF00) >> 8);"
+ " arr.push(tmp & 0xFF);"
+ " }"
+ " if (scratch === 2) {"
+ " tmp = (lookup.indexOf(b64[i]) << 2) | (lookup.indexOf(b64[i + 1]) >> 4);"
+ " arr.push(tmp & 0xFF);"
+ " } else if (scratch === 1) {"
+ " tmp = (lookup.indexOf(b64[i]) << 10) | (lookup.indexOf(b64[i + 1]) << 4);"
+ " tmp |= (lookup.indexOf(b64[i + 2]) >> 2);"
+ " arr.push((tmp >> 8) & 0xFF);"
+ " arr.push(tmp & 0xFF);"
+ " }"
+ " return arr;"
+ "})";
+
+
+static void doInitContext(mapreduce_ctx_t *ctx);
+static Persistent<Context> createJsContext();
+static Handle<Function> compileFunction(const std::string &function);
+static std::string exceptionString(const TryCatch &tryCatch);
+static void loadFunctions(mapreduce_ctx_t *ctx,
+ const std::list<std::string> &function_sources);
+static Handle<Value> emit(const Arguments &args);
+static inline isolate_data_t *getIsolateData();
+static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj);
+static inline Handle<Value> jsonParse(const mapreduce_json_t &thing);
+static inline void taskStarted(mapreduce_ctx_t *ctx);
+static inline void taskFinished(mapreduce_ctx_t *ctx);
+static void freeKvListEntries(kv_list_int_t &kvs);
+static void freeJsonListEntries(json_results_list_t &list);
+static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list);
+
+
+void initContext(mapreduce_ctx_t *ctx,
+ const std::list<std::string> &function_sources)
+{
+ doInitContext(ctx);
+
+ try {
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+ Context::Scope contextScope(ctx->jsContext);
+
+ loadFunctions(ctx, function_sources);
+ } catch (...) {
+ destroyContext(ctx);
+ throw;
+ }
+}
+
+
+void destroyContext(mapreduce_ctx_t *ctx)
+{
+ {
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+ Context::Scope contextScope(ctx->jsContext);
+
+ for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
+ (*ctx->functions)[i].Dispose();
+ }
+ delete ctx->functions;
+
+ isolate_data_t *isoData = getIsolateData();
+ isoData->jsonObject.Dispose();
+ isoData->jsonObject.Clear();
+ isoData->jsonParseFun.Dispose();
+ isoData->jsonParseFun.Clear();
+ isoData->stringifyFun.Dispose();
+ isoData->stringifyFun.Clear();
+ delete isoData;
+
+ ctx->jsContext.Dispose();
+ ctx->jsContext.Clear();
+ }
+
+ ctx->isolate->Dispose();
+}
+
+
+static void doInitContext(mapreduce_ctx_t *ctx)
+{
+ ctx->isolate = Isolate::New();
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+
+ ctx->jsContext = createJsContext();
+ Context::Scope contextScope(ctx->jsContext);
+
+ Handle<Object> jsonObject = Local<Object>::Cast(ctx->jsContext->Global()->Get(String::New("JSON")));
+ Handle<Function> parseFun = Local<Function>::Cast(jsonObject->Get(String::New("parse")));
+ Handle<Function> stringifyFun = Local<Function>::Cast(jsonObject->Get(String::New("stringify")));
+
+ isolate_data_t *isoData = new isolate_data_t();
+ isoData->jsonObject = Persistent<Object>::New(jsonObject);
+ isoData->jsonParseFun = Persistent<Function>::New(parseFun);
+ isoData->stringifyFun = Persistent<Function>::New(stringifyFun);
+ isoData->ctx = ctx;
+
+ ctx->isolate->SetData(isoData);
+ ctx->taskStartTime = -1;
+}
+
+
+static Persistent<Context> createJsContext()
+{
+ HandleScope handleScope;
+ Handle<ObjectTemplate> global = ObjectTemplate::New();
+
+ global->Set(String::New("emit"), FunctionTemplate::New(emit));
+
+ Persistent<Context> context = Context::New(NULL, global);
+ Context::Scope contextScope(context);
+
+ Handle<Function> sumFun = compileFunction(SUM_FUNCTION_STRING);
+ context->Global()->Set(String::New("sum"), sumFun);
+
+ Handle<Function> decodeBase64Fun = compileFunction(BASE64_FUNCTION_STRING);
+ context->Global()->Set(String::New("decodeBase64"), decodeBase64Fun);
+
+ Handle<Function> dateToArrayFun = compileFunction(DATE_FUNCTION_STRING);
+ context->Global()->Set(String::New("dateToArray"), dateToArrayFun);
+
+ return context;
+}
+
+
+void mapDoc(mapreduce_ctx_t *ctx,
+ const mapreduce_json_t &doc,
+ const mapreduce_json_t &meta,
+ mapreduce_map_result_list_t *results)
+{
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+ Context::Scope contextScope(ctx->jsContext);
+ Handle<Value> docObject = jsonParse(doc);
+ Handle<Value> metaObject = jsonParse(meta);
+
+ if (!metaObject->IsObject()) {
+ throw MapReduceError(MAPREDUCE_INVALID_ARG, "metadata is not a JSON object");
+ }
+
+ Handle<Value> funArgs[] = { docObject, metaObject };
+
+ taskStarted(ctx);
+ kv_list_int_t kvs;
+ ctx->kvs = &kvs;
+
+ for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
+ mapreduce_map_result_t mapResult;
+ Handle<Function> fun = (*ctx->functions)[i];
+ TryCatch trycatch;
+ Handle<Value> result = fun->Call(fun, 2, funArgs);
+
+ if (!result.IsEmpty()) {
+ mapResult.error = MAPREDUCE_SUCCESS;
+ mapResult.result.kvs.length = kvs.size();
+ size_t sz = sizeof(mapreduce_kv_t) * mapResult.result.kvs.length;
+ mapResult.result.kvs.kvs = (mapreduce_kv_t *) malloc(sz);
+ if (mapResult.result.kvs.kvs == NULL) {
+ freeKvListEntries(kvs);
+ throw std::bad_alloc();
+ }
+ kv_list_int_t::iterator it = kvs.begin();
+ for (int j = 0; it != kvs.end(); ++it, ++j) {
+ mapResult.result.kvs.kvs[j] = *it;
+ }
+ } else {
+ freeKvListEntries(kvs);
+
+ if (!trycatch.CanContinue()) {
+ throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
+ }
+
+ mapResult.error = MAPREDUCE_RUNTIME_ERROR;
+ std::string exceptString = exceptionString(trycatch);
+ size_t len = exceptString.length();
+
+ mapResult.result.error_msg = (char *) malloc(len + 1);
+ if (mapResult.result.error_msg == NULL) {
+ throw std::bad_alloc();
+ }
+ memcpy(mapResult.result.error_msg, exceptString.data(), len);
+ mapResult.result.error_msg[len] = '\0';
+ }
+
+ results->list[i] = mapResult;
+ results->length += 1;
+ kvs.clear();
+ }
+
+ taskFinished(ctx);
+}
+
+
+json_results_list_t runReduce(mapreduce_ctx_t *ctx,
+ const mapreduce_json_list_t &keys,
+ const mapreduce_json_list_t &values)
+{
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+ Context::Scope contextScope(ctx->jsContext);
+ Handle<Array> keysArray = jsonListToJsArray(keys);
+ Handle<Array> valuesArray = jsonListToJsArray(values);
+ json_results_list_t results;
+
+ Handle<Value> args[] = { keysArray, valuesArray, Boolean::New(false) };
+
+ taskStarted(ctx);
+
+ for (unsigned int i = 0; i < ctx->functions->size(); ++i) {
+ Handle<Function> fun = (*ctx->functions)[i];
+ TryCatch trycatch;
+ Handle<Value> result = fun->Call(fun, 3, args);
+
+ if (result.IsEmpty()) {
+ freeJsonListEntries(results);
+
+ if (!trycatch.CanContinue()) {
+ throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
+ }
+
+ throw MapReduceError(MAPREDUCE_RUNTIME_ERROR, exceptionString(trycatch));
+ }
+
+ try {
+ mapreduce_json_t jsonResult = jsonStringify(result);
+ results.push_back(jsonResult);
+ } catch(...) {
+ freeJsonListEntries(results);
+ throw;
+ }
+ }
+
+ taskFinished(ctx);
+
+ return results;
+}
+
+
+mapreduce_json_t runReduce(mapreduce_ctx_t *ctx,
+ int reduceFunNum,
+ const mapreduce_json_list_t &keys,
+ const mapreduce_json_list_t &values)
+{
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+ Context::Scope contextScope(ctx->jsContext);
+
+ reduceFunNum -= 1;
+ if (reduceFunNum < 0 ||
+ static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
+ throw MapReduceError(MAPREDUCE_INVALID_ARG, "invalid reduce function number");
+ }
+
+ Handle<Function> fun = (*ctx->functions)[reduceFunNum];
+ Handle<Array> keysArray = jsonListToJsArray(keys);
+ Handle<Array> valuesArray = jsonListToJsArray(values);
+ Handle<Value> args[] = { keysArray, valuesArray, Boolean::New(false) };
+
+ taskStarted(ctx);
+
+ TryCatch trycatch;
+ Handle<Value> result = fun->Call(fun, 3, args);
+
+ taskFinished(ctx);
+
+ if (result.IsEmpty()) {
+ if (!trycatch.CanContinue()) {
+ throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
+ }
+
+ throw MapReduceError(MAPREDUCE_RUNTIME_ERROR, exceptionString(trycatch));
+ }
+
+ return jsonStringify(result);
+}
+
+
+mapreduce_json_t runRereduce(mapreduce_ctx_t *ctx,
+ int reduceFunNum,
+ const mapreduce_json_list_t &reductions)
+{
+ Locker locker(ctx->isolate);
+ Isolate::Scope isolateScope(ctx->isolate);
+ HandleScope handleScope;
+ Context::Scope contextScope(ctx->jsContext);
+
+ reduceFunNum -= 1;
+ if (reduceFunNum < 0 ||
+ static_cast<unsigned int>(reduceFunNum) >= ctx->functions->size()) {
+ throw MapReduceError(MAPREDUCE_INVALID_ARG, "invalid reduce function number");
+ }
+
+ Handle<Function> fun = (*ctx->functions)[reduceFunNum];
+ Handle<Array> valuesArray = jsonListToJsArray(reductions);
+ Handle<Value> args[] = { Null(), valuesArray, Boolean::New(true) };
+
+ taskStarted(ctx);
+
+ TryCatch trycatch;
+ Handle<Value> result = fun->Call(fun, 3, args);
+
+ taskFinished(ctx);
+
+ if (result.IsEmpty()) {
+ if (!trycatch.CanContinue()) {
+ throw MapReduceError(MAPREDUCE_TIMEOUT, "timeout");
+ }
+
+ throw MapReduceError(MAPREDUCE_RUNTIME_ERROR, exceptionString(trycatch));
+ }
+
+ return jsonStringify(result);
+}
+
+
+void terminateTask(mapreduce_ctx_t *ctx)
+{
+ V8::TerminateExecution(ctx->isolate);
+ taskFinished(ctx);
+}
+
+
+static void freeKvListEntries(kv_list_int_t &kvs)
+{
+ kv_list_int_t::iterator it = kvs.begin();
+
+ for ( ; it != kvs.end(); ++it) {
+ mapreduce_kv_t kv = *it;
+ free(kv.key.json);
+ free(kv.value.json);
+ }
+ kvs.clear();
+}
+
+
+static void freeJsonListEntries(json_results_list_t &list)
+{
+ json_results_list_t::iterator it = list.begin();
+
+ for ( ; it != list.end(); ++it) {
+ free((*it).json);
+ }
+ list.clear();
+}
+
+
+static Handle<Function> compileFunction(const std::string &funSource)
+{
+ HandleScope handleScope;
+ TryCatch trycatch;
+ Handle<String> source = String::New(funSource.data(), funSource.length());
+ Handle<Script> script = Script::Compile(source);
+
+ if (script.IsEmpty()) {
+ throw MapReduceError(MAPREDUCE_SYNTAX_ERROR, exceptionString(trycatch));
+ }
+
+ Handle<Value> result = script->Run();
+
+ if (result.IsEmpty()) {
+ throw MapReduceError(MAPREDUCE_SYNTAX_ERROR, exceptionString(trycatch));
+ }
+
+ if (!result->IsFunction()) {
+ throw MapReduceError(MAPREDUCE_SYNTAX_ERROR,
+ std::string("Invalid function: ") + funSource.c_str());
+ }
+
+ return handleScope.Close(Handle<Function>::Cast(result));
+}
+
+
+static std::string exceptionString(const TryCatch &tryCatch)
+{
+ HandleScope handleScope;
+ String::Utf8Value exception(tryCatch.Exception());
+ const char *exceptionString = (*exception);
+
+ if (exceptionString) {
+ return std::string(exceptionString);
+ }
+
+ return std::string("runtime error");
+}
+
+
+static void loadFunctions(mapreduce_ctx_t *ctx,
+ const std::list<std::string> &function_sources)
+{
+ HandleScope handleScope;
+
+ ctx->functions = new function_vector_t();
+
+ std::list<std::string>::const_iterator it = function_sources.begin();
+
+ for ( ; it != function_sources.end(); ++it) {
+ Handle<Function> fun = compileFunction(*it);
+
+ ctx->functions->push_back(Persistent<Function>::New(fun));
+ }
+}
+
+
+static Handle<Value> emit(const Arguments &args)
+{
+ isolate_data_t *isoData = getIsolateData();
+
+ if (isoData->ctx->kvs == NULL) {
+ return Undefined();
+ }
+
+ try {
+ mapreduce_kv_t result;
+
+ result.key = jsonStringify(args[0]);
+ result.value = jsonStringify(args[1]);
+ isoData->ctx->kvs->push_back(result);
+
+ return Undefined();
+ } catch(Handle<Value> &ex) {
+ return ThrowException(ex);
+ }
+}
+
+
+static inline isolate_data_t *getIsolateData()
+{
+ Isolate *isolate = Isolate::GetCurrent();
+ return reinterpret_cast<isolate_data_t*>(isolate->GetData());
+}
+
+
+static inline mapreduce_json_t jsonStringify(const Handle<Value> &obj)
+{
+ isolate_data_t *isoData = getIsolateData();
+ Handle<Value> args[] = { obj };
+ TryCatch trycatch;
+ Handle<Value> result = isoData->stringifyFun->Call(isoData->jsonObject, 1, args);
+
+ if (result.IsEmpty()) {
+ throw trycatch.Exception();
+ }
+
+ mapreduce_json_t jsonResult;
+
+ if (!result->IsUndefined()) {
+ Handle<String> str = Handle<String>::Cast(result);
+ jsonResult.length = str->Utf8Length();
+ jsonResult.json = (char *) malloc(jsonResult.length);
+ if (jsonResult.json == NULL) {
+ throw std::bad_alloc();
+ }
+ str->WriteUtf8(jsonResult.json, jsonResult.length,
+ NULL, String::NO_NULL_TERMINATION);
+ } else {
+ jsonResult.length = sizeof("null") - 1;
+ jsonResult.json = (char *) malloc(jsonResult.length);
+ if (jsonResult.json == NULL) {
+ throw std::bad_alloc();
+ }
+ memcpy(jsonResult.json, "null", jsonResult.length);
+ }
+
+ // Caller responsible for freeing jsonResult.json
+ return jsonResult;
+}
+
+
+static inline Handle<Value> jsonParse(const mapreduce_json_t &thing)
+{
+ isolate_data_t *isoData = getIsolateData();
+ Handle<Value> args[] = { String::New(thing.json, thing.length) };
+ TryCatch trycatch;
+ Handle<Value> result = isoData->jsonParseFun->Call(isoData->jsonObject, 1, args);
+
+ if (result.IsEmpty()) {
+ throw MapReduceError(MAPREDUCE_RUNTIME_ERROR, exceptionString(trycatch));
+ }
+
+ return result;
+}
+
+
+static inline void taskStarted(mapreduce_ctx_t *ctx)
+{
+ ctx->taskStartTime = time(NULL);
+ ctx->kvs = NULL;
+}
+
+
+static inline void taskFinished(mapreduce_ctx_t *ctx)
+{
+ ctx->taskStartTime = -1;
+}
+
+
+static inline Handle<Array> jsonListToJsArray(const mapreduce_json_list_t &list)
+{
+ Handle<Array> array = Array::New(list.length);
+
+ for (int i = 0 ; i < list.length; ++i) {
+ Handle<Value> v = jsonParse(list.values[i]);
+ array->Set(Number::New(i), v);
+ }
+
+ return array;
+}
View
185 src/views/mapreduce/mapreduce.h
@@ -0,0 +1,185 @@
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+/**
+ * This is the public, plain C interface to expose to the outside.
+ **/
+
+#ifndef _MAPREDUCE_H
+#define _MAPREDUCE_H
+
+#include "config.h"
+#include "visibility.h"
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ typedef struct {
+ char *json;
+ int length;
+ } mapreduce_json_t;
+
+ typedef struct {
+ mapreduce_json_t *values;
+ int length;
+ } mapreduce_json_list_t;
+
+ typedef struct {
+ mapreduce_json_t key;
+ mapreduce_json_t value;
+ } mapreduce_kv_t;
+
+ typedef enum {
+ MAPREDUCE_SUCCESS,
+ MAPREDUCE_SYNTAX_ERROR,
+ MAPREDUCE_RUNTIME_ERROR,
+ MAPREDUCE_ALLOC_ERROR,
+ MAPREDUCE_INVALID_ARG,
+ MAPREDUCE_TIMEOUT
+ } mapreduce_error_t;
+
+ typedef struct {
+ mapreduce_kv_t *kvs;
+ int length;
+ } mapreduce_kv_list_t;
+
+ typedef struct {
+ mapreduce_error_t error;
+ union {
+ /* valid if error is MAPREDUCE_SUCCESS */
+ mapreduce_kv_list_t kvs;
+ /* valid if error is other than MAPREDUCE_SUCCESS */
+ char *error_msg;
+ } result;
+ } mapreduce_map_result_t;
+
+ typedef struct {
+ mapreduce_map_result_t *list;
+ int length;
+ } mapreduce_map_result_list_t;
+
+
+
+ /**
+ * If return value other than MAPREDUCE_SUCCESS, error_msg might be
+ * assigned an error message, for which the caller is responsible to
+ * deallocate via mapreduce_free_error_msg().
+ **/
+ LIBMAPREDUCE_API
+ mapreduce_error_t mapreduce_start_map_context(const char *map_functions[],
+ int num_functions,
+ void **context,
+ char **error_msg);
+
+ /**
+ * If return value is MAPREDUCE_SUCCESS, the caller is responsible for
+ * free'ing result output parameter with a call to
+ * mapreduce_free_map_result_list().
+ */
+ LIBMAPREDUCE_API
+ mapreduce_error_t mapreduce_map(void *context,
+ const mapreduce_json_t *doc,
+ const mapreduce_json_t *meta,
+ mapreduce_map_result_list_t **result);
+
+ LIBMAPREDUCE_API
+ void mapreduce_free_json_list(mapreduce_json_list_t *list);
+
+ LIBMAPREDUCE_API
+ void mapreduce_free_json(mapreduce_json_t *value);
+
+ LIBMAPREDUCE_API
+ void mapreduce_free_map_result_list(mapreduce_map_result_list_t *list);
+
+ LIBMAPREDUCE_API
+ void mapreduce_free_error_msg(char *error_msg);
+
+ /**
+ * If return value other than MAPREDUCE_SUCCESS, error_msg might be
+ * assigned an error message, for which the caller is responsible to
+ * deallocate via mapreduce_free_error_msg().
+ **/
+ LIBMAPREDUCE_API
+ mapreduce_error_t mapreduce_start_reduce_context(const char *reduce_functions[],
+ int num_functions,
+ void **context,
+ char **error_msg);
+
+ /**
+ * If return value other than MAPREDUCE_SUCCESS, error_msg might be
+ * assigned an error message, for which the caller is responsible to
+ * deallocate via mapreduce_free_error_msg().
+ *
+ * If return value is MAPREDUCE_SUCCESS, the caller is responsible for
+ * free'ing result output parameter with a call to
+ * mapreduce_free_json_list().
+ */
+ LIBMAPREDUCE_API
+ mapreduce_error_t mapreduce_reduce_all(void *context,
+ const mapreduce_json_list_t *keys,
+ const mapreduce_json_list_t *values,
+ mapreduce_json_list_t **result,
+ char **error_msg);
+
+ /**
+ * If return value other than MAPREDUCE_SUCCESS, error_msg might be
+ * assigned an error message, for which the caller is responsible to
+ * deallocate via mapreduce_free_error_msg().
+ *
+ * If return value is MAPREDUCE_SUCCESS, the caller is responsible for
+ * free'ing result output parameter with a call to
+ * mapreduce_free_json().
+ **/
+ LIBMAPREDUCE_API
+ mapreduce_error_t mapreduce_reduce(void *context,
+ int reduceFunNum,
+ const mapreduce_json_list_t *keys,
+ const mapreduce_json_list_t *values,
+ mapreduce_json_t **result,
+ char **error_msg);
+
+ /**
+ * If return value other than MAPREDUCE_SUCCESS, error_msg might be
+ * assigned an error message, for which the caller is responsible to
+ * deallocate via mapreduce_free_error_msg().
+ *
+ * If return value is MAPREDUCE_SUCCESS, the caller is responsible for
+ * free'ing result output parameter with a call to
+ * mapreduce_free_json().
+ **/
+ LIBMAPREDUCE_API
+ mapreduce_error_t mapreduce_rereduce(void *context,
+ int reduceFunNum,
+ const mapreduce_json_list_t *reductions,
+ mapreduce_json_t **result,
+ char **error_msg);
+
+ LIBMAPREDUCE_API
+ void mapreduce_free_context(void *context);
+
+ LIBMAPREDUCE_API
+ void mapreduce_set_timeout(unsigned int seconds);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
View
440 src/views/mapreduce/mapreduce_c.cc
@@ -0,0 +1,440 @@
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+/**
+ * Implementation of all exported (public) functions, pure C.
+ **/
+
+#include "mapreduce.h"
+#include "mapreduce_internal.h"
+#include <iostream>
+#include <map>
+#include <cstring>
+#include <assert.h>
+#include <pthread.h>
+
+#if defined(WIN32) || defined(_WIN32)
+#include <windows.h>
+#define doSleep(Secs) Sleep(Secs * 1000)
+#else
+#include <unistd.h>
+#define doSleep(Secs) sleep(Secs)
+#endif
+
+static const char *MEM_ALLOC_ERROR_MSG = "memory allocation failure";
+
+static pthread_t terminator_thread;
+static bool terminator_thread_created = false;
+static volatile unsigned int terminator_timeout = 5;
+
+static std::map<uintptr_t, mapreduce_ctx_t *> ctx_registry;
+static pthread_mutex_t ctx_registry_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+static void make_function_list(const char *sources[],
+ int num_sources,
+ std::list<std::string> &list);
+
+static void copy_error_msg(const std::string &msg, char **to);
+
+static void register_ctx(mapreduce_ctx_t *ctx);
+static void unregister_ctx(mapreduce_ctx_t *ctx);
+static void *terminator_loop(void *);
+
+
+LIBMAPREDUCE_API
+mapreduce_error_t mapreduce_start_map_context(const char *map_functions[],
+ int num_functions,
+ void **context,
+ char **error_msg)
+{
+ try {
+ mapreduce_ctx_t *ctx = new mapreduce_ctx_t();
+ std::list<std::string> functions_list;
+
+ make_function_list(map_functions, num_functions, functions_list);
+ initContext(ctx, functions_list);
+ register_ctx(ctx);
+ *context = (void *) ctx;
+ } catch (MapReduceError &e) {
+ copy_error_msg(e.getMsg(), error_msg);
+ return e.getError();
+ } catch (std::bad_alloc &) {
+ copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+ *error_msg = NULL;
+ return MAPREDUCE_SUCCESS;
+}
+
+
+LIBMAPREDUCE_API
+mapreduce_error_t mapreduce_map(void *context,
+ const mapreduce_json_t *doc,
+ const mapreduce_json_t *meta,
+ mapreduce_map_result_list_t **result)
+{
+ mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
+
+ *result = (mapreduce_map_result_list_t *) malloc(sizeof(**result));
+ if (*result == NULL) {
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ int num_funs = ctx->functions->size();
+ size_t sz = sizeof(mapreduce_map_result_t) * num_funs;
+ (*result)->list = (mapreduce_map_result_t *) malloc(sz);
+
+ if ((*result)->list == NULL) {
+ free(*result);
+ *result = NULL;
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ (*result)->length = 0;
+ try {
+ mapDoc(ctx, *doc, *meta, *result);
+ } catch (MapReduceError &e) {
+ mapreduce_free_map_result_list(*result);
+ *result = NULL;
+ return e.getError();
+ } catch (std::bad_alloc &) {
+ mapreduce_free_map_result_list(*result);
+ *result = NULL;
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ assert((*result)->length == num_funs);
+ return MAPREDUCE_SUCCESS;
+}
+
+
+LIBMAPREDUCE_API
+mapreduce_error_t mapreduce_start_reduce_context(const char *reduce_functions[],
+ int num_functions,
+ void **context,
+ char **error_msg)
+{
+ try {
+ mapreduce_ctx_t *ctx = new mapreduce_ctx_t();
+ std::list<std::string> functions_list;
+
+ make_function_list(reduce_functions, num_functions, functions_list);
+ initContext(ctx, functions_list);
+ register_ctx(ctx);
+ *context = (void *) ctx;
+ } catch (MapReduceError &e) {
+ copy_error_msg(e.getMsg(), error_msg);
+ return e.getError();
+ } catch (std::bad_alloc &) {
+ copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ *error_msg = NULL;
+ return MAPREDUCE_SUCCESS;
+}
+
+
+LIBMAPREDUCE_API
+mapreduce_error_t mapreduce_reduce_all(void *context,
+ const mapreduce_json_list_t *keys,
+ const mapreduce_json_list_t *values,
+ mapreduce_json_list_t **result,
+ char **error_msg)
+{
+ mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
+
+ try {
+ json_results_list_t list = runReduce(ctx, *keys, *values);
+ size_t sz = list.size();
+ json_results_list_t::iterator it = list.begin();
+
+ assert(sz == ctx->functions->size());
+
+ *result = (mapreduce_json_list_t *) malloc(sizeof(**result));
+ if (*result == NULL) {
+ for ( ; it != list.end(); ++it) {
+ free((*it).json);
+ }
+ throw std::bad_alloc();
+ }
+
+ (*result)->length = sz;
+ (*result)->values = (mapreduce_json_t *) malloc(sizeof(mapreduce_json_t) * sz);
+ if ((*result)->values == NULL) {
+ free(*result);
+ for ( ; it != list.end(); ++it) {
+ free((*it).json);
+ }
+ throw std::bad_alloc();
+ }
+ for (int i = 0; it != list.end(); ++it, ++i) {
+ (*result)->values[i] = *it;
+ }
+ } catch (MapReduceError &e) {
+ copy_error_msg(e.getMsg(), error_msg);
+ *result = NULL;
+ return e.getError();
+ } catch (std::bad_alloc &) {
+ copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
+ *result = NULL;
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ *error_msg = NULL;
+ return MAPREDUCE_SUCCESS;
+}
+
+
+LIBMAPREDUCE_API
+mapreduce_error_t mapreduce_reduce(void *context,
+ int reduceFunNum,
+ const mapreduce_json_list_t *keys,
+ const mapreduce_json_list_t *values,
+ mapreduce_json_t **result,
+ char **error_msg)
+{
+ mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
+
+ try {
+ mapreduce_json_t red = runReduce(ctx, reduceFunNum, *keys, *values);
+
+ *result = (mapreduce_json_t *) malloc(sizeof(**result));
+ if (*result == NULL) {
+ free(red.json);
+ throw std::bad_alloc();
+ }
+ **result = red;
+ } catch (MapReduceError &e) {
+ copy_error_msg(e.getMsg(), error_msg);
+ *result = NULL;
+ return e.getError();
+ } catch (std::bad_alloc &) {
+ copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
+ *result = NULL;
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ *error_msg = NULL;
+ return MAPREDUCE_SUCCESS;
+}
+
+
+LIBMAPREDUCE_API
+mapreduce_error_t mapreduce_rereduce(void *context,
+ int reduceFunNum,
+ const mapreduce_json_list_t *reductions,
+ mapreduce_json_t **result,
+ char **error_msg)
+{
+ mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
+
+ try {
+ mapreduce_json_t red = runRereduce(ctx, reduceFunNum, *reductions);
+
+ *result = (mapreduce_json_t *) malloc(sizeof(**result));
+ if (*result == NULL) {
+ free(red.json);
+ throw std::bad_alloc();
+ }
+ **result = red;
+ } catch (MapReduceError &e) {
+ copy_error_msg(e.getMsg(), error_msg);
+ *result = NULL;
+ return e.getError();
+ } catch (std::bad_alloc &) {
+ copy_error_msg(MEM_ALLOC_ERROR_MSG, error_msg);
+ *result = NULL;
+ return MAPREDUCE_ALLOC_ERROR;
+ }
+
+ *error_msg = NULL;
+ return MAPREDUCE_SUCCESS;
+}
+
+
+LIBMAPREDUCE_API
+void mapreduce_free_context(void *context)
+{
+ if (context != NULL) {
+ mapreduce_ctx_t *ctx = (mapreduce_ctx_t *) context;
+
+ unregister_ctx(ctx);
+ destroyContext(ctx);
+ }
+}
+
+
+LIBMAPREDUCE_API
+void mapreduce_free_json(mapreduce_json_t *value)
+{
+ if (value != NULL) {
+ free(value->json);
+ free(value);
+ }
+}
+
+
+LIBMAPREDUCE_API
+void mapreduce_free_json_list(mapreduce_json_list_t *list)
+{
+ if (list != NULL) {
+ for (int i = 0; i < list->length; ++i) {
+ free(list->values[i].json);
+ }
+ free(list->values);
+ free(list);
+ }
+}
+
+
+LIBMAPREDUCE_API
+void mapreduce_free_map_result_list(mapreduce_map_result_list_t *list)
+{
+ if (list == NULL) {
+ return;
+ }
+
+ for (int i = 0; i < list->length; ++i) {
+ mapreduce_map_result_t mr = list->list[i];
+
+ switch (mr.error) {
+ case MAPREDUCE_SUCCESS:
+ {
+ mapreduce_kv_list_t kvs = mr.result.kvs;
+
+ for (int j = 0; j < kvs.length; ++j) {
+ mapreduce_kv_t kv = kvs.kvs[j];
+ free(kv.key.json);
+ free(kv.value.json);
+ }
+ free(kvs.kvs);
+ }
+ break;
+ default:
+ free(mr.result.error_msg);
+ break;
+ }
+ }
+
+ free(list->list);
+ free(list);
+}
+
+
+LIBMAPREDUCE_API
+void mapreduce_free_error_msg(char *error_msg)
+{
+ free(error_msg);
+}
+
+
+LIBMAPREDUCE_API
+void mapreduce_set_timeout(unsigned int seconds)
+{
+ terminator_timeout = seconds;
+}
+
+
+static void make_function_list(const char *sources[],
+ int num_sources,
+ std::list<std::string> &list)
+{
+ for (int i = 0; i < num_sources; ++i) {
+ std::string source;
+ size_t len = strlen(sources[i]);
+
+ source.reserve(1 + len + 1);
+ source += '(';
+ source.append(sources[i], len);
+ source += ')';
+
+ list.push_back(source);
+ }
+}
+
+
+static void copy_error_msg(const std::string &msg, char **to)
+{
+ if (to != NULL) {
+ size_t len = msg.length();
+
+ *to = (char *) malloc(len + 1);
+ if (*to != NULL) {
+ msg.copy(*to, len);
+ (*to)[len] = '\0';
+ }
+ }
+}
+
+
+static void register_ctx(mapreduce_ctx_t *ctx)
+{
+ uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
+
+ pthread_mutex_lock(&ctx_registry_mutex);
+
+ if (!terminator_thread_created) {
+ int ret = pthread_create(&terminator_thread, NULL, terminator_loop, NULL);
+ if (ret != 0) {
+ std::cerr << "Error creating terminator thread: " << ret << std::endl;
+ exit(1);
+ }
+ terminator_thread_created = true;
+ }
+
+ ctx_registry[key] = ctx;
+ pthread_mutex_unlock(&ctx_registry_mutex);
+}
+
+
+static void unregister_ctx(mapreduce_ctx_t *ctx)
+{
+ uintptr_t key = reinterpret_cast<uintptr_t>(ctx);
+
+ pthread_mutex_lock(&ctx_registry_mutex);
+ ctx_registry.erase(key);
+ pthread_mutex_unlock(&ctx_registry_mutex);
+}
+
+
+static void *terminator_loop(void *)
+{
+ std::map<uintptr_t, mapreduce_ctx_t *>::iterator it;
+ time_t now;
+
+ while (true) {
+ pthread_mutex_lock(&ctx_registry_mutex);
+ now = time(NULL);
+ for (it = ctx_registry.begin(); it != ctx_registry.end(); ++it) {
+ mapreduce_ctx_t *ctx = (*it).second;
+
+ if (ctx->taskStartTime >= 0) {
+ if (ctx->taskStartTime + terminator_timeout < now) {
+ terminateTask(ctx);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&ctx_registry_mutex);
+ doSleep(terminator_timeout);
+ }
+
+ return NULL;
+}
View
103 src/views/mapreduce/mapreduce_internal.h
@@ -0,0 +1,103 @@
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+/**
+ * This is a private header, do not include it in other applications/lirbaries.
+ **/
+
+#ifndef _MAPREDUCE_INTERNAL_H
+#define _MAPREDUCE_INTERNAL_H
+
+#include "mapreduce.h"
+#include <iostream>
+#include <string>
+#include <list>
+#include <vector>
+#include <stdlib.h>
+#include <stdint.h>
+#include <time.h>
+#include <v8.h>
+
+
+class MapReduceError;
+
+typedef std::list<mapreduce_json_t> json_results_list_t;
+typedef std::list<mapreduce_kv_t> kv_list_int_t;
+typedef std::vector< v8::Persistent<v8::Function> > function_vector_t;
+
+typedef struct {
+ v8::Persistent<v8::Context> jsContext;
+ v8::Isolate *isolate;
+ function_vector_t *functions;
+ kv_list_int_t *kvs;
+ volatile time_t taskStartTime;
+} mapreduce_ctx_t;
+
+
+void initContext(mapreduce_ctx_t *ctx,
+ const std::list<std::string> &function_sources);
+
+void destroyContext(mapreduce_ctx_t *ctx);
+
+void mapDoc(mapreduce_ctx_t *ctx,
+ const mapreduce_json_t &doc,
+ const mapreduce_json_t &meta,
+ mapreduce_map_result_list_t *result);
+
+json_results_list_t runReduce(mapreduce_ctx_t *ctx,
+ const mapreduce_json_list_t &keys,
+ const mapreduce_json_list_t &values);
+
+mapreduce_json_t runReduce(mapreduce_ctx_t *ctx,
+ int reduceFunNum,
+ const mapreduce_json_list_t &keys,
+ const mapreduce_json_list_t &values);
+
+mapreduce_json_t runRereduce(mapreduce_ctx_t *ctx,
+ int reduceFunNum,
+ const mapreduce_json_list_t &reductions);
+
+void terminateTask(mapreduce_ctx_t *ctx);
+
+
+
+class MapReduceError {
+public:
+ MapReduceError(const mapreduce_error_t error, const char *msg)
+ : _error(error), _msg(msg) {
+ }
+
+ MapReduceError(const mapreduce_error_t error, const std::string &msg)
+ : _error(error), _msg(msg) {
+ }
+
+ mapreduce_error_t getError() const {
+ return _error;
+ }
+
+ const std::string& getMsg() const {
+ return _msg;
+ }
+
+private:
+ const mapreduce_error_t _error;
+ const std::string _msg;
+};
+
+
+#endif
View
27 src/views/mapreduce/visibility.h
@@ -0,0 +1,27 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+#ifndef _MAPREDUCE_VISIBILITY_H
+#define _MAPREDUCE_VISIBILITY_H
+
+#if defined(LIBMAPREDUCE_INTERNAL)
+
+#ifdef __SUNPRO_C
+#define LIBMAPREDUCE_API __global
+#elif defined(HAVE_VISIBILITY) && HAVE_VISIBILITY
+#define LIBMAPREDUCE_API __attribute__ ((visibility("default")))
+#elif defined(_MSC_VER)
+#define LIBMAPREDUCE_API extern __declspec(dllexport)
+#else
+#define LIBMAPREDUCE_API
+#endif
+
+#else
+
+#ifdef _MSC_VER
+#define LIBMAPREDUCE_API extern __declspec(dllimport)
+#else
+#define LIBMAPREDUCE_API
+#endif
+
+#endif
+
+#endif
View
167 tests/mapreduce/builtin.c
@@ -0,0 +1,167 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+#include "mapreduce_tests.h"
+#include <string.h>
+
+
+#define DOC_BODY "{" \
+ " \"values\": [10, -7, 20, 1]," \
+ " \"bin\": \"aGVsbG8gd29ybGQh\"," \
+ " \"date\":\"+033658-09-27T01:46:40.000Z\"" \
+ "}"
+
+static const mapreduce_json_t doc = {
+ .json = DOC_BODY,
+ .length = sizeof(DOC_BODY) - 1
+};
+static const mapreduce_json_t meta = {
+ .json = "{\"id\":\"doc1\"}",
+ .length = sizeof("{\"id\":\"doc1\"}") - 1
+};
+
+static void test_sum_function();
+static void test_b64decode_function();
+static void test_date_to_array_function();
+
+
+void builtin_tests()
+{
+ TPRINT("Running mapreduce builtin tests\n");
+
+ for (int i = 0; i < 100; ++i) {
+ test_sum_function();
+ test_b64decode_function();
+ test_date_to_array_function();
+ }
+}
+
+
+static void test_sum_function()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { emit(meta.id, sum(doc.values)); }"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc, &meta, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("24") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "24",
+ (sizeof("24") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
+
+
+static void test_b64decode_function()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) {"
+ " emit(meta.id, String.fromCharCode.apply(this, decodeBase64(doc.bin)));"
+ "}"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc, &meta, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("\"hello world!\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "\"hello world!\"",
+ (sizeof("\"hello world!\"") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
+
+
+static void test_date_to_array_function()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { emit(meta.id, dateToArray(doc.date)); }"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc, &meta, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("[33658,9,27,1,46,40]") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "[33658,9,27,1,46,40]",
+ (sizeof("[33658,9,27,1,46,40]") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
View
511 tests/mapreduce/map.c
@@ -0,0 +1,511 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+#include "mapreduce_tests.h"
+#include <string.h>
+
+
+static const mapreduce_json_t doc1 = {
+ .json = "{\"value\": 1}",
+ .length = sizeof("{\"value\": 1}") - 1
+};
+static const mapreduce_json_t meta1 = {
+ .json = "{\"id\":\"doc1\"}",
+ .length = sizeof("{\"id\":\"doc1\"}") - 1
+};
+
+static const mapreduce_json_t doc2 = {
+ .json = "{\"value\": 2}",
+ .length = sizeof("{\"value\": 2}") - 1
+};
+static const mapreduce_json_t meta2 = {
+ .json = "{\"id\":\"doc2\"}",
+ .length = sizeof("{\"id\":\"doc2\"}") - 1
+};
+
+static const mapreduce_json_t doc3 = {
+ .json = "{\"value\": 3}",
+ .length = sizeof("{\"value\": 3}") - 1
+};
+static const mapreduce_json_t meta3 = {
+ .json = "{\"id\":\"doc3\"}",
+ .length = sizeof("{\"id\":\"doc3\"}") - 1
+};
+
+static void test_bad_syntax_functions();
+static void test_runtime_exception();
+static void test_runtime_error();
+static void test_map_no_emit();
+static void test_map_single_emit();
+static void test_map_multiple_emits();
+static void test_timeout();
+
+
+void map_tests()
+{
+ TPRINT("Running map tests\n");
+
+ mapreduce_set_timeout(1);
+ test_timeout();
+
+ for (int i = 0; i < 100; ++i) {
+ test_bad_syntax_functions();
+ test_runtime_exception();
+ test_runtime_error();
+ test_map_no_emit();
+ test_map_single_emit();
+ test_map_multiple_emits();
+ }
+
+ test_timeout();
+}
+
+
+static void test_bad_syntax_functions()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { emit(meta.id, null); }",
+ "function(doc, meta { emit(doc.field, meta.id); }"
+ };
+
+ ret = mapreduce_start_map_context(functions, 2, &context, &error_msg);
+ assert(ret == MAPREDUCE_SYNTAX_ERROR);
+ assert(error_msg != NULL);
+ assert(strlen(error_msg) > 0);
+ assert(context == NULL);
+
+ mapreduce_free_error_msg(error_msg);
+}
+
+
+static void test_runtime_exception()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { throw('foobar'); }"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc1, &meta1, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_RUNTIME_ERROR);
+ assert(result->list[0].result.error_msg != NULL);
+ assert(strcmp("foobar", result->list[0].result.error_msg) == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
+
+
+static void test_runtime_error()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { emit(doc.foo.bar, meta.id); }"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc1, &meta1, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_RUNTIME_ERROR);
+ assert(result->list[0].result.error_msg != NULL);
+ assert(strcmp("TypeError: Cannot read property 'bar' of undefined",
+ result->list[0].result.error_msg) == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
+
+
+static void test_map_no_emit()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { }",
+ "function(doc, meta) { if (doc.value > 12345) { emit(meta.id, null); } }"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 2, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc1, &meta1, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 2);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 0);
+
+ assert(result->list[1].error == MAPREDUCE_SUCCESS);
+ assert(result->list[1].result.kvs.length == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
+
+
+static void test_map_single_emit()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) { emit(meta.id, doc.value); }",
+ "function(doc, meta) { emit(doc.value, meta.id); }"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 2, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc1, &meta1, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 2);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("1") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "1",
+ (sizeof("1") - 1)) == 0);
+
+ assert(result->list[1].error == MAPREDUCE_SUCCESS);
+ assert(result->list[1].result.kvs.length == 1);
+ assert(result->list[1].result.kvs.kvs[0].key.length == (sizeof("1") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].key.json,
+ "1",
+ (sizeof("1") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[0].value.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].value.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+
+ ret = mapreduce_map(context, &doc2, &meta2, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 2);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc2\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc2\"",
+ (sizeof("\"doc2\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("2") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "2",
+ (sizeof("2") - 1)) == 0);
+
+ assert(result->list[1].error == MAPREDUCE_SUCCESS);
+ assert(result->list[1].result.kvs.length == 1);
+ assert(result->list[1].result.kvs.kvs[0].key.length == (sizeof("2") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].key.json,
+ "2",
+ (sizeof("2") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[0].value.length == (sizeof("\"doc2\"") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].value.json,
+ "\"doc2\"",
+ (sizeof("\"doc2\"") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+
+ mapreduce_free_context(context);
+}
+
+
+static void test_map_multiple_emits()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) {\n"
+ " if (doc.value != 1) { throw('foobar'); } else { emit(meta.id, doc.value); }\n"
+ "}\n",
+ "function(doc, meta) {\n"
+ " emit(doc.value, meta.id);\n"
+ " emit([meta.id, doc.value], null);\n"
+ " emit(doc.value * 5, -doc.value);\n"
+ "}\n",
+ "function(doc, meta) {\n"
+ " if (doc.value != 3) { emit(doc.value, 0); } else { emit(meta.id, doc.value.f.z); }\n"
+ "}\n"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 3, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ /* map doc1 */
+ ret = mapreduce_map(context, &doc1, &meta1, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 3);
+ assert(result->list != NULL);
+
+ /* function 1 */
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("1") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "1",
+ (sizeof("1") - 1)) == 0);
+
+ /* function 2 */
+ assert(result->list[1].error == MAPREDUCE_SUCCESS);
+ assert(result->list[1].result.kvs.length == 3);
+ assert(result->list[1].result.kvs.kvs[0].key.length == (sizeof("1") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].key.json,
+ "1",
+ (sizeof("1") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[0].value.length == (sizeof("\"doc1\"") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].value.json,
+ "\"doc1\"",
+ (sizeof("\"doc1\"") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[1].key.length == (sizeof("[\"doc1\",1]") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[1].key.json,
+ "[\"doc1\",1]",
+ (sizeof("[\"doc1\",1]") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[1].value.length == (sizeof("null") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[1].value.json,
+ "null",
+ (sizeof("null") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[2].key.length == (sizeof("5") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[2].key.json,
+ "5",
+ (sizeof("5") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[2].value.length == (sizeof("-1") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[2].value.json,
+ "-1",
+ (sizeof("-1") - 1)) == 0);
+
+ /* function 3 */
+ assert(result->list[2].error == MAPREDUCE_SUCCESS);
+ assert(result->list[2].result.kvs.length == 1);
+ assert(result->list[2].result.kvs.kvs[0].key.length == (sizeof("1") - 1));
+ assert(memcmp(result->list[2].result.kvs.kvs[0].key.json,
+ "1",
+ (sizeof("1") - 1)) == 0);
+ assert(result->list[2].result.kvs.kvs[0].value.length == (sizeof("0") - 1));
+ assert(memcmp(result->list[2].result.kvs.kvs[0].value.json,
+ "0",
+ (sizeof("0") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+
+ /* map doc2 */
+ ret = mapreduce_map(context, &doc2, &meta2, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 3);
+ assert(result->list != NULL);
+
+ /* function 1 */
+ assert(result->list[0].error == MAPREDUCE_RUNTIME_ERROR);
+ assert(result->list[0].result.error_msg != NULL);
+ assert(strcmp("foobar", result->list[0].result.error_msg) == 0);
+
+ /* function 2 */
+ assert(result->list[1].error == MAPREDUCE_SUCCESS);
+ assert(result->list[1].result.kvs.length == 3);
+ assert(result->list[1].result.kvs.kvs[0].key.length == (sizeof("2") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].key.json,
+ "2",
+ (sizeof("2") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[0].value.length == (sizeof("\"doc2\"") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].value.json,
+ "\"doc2\"",
+ (sizeof("\"doc2\"") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[1].key.length == (sizeof("[\"doc2\",2]") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[1].key.json,
+ "[\"doc2\",2]",
+ (sizeof("[\"doc2\",2]") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[1].value.length == (sizeof("null") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[1].value.json,
+ "null",
+ (sizeof("null") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[2].key.length == (sizeof("10") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[2].key.json,
+ "10",
+ (sizeof("10") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[2].value.length == (sizeof("-2") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[2].value.json,
+ "-2",
+ (sizeof("-2") - 1)) == 0);
+
+ /* function 3 */
+ assert(result->list[2].error == MAPREDUCE_SUCCESS);
+ assert(result->list[2].result.kvs.length == 1);
+ assert(result->list[2].result.kvs.kvs[0].key.length == (sizeof("2") - 1));
+ assert(memcmp(result->list[2].result.kvs.kvs[0].key.json,
+ "2",
+ (sizeof("2") - 1)) == 0);
+ assert(result->list[2].result.kvs.kvs[0].value.length == (sizeof("0") - 1));
+ assert(memcmp(result->list[2].result.kvs.kvs[0].value.json,
+ "0",
+ (sizeof("0") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+
+ /* map doc3 */
+ ret = mapreduce_map(context, &doc3, &meta3, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 3);
+ assert(result->list != NULL);
+
+ /* function 1 */
+ assert(result->list[0].error == MAPREDUCE_RUNTIME_ERROR);
+ assert(result->list[0].result.error_msg != NULL);
+ assert(strcmp("foobar", result->list[0].result.error_msg) == 0);
+
+ /* function 2 */
+ assert(result->list[1].error == MAPREDUCE_SUCCESS);
+ assert(result->list[1].result.kvs.length == 3);
+ assert(result->list[1].result.kvs.kvs[0].key.length == (sizeof("3") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].key.json,
+ "3",
+ (sizeof("3") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[0].value.length == (sizeof("\"doc3\"") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[0].value.json,
+ "\"doc3\"",
+ (sizeof("\"doc3\"") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[1].key.length == (sizeof("[\"doc3\",3]") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[1].key.json,
+ "[\"doc3\",3]",
+ (sizeof("[\"doc3\",3]") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[1].value.length == (sizeof("null") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[1].value.json,
+ "null",
+ (sizeof("null") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[2].key.length == (sizeof("15") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[2].key.json,
+ "15",
+ (sizeof("15") - 1)) == 0);
+ assert(result->list[1].result.kvs.kvs[2].value.length == (sizeof("-3") - 1));
+ assert(memcmp(result->list[1].result.kvs.kvs[2].value.json,
+ "-3",
+ (sizeof("-3") - 1)) == 0);
+
+ /* function 3 */
+ assert(result->list[2].error == MAPREDUCE_RUNTIME_ERROR);
+ assert(result->list[2].result.error_msg != NULL);
+ assert(strcmp("TypeError: Cannot read property 'z' of undefined",
+ result->list[2].result.error_msg) == 0);
+
+ mapreduce_free_map_result_list(result);
+
+ mapreduce_free_context(context);
+}
+
+
+static void test_timeout()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(doc, meta) {"
+ " if (doc.value === 1) {"
+ " while (true) { };"
+ " } else {"
+ " emit(meta.id, doc.value);"
+ " }"
+ "}"
+ };
+ mapreduce_map_result_list_t *result = NULL;
+
+ ret = mapreduce_start_map_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ ret = mapreduce_map(context, &doc1, &meta1, &result);
+ assert(ret == MAPREDUCE_TIMEOUT);
+ assert(result == NULL);
+
+ ret = mapreduce_map(context, &doc2, &meta2, &result);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->list != NULL);
+
+ assert(result->list[0].error == MAPREDUCE_SUCCESS);
+ assert(result->list[0].result.kvs.length == 1);
+ assert(result->list[0].result.kvs.kvs[0].key.length == (sizeof("\"doc2\"") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].key.json,
+ "\"doc2\"",
+ (sizeof("\"doc2\"") - 1)) == 0);
+ assert(result->list[0].result.kvs.kvs[0].value.length == (sizeof("2") - 1));
+ assert(memcmp(result->list[0].result.kvs.kvs[0].value.json,
+ "2",
+ (sizeof("2") - 1)) == 0);
+
+ mapreduce_free_map_result_list(result);
+ mapreduce_free_context(context);
+}
View
39 tests/mapreduce/mapreduce_tests.h
@@ -0,0 +1,39 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+#ifndef _MAPREDUCE_TESTS_H
+#define _MAPREDUCE_TESTS_H
+
+#include "config.h"
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include "../macros.h"
+#include "../../src/views/mapreduce/mapreduce.h"
+
+#define TPRINT(...) fprintf(stderr, __VA_ARGS__)
+
+/* main test function */
+void mapreduce_tests();
+void map_tests();
+void reduce_tests();
+void builtin_tests();
+
+#endif
View
424 tests/mapreduce/reduce.c
@@ -0,0 +1,424 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+/**
+ * @copyright 2013 Couchbase, Inc.
+ *
+ * @author Filipe Manana <filipe@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+#include "mapreduce_tests.h"
+#include <string.h>
+
+
+static const mapreduce_json_t key1 = {
+ .json = "[1,1]",
+ .length = sizeof("[1,1]") - 1
+};
+static const mapreduce_json_t value1 = {
+ .json = "11",
+ .length = sizeof("11") - 1
+};
+
+static const mapreduce_json_t key2 = {
+ .json = "[2,2]",
+ .length = sizeof("[2,2]") - 1
+};
+static const mapreduce_json_t value2 = {
+ .json = "22",
+ .length = sizeof("22") - 1
+};
+
+static const mapreduce_json_t key3 = {
+ .json = "[3,3]",
+ .length = sizeof("[3,3]") - 1
+};
+static const mapreduce_json_t value3 = {
+ .json = "33",
+ .length = sizeof("33") - 1
+};
+
+static const mapreduce_json_t key4 = {
+ .json = "[4,4]",
+ .length = sizeof("[4,4]") - 1
+};
+static const mapreduce_json_t value4 = {
+ .json = "44",
+ .length = sizeof("44") - 1
+};
+
+
+static mapreduce_json_list_t *all_keys();
+static mapreduce_json_list_t *all_values();
+static void free_json_list(mapreduce_json_list_t *list);
+
+static void test_bad_syntax_functions();
+static void test_runtime_exception();
+static void test_runtime_error();
+static void test_reduce_emits();
+static void test_reduce_and_rereduce_success();
+static void test_timeout();
+
+
+void reduce_tests()
+{
+ TPRINT("Running reduce tests\n");
+
+ mapreduce_set_timeout(1);
+ test_timeout();
+
+ for (int i = 0; i < 100; ++i) {
+ test_bad_syntax_functions();
+ test_runtime_exception();
+ test_runtime_error();
+ test_reduce_emits();
+ test_reduce_and_rereduce_success();
+ }
+
+ test_timeout();
+}
+
+
+static void test_bad_syntax_functions()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(key, values, rereduce) { return values.length; }",
+ "function(key, values, rereduce) { return values.length * 2;"
+ };
+
+ ret = mapreduce_start_reduce_context(functions, 2, &context, &error_msg);
+ assert(ret == MAPREDUCE_SYNTAX_ERROR);
+ assert(error_msg != NULL);
+ assert(strlen(error_msg) > 0);
+ assert(context == NULL);
+
+ mapreduce_free_error_msg(error_msg);
+}
+
+
+static void test_runtime_exception()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(key, values, rereduce) { throw('foobar'); }"
+ };
+ mapreduce_json_list_t *result = NULL;
+
+ ret = mapreduce_start_reduce_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ mapreduce_json_list_t *keys = all_keys();
+ mapreduce_json_list_t *values = all_values();
+
+ ret = mapreduce_reduce_all(context, keys, values, &result, &error_msg);
+ assert(ret == MAPREDUCE_RUNTIME_ERROR);
+ assert(result == NULL);
+ assert(error_msg != NULL);
+
+ mapreduce_free_error_msg(error_msg);
+ mapreduce_free_context(context);
+ free_json_list(keys);
+ free_json_list(values);
+}
+
+
+static void test_runtime_error()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(key, values, rereduce) { return sum(values); }",
+ "function(key, values, rereduce) { return values[0].foo.bar; }"
+ };
+ mapreduce_json_list_t *result = NULL;
+
+ ret = mapreduce_start_reduce_context(functions, 2, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ mapreduce_json_list_t *keys = all_keys();
+ mapreduce_json_list_t *values = all_values();
+
+ /* reduce all */
+ ret = mapreduce_reduce_all(context, keys, values, &result, &error_msg);
+ assert(ret == MAPREDUCE_RUNTIME_ERROR);
+ assert(result == NULL);
+ assert(error_msg != NULL);
+ assert(strcmp("TypeError: Cannot read property 'bar' of undefined",
+ error_msg) == 0);
+
+ mapreduce_free_error_msg(error_msg);
+ error_msg = NULL;
+
+ mapreduce_json_t *reduction = NULL;
+
+ /* reduce single function (2nd) */
+
+ ret = mapreduce_reduce(context, 2, keys, values, &reduction, &error_msg);
+ assert(ret == MAPREDUCE_RUNTIME_ERROR);
+ assert(reduction == NULL);
+ assert(error_msg != NULL);
+ assert(strcmp("TypeError: Cannot read property 'bar' of undefined",
+ error_msg) == 0);
+
+ mapreduce_free_error_msg(error_msg);
+
+ /* reduce single function (1st), should succeed */
+
+ ret = mapreduce_reduce(context, 1, keys, values, &reduction, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(reduction != NULL);
+ assert(error_msg == NULL);
+ assert(reduction->length == (sizeof("110") - 1));
+ assert(strncmp(reduction->json, "110", sizeof("110") - 1) == 0);
+
+ mapreduce_free_json(reduction);
+
+ mapreduce_free_context(context);
+ free_json_list(keys);
+ free_json_list(values);
+}
+
+
+static void test_reduce_emits()
+{
+ void *context = NULL;
+ char *error_msg = NULL;
+ mapreduce_error_t ret;
+ const char *functions[] = {
+ "function(key, values, rereduce) { emit(key, values); return sum(values); }"
+ };
+ mapreduce_json_list_t *result = NULL;
+
+ ret = mapreduce_start_reduce_context(functions, 1, &context, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(context != NULL);
+
+ mapreduce_json_list_t *keys = all_keys();
+ mapreduce_json_list_t *values = all_values();
+
+ ret = mapreduce_reduce_all(context, keys, values, &result, &error_msg);
+ assert(ret == MAPREDUCE_SUCCESS);
+ assert(error_msg == NULL);
+ assert(result != NULL);
+ assert(result->length == 1);
+ assert(result->values[0].length == (sizeof("110") - 1));
+ assert(strncmp("110", result->values[0].json, sizeof("110") - 1) == 0);