Permalink
Browse files

MB-7629 Add native viewgroup compactor

Implementation of native view index compactor.
The native program receives viewgroup header info through stdin
and it creates a new index file and recreates all view btrees
and id btree. The resulting index header is written out to stdout.

Change-Id: I0c9c9ff33d3e005f22cc848c85d708b158d806e4
Reviewed-on: http://review.couchbase.org/31666
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information...
1 parent a48827f commit cc98f867807a5402e8b9acc117a51c7c02c984bd @t3rm1n4l t3rm1n4l committed with fdmanana Jan 7, 2014
Showing with 492 additions and 0 deletions.
  1. +1 −0 .gitignore
  2. +4 −0 CMakeLists.txt
  3. +143 −0 src/views/bin/couch_view_group_compactor.c
  4. +326 −0 src/views/view_group.c
  5. +18 −0 src/views/view_group.h
View
1 .gitignore
@@ -81,3 +81,4 @@ xcuserdata/
/couch_view_group_cleanup
/couch_view_index_updater
/purge.couch
+/couch_view_group_compactor
View
4 CMakeLists.txt
@@ -94,6 +94,9 @@ TARGET_LINK_LIBRARIES(couch_view_group_cleanup couchstore)
ADD_EXECUTABLE(couch_view_index_updater src/views/bin/couch_view_index_updater.c)
TARGET_LINK_LIBRARIES(couch_view_index_updater couchstore)
+ADD_EXECUTABLE(couch_view_group_compactor src/views/bin/couch_view_group_compactor.c)
+TARGET_LINK_LIBRARIES(couch_view_group_compactor couchstore)
+
IF (INSTALL_HEADER_FILES)
INSTALL(FILES
include/libcouchstore/couch_db.h
@@ -118,6 +121,7 @@ INSTALL(TARGETS couch_dbdump
couch_view_index_builder
couch_view_group_cleanup
couch_view_index_updater
+ couch_view_group_compactor
RUNTIME DESTINATION bin)
CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.cmake.h.in
View
143 src/views/bin/couch_view_group_compactor.c
@@ -0,0 +1,143 @@
+/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
+
+/**
+ * @copyright 2014 Couchbase, Inc.
+ *
+ * @author Sarath Lakshman <sarath@couchbase.com>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ **/
+
+#include "config.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "../view_group.h"
+#include "../util.h"
+
+#define BUF_SIZE 8192
+
+static void die_on_exit_msg(void *args)
+{
+ char buf[4];
+ (void) args;
+
+ if (fread(buf, 1, 4, stdin) == 4 && !strncmp(buf, "exit", 4)) {
+ exit(1);
+ }
+}
+
+int main(int argc, char *argv[])
+{
+ view_group_info_t *group_info = NULL;
+ char buf[BUF_SIZE];
+ char *target_file = NULL;
+ size_t len;
+ int ret = COUCHSTORE_SUCCESS;
+ sized_buf header_buf = {NULL, 0};
+ sized_buf header_outbuf = {NULL, 0};
+ uint64_t inserted = 0;
+ view_error_t error_info;
+ cb_thread_t exit_thread;
+
+ (void) argc;
+ (void) argv;
+
+ /* Read target filepath */
+ if (couchstore_read_line(stdin, buf, BUF_SIZE) != buf) {
+ fprintf(stderr, "Error reading compaction target filepath\n");
+ ret = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
+ goto out;
+ }
+
+ len = strlen(buf);
+ target_file = (char *) malloc(len + 1);
+ if (target_file == NULL) {
+ fprintf(stderr, "Memory allocation failure\n");
+ ret = COUCHSTORE_ERROR_ALLOC_FAIL;
+ goto out;
+ }
+
+ memcpy(target_file, buf, len);
+ target_file[len] = '\0';
+
+ group_info = couchstore_read_view_group_info(stdin, stderr);
+ if (group_info == NULL) {
+ ret = COUCHSTORE_ERROR_ALLOC_FAIL;
+ goto out;
+ }
+
+ /* Read group header bin */
+ if (fscanf(stdin, "%lu\n", &header_buf.size) != 1) {
+ fprintf(stderr, "Error reading viewgroup header size\n");
+ ret = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
+ goto out;
+ }
+
+ header_buf.buf = malloc(header_buf.size);
+ if (header_buf.buf == NULL) {
+ fprintf(stderr, "Memory allocation failure\n");
+ ret = COUCHSTORE_ERROR_ALLOC_FAIL;
+ goto out;
+ }
+
+ if (fread(header_buf.buf, header_buf.size, 1, stdin) != 1) {
+ fprintf(stderr,
+ "Error reading viewgroup header from stdin\n");
+ ret = COUCHSTORE_ERROR_INVALID_ARGUMENTS;
+ goto out;
+ }
+
+ /* Start a watcher thread to gracefully die on exit message */
+ ret = cb_create_thread(&exit_thread, die_on_exit_msg, NULL, 1);
+ if (ret < 0) {
+ fprintf(stderr, "Error starting stdin exit listener thread\n");
+ /* For differentiating from couchstore_error_t */
+ ret = -ret;
+ goto out;
+ }
+
+ ret = couchstore_compact_view_group(group_info,
+ target_file,
+ &header_buf,
+ &inserted,
+ &header_outbuf,
+ &error_info);
+
+ if (ret != COUCHSTORE_SUCCESS) {
+ if (error_info.error_msg != NULL && error_info.view_name != NULL) {
+ fprintf(stderr,
+ "Error compacting index for view `%s`, reason: %s\n",
+ error_info.view_name,
+ error_info.error_msg);
+ }
+ goto out;
+ }
+
+ fprintf(stdout, "Header Len : %lu\n", header_outbuf.size);
+ fwrite(header_outbuf.buf, header_outbuf.size, 1, stdout);
+ fprintf(stdout, "\n");
+
+ fprintf(stdout, "Results = inserts : %"PRIu64"\n", inserted);
+
+out:
+ couchstore_free_view_group_info(group_info);
+ free((void *) error_info.error_msg);
+ free((void *) error_info.view_name);
+ free((void *) header_buf.buf);
+ free((void *) header_outbuf.buf);
+ free(target_file);
+
+ return (ret < 0) ? (100 + ret) : ret;
+}
+
View
326 src/views/view_group.c
@@ -99,6 +99,34 @@ static couchstore_error_t update_view_btree(const char *source_file,
node_pointer **out_root,
view_error_t *error_info);
+static couchstore_error_t compact_view_fetchcb(couchfile_lookup_request *rq,
+ const sized_buf *k,
+ const sized_buf *v);
+
+static couchstore_error_t compact_btree(tree_file *source,
+ tree_file *target,
+ const node_pointer *root,
+ compare_info *cmp,
+ reduce_fn reduce_fun,
+ reduce_fn rereduce_fun,
+ view_reducer_ctx_t *red_ctx,
+ uint64_t *inserted,
+ node_pointer **out_root);
+
+static couchstore_error_t compact_id_btree(tree_file *source,
+ tree_file *target,
+ const node_pointer *root,
+ uint64_t *inserted,
+ node_pointer **out_root);
+
+static couchstore_error_t compact_view_btree(tree_file *source,
+ tree_file *target,
+ const view_btree_info_t *info,
+ const node_pointer *root,
+ uint64_t *inserted,
+ node_pointer **out_root,
+ view_error_t *error_info);
+
LIBCOUCHSTORE_API
view_group_info_t *couchstore_read_view_group_info(FILE *in_stream,
FILE *error_stream)
@@ -1301,3 +1329,301 @@ couchstore_error_t couchstore_update_view_group(view_group_info_t *info,
return ret;
}
+
+/* Add the kv pair to modify result */
+static couchstore_error_t compact_view_fetchcb(couchfile_lookup_request *rq,
+ const sized_buf *k,
+ const sized_buf *v)
+{
+ couchstore_error_t ret;
+ sized_buf *k_c, *v_c;
+ view_compact_ctx_t *ctx = (view_compact_ctx_t *) rq->callback_ctx;
+
+ if (k == NULL || v == NULL) {
+ return COUCHSTORE_ERROR_READ;
+ }
+
+ k_c = arena_copy_buf(ctx->transient_arena, k);
+ v_c = arena_copy_buf(ctx->transient_arena, v);
+ ret = mr_push_item(k_c, v_c, ctx->mr);
+ if (ret != COUCHSTORE_SUCCESS) {
+ return ret;
+ }
+
+ if (ctx->inserted) {
+ (*ctx->inserted)++;
+ }
+
+ if (ctx->mr->count == 0) {
+ arena_free_all(ctx->transient_arena);
+ }
+
+ return ret;
+}
+
+static couchstore_error_t compact_btree(tree_file *source,
+ tree_file *target,
+ const node_pointer *root,
+ compare_info *cmp,
+ reduce_fn reduce_fun,
+ reduce_fn rereduce_fun,
+ view_reducer_ctx_t *red_ctx,
+ uint64_t *inserted,
+ node_pointer **out_root)
+{
+ couchstore_error_t ret = COUCHSTORE_SUCCESS;
+ arena *transient_arena;
+ arena *persistent_arena;
+ couchfile_modify_result *modify_result;
+ couchfile_lookup_request lookup_rq;
+ view_compact_ctx_t compact_ctx;
+ sized_buf nullkey = {NULL, 0};
+ sized_buf *lowkeys = &nullkey;
+
+ if (!root) {
+ return COUCHSTORE_SUCCESS;
+ }
+
+ transient_arena = new_arena(0);
+ persistent_arena = new_arena(0);
+
+ if (transient_arena == NULL || persistent_arena == NULL) {
+ ret = COUCHSTORE_ERROR_ALLOC_FAIL;
+ goto cleanup;
+ }
+
+ /* Create new btree on new file */
+ modify_result = new_btree_modres(persistent_arena,
+ transient_arena,
+ target,
+ cmp,
+ reduce_fun,
+ rereduce_fun,
+ red_ctx,
+ VIEW_KV_CHUNK_THRESHOLD + (VIEW_KV_CHUNK_THRESHOLD / 3),
+ VIEW_KP_CHUNK_THRESHOLD + (VIEW_KP_CHUNK_THRESHOLD / 3));
+ if (modify_result == NULL) {
+ ret = COUCHSTORE_ERROR_ALLOC_FAIL;
+ goto cleanup;
+ }
+
+ compact_ctx.mr = modify_result;
+ compact_ctx.transient_arena = transient_arena;
+ compact_ctx.inserted = inserted;
+
+ lookup_rq.cmp.compare = cmp->compare;
+ lookup_rq.file = source;
+ lookup_rq.num_keys = 1;
+ lookup_rq.keys = &lowkeys;
+ lookup_rq.callback_ctx = &compact_ctx;
+ lookup_rq.fetch_callback = compact_view_fetchcb;
+ lookup_rq.node_callback = NULL;
+ lookup_rq.fold = 1;
+
+ ret = btree_lookup(&lookup_rq, root->pointer);
+ if (ret != COUCHSTORE_SUCCESS) {
+ goto cleanup;
+ }
+
+ *out_root = complete_new_btree(modify_result, &ret);
+
+cleanup:
+ if (transient_arena != NULL) {
+ delete_arena(transient_arena);
+ }
+
+ if (persistent_arena != NULL) {
+ delete_arena(persistent_arena);
+ }
+
+ return ret;
+}
+
+static couchstore_error_t compact_id_btree(tree_file *source,
+ tree_file *target,
+ const node_pointer *root,
+ uint64_t *inserted,
+ node_pointer **out_root)
+{
+ couchstore_error_t ret;
+ compare_info cmp;
+
+ cmp.compare = id_btree_cmp;
+
+ ret = compact_btree(source,
+ target,
+ root,
+ &cmp,
+ view_id_btree_reduce,
+ view_id_btree_rereduce,
+ NULL,
+ inserted,
+ out_root);
+
+ return ret;
+}
+
+static couchstore_error_t compact_view_btree(tree_file *source,
+ tree_file *target,
+ const view_btree_info_t *info,
+ const node_pointer *root,
+ uint64_t *inserted,
+ node_pointer **out_root,
+ view_error_t *error_info)
+{
+ couchstore_error_t ret;
+ compare_info cmp;
+ view_reducer_ctx_t *red_ctx = NULL;
+ char *error_msg = NULL;
+
+ cmp.compare = view_btree_cmp;
+ red_ctx = make_view_reducer_ctx(info->reducers,
+ info->num_reducers,
+ &error_msg);
+ if (red_ctx == NULL) {
+ error_info->error_msg = (const char *) error_msg;
+ error_info->view_name = (const char *) strdup(info->names[0]);
+ return COUCHSTORE_ERROR_REDUCER_FAILURE;
+ }
+
+ ret = compact_btree(source,
+ target,
+ root,
+ &cmp,
+ view_btree_reduce,
+ view_btree_rereduce,
+ red_ctx,
+ inserted,
+ out_root);
+
+ if (ret != COUCHSTORE_SUCCESS) {
+ char *error_msg = NULL;
+
+ if (red_ctx->error != NULL) {
+ error_msg = strdup(red_ctx->error);
+ } else {
+ error_msg = view_error_msg(ret);
+ }
+ error_info->error_msg = (const char *) error_msg;
+ error_info->view_name = (const char *) strdup(info->names[0]);
+ }
+
+ free_view_reducer_ctx(red_ctx);
+
+ return ret;
+}
+
+LIBCOUCHSTORE_API
+couchstore_error_t couchstore_compact_view_group(view_group_info_t *info,
+ const char *target_file,
+ const sized_buf *header_buf,
+ uint64_t *inserted,
+ sized_buf *header_outbuf,
+ view_error_t *error_info)
+{
+ couchstore_error_t ret;
+ tree_file index_file;
+ tree_file compact_file;
+ index_header_t *header = NULL;
+ node_pointer *id_root = NULL;
+ node_pointer **view_roots = NULL;
+ int i;
+
+ error_info->view_name = NULL;
+ error_info->error_msg = NULL;
+ index_file.handle = NULL;
+ index_file.ops = NULL;
+ index_file.path = NULL;
+ compact_file.handle = NULL;
+ compact_file.ops = NULL;
+ compact_file.path = NULL;
+
+ /*
+ * TODO(sarath): Add filter function to avoid iterating vbs in cleanup
+ * bitmask.
+ * */
+
+ ret = decode_index_header(header_buf->buf, header_buf->size, &header);
+ if (ret < 0) {
+ goto cleanup;
+ }
+
+ view_roots = (node_pointer **) calloc(info->num_btrees,
+ sizeof(node_pointer *));
+ if (view_roots == NULL) {
+ ret = COUCHSTORE_ERROR_ALLOC_FAIL;
+ goto cleanup;
+ }
+
+ assert(info->num_btrees == header->num_views);
+
+ ret = open_view_group_file(info->filepath,
+ COUCHSTORE_OPEN_FLAG_RDONLY,
+ &index_file);
+ if (ret != COUCHSTORE_SUCCESS) {
+ goto cleanup;
+ }
+
+ /*
+ * Open target file for compaction
+ * Expects that caller created the target file
+ */
+ ret = open_view_group_file(target_file, 0, &compact_file);
+ if (ret != COUCHSTORE_SUCCESS) {
+ goto cleanup;
+ }
+
+ compact_file.pos = compact_file.ops->goto_eof(&compact_file.lastError,
+ compact_file.handle);
+ ret = compact_id_btree(&index_file, &compact_file,
+ header->id_btree_state,
+ inserted,
+ &id_root);
+ if (ret != COUCHSTORE_SUCCESS) {
+ goto cleanup;
+ }
+
+ free(header->id_btree_state);
+ header->id_btree_state = id_root;
+ id_root = NULL;
+
+ for (i = 0; i < info->num_btrees; ++i) {
+ ret = compact_view_btree(&index_file,
+ &compact_file,
+ &info->btree_infos[i],
+ header->view_btree_states[i],
+ inserted,
+ &view_roots[i],
+ error_info);
+
+ if (ret != COUCHSTORE_SUCCESS) {
+ goto cleanup;
+ }
+
+ free(header->view_btree_states[i]);
+ header->view_btree_states[i] = view_roots[i];
+ view_roots[i] = NULL;
+ }
+
+ ret = encode_index_header(header, &header_outbuf->buf, &header_outbuf->size);
+ if (ret != COUCHSTORE_SUCCESS) {
+ goto cleanup;
+ }
+
+ ret = COUCHSTORE_SUCCESS;
+
+cleanup:
+ free_index_header(header);
+ close_view_group_file(info);
+ tree_file_close(&index_file);
+ tree_file_close(&compact_file);
+ free(id_root);
+ if (view_roots != NULL) {
+ for (i = 0; i < info->num_btrees; ++i) {
+ free(view_roots[i]);
+ }
+ free(view_roots);
+ }
+
+ return ret;
+}
View
18 src/views/view_group.h
@@ -25,6 +25,8 @@
#include <stdio.h>
#include <libcouchstore/couch_db.h>
#include "index_header.h"
+#include "../arena.h"
+#include "../couch_btree.h"
#ifdef __cplusplus
extern "C" {
@@ -57,6 +59,13 @@ extern "C" {
uint64_t purged;
} view_group_update_stats_t;
+ /* Compaction context definition */
+ typedef struct _view_compact_ctx {
+ couchfile_modify_result *mr;
+ arena *transient_arena;
+ uint64_t *inserted;
+ } view_compact_ctx_t;
+
/* Read a view group definition from an input stream, and write any
errors to the optional error stream. */
LIBCOUCHSTORE_API
@@ -98,6 +107,15 @@ extern "C" {
sized_buf *header_outbuf,
view_error_t *error_info);
+ LIBCOUCHSTORE_API
+ couchstore_error_t couchstore_compact_view_group(
+ view_group_info_t *info,
+ const char *target_file,
+ const sized_buf *header_buf,
+ uint64_t *inserted,
+ sized_buf *header_outbuf,
+ view_error_t *error_info);
+
#ifdef __cplusplus
}
#endif

0 comments on commit cc98f86

Please sign in to comment.