diff --git a/C/c4.exp b/C/c4.exp index f563b3c..e85c45c 100644 --- a/C/c4.exp +++ b/C/c4.exp @@ -38,3 +38,37 @@ _c4doc_insertRevision _c4doc_insertRevisionWithHistory _c4doc_setType _c4doc_save + +_kC4DefaultAllDocsOptions +_kC4DefaultChangesOptions +_kC4DefaultQueryOptions + +_c4key_new +_c4key_free +_c4Key_addNull +_c4key_addBool +_c4key_addNumber +_c4key_addString +_c4key_addMapKey +_c4key_beginArray +_c4key_endArray +_c4key_beginMap +_c4key_endMap + +_c4view_open +_c4view_close +_c4view_eraseIndex +_c4view_delete +_c4view_getTotalRows +_c4view_getLastSequenceIndexed +_c4view_getLastSequenceChangedAt + +_c4indexer_begin +_c4indexer_enumerateDocuments +_c4indexer_emit +_c4indexer_end + +#_c4view_query +#_c4queryenum_next +#_c4queryenum_free +#_c4queryrow_free diff --git a/C/c4Database.cc b/C/c4Database.cc index ad01e92..a5bc40f 100644 --- a/C/c4Database.cc +++ b/C/c4Database.cc @@ -6,18 +6,8 @@ // Copyright © 2015 Couchbase. All rights reserved. // -#include "slice.hh" -typedef forestdb::slice C4Slice; -typedef struct { - const void *buf; - size_t size; -} C4SliceResult; - -#define kC4SliceNull forestdb::slice::null - -#define C4_IMPL +#include "c4Impl.hh" #include "c4Database.h" -#undef C4_IMPL #include "Database.hh" #include "Document.hh" @@ -30,44 +20,36 @@ using namespace forestdb; // Size of ForestDB buffer cache allocated for a database -#define kDBBufferCacheSize (8*1024*1024) +static const size_t kDBBufferCacheSize = (8*1024*1024); // ForestDB Write-Ahead Log size (# of records) -#define kDBWALThreshold 1024 +static const size_t kDBWALThreshold = 1024; // How often ForestDB should check whether databases need auto-compaction -#define kAutoCompactInterval (5*60.0) +static const uint64_t kAutoCompactInterval = (5*60); -static void recordError(C4ErrorDomain domain, int code, C4Error* outError) { +void recordError(C4ErrorDomain domain, int code, C4Error* outError) { if (outError) { outError->domain = domain; outError->code = code; } } -static void recordHTTPError(int httpStatus, C4Error* outError) { +void recordHTTPError(int httpStatus, C4Error* outError) { recordError(HTTPDomain, httpStatus, outError); } -static void recordError(error e, C4Error* outError) { +void recordError(error e, C4Error* outError) { recordError(ForestDBDomain, e.status, outError); } -static void recordUnknownException(C4Error* outError) { +void recordUnknownException(C4Error* outError) { Warn("Unexpected C++ exception thrown from CBForest"); recordError(C4Domain, kC4ErrorInternalException, outError); } -#define catchError(OUTERR) \ - catch (error err) { \ - recordError(err, OUTERR); \ - } catch (...) { \ - recordUnknownException(OUTERR); \ - } - - void c4slice_free(C4Slice slice) { slice.free(); } @@ -127,6 +109,11 @@ struct c4Database { }; +forestdb::Database* internal(C4Database *db) { + return db->_db; +} + + C4Database* c4db_open(C4Slice path, bool readOnly, C4Error *outError) @@ -138,7 +125,7 @@ C4Database* c4db_open(C4Slice path, config.wal_flush_before_commit = true; config.seqtree_opt = true; config.compress_document_body = true; - config.compactor_sleep_duration = (uint64_t)kAutoCompactInterval; + config.compactor_sleep_duration = kAutoCompactInterval; auto c4db = new c4Database; try { @@ -321,7 +308,7 @@ struct C4DocumentInternal : public C4Document { } else { // Doc body (rev tree) isn't available, but we know most things about the current rev: selectedRev.revID = revID; - selectedRev.sequence = _versionedDoc.sequence(); + selectedRev.sequence = sequence; int revFlags = 0; if (flags & kExists) { revFlags |= kRevLeaf; @@ -337,6 +324,7 @@ struct C4DocumentInternal : public C4Document { void initRevID() { _revIDBuf = _versionedDoc.revID().expanded(); revID = _revIDBuf; + sequence = _versionedDoc.sequence(); } bool selectRevision(const Revision *rev, C4Error *outError =NULL) { @@ -599,6 +587,17 @@ bool c4doc_save(C4Document *doc, #pragma mark - DOC ENUMERATION: +const C4AllDocsOptions kC4DefaultAllDocsOptions = { + .inclusiveStart = true, + .inclusiveEnd = true, + .includeBodies = true +}; + +const C4ChangesOptions kC4DefaultChangesOptions = { + .includeBodies = true +}; + + struct C4DocEnumerator { C4Database *_database; DocEnumerator _e; @@ -628,14 +627,16 @@ void c4enum_free(C4DocEnumerator *e) { C4DocEnumerator* c4db_enumerateChanges(C4Database *database, C4SequenceNumber since, - bool withBodies, + const C4ChangesOptions *c4options, C4Error *outError) { try { + if (!c4options) + c4options = &kC4DefaultChangesOptions; auto options = DocEnumerator::Options::kDefault; options.inclusiveEnd = true; - options.includeDeleted = false; - if (!withBodies) + options.includeDeleted = c4options->includeDeleted; + if (!c4options->includeBodies) options.contentOptions = KeyStore::kMetaOnly; return new C4DocEnumerator(database, since+1, UINT64_MAX, options); } catchError(outError); @@ -646,18 +647,19 @@ C4DocEnumerator* c4db_enumerateChanges(C4Database *database, C4DocEnumerator* c4db_enumerateAllDocs(C4Database *database, C4Slice startDocID, C4Slice endDocID, - bool descending, - bool inclusiveEnd, - unsigned skip, - bool withBodies, + const C4AllDocsOptions *c4options, C4Error *outError) { try { + if (!c4options) + c4options = &kC4DefaultAllDocsOptions; auto options = DocEnumerator::Options::kDefault; - options.skip = skip; - options.descending = descending; - options.inclusiveEnd = inclusiveEnd; - if (!withBodies) + options.skip = c4options->skip; + options.descending = c4options->descending; + options.inclusiveStart = c4options->inclusiveStart; + options.inclusiveEnd = c4options->inclusiveEnd; + options.includeDeleted = c4options->includeDeleted; + if (!c4options->includeBodies) options.contentOptions = KeyStore::kMetaOnly; return new C4DocEnumerator(database, startDocID, endDocID, options); } catchError(outError); diff --git a/C/c4Database.h b/C/c4Database.h index ccf1e70..17a7846 100644 --- a/C/c4Database.h +++ b/C/c4Database.h @@ -111,9 +111,10 @@ extern "C" { /** Describes a version-controlled document. */ typedef struct { - C4DocumentFlags flags; - C4Slice docID; - C4Slice revID; + C4DocumentFlags flags; /**< Document flags */ + C4Slice docID; /**< Document ID */ + C4Slice revID; /**< RevID of current revision */ + C4SequenceNumber sequence; /**< Sequence at which doc was last updated */ struct { C4Slice revID; @@ -171,7 +172,36 @@ extern "C" { //////// DOCUMENT ENUMERATORS: + + /** Options for enumerating over all documents. */ + typedef struct { + bool descending; /**< If true, iteration goes by descending document IDs. */ + bool inclusiveStart; /**< If false, iteration starts just _after_ the startDocID. */ + bool inclusiveEnd; /**< If false, iteration stops just _before_ the endDocID. */ + unsigned skip; /**< The number of initial results to skip. */ + bool includeDeleted; /**< If true, include deleted documents. */ + bool includeBodies; /**< If false, document bodies will not be preloaded, just the + metadata (docID, revID, sequence, flags.) This is faster if you + don't need to access the revision tree or revision bodies. You + can still access all the data of the document, but it will + trigger loading the document body from the database. */ + } C4AllDocsOptions; + + /** Default all-docs enumeration options. */ + extern const C4AllDocsOptions kC4DefaultAllDocsOptions; + + /** Options for enumerating over database changes. */ + typedef struct { + bool includeDeleted; /**< If true, include deleted documents. */ + bool includeBodies; /**< If false, document bodies will not be preloaded. See + C4AllDocsOptions.includeBodies for more details. */ + } C4ChangesOptions; + + /** Default change-enumeration options. */ + extern const C4ChangesOptions kC4DefaultChangesOptions; + + /** Opaque handle to a document enumerator. */ typedef struct C4DocEnumerator C4DocEnumerator; @@ -182,15 +212,12 @@ extern "C" { Caller is responsible for freeing the enumerator when finished with it. @param database The database. @param since The sequence number to start _after_. Pass 0 to start from the beginning. - @param withBodies If false, document bodies will not be preloaded, just the metadata - (docID, revID, sequence, flags.) This is faster if you don't need to access the - revision tree or revision bodies. You can still access all the data of the document, - but it will trigger loading the document body from the database. + @param options Enumeration options (NULL for defaults). @param outError Error will be stored here on failure. @return A new enumerator, or NULL on failure. */ C4DocEnumerator* c4db_enumerateChanges(C4Database *database, C4SequenceNumber since, - bool withBodies, + const C4ChangesOptions *options, C4Error *outError); /** Creates an enumerator ordered by docID. @@ -200,23 +227,13 @@ extern "C" { @param database The database. @param startDocID The document ID to begin at. @param endDocID The document ID to end at. - @param descending If true, iteration goes by descending document ID. The startDocID must - be higher than the endDocID. - @param inclusiveEnd If false, iteration stops just _before_ the endDocID. - @param skip The number of initial results to skip. - @param withBodies If false, document bodies will not be preloaded, just the metadata - (docID, revID, sequence, flags.) This is faster if you don't need to access the - revision tree or revision bodies. You can still access all the data of the document, - but it will trigger loading the document body from the database. + @param options Enumeration options (NULL for defaults). @param outError Error will be stored here on failure. @return A new enumerator, or NULL on failure. */ C4DocEnumerator* c4db_enumerateAllDocs(C4Database *database, C4Slice startDocID, C4Slice endDocID, - bool descending, - bool inclusiveEnd, - unsigned skip, - bool withBodies, + const C4AllDocsOptions *options, C4Error *outError); /** Returns the next document from an enumerator, or NULL if there are no more. diff --git a/C/c4Impl.hh b/C/c4Impl.hh new file mode 100644 index 0000000..cbeba04 --- /dev/null +++ b/C/c4Impl.hh @@ -0,0 +1,49 @@ +// +// c4Impl.h +// CBForest +// +// Created by Jens Alfke on 9/15/15. +// Copyright © 2015 Couchbase. All rights reserved. +// + +#ifndef c4Impl_h +#define c4Impl_h + +#include "slice.hh" + +typedef forestdb::slice C4Slice; + +typedef struct { + const void *buf; + size_t size; +} C4SliceResult; + +#define kC4SliceNull forestdb::slice::null + + +#define C4_IMPL +#include "c4Database.h" + + +#include "Error.hh" +#include "Database.hh" + +using namespace forestdb; + +Database* internal(C4Database*); + + +void recordError(C4ErrorDomain domain, int code, C4Error* outError); +void recordHTTPError(int httpStatus, C4Error* outError); +void recordError(error e, C4Error* outError); +void recordUnknownException(C4Error* outError); + +#define catchError(OUTERR) \ + catch (error err) { \ + recordError(err, OUTERR); \ + } catch (...) { \ + recordUnknownException(OUTERR); \ + } + + +#endif /* c4Impl_h */ diff --git a/C/c4View.cc b/C/c4View.cc new file mode 100644 index 0000000..55481e0 --- /dev/null +++ b/C/c4View.cc @@ -0,0 +1,256 @@ +// +// c4View.cc +// CBForest +// +// Created by Jens Alfke on 9/15/15. +// Copyright © 2015 Couchbase. All rights reserved. +// + +#include "c4Impl.hh" +#include "c4View.h" +#include "Collatable.hh" +#include "MapReduceIndex.hh" + +using namespace forestdb; + + +// Size of ForestDB buffer cache allocated for a database +static const size_t kViewDBBufferCacheSize = (8*1024*1024); + +// ForestDB Write-Ahead Log size (# of records) +static const size_t kViewDBWALThreshold = 1024; + + +#pragma mark - KEYS: + + +struct c4Key : public Collatable { + c4Key() :Collatable() { } +}; + + +C4Key* c4key_new() { + return new c4Key(); +} + +void c4key_free(C4Key *key) { + delete key; +} + +void c4Key_addNull(C4Key *key) { + key->addNull(); +} + +void c4key_addBool(C4Key *key, bool b) { + key->addBool(b); +} + +void c4key_addNumber(C4Key *key, double n) { + *key << n; +} + +void c4key_addString(C4Key *key, C4Slice str) { + *key << str; +} + +void c4key_addMapKey(C4Key *key, C4Slice mapKey) { + *key << mapKey; +} + +void c4key_beginArray(C4Key *key) { + key->beginArray(); +} + +void c4key_endArray(C4Key *key) { + key->endArray(); +} + +void c4key_beginMap(C4Key *key) { + key->beginMap(); +} + +void c4key_endMap(C4Key *key) { + key->endMap(); +} + + +#pragma mark - VIEWS: + + +struct c4View { + c4View(C4Database *sourceDB, + Database *viewDB, + C4Slice name, + C4Slice version) + :_sourceDB(sourceDB), + _viewDB(viewDB), + _index(viewDB, (std::string)name, internal(sourceDB)->defaultKeyStore()), + _version(version) + { } + + C4Database *_sourceDB; + Database *_viewDB; + MapReduceIndex _index; + std::string _version; +}; + + +C4View* c4view_open(C4Database* db, + C4Slice path, + C4Slice viewName, + C4Slice version, + C4Error *outError) +{ + try { + auto config = Database::defaultConfig(); + config.flags = FDB_OPEN_FLAG_CREATE; + config.buffercache_size = kViewDBBufferCacheSize; + config.wal_threshold = kViewDBWALThreshold; + config.wal_flush_before_commit = true; + config.seqtree_opt = FDB_SEQTREE_NOT_USE; // indexes don't need by-sequence ordering + config.compaction_threshold = 50; + + auto viewDB = new Database((std::string)path, config); + return new c4View(db, viewDB, viewName, version); + } catchError(outError); + return NULL; +} + +/** Closes the view and frees the object. */ +bool c4view_close(C4View* view, C4Error *outError) { + try { + delete view; + return true; + } catchError(outError); + return false; +} + +bool c4view_eraseIndex(C4View *view, C4Error *outError) { + try { + Transaction t(view->_viewDB); + view->_index.erase(t); + return true; + } catchError(outError); + return false; +} + +bool c4view_delete(C4View *view, C4Error *outError) { + try { + view->_viewDB->deleteDatabase(); + delete view; + return true; + } catchError(outError) + return false; +} + + +uint64_t c4view_getTotalRows(C4View *view) { + try { + return view->_index.rowCount(); + } catchError(NULL); + return 0; +} + +C4SequenceNumber c4view_getLastSequenceIndexed(C4View *view) { + try { + return view->_index.lastSequenceIndexed(); + } catchError(NULL); + return 0; +} + +C4SequenceNumber c4view_getLastSequenceChangedAt(C4View *view) { + try { + return view->_index.lastSequenceChangedAt(); + } catchError(NULL); + return 0; +} + + +#pragma mark - INDEXING: + + +struct c4Indexer : public MapReduceIndexer { + c4Indexer(C4Database *db) + :MapReduceIndexer(), + _db(db) + { } + + virtual ~c4Indexer() { } + + C4Database* _db; +}; + + +C4Indexer* c4indexer_begin(C4Database *db, + C4View *views[], + int viewCount, + C4Error *outError) +{ + c4Indexer *indexer = NULL; + try { + indexer = new c4Indexer(db); + for (int i = 0; i < viewCount; ++i) { + auto t = new Transaction(views[i]->_viewDB); + indexer->addIndex(&views[i]->_index, t); + } + } catchError(outError); + if (indexer) + delete indexer; + return NULL; +} + + +C4DocEnumerator* c4indexer_enumerateDocuments(C4Indexer *indexer, C4Error *outError) { + try { + sequence startSequence = indexer->startingSequence(); + if (startSequence == UINT64_MAX) + return NULL; + auto options = kC4DefaultChangesOptions; + options.includeDeleted = true; + return c4db_enumerateChanges(indexer->_db, startSequence-1, &options, outError); + } catchError(outError); + return NULL; +} + +bool c4indexer_emit(C4Indexer *indexer, + C4Document *doc, + unsigned viewIndex, + unsigned emitCount, + C4Key* emittedKeys[], + C4Key* emittedValues[], + C4Error *outError) +{ + try { + std::vector keys, values; + if (!(doc->flags & kDeleted)) { + for (unsigned i = 0; i < emitCount; ++i) { + keys.push_back(*emittedKeys[i]); + values.push_back(*emittedValues[i]); + } + } + indexer->emitDocIntoView(doc->docID, doc->sequence, viewIndex, keys, values); + return true; + } catchError(outError) + return false; +} + + +bool c4indexer_end(C4Indexer *indexer, bool commit, C4Error *outError) { + try { + if (commit) + indexer->finished(); + delete indexer; + return true; + } catchError(outError) + return false; +} + + +#pragma mark - QUERIES: + + +const C4QueryOptions kC4DefaultQueryOptions = { + .limit = UINT_MAX, + .inclusiveStart = true, + .inclusiveEnd = true +}; diff --git a/C/c4View.h b/C/c4View.h index 430720d..faa9464 100644 --- a/C/c4View.h +++ b/C/c4View.h @@ -11,62 +11,155 @@ #ifndef c4View_h #define c4View_h -#include "c4.h" +#include "c4Database.h" #ifdef __cplusplus extern "C" { #endif + + //////// KEYS: + + + /** An opaque value used as a key in a view index. JSON-compatible. */ + typedef struct c4Key C4Key; + + C4Key* c4key_new(); + void c4key_free(C4Key*); + + void c4Key_addNull(C4Key*); + void c4key_addBool(C4Key*, bool); + void c4key_addNumber(C4Key*, double); + void c4key_addString(C4Key*, C4Slice); + + void c4key_addMapKey(C4Key*, C4Slice); + + void c4key_beginArray(C4Key*); + void c4key_endArray(C4Key*); + void c4key_beginMap(C4Key*); + void c4key_endMap(C4Key*); + + + typedef C4Slice C4KeyReader; + + typedef enum { + kC4Null, + kC4Bool, + kC4Number, + kC4String, + kC4Array, + kC4Map, + kC4EndSequence, + kC4Special, + kC4Error = 255 + } C4KeyItemType; + + C4KeyItemType c4key_peek(C4KeyReader*); + void c4key_next(C4KeyReader*); + bool c4Key_readBool(C4KeyReader*); + double c4Key_readNumber(C4KeyReader*); + C4SliceResult c4Key_readString(C4KeyReader*); // remember to free the result + + //////// VIEWS: - + /** Opaque handle to an opened view. */ typedef struct c4View C4View; - typedef void (*C4EmitFn)(void *emitContext, - C4Slice keyJSON, - C4Slice valueJSON); - - typedef bool (*C4MapFn)(void *mapContext, - C4Slice docID, - C4Slice revID, - C4Slice json, - C4EmitFn *emit, - void *emitContext, - C4Error *outError); - - /** Opens a view. */ - C4View* c4view_open(C4Slice path, - C4MapFn *mapFn, - //C4ReduceFn *reduceFn, //TODO + /** Opens a view, or creates it if the file doesn't already exist. + @param database The database the view is associated with. + @param path The filesystem path to the view index file. + @param viewName The name of the view. + @param version The version of the views map function. + @param outError On failure, error info will be stored here. + @return The new C4View, or NULL on failure. */ + C4View* c4view_open(C4Database *database, + C4Slice path, + C4Slice viewName, C4Slice version, C4Error *outError); /** Closes the view and frees the object. */ - void c4view_close(C4View* view); - - bool c4view_eraseIndex(C4View *view, C4Error *outError); - - bool c4view_delete(C4View *view, C4Error *outError); - - bool c4view_updateIndexes(C4View *views[], int viewCount, C4Error *outError); - - - uint64_t c4view_getTotalRows(C4View *view); - - C4SequenceNumber c4view_getLastSequenceIndexed(C4View *view); + bool c4view_close(C4View* view, C4Error*); + + /** Erases the view index, but doesn't delete the database file. */ + bool c4view_eraseIndex(C4View*, C4Error *outError); + + /** Deletes the database file and closes/frees the C4View. */ + bool c4view_delete(C4View*, C4Error *outError); + + + /** Returns the total number of rows in the view index. */ + uint64_t c4view_getTotalRows(C4View*); + + /** Returns the last database sequence number that's been indexed. + If this is less than the database's lastSequence, the view index is out of date. */ + C4SequenceNumber c4view_getLastSequenceIndexed(C4View*); + + /** Returns the last database sequence number that changed the view index. */ + C4SequenceNumber c4view_getLastSequenceChangedAt(C4View*); + + + //////// INDEXING: + + + /** Opaque reference to an indexing task. */ + typedef struct c4Indexer C4Indexer; + + /** Creates an indexing task on one or more views in a database. + @param db The database to index. + @param views An array of views whose indexes should be updated in parallel. + @param viewCount The number of views in the views[] array. + @param outError On failure, error info will be stored here. + @return A new C4Indexer, or NULL on failure. */ + C4Indexer* c4indexer_begin(C4Database *db, + C4View *views[], + int viewCount, + C4Error *outError); + + /** Creates an enumerator that will return all the documents that need to be (re)indexed. */ + C4DocEnumerator* c4indexer_enumerateDocuments(C4Indexer *indexer, + C4Error *outError); + + /** Emits new keys/values derived from one document, for one view. + This function needs to be called once for each (document, view) pair. Even if the view's map + function didn't emit anything, the old keys/values need to be cleaned up. + @param indexer The indexer task. + @param document The document being indexed. + @param viewNumber The position of the view in the indexer's views[] array. + @param emitCount The number of emitted key/value pairs. + @param emittedKeys Array of keys being emitted. + @param emittedValues Array of values being emitted. + @param outError On failure, error info will be stored here. + @return True on success, false on failure. */ + bool c4indexer_emit(C4Indexer *indexer, + C4Document *document, + unsigned viewNumber, + unsigned emitCount, + C4Key* emittedKeys[], + C4Key* emittedValues[], + C4Error *outError); - C4SequenceNumber c4view_getLastSequenceChangedAt(C4View *view); + /** Finishes an indexing task and frees the indexer reference. + @param indexer The indexer. + @param commit True to commit changes to the indexes, false to abort. + @param outError On failure, error info will be stored here. + @return True on success, false on failure. */ + bool c4indexer_end(C4Indexer *indexer, + bool commit, + C4Error *outError); //////// QUERYING: + /** Options for view queries. */ typedef struct { - unsigned prefixMatchLevel; unsigned skip; unsigned limit; unsigned groupLevel; + unsigned prefixMatchLevel; C4Slice startKeyJSON; C4Slice endKeyJSON; C4Slice startKeyDocID; @@ -84,22 +177,30 @@ extern "C" { bool group; } C4QueryOptions; + /** Default query options. */ + extern const C4QueryOptions kC4DefaultQueryOptions; + + /** Opaque reference to a view query result enumerator. */ typedef struct c4QueryEnumerator C4QueryEnumerator; typedef struct { - C4Slice keyJSON; - C4Slice valueJSON; + C4KeyReader key; + C4KeyReader value; C4Slice docID; } C4QueryRow; - C4QueryEnumerator* c4view_query(C4View *view, + /** Runs a query and returns an enumerator for the results. */ + C4QueryEnumerator* c4view_query(C4View*, const C4QueryOptions *options, C4Error *outError); + /** Returns the next result row from a view query, or NULL at the end of the results. */ C4QueryRow* c4queryenum_next(C4QueryEnumerator *e); + /** Frees a query enumerator. Must be called after you're finished with it. */ void c4queryenum_free(C4QueryEnumerator *e); + /** Frees a query row. */ void c4queryrow_free(C4QueryRow *row); #ifdef __cplusplus diff --git a/C/tests/c4DatabaseTest.cc b/C/tests/c4DatabaseTest.cc index 282b964..d8e9ad3 100644 --- a/C/tests/c4DatabaseTest.cc +++ b/C/tests/c4DatabaseTest.cc @@ -189,8 +189,7 @@ class C4DatabaseTest : public CppUnit::TestFixture { C4Document* doc; // No start or end ID: - e = c4db_enumerateAllDocs(db, kC4SliceNull, kC4SliceNull, - false, true, 0, false, &error); + e = c4db_enumerateAllDocs(db, kC4SliceNull, kC4SliceNull, NULL, &error); Assert(e); int i = 1; while (NULL != (doc = c4enum_nextDocument(e, &error))) { @@ -209,8 +208,7 @@ class C4DatabaseTest : public CppUnit::TestFixture { } // Start and end ID: - e = c4db_enumerateAllDocs(db, c4str("doc-007"), c4str("doc-090"), - false, true, 0, false, &error); + e = c4db_enumerateAllDocs(db, c4str("doc-007"), c4str("doc-090"), NULL, &error); Assert(e); i = 7; while (NULL != (doc = c4enum_nextDocument(e, &error))) { @@ -235,7 +233,9 @@ class C4DatabaseTest : public CppUnit::TestFixture { C4Document* doc; // Since start: - e = c4db_enumerateChanges(db, 0, false, &error); + C4ChangesOptions options = kC4DefaultChangesOptions; + options.includeBodies = false; + e = c4db_enumerateChanges(db, 0, &options, &error); Assert(e); C4SequenceNumber seq = 1; while (NULL != (doc = c4enum_nextDocument(e, &error))) { @@ -247,7 +247,7 @@ class C4DatabaseTest : public CppUnit::TestFixture { } // Since 6: - e = c4db_enumerateChanges(db, 6, false, &error); + e = c4db_enumerateChanges(db, 6, &options, &error); Assert(e); seq = 7; while (NULL != (doc = c4enum_nextDocument(e, &error))) { diff --git a/CBForest.xcodeproj/project.pbxproj b/CBForest.xcodeproj/project.pbxproj index e8e8e20..fd20001 100644 --- a/CBForest.xcodeproj/project.pbxproj +++ b/CBForest.xcodeproj/project.pbxproj @@ -125,6 +125,7 @@ 274D040F1BA75E5000FF7C35 /* c4DatabaseTest.cc in Sources */ = {isa = PBXBuildFile; fileRef = 274D04001BA75C0400FF7C35 /* c4DatabaseTest.cc */; }; 274D04111BA75E9C00FF7C35 /* libcppunit-1.13.0.dylib in Frameworks */ = {isa = PBXBuildFile; fileRef = 274D04101BA75E9C00FF7C35 /* libcppunit-1.13.0.dylib */; }; 274D04201BA892B100FF7C35 /* libCBForest-Interop.dylib in Frameworks */ = {isa = PBXBuildFile; fileRef = 720EA3F51BA7EAD9002B8416 /* libCBForest-Interop.dylib */; }; + 274D04251BA8A58200FF7C35 /* c4View.cc in Sources */ = {isa = PBXBuildFile; fileRef = 274D04241BA8A58200FF7C35 /* c4View.cc */; settings = {ASSET_TAGS = (); }; }; 275072C718E4AA5000A80C5A /* avltree.cc in Sources */ = {isa = PBXBuildFile; fileRef = 2750729318E4857900A80C5A /* avltree.cc */; }; 275072C818E4AA5000A80C5A /* blockcache.cc in Sources */ = {isa = PBXBuildFile; fileRef = 2750729418E4857900A80C5A /* blockcache.cc */; }; 275072C918E4AA5000A80C5A /* btree_kv.cc in Sources */ = {isa = PBXBuildFile; fileRef = 2750729518E4857900A80C5A /* btree_kv.cc */; }; @@ -639,6 +640,8 @@ 274D040A1BA75E1C00FF7C35 /* main.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = main.cpp; sourceTree = ""; }; 274D04101BA75E9C00FF7C35 /* libcppunit-1.13.0.dylib */ = {isa = PBXFileReference; lastKnownFileType = "compiled.mach-o.dylib"; name = "libcppunit-1.13.0.dylib"; path = "../../../../usr/local/Cellar/cppunit/1.13.2/lib/libcppunit-1.13.0.dylib"; sourceTree = ""; }; 274D04231BA8932800FF7C35 /* c4.exp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.exports; path = c4.exp; sourceTree = ""; }; + 274D04241BA8A58200FF7C35 /* c4View.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = c4View.cc; sourceTree = ""; }; + 274D04261BA8A5BC00FF7C35 /* c4Impl.hh */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = c4Impl.hh; sourceTree = ""; }; 2750723E18E3E52800A80C5A /* Foundation.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = Foundation.framework; path = System/Library/Frameworks/Foundation.framework; sourceTree = SDKROOT; }; 2750724418E3E52800A80C5A /* CBForest-Prefix.pch */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "CBForest-Prefix.pch"; sourceTree = ""; }; 2750724C18E3E5BB00A80C5A /* libforestdb.dylib */ = {isa = PBXFileReference; lastKnownFileType = "compiled.mach-o.dylib"; name = libforestdb.dylib; path = vendor/forestdb/build/libforestdb.dylib; sourceTree = ""; }; @@ -1246,8 +1249,10 @@ children = ( 2757DE5A1B9FC5C7002EE261 /* c4.c */, 2757DE591B9FC3F1002EE261 /* c4.h */, + 274D04261BA8A5BC00FF7C35 /* c4Impl.hh */, 2757DE561B9FC3C9002EE261 /* c4Database.cc */, 2757DE571B9FC3C9002EE261 /* c4Database.h */, + 274D04241BA8A58200FF7C35 /* c4View.cc */, 275607781BA21DAB00F58218 /* c4View.h */, 274D04231BA8932800FF7C35 /* c4.exp */, 274D03FF1BA7554700FF7C35 /* tests */, @@ -2183,6 +2188,7 @@ 720EA4141BA8D834002B8416 /* RevTree.cc in Sources */, 720EA4151BA8D834002B8416 /* Index.cc in Sources */, 720EA4121BA8D834002B8416 /* VersionedDocument.cc in Sources */, + 274D04251BA8A58200FF7C35 /* c4View.cc in Sources */, 720EA3E61BA7EAD9002B8416 /* c4Database.cc in Sources */, ); runOnlyForDeploymentPostprocessing = 0; diff --git a/CBForest/MapReduceIndex.cc b/CBForest/MapReduceIndex.cc index c8133f6..eca2970 100644 --- a/CBForest/MapReduceIndex.cc +++ b/CBForest/MapReduceIndex.cc @@ -299,8 +299,13 @@ namespace forestdb { emitter emit; if (!doc.deleted()) (*_map)(mappable, emit); // Call map function! - _lastSequenceIndexed = doc.sequence(); - if (IndexWriter(this,t).update(doc.key(), doc.sequence(), emit.keys, emit.values, _rowCount)) { + return emitForDocument(t, doc.key(), doc.sequence(), emit.keys, emit.values); + } + + bool MapReduceIndex::emitForDocument(Transaction& t, slice docID, sequence docSequence, std::vector keys, std::vector values) + { + _lastSequenceIndexed = docSequence; + if (IndexWriter(this,t).update(docID, docSequence, keys, values, _rowCount)) { _lastSequenceChangedAt = _lastSequenceIndexed; return true; } @@ -323,33 +328,42 @@ namespace forestdb { } - bool MapReduceIndexer::run() { - KeyStore sourceStore = _indexes[0]->sourceStore(); - _latestDbSequence = sourceStore.lastSequence(); + KeyStore MapReduceIndexer::sourceStore() { + return _indexes[0]->sourceStore(); + } + + + sequence MapReduceIndexer::startingSequence() { + _latestDbSequence = sourceStore().lastSequence(); // First find the minimum sequence that not all indexes have indexed yet. - // Also start a transaction for each index: sequence startSequence = _latestDbSequence+1; for (auto idx = _indexes.begin(); idx != _indexes.end(); ++idx) { sequence lastSequence = (*idx)->lastSequenceIndexed(); if (lastSequence < _latestDbSequence) { startSequence = std::min(startSequence, lastSequence+1); } else if (*idx == _triggerIndex) { - return false; // The trigger index doesn't need to be updated, so abort + return UINT64_MAX; // The trigger index doesn't need to be updated, so abort } _lastSequences.push_back(lastSequence); } + if (startSequence > _latestDbSequence) + startSequence = UINT64_MAX; // no updating needed + return startSequence; + } + bool MapReduceIndexer::run() { + sequence startSequence = startingSequence(); if (startSequence > _latestDbSequence) return false; // no updating needed // Enumerate all the documents: DocEnumerator::Options options = DocEnumerator::Options::kDefault; options.includeDeleted = true; - for (DocEnumerator e(sourceStore, startSequence, UINT64_MAX, options); e.next(); ) { + for (DocEnumerator e(sourceStore(), startSequence, UINT64_MAX, options); e.next(); ) { addDocument(*e); } - _finished = true; + finished(); return true; } @@ -375,4 +389,18 @@ namespace forestdb { updateDocInIndex(i, mappable); } + void MapReduceIndexer::emitDocIntoView(slice docID, + sequence docSequence, + unsigned viewNumber, + std::vector keys, + std::vector values) + { + emitter emit; + for (unsigned i = 0; i < keys.size(); ++i) + emit.emit(keys[i], values[i]); + _indexes[viewNumber]->emitForDocument(*_transactions[viewNumber], + docID, docSequence, + emit.keys, emit.values); + } + } diff --git a/CBForest/MapReduceIndex.hh b/CBForest/MapReduceIndex.hh index 333ad07..166630a 100644 --- a/CBForest/MapReduceIndex.hh +++ b/CBForest/MapReduceIndex.hh @@ -99,6 +99,8 @@ namespace forestdb { private: void saveState(Transaction& t); bool updateDocInIndex(Transaction&, const Mappable&); + bool emitForDocument(Transaction& t, slice docID, sequence docSequence, + std::vector keys, std::vector values); alloc_slice getSpecialEntry(slice docID, sequence, unsigned fullTextID); forestdb::KeyStore _sourceDatabase; @@ -125,10 +127,26 @@ namespace forestdb { /** If set, indexing will only occur if this index needs to be updated. */ void triggerOnIndex(MapReduceIndex* index) {_triggerIndex = index;} + KeyStore sourceStore(); + bool run(); + void finished() {_finished = true;} + sequence latestDbSequence() const {return _latestDbSequence;} + // Incremental mode: + + /** Determines at which sequence indexing should start. + Returns UINT64_MAX if no re-indexing is necessary. */ + sequence startingSequence(); + + void emitDocIntoView(slice docID, + sequence docSequence, + unsigned viewNumber, + std::vector keys, + std::vector values); + protected: /** Transforms the Document to a Mappable and invokes addMappable. The default implementation just uses the Mappable base class, i.e. doesn't do any work.