diff --git a/Makefile.am b/Makefile.am index fa24eb25d..a98f5699f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -66,6 +66,7 @@ ep_la_SOURCES = \ command_ids.h \ common.hh \ config_static.h \ + crc32.c crc32.h \ dispatcher.cc dispatcher.hh \ ep.cc ep.hh \ ep_engine.cc ep_engine.h \ @@ -79,6 +80,7 @@ ep_la_SOURCES = \ item_pager.cc item_pager.hh \ kvstore.hh \ locks.hh \ + mutation_log.cc mutation_log.hh \ mutex.cc mutex.hh \ priority.cc priority.hh \ queueditem.cc queueditem.hh \ @@ -201,6 +203,7 @@ check_PROGRAMS=\ histo_test \ hrtime_test \ misc_test \ + mutation_log_test \ mutex_test \ pathexpand_test \ priority_test \ @@ -335,6 +338,14 @@ checkpoint_test_DEPENDENCIES = checkpoint.hh vbucket.hh \ libobjectregistry.la libconfiguration.la checkpoint_test_LDADD = libobjectregistry.la libconfiguration.la +mutation_log_test_CXXFLAGS = $(AM_CXXFLAGS) -I$(top_srcdir) ${NO_WERROR} +mutation_log_test_SOURCES = t/mutation_log_test.cc mutation_log.hh \ + testlogger.cc mutation_log.cc \ + byteorder.h byteorder.c \ + crc32.h crc32.c +mutation_log_test_DEPENDENCIES = mutation_log.hh +mutation_log_test_LDADD = + hrtime_test_CXXFLAGS = $(AM_CXXFLAGS) -I$(top_srcdir) ${NO_WERROR} hrtime_test_SOURCES = t/hrtime_test.cc common.hh @@ -368,6 +379,7 @@ checkpoint_test_SOURCES += gethrtime.c management_cbdbconvert_SOURCES += gethrtime.c ep_testsuite_la_SOURCES += gethrtime.c hash_table_test_SOURCES += gethrtime.c +mutation_log_test_SOURCES += gethrtime.c endif if BUILD_BYTEORDER @@ -474,6 +486,7 @@ CLEANFILES += ep_la-probes.o ep_la-probes.lo \ .libs/cddbconvert-probes.o .libs/cddbconvert-probes.o \ .libs/atomic_ptr_test-probes.o \ .libs/checkpoint_test-probes.o \ + .libs/mutation_test-probes.o \ .libs/dispatcher_test-probes.o \ .libs/hash_table_test-probes.o \ .libs/vbucket_test-probes.o \ @@ -528,6 +541,12 @@ ep_testsuite_la-probes.lo: $(ep_testsuite_la_OBJECTS) dtrace/probes.h -s ${srcdir}/dtrace/probes.d \ $(checkpoint_test_OBJECTS) +.libs/mutation_test-probes.o: $(mutation_test_OBJECTS) dtrace/probes.h + $(DTRACE) $(DTRACEFLAGS) -G \ + -o .libs/mutation_test-probes.o \ + -s ${srcdir}/dtrace/probes.d \ + $(mutation_test_OBJECTS) + .libs/dispatcher_test-probes.o: $(dispatcher_test_OBJECTS) dtrace/probes.h $(DTRACE) $(DTRACEFLAGS) -G \ -o .libs/dispatcher_test-probes.o \ diff --git a/configuration.json b/configuration.json index 00e231acb..4fe7ad78a 100644 --- a/configuration.json +++ b/configuration.json @@ -162,6 +162,38 @@ "descr": "True if we want to keep the closed checkpoints for each vbucket unless the memory usage is above high water mark", "type": "bool" }, + "klog_block_size": { + "default": "4096", + "descr": "Logging block size.", + "type": "size_t" + }, + "klog_flush": { + "default": "commit2", + "descr": "When to flush the log (complete current block).", + "enum": [ + "off", + "commit1", + "commit2", + "full" + ], + "type": "std::string" + }, + "klog_path": { + "default": "", + "descr": "Path to the mutation key log.", + "type": "std::string" + }, + "klog_sync": { + "default": "commit2", + "descr": "When to sync the log.", + "enum": [ + "off", + "commit1", + "commit2", + "full" + ], + "type": "std::string" + }, "max_checkpoints": { "default": "2", "type": "size_t" diff --git a/crc32.c b/crc32.c new file mode 100644 index 000000000..ae362e18a --- /dev/null +++ b/crc32.c @@ -0,0 +1,124 @@ +/* Crc - 32 BIT ANSI X3.66 CRC checksum files */ + +#include +#include "crc32.h" + +/**********************************************************************\ +|* Demonstration program to compute the 32-bit CRC used as the frame *| +|* check sequence in ADCCP (ANSI X3.66, also known as FIPS PUB 71 *| +|* and FED-STD-1003, the U.S. versions of CCITT's X.25 link-level *| +|* protocol). The 32-bit FCS was added via the Federal Register, *| +|* 1 June 1982, p.23798. I presume but don't know for certain that *| +|* this polynomial is or will be included in CCITT V.41, which *| +|* defines the 16-bit CRC (often called CRC-CCITT) polynomial. FIPS *| +|* PUB 78 says that the 32-bit FCS reduces otherwise undetected *| +|* errors by a factor of 10^-5 over 16-bit FCS. *| +\**********************************************************************/ + +/* Need an unsigned type capable of holding 32 bits; */ + +/* Copyright (C) 1986 Gary S. Brown. You may use this program, or + code or tables extracted from it, as desired without restriction.*/ + +/* First, the polynomial itself and its table of feedback terms. The */ +/* polynomial is */ +/* X^32+X^26+X^23+X^22+X^16+X^12+X^11+X^10+X^8+X^7+X^5+X^4+X^2+X^1+X^0 */ +/* Note that we take it "backwards" and put the highest-order term in */ +/* the lowest-order bit. The X^32 term is "implied"; the LSB is the */ +/* X^31 term, etc. The X^0 term (usually shown as "+1") results in */ +/* the MSB being 1. */ + +/* Note that the usual hardware shift register implementation, which */ +/* is what we're using (we're merely optimizing it by doing eight-bit */ +/* chunks at a time) shifts bits into the lowest-order term. In our */ +/* implementation, that means shifting towards the right. Why do we */ +/* do it this way? Because the calculated CRC must be transmitted in */ +/* order from highest-order term to lowest-order term. UARTs transmit */ +/* characters in order from LSB to MSB. By storing the CRC this way, */ +/* we hand it to the UART in the order low-byte to high-byte; the UART */ +/* sends each low-bit to hight-bit; and the result is transmission bit */ +/* by bit from highest- to lowest-order term without requiring any bit */ +/* shuffling on our part. Reception works similarly. */ + +/* The feedback terms table consists of 256, 32-bit entries. Notes: */ +/* */ +/* 1. The table can be generated at runtime if desired; code to do so */ +/* is shown later. It might not be obvious, but the feedback */ +/* terms simply represent the results of eight shift/xor opera- */ +/* tions for all combinations of data and CRC register values. */ +/* */ +/* 2. The CRC accumulation logic is the same for all CRC polynomials, */ +/* be they sixteen or thirty-two bits wide. You simply choose the */ +/* appropriate table. Alternatively, because the table can be */ +/* generated at runtime, you can start by generating the table for */ +/* the polynomial in question and use exactly the same "updcrc", */ +/* if your application needn't simultaneously handle two CRC */ +/* polynomials. (Note, however, that XMODEM is strange.) */ +/* */ +/* 3. For 16-bit CRCs, the table entries need be only 16 bits wide; */ +/* of course, 32-bit entries work OK if the high 16 bits are zero. */ +/* */ +/* 4. The values must be right-shifted by eight bits by the "updcrc" */ +/* logic; the shift must be unsigned (bring in zeroes). On some */ +/* hardware you could probably optimize the shift in assembler by */ +/* using byte-swap instructions. */ + +static uint32_t crc_32_tab[] = { /* CRC polynomial 0xedb88320 */ + 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, + 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988, + 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2, + 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7, + 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9, + 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172, + 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c, + 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59, + 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423, + 0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924, + 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106, + 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433, + 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d, + 0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e, + 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950, + 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65, + 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7, + 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0, + 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa, + 0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f, + 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81, + 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a, + 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84, + 0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1, + 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb, + 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc, + 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e, + 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b, + 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55, + 0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236, + 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28, + 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d, + 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f, + 0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38, + 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242, + 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777, + 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69, + 0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2, + 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc, + 0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9, + 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693, + 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94, + 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d +}; + +#define UPDC32(octet, crc) (crc_32_tab[((crc) ^ (octet)) & 0xff] ^ ((crc) >> 8)) + +uint32_t crc32buf(uint8_t *buf, size_t len) { + register uint32_t oldcrc32; + + oldcrc32 = 0xFFFFFFFF; + + for ( ; len; --len, ++buf) { + oldcrc32 = UPDC32(*buf, oldcrc32); + } + + return ~oldcrc32; +} diff --git a/crc32.h b/crc32.h new file mode 100644 index 000000000..b07e73adf --- /dev/null +++ b/crc32.h @@ -0,0 +1,8 @@ +#ifndef CRC32_H +#define CRC32_H 1 + +#include + +uint32_t crc32buf(uint8_t *buf, size_t len); + +#endif /* CRC32_H */ diff --git a/docs/engine-params.org b/docs/engine-params.org index fd359cd2b..1c08aaf52 100644 --- a/docs/engine-params.org +++ b/docs/engine-params.org @@ -108,6 +108,12 @@ memcached like this: | | | means don't throttle. | | tap_throttle_threshold | float | Percentage of memory in use before we | | | | throttle tap streams | +| klog_path | string | Path to the mutation key log. | +| klog_block_size | int | Mutation key log block size. | +| klog_flush | string | When to force buffer flushes during | +| | | klog (off, commit1, commit2, full) | +| klog_sync | string | When to fsync during klog. | + ** Shard Patterns diff --git a/docs/klog.org b/docs/klog.org new file mode 100644 index 000000000..ddcb92d2b --- /dev/null +++ b/docs/klog.org @@ -0,0 +1,99 @@ +* Overview of the Key Mutation Log + +This change introduces a new on-disk binary format for logging major +key events only. These events are limited to the creation and +destruction of keys only -- modification of keys is out of scope, as +is storing values. + +* Flow + +The logging hooks in to the persistence flow and does the following: + +After writing any new key or deleting any existing key from disk, a +new log entry is buffered. + +The buffer is written when a block is full or on a flush event +(configurable, usually after a commit1 or commit2 event). + +Before sending a commit to the underlying store, a commit1 event is +logged. + +After the underlying store completes its commit, a commit2 is logged. + +* Configuration + +There are four new engine parameters that come with this feature: + +** klog_path + +Where the events should be logged. An empty string (default) disables +logging. + +** klog_block_size + +The buffer/block size for log entries. The number should line up with +the underlying filesystem block size. Multiples may increase +throughput. + +** klog_flush + +Configures when the buffer should be force-flushed. There are four +possible values: + + off: never force a flush + commit1: force a flush after commit1 only + commit2: force a flush after commit2 only + full: force a flush after both commit1 and commit2 + +** klog_sync + +Configures when the file should be fsynced. There are four +possible values: + + off: never fsync + commit1: fsync after commit1 only + commit2: fsync after commit2 only + full: fsync after both commit1 and commit2 + +* Data Format + +Each file consists of a header and then an arbitrary number of blocks +which each contain an arbitrary number of records. All fields are +big-endian byte encoded. + +** Header + +The file begins with a header of at least 4,096 bytes long. The +header defines some basic info about the file. + +- 32-bit version number (this document describes version 1) +- 32-bit block size +- 32-bit block count +- k/v properties to store additional tagged config + - 8-bit key len + - 8-bit value len + - key bytes + - value bytes +- terminated by zero-length key + +If the block size listed in the header is larger 4,096 bytes, the +first block itself will be this length (assume anything extra is +zero-padded). + +** Block + +- checksum (16-bits, IEEE crc32 & 0xffff) +- record count (16-bits) +- []record + +Block size is variable. I've been using 4k, for now. Blocks are 0 +padded at the end when we need to sync or we can't fit more entries. + +** Record + +- rowid (64-bit) +- vbucket (16-bit) +- magic (0x45) +- type (8-bit) +- key len (8-bit) +- key ([]byte) diff --git a/docs/stats.org b/docs/stats.org index 06b1316fc..7422875fa 100644 --- a/docs/stats.org +++ b/docs/stats.org @@ -473,6 +473,9 @@ form to describe when time was spent doing various things: | disk_commit | waiting for a commit after a batch of updates | | disk_invalid_item_del | Waiting for disk to delete a chunk of invalid | | | items with the old vbucket version | +| klogPadding | Amount of wasted "padding" space in the klog. | +| klogFlushTime | Time spent flushing the klog. | +| klogSyncTime | Time spent syncing the klog. | ** Hash Stats @@ -560,6 +563,17 @@ Note that tcmalloc stats are not available on some operating systems | tcmalloc_current_thread_cache_bytes | A measure of some of the memory | | | TCMalloc is using for small objects. | +** Key Log + +Stats =klog= shows counts what's going on with the key mutation log. + +| size | The size of the logfile. | +| count_new | Number of "new key" events in the log. | +| count_del | Number of "deleted key" events in the log. | +| count_del_all | Number of "delete all" events in the log. | +| count_commit1 | Number of "commit1" events in the log. | +| count_commit2 | Number of "commit2" events in the log. | + * Details ** Ages diff --git a/ep.cc b/ep.cc index 07ea46f51..98b5a57e1 100644 --- a/ep.cc +++ b/ep.cc @@ -446,8 +446,10 @@ EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine engine(theEngine), stats(engine.getEpStats()), rwUnderlying(t), storageProperties(t->getStorageProperties()), vbuckets(theEngine.getConfiguration()), + mutationLog(theEngine.getConfiguration().getKlogPath(), + theEngine.getConfiguration().getKlogBlockSize()), diskFlushAll(false), - tctx(stats, t, theEngine.observeRegistry), + tctx(stats, t, mutationLog, theEngine.observeRegistry), bgFetchDelay(0) { getLogger()->log(EXTENSION_LOG_INFO, NULL, @@ -528,6 +530,9 @@ EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine persistenceCheckpointIds[i] = 0; } + bool syncset(mutationLog.setSyncConfig(theEngine.getConfiguration().getKlogSync())); + assert(syncset); + startDispatcher(); startFlusher(); startNonIODispatcher(); @@ -1122,6 +1127,11 @@ EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid, uint16_t vbver lh.unlock(); if (rwUnderlying->delVBucket(vbid, vbver)) { vbuckets.setBucketDeletion(vbid, false); + mutationLog.deleteAll(vbid); + // This is happening in an independent transaction, so + // we're going go ahead and commit it out. + mutationLog.commit1(); + mutationLog.commit2(); ++stats.vbucketDeletions; return vbucket_del_success; } else { @@ -2185,9 +2195,11 @@ class PersistenceCallback : public Callback, public: PersistenceCallback(const queued_item &qi, std::queue *q, - EventuallyPersistentStore *st, + EventuallyPersistentStore *st, MutationLog *ml, rel_time_t qd, rel_time_t d, EPStats *s) : - queuedItem(qi), rq(q), store(st), queued(qd), dirtied(d), stats(s) { + queuedItem(qi), rq(q), store(st), mutationLog(ml), + queued(qd), dirtied(d), stats(s) { + assert(rq); assert(s); } @@ -2197,6 +2209,8 @@ class PersistenceCallback : public Callback, if (value.first == 1) { stats->totalPersisted++; if (value.second > 0) { + mutationLog->newItem(queuedItem->getVBucketId(), queuedItem->getKey(), + value.second); ++stats->newItems; setId(value.second); } @@ -2265,6 +2279,9 @@ class PersistenceCallback : public Callback, ++stats->delItems; ++vb->opsDelete; } + + mutationLog->delItem(queuedItem->getVBucketId(), queuedItem->getKey()); + // We have succesfully removed an item from the disk, we // may now remove it from the hash table. if (vb) { @@ -2315,6 +2332,7 @@ class PersistenceCallback : public Callback, const queued_item queuedItem; std::queue *rq; EventuallyPersistentStore *store; + MutationLog *mutationLog; rel_time_t queued; rel_time_t dirtied; EPStats *stats; @@ -2323,6 +2341,15 @@ class PersistenceCallback : public Callback, int EventuallyPersistentStore::flushOneDeleteAll() { rwUnderlying->reset(); + // Log a flush of every known vbucket. + std::vector vbs(vbuckets.getBuckets()); + for (std::vector::iterator it(vbs.begin()); it != vbs.end(); ++it) { + mutationLog.deleteAll(static_cast(*it)); + } + // This is happening in an independent transaction, so we're going + // go ahead and commit it out. + mutationLog.commit1(); + mutationLog.commit2(); diskFlushAll.cas(true, false); return 1; } @@ -2428,7 +2455,7 @@ int EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi, rowid == -1 ? "disk_insert" : "disk_update", stats.timingLog); PersistenceCallback *cb; - cb = new PersistenceCallback(qi, rejectQueue, this, + cb = new PersistenceCallback(qi, rejectQueue, this, &mutationLog, queued, dirtied, &stats); tctx.addCallback(cb); rwUnderlying->set(itm, qi->getVBucketVersion(), *cb); @@ -2444,8 +2471,8 @@ int EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi, BlockTimer timer(&stats.diskDelHisto, "disk_delete", stats.timingLog); PersistenceCallback *cb; - cb = new PersistenceCallback(qi, rejectQueue, this, queued, - dirtied, &stats); + cb = new PersistenceCallback(qi, rejectQueue, this, &mutationLog, + queued, dirtied, &stats); if (rowid > 0) { uint16_t vbid(qi->getVBucketId()); uint16_t vbver(vbuckets.getBucketVersion(vbid)); @@ -2586,6 +2613,59 @@ void EventuallyPersistentStore::warmupCompleted() { } } +static void warmupLogCallback(void *arg, uint16_t vb, uint16_t vbver, + const std::string &key, uint64_t rowid) { + shared_ptr > *cb = reinterpret_cast >*>(arg); + Item *itm = new Item(key.data(), key.size(), + 0, // flags + 0, // exp + NULL, 0, // data + 0, // CAS + rowid, + vb); + + GetValue gv(itm, ENGINE_SUCCESS, rowid, vbver, NULL, true /* partial */); + + (*cb)->callback(gv); +} + +bool EventuallyPersistentStore::warmupFromLog(const std::map, + vbucket_state> &state, + shared_ptr > cb) { + + bool rv(true); + + MutationLogHarvester harvester(mutationLog); + for (std::map, vbucket_state>::const_iterator it = state.begin(); + it != state.end(); ++it) { + + harvester.setVbVer(it->first.first, it->first.second); + } + + hrtime_t start(gethrtime()); + + harvester.load(); + + hrtime_t end1(gethrtime()); + + getLogger()->log(EXTENSION_LOG_DEBUG, NULL, + "Completed log read in %s with %d entries\n", + hrtime2text(end1 - start).c_str(), harvester.total()); + + harvester.apply(&cb, &warmupLogCallback); + mutationLog.resetCounts(harvester.getItemsSeen()); + + hrtime_t end2(gethrtime()); + getLogger()->log(EXTENSION_LOG_DEBUG, NULL, + "Completed repopulation from log in %dms\n", + ((end2 - end1) / 1000000)); + + // Anything left in the "loading" map at this point is uncommitted. + // TODO: Forward reconciliation of uncommitted data. + + return rv; +} + void EventuallyPersistentStore::warmup(const std::map, vbucket_state> &st, bool keysOnly) { @@ -2609,7 +2689,9 @@ EventuallyPersistentStore::warmup(const std::map, } if (keysOnly) { - roUnderlying->dumpKeys(vbids, cb); + if (!warmupFromLog(st, cb)) { + roUnderlying->dumpKeys(vbids, cb); + } } else { roUnderlying->dump(cb); invalidItemDbPager->createRangeList(); @@ -2679,7 +2761,8 @@ void LoadStorageKVPairCallback::callback(GetValue &val) { epstore->getInvalidItemDbPager()->addInvalidItem(i, val.getVBucketVersion()); getLogger()->log(EXTENSION_LOG_WARNING, NULL, - "Received invalid item.. ignored"); + "Received invalid item (v %d != v %d).. ignored", + val.getVBucketVersion(), vb_version); delete i; return; @@ -2791,10 +2874,12 @@ void TransactionContext::leave(int completed) { void TransactionContext::commit() { BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog); rel_time_t cstart = ep_current_time(); + mutationLog.commit1(); while (!underlying->commit()) { sleep(1); ++stats.commitFailed; } + mutationLog.commit2(); ++stats.flusherCommits; std::list::iterator iter; diff --git a/ep.hh b/ep.hh index 112cd0dfb..de380d991 100644 --- a/ep.hh +++ b/ep.hh @@ -48,6 +48,7 @@ extern EXTENSION_LOGGER_DESCRIPTOR *getLogger(void); #include "vbucket.hh" #include "vbucketmap.hh" #include "item_pager.hh" +#include "mutation_log.hh" #define MAX_BG_FETCH_DELAY 900 @@ -299,8 +300,9 @@ class PersistenceCallback; class TransactionContext { public: - TransactionContext(EPStats &st, KVStore *ks, ObserveRegistry &obsReg) - : stats(st), underlying(ks), _remaining(0), intxn(false), + TransactionContext(EPStats &st, KVStore *ks, MutationLog &log, + ObserveRegistry &obsReg) + : stats(st), underlying(ks), mutationLog(log), _remaining(0), intxn(false), observeRegistry(obsReg) {} /** @@ -373,6 +375,7 @@ public: private: EPStats &stats; KVStore *underlying; + MutationLog &mutationLog; int _remaining; Atomic txnSize; Atomic numUncommittedItems; @@ -835,10 +838,18 @@ public: */ void completeDegradedMode(); + + /** + * Get access to the mutation log. + */ + const MutationLog *getMutationLog() const { return &mutationLog; } + protected: // Method called by the flusher std::map, vbucket_state> loadVBucketState(); + bool warmupFromLog(const std::map, vbucket_state> &state, + shared_ptr >cb); void warmup(const std::map, vbucket_state> &state, bool keysOnly); void warmupCompleted(); @@ -939,6 +950,8 @@ private: VBucketMap vbuckets; SyncObject mutex; + MutationLog mutationLog; + // The writing queue is used by the flusher thread to keep // track of the objects it works on. It should _not_ be used // by any other threads (because the flusher use it without diff --git a/ep_engine.cc b/ep_engine.cc index c05b23a71..a11920f61 100644 --- a/ep_engine.cc +++ b/ep_engine.cc @@ -2932,7 +2932,19 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doTimingStats(const void *cookie, add_casted_stat("disk_invalid_item_del", stats.diskInvaidItemDelHisto, add_stat, cookie); - add_casted_stat("online_update_revert", stats.checkpointRevertHisto, add_stat, cookie); + add_casted_stat("online_update_revert", stats.checkpointRevertHisto, + add_stat, cookie); + + // Mutation Log + const MutationLog *mutationLog(epstore->getMutationLog()); + if (mutationLog->isEnabled()) { + add_casted_stat("klogPadding", mutationLog->paddingHisto, + add_stat, cookie); + add_casted_stat("klogFlushTime", mutationLog->flushTimeHisto, + add_stat, cookie); + add_casted_stat("klogSyncTime", mutationLog->syncTimeHisto, + add_stat, cookie); + } return ENGINE_SUCCESS; } @@ -3012,6 +3024,21 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doObserveStats(const void* cookie, return ENGINE_SUCCESS; } +ENGINE_ERROR_CODE EventuallyPersistentEngine::doKlogStats(const void* cookie, + ADD_STAT add_stat) { + const MutationLog *mutationLog(epstore->getMutationLog()); + add_casted_stat("size", mutationLog->logSize, add_stat, cookie); + for (int i(0); i < MUTATION_LOG_TYPES; ++i) { + size_t v(mutationLog->itemsLogged[i]); + if (v > 0) { + char key[32]; + snprintf(key, sizeof(key), "count_%s", mutation_log_type_names[i]); + add_casted_stat(key, v, add_stat, cookie); + } + } + return ENGINE_SUCCESS; +} + ENGINE_ERROR_CODE EventuallyPersistentEngine::getStats(const void* cookie, const char* stat_key, int nkey, @@ -3041,6 +3068,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::getStats(const void* cookie, rv = doVBucketStats(cookie, add_stat, true, false); } else if (nkey == 10 && strncmp(stat_key, "checkpoint", 10) == 0) { rv = doCheckpointStats(cookie, add_stat); + } else if (nkey == 4 && strncmp(stat_key, "klog", 10) == 0) { + rv = doKlogStats(cookie, add_stat); } else if (nkey == 7 && strncmp(stat_key, "timings", 7) == 0) { rv = doTimingStats(cookie, add_stat); } else if (nkey == 10 && strncmp(stat_key, "dispatcher", 10) == 0) { diff --git a/ep_engine.h b/ep_engine.h index 6c85ae1c0..b835dbb3a 100644 --- a/ep_engine.h +++ b/ep_engine.h @@ -683,6 +683,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 { ENGINE_ERROR_CODE doObserveStats(const void* cookie, ADD_STAT add_s, const char* stat_key, int nkey); ENGINE_ERROR_CODE doEngineStats(const void *cookie, ADD_STAT add_stat); + ENGINE_ERROR_CODE doKlogStats(const void *cookie, ADD_STAT add_stat); ENGINE_ERROR_CODE doMemoryStats(const void *cookie, ADD_STAT add_stat); ENGINE_ERROR_CODE doVBucketStats(const void *cookie, ADD_STAT add_stat, bool prevStateRequested, diff --git a/ep_testsuite.cc b/ep_testsuite.cc index 96f157053..99465d7d2 100644 --- a/ep_testsuite.cc +++ b/ep_testsuite.cc @@ -109,6 +109,7 @@ bool abort_msg(const char *expr, const char *msg, int line) { static void rmdb(void) { remove("/tmp/test.db"); + remove("/tmp/mutation.log"); remove("/tmp/test.db-0.sqlite"); remove("/tmp/test.db-1.sqlite"); remove("/tmp/test.db-2.sqlite"); diff --git a/flusher.cc b/flusher.cc index a5a921640..e82021fd6 100644 --- a/flusher.cc +++ b/flusher.cc @@ -158,7 +158,8 @@ void Flusher::initialize(TaskId tid) { warmupStartTime = gethrtime(); initialVbState = store->loadVBucketState(); - if (store->getROUnderlying()->isKeyDumpSupported()) { + if (store->getMutationLog()->isEnabled() + || store->getROUnderlying()->isKeyDumpSupported()) { transition_state(loading_keys); } else { transition_state(loading_data); diff --git a/flusher.hh b/flusher.hh index 99603bb67..c2c7be61a 100644 --- a/flusher.hh +++ b/flusher.hh @@ -5,6 +5,7 @@ #include "common.hh" #include "ep.hh" #include "dispatcher.hh" +#include "mutation_log.hh" enum flusher_state { initializing, diff --git a/management/cbstats b/management/cbstats index 2f6f586d1..fdbda05de 100755 --- a/management/cbstats +++ b/management/cbstats @@ -2,6 +2,7 @@ import clitool import sys +import math import itertools import re @@ -51,6 +52,38 @@ def stats_formatter(stats, prefix=" ", cmp=None): s = stat + ":" print "%s%s%s" % (prefix, s.ljust(longest), val) +def time_label(s): + # -(2**64) -> '-inf' + # 2**64 -> 'inf' + # 0 -> '0' + # 4 -> '4us' + # 838384 -> '838ms' + # 8283852 -> '8s' + if s > BIG_VALUE: + return 'inf' + elif s < SMALL_VALUE: + return '-inf' + elif s == 0: + return '0' + product = 1 + sizes = (('us', 1), ('ms', 1000), ('s', 1000), ('m', 60)) + sizeMap = [] + for l,sz in sizes: + product = sz * product + sizeMap.insert(0, (l, product)) + lbl, factor = itertools.dropwhile(lambda x: x[1] > s, sizeMap).next() + return "%d%s" % (s / factor, lbl) + +def size_label(s): + if s == 0: + return "0" + sizes=['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB'] + e = math.floor(math.log(s, 1024)) + suffix = sizes[int(e)] + return "%d%s" % (s/(1024 ** math.floor(e)), suffix) + +@cmd + def histograms(mc, raw_stats): def seg(k, v): # Parse ('some_stat_x_y', 'v') into (('some_stat', x, y), v) @@ -75,8 +108,10 @@ def histograms(mc, raw_stats): dd = {} totals = {} longest = 0 + labelers = {'klogPadding': size_label} for s in stats: - lbl = "%s - %s" % (time_label(s[0][1]), time_label(s[0][2])) + labeler = labelers.get(s[0][0], time_label) + lbl = "%s - %s" % (labeler(s[0][1]), labeler(s[0][2])) longest = max(longest, len(lbl) + 1) k = s[0][0] l = dd.get(k, []) diff --git a/mutation_log.cc b/mutation_log.cc new file mode 100644 index 000000000..08d08bf3f --- /dev/null +++ b/mutation_log.cc @@ -0,0 +1,510 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ + +#include "config.h" +#include + +#include + +#include "mutation_log.hh" + +extern "C" { +#include "crc32.h" +} + +const char *mutation_log_type_names[] = { + "new", "del", "del_all", "commit1", "commit2", NULL +}; + +static void writeFully(int fd, const uint8_t *buf, size_t nbytes) { + while (nbytes > 0) { + ssize_t written = write(fd, buf, nbytes); + assert(written >= 0); + + nbytes -= written; + buf += written; + } +} + +uint64_t MutationLogEntry::rowid() const { + return ntohll(_rowid); +} + +MutationLog::MutationLog(const std::string &path, + const size_t bs) + : paddingHisto(GrowingWidthGenerator(0, 8, 1.5), 32), + logPath(path), + blockSize(bs), + blockPos(HEADER_RESERVED), + file(-1), + entries(0), + entryBuffer(static_cast(calloc(MutationLogEntry::len(256), 1))), + blockBuffer(static_cast(calloc(bs, 1))), + syncConfig(DEFAULT_SYNC_CONF) { + + assert(entryBuffer); + assert(blockBuffer); + if (logPath == "") { + file = DISABLED_FD; + } + open(); +} + +MutationLog::~MutationLog() { + flush(); + if (file >= 0) { + close(file); + } + free(entryBuffer); + free(blockBuffer); +} + +void MutationLog::newItem(uint16_t vbucket, const std::string &key, uint64_t rowid) { + MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, + rowid, ML_NEW, vbucket, key); + writeEntry(mle); +} + +void MutationLog::delItem(uint16_t vbucket, const std::string &key) { + MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, + 0, ML_DEL, vbucket, key); + writeEntry(mle); +} + +void MutationLog::deleteAll(uint16_t vbucket) { + MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, + 0, ML_DEL_ALL, vbucket, ""); + writeEntry(mle); +} + +void MutationLog::commit1() { + MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, + 0, ML_COMMIT1, 0, ""); + writeEntry(mle); + if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) { + flush(); + } + if ((getSyncConfig() & SYNC_COMMIT_1) != 0) { + BlockTimer timer(&syncTimeHisto); + fsync(file); + } +} + +void MutationLog::commit2() { + MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, + 0, ML_COMMIT2, 0, ""); + writeEntry(mle); + if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) { + flush(); + } + if ((getSyncConfig() & SYNC_COMMIT_2) != 0) { + BlockTimer timer(&syncTimeHisto); + fsync(file); + } +} + +void MutationLog::writeInitialBlock() { + assert(isEnabled()); + headerBlock.set(blockSize); + + writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock)); + + int lseek_result = lseek(file, std::max(static_cast(MIN_LOG_HEADER_SIZE), + headerBlock.blockSize() * headerBlock.blockCount()) + - 1, SEEK_SET); + assert(lseek_result > 0); + uint8_t zero(0); + writeFully(file, &zero, sizeof(zero)); +} + +void MutationLog::readInitialBlock() { + uint8_t buf[MIN_LOG_HEADER_SIZE]; + ssize_t bytesread = pread(file, buf, sizeof(buf), 0); + assert(bytesread == sizeof(buf)); + + headerBlock.set(buf, sizeof(buf)); + + assert(headerBlock.version() == LOG_VERSION); + assert(headerBlock.blockCount() == 1); + + blockSize = headerBlock.blockSize(); +} + +void MutationLog::prepareWrites() { + if (isEnabled()) { + int lseek_result = lseek(file, 0, SEEK_END); + assert(lseek_result > 0); + assert(lseek_result % blockSize == 0); + logSize = static_cast(lseek_result); + } +} + +static uint8_t parseConfigString(const std::string &s) { + uint8_t rv(0); + if (s == "off") { + rv = 0; + } else if (s == "commit1") { + rv = 1; + } else if (s == "commit2") { + rv = 2; + } else if (s == "full") { + rv = 3; + } else { + rv = 0xff; + } + return rv; +} + +bool MutationLog::setSyncConfig(const std::string &s) { + uint8_t v(parseConfigString(s)); + if (v != 0xff) { + syncConfig = (syncConfig & ~SYNC_FULL) | v; + } + return v != 0xff; +} + +bool MutationLog::setFlushConfig(const std::string &s) { + uint8_t v(parseConfigString(s)); + if (v != 0xff) { + syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2); + } + return v != 0xff; +} + +void MutationLog::open() { + if (!isEnabled()) { + return; + } + file = ::open(const_cast(logPath.c_str()), O_RDWR|O_CREAT, 0666); + assert(file >= 0); + struct stat st; + int stat_result = fstat(file, &st); + assert(stat_result == 0); + + if (st.st_size > 0) { + readInitialBlock(); + } else { + writeInitialBlock(); + } + + prepareWrites(); +} + +void MutationLog::flush() { + if (isEnabled() && blockPos > HEADER_RESERVED) { + BlockTimer timer(&flushTimeHisto); + + if (blockPos < blockSize) { + size_t padding(blockSize - blockPos); + memset(blockBuffer + blockPos, 0x00, padding); + paddingHisto.add(padding); + } + + entries = htons(entries); + memcpy(blockBuffer + 2, &entries, sizeof(entries)); + + uint32_t crc32(crc32buf(blockBuffer + 2, blockSize - 2)); + uint16_t crc16(htons(crc32 & 0xffff)); + memcpy(blockBuffer, &crc16, sizeof(crc16)); + + writeFully(file, blockBuffer, blockSize); + logSize += blockSize; + + blockPos = HEADER_RESERVED; + entries = 0; + } +} + +void MutationLog::writeEntry(MutationLogEntry *mle) { + if (!isEnabled()) { + return; + } + size_t len(mle->len()); + if (blockPos + len > blockSize) { + flush(); + } + assert(len < blockSize); + + memcpy(blockBuffer + blockPos, mle, len); + blockPos += len; + ++entries; + + ++itemsLogged[mle->type()]; + + delete mle; +} + +static const char* logType(uint8_t t) { + switch(t) { + case ML_NEW: + return "new"; + break; + case ML_DEL: + return "del"; + break; + case ML_DEL_ALL: + return "delall"; + break; + case ML_COMMIT1: + return "commit1"; + break; + case ML_COMMIT2: + return "commit2"; + break; + } + return "UNKNOWN"; +} + +// ---------------------------------------------------------------------- +// Mutation log iterator +// ---------------------------------------------------------------------- + +MutationLog::iterator::iterator(const MutationLog *l, bool e) + : log(l), + entryBuf(NULL), + buf(NULL), + p(buf), + offset(l->header().blockSize() * l->header().blockCount()), + items(0), isEnd(e) { + + assert(log); +} + +MutationLog::iterator::iterator(const MutationLog::iterator& mit) + : log(mit.log), + entryBuf(NULL), + buf(NULL), + p(NULL), + offset(mit.offset), + items(0), + isEnd(mit.isEnd) { + + assert(log); + if (mit.buf != NULL) { + buf = static_cast(calloc(1, log->header().blockSize())); + assert(buf); + memcpy(buf, mit.buf, log->header().blockSize()); + p = buf + (mit.p - mit.buf); + } + if (mit.entryBuf != NULL) { + buf = static_cast(calloc(1, LOG_ENTRY_BUF_SIZE)); + assert(entryBuf); + memcpy(entryBuf, mit.entryBuf, LOG_ENTRY_BUF_SIZE); + } +} + +MutationLog::iterator::~iterator() { + free(entryBuf); + free(buf); +} + +void MutationLog::iterator::prepItem() { + MutationLogEntry *e = MutationLogEntry::newEntry(p, bufferBytesRemaining()); + if (entryBuf == NULL) { + entryBuf = static_cast(calloc(1, LOG_ENTRY_BUF_SIZE)); + assert(entryBuf); + } + memcpy(entryBuf, p, e->len()); +} + +MutationLog::iterator& MutationLog::iterator::operator++() { + if (--items == 0) { + nextBlock(); + } else { + size_t l(operator*()->len()); + p += l; + + prepItem(); + } + return *this; +} + +MutationLog::iterator& MutationLog::iterator::operator++(int) { + abort(); + return *this; +} + +bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) { + return log->fd() == rhs.log->fd() + && ( + (isEnd == rhs.isEnd) + || (offset == rhs.offset + && items == rhs.items)); +} + +bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) { + return ! operator==(rhs); +} + +const MutationLogEntry* MutationLog::iterator::operator*() { + assert(entryBuf != NULL); + return MutationLogEntry::newEntry(entryBuf, LOG_ENTRY_BUF_SIZE); +} + +size_t MutationLog::iterator::bufferBytesRemaining() { + return log->header().blockSize() - (p - buf); +} + +void MutationLog::iterator::nextBlock() { + if (buf == NULL) { + buf = static_cast(calloc(1, log->header().blockSize())); + assert(buf); + } + ssize_t bytesread = pread(log->fd(), buf, log->header().blockSize(), offset); + if (bytesread < log->header().blockSize()) { + isEnd = true; + return; + } + assert(bytesread == log->header().blockSize()); + offset += bytesread; + p = buf; + + uint32_t crc32(crc32buf(buf + 2, log->header().blockSize() - 2)); + uint16_t computed_crc16(crc32 & 0xffff); + uint16_t retrieved_crc16; + memcpy(&retrieved_crc16, buf, sizeof(retrieved_crc16)); + retrieved_crc16 = ntohs(retrieved_crc16); + assert(computed_crc16 == retrieved_crc16); + + memcpy(&items, buf + 2, 2); + items = ntohs(items); + + p = p + 4; + + prepItem(); +} + +void MutationLog::resetCounts(size_t *items) { + for (int i(0); i < MUTATION_LOG_TYPES; ++i) { + itemsLogged[i] = items[i]; + } +} + +// ---------------------------------------------------------------------- +// Reading entries +// ---------------------------------------------------------------------- + +bool MutationLogHarvester::load() { + bool clean(false); + std::set shouldClear; + for (MutationLog::iterator it(mlog.begin()); it != mlog.end(); ++it) { + const MutationLogEntry *le = *it; + ++itemsSeen[le->type()]; + clean = false; + + switch (le->type()) { + case ML_DEL: + // FALLTHROUGH + case ML_NEW: + if (vbid_set.find(le->vbucket()) != vbid_set.end()) { + loading[le->vbucket()][le->key()] = std::make_pair(le->rowid(), le->type()); + } + break; + case ML_COMMIT2: { + clean = true; + for (std::set::iterator vit(shouldClear.begin()); vit != shouldClear.end(); ++vit) { + committed[*vit].clear(); + } + shouldClear.clear(); + + for (std::set::const_iterator vit = vbid_set.begin(); vit != vbid_set.end(); ++vit) { + uint16_t vb(*vit); + + unordered_map::iterator copyit2; + for (copyit2 = loading[vb].begin(); + copyit2 != loading[vb].end(); + ++copyit2) { + + mutation_log_event_t t = copyit2->second; + + switch (t.second) { + case ML_NEW: + committed[vb][copyit2->first] = t.first; + break; + case ML_DEL: + committed[vb].erase(copyit2->first); + break; + default: + abort(); + } + } + } + } + loading.clear(); + break; + case ML_COMMIT1: + // nothing in particular + break; + case ML_DEL_ALL: + if (vbid_set.find(le->vbucket()) != vbid_set.end()) { + loading[le->vbucket()].clear(); + shouldClear.insert(le->vbucket()); + } + break; + default: + abort(); + } + } + return clean; +} + +void MutationLogHarvester::apply(void *arg, mlCallback mlc) { + for (std::set::const_iterator it = vbid_set.begin(); + it != vbid_set.end(); ++it) { + uint16_t vb(*it); + + for (unordered_map::iterator it2 = committed[vb].begin(); + it2 != committed[vb].end(); ++it2) { + const std::string key(it2->first); + uint64_t rowid(it2->second); + + mlc(arg, vb, vbids[vb], key, rowid); + } + } +} + +std::vector MutationLogHarvester::getUncommitted() { + std::vector rv; + + for (std::set::const_iterator vit = vbid_set.begin(); vit != vbid_set.end(); ++vit) { + uint16_t vb(*vit); + mutation_log_uncommitted_t leftover; + leftover.vbucket = vb; + + unordered_map::iterator copyit2; + for (copyit2 = loading[vb].begin(); + copyit2 != loading[vb].end(); + ++copyit2) { + + mutation_log_event_t t = copyit2->second; + leftover.key = copyit2->first; + leftover.rowid = t.first; + leftover.type = static_cast(t.second); + + rv.push_back(leftover); + } + } + + return rv; +} + +size_t MutationLogHarvester::total() { + size_t rv(0); + for (int i = 0; i < MUTATION_LOG_TYPES; ++i) { + rv += itemsSeen[i]; + } + return rv; +} + +// ---------------------------------------------------------------------- +// Output of entries +// ---------------------------------------------------------------------- + +std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) { + out << "{MutationLogEntry rowid=" << mle.rowid() + << ", vbucket=" << mle.vbucket() + << ", magic=0x" << std::hex << static_cast(mle.magic) + << std::dec + << ", type=" << logType(mle.type()) + << ", key=``" << mle.key() << "''"; + return out; +} diff --git a/mutation_log.hh b/mutation_log.hh new file mode 100644 index 000000000..46239b89a --- /dev/null +++ b/mutation_log.hh @@ -0,0 +1,379 @@ +#ifndef MUTATION_LOG_HH +#define MUTATION_LOG_HH 1 + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +#include "common.hh" +#include "atomic.hh" +#include "histo.hh" + +#define ML_BUFLEN (128 * 1024 * 1024) + +const size_t MIN_LOG_HEADER_SIZE(4096); +const uint8_t MUTATION_LOG_MAGIC(0x45); +const size_t HEADER_RESERVED(4); +const uint32_t LOG_VERSION(1); +const size_t LOG_ENTRY_BUF_SIZE(512); +const int DISABLED_FD(-3); + +const uint8_t SYNC_COMMIT_1(1); +const uint8_t SYNC_COMMIT_2(2); +const uint8_t SYNC_FULL(SYNC_COMMIT_1 | SYNC_COMMIT_2); +const uint8_t FLUSH_COMMIT_1(4); +const uint8_t FLUSH_COMMIT_2(8); +const uint8_t FLUSH_FULL(FLUSH_COMMIT_1 | FLUSH_COMMIT_2); + +const uint8_t DEFAULT_SYNC_CONF(FLUSH_COMMIT_2 | SYNC_COMMIT_2); + +class LogHeaderBlock { +public: + LogHeaderBlock() : _version(htonl(LOG_VERSION)), _blockSize(0), _blockCount(0) { + } + + void set(uint32_t bs, uint32_t bc=1) { + _blockSize = htonl(bs); + _blockCount = htonl(bc); + } + + void set(const uint8_t *buf, size_t buflen) { + assert(buflen == MIN_LOG_HEADER_SIZE); + int offset(0); + memcpy(&_version, buf + offset, sizeof(_version)); + offset += sizeof(_version); + memcpy(&_blockSize, buf + offset, sizeof(_blockSize)); + offset += sizeof(_blockSize); + memcpy(&_blockCount, buf + offset, sizeof(_blockCount)); + offset += sizeof(_blockCount); + } + + uint32_t version() const { + return ntohl(_version); + } + + uint32_t blockSize() const { + return ntohl(_blockSize); + } + + uint32_t blockCount() const { + return ntohl(_blockCount); + } + +private: + + uint32_t _version; + uint32_t _blockSize; + uint32_t _blockCount; +}; + +typedef enum { + ML_NEW, ML_DEL, ML_DEL_ALL, ML_COMMIT1, ML_COMMIT2 +} mutation_log_type_t; + +#define MUTATION_LOG_TYPES 5 + +extern const char *mutation_log_type_names[]; + +class MutationLogEntry { +public: + + static MutationLogEntry* newEntry(uint8_t *buf, + uint64_t r, mutation_log_type_t t, + uint16_t vb, const std::string &k) { + return new (buf) MutationLogEntry(r, t, vb, k); + } + + static MutationLogEntry* newEntry(uint8_t *buf, size_t buflen) { + assert(buflen >= len(0)); + MutationLogEntry *me = reinterpret_cast(buf); + assert(me->magic == MUTATION_LOG_MAGIC); + assert(buflen >= me->len()); + return me; + } + + void operator delete(void *) { + // Statically buffered. There is no delete. + } + + static size_t len(size_t klen) { + // 13 == the exact empty record size as will be packed into + // the layout + return 13 + klen; + } + + size_t len() const { + return MutationLogEntry::len(keylen); + } + + const std::string key() const { + return std::string(_key, keylen); + } + + uint64_t rowid() const; + + uint16_t vbucket() const { + return ntohs(_vbucket); + } + + uint8_t type() const { + return _type; + } + +private: + + friend std::ostream& operator<< (std::ostream& out, + const MutationLogEntry &e); + + MutationLogEntry(uint64_t r, mutation_log_type_t t, + uint16_t vb, const std::string &k) + : _rowid(htonll(r)), _vbucket(htons(vb)), magic(MUTATION_LOG_MAGIC), + _type(static_cast(t)), + keylen(static_cast(k.length())) { + assert(k.length() <= std::numeric_limits::max()); + memcpy(_key, k.data(), k.length()); + } + + uint64_t _rowid; + uint16_t _vbucket; + uint8_t magic; + uint8_t _type; + uint8_t keylen; + char _key[1]; + + DISALLOW_COPY_AND_ASSIGN(MutationLogEntry); +}; + +std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle); + +class MutationLog { +public: + + MutationLog(const std::string &path, const size_t bs=4096); + + ~MutationLog(); + + void newItem(uint16_t vbucket, const std::string &key, uint64_t rowid); + + void delItem(uint16_t vbucket, const std::string &key); + + void deleteAll(uint16_t vbucket); + + void commit1(); + + void commit2(); + + void flush(); + + bool isEnabled() const { + return file != DISABLED_FD; + } + + LogHeaderBlock header() const { + return headerBlock; + } + + void setSyncConfig(uint8_t sconf) { + syncConfig = sconf; + } + + uint8_t getSyncConfig() const { + return syncConfig & SYNC_FULL; + } + + uint8_t getFlushConfig() const { + return syncConfig & FLUSH_FULL; + } + + bool setSyncConfig(const std::string &s); + bool setFlushConfig(const std::string &s); + + /** + * Reset the item type counts to the given values. + * + * This is used by the loader as part of initialization. + */ + void resetCounts(size_t *); + + /** + * An iterator for the mutation log. + */ + class iterator : public std::iterator { + public: + + iterator(const iterator& mit); + + ~iterator(); + + iterator& operator++(); + + iterator& operator++(int); + + bool operator==(const iterator& rhs); + + bool operator!=(const iterator& rhs); + + const MutationLogEntry* operator*(); + + private: + + friend class MutationLog; + + iterator(const MutationLog *l, bool e=false); + + void nextBlock(); + size_t bufferBytesRemaining(); + void prepItem(); + + const MutationLog *log; + uint8_t *entryBuf; + uint8_t *buf; + uint8_t *p; + off_t offset; + uint16_t items; + bool isEnd; + }; + + iterator begin() { + iterator it(iterator(this)); + it.nextBlock(); + return it; + } + + iterator end() { + return iterator(this, true); + } + + //! Items logged by type. + Atomic itemsLogged[MUTATION_LOG_TYPES]; + //! Histogram of block padding sizes. + Histogram paddingHisto; + //! Flush time histogram. + Histogram flushTimeHisto; + //! Sync time histogram. + Histogram syncTimeHisto; + //! Size of the log + Atomic logSize; + +private: + + void open(); + + void writeEntry(MutationLogEntry *mle); + + void writeInitialBlock(); + void readInitialBlock(); + + void prepareWrites(); + + int fd() const { return file; } + + LogHeaderBlock headerBlock; + const std::string &logPath; + size_t blockSize; + size_t blockPos; + int file; + uint16_t entries; + uint8_t *entryBuffer; + uint8_t *blockBuffer; + uint8_t syncConfig; + + DISALLOW_COPY_AND_ASSIGN(MutationLog); +}; + +/// @cond DETAILS + +//! rowid, (uint8_t)mutation_log_type_t +typedef std::pair mutation_log_event_t; + +/// @endcond + +/** + * MutationLogHarvester::apply callback type. + */ +typedef void (*mlCallback)(void*, uint16_t, uint16_t, const std::string &, uint64_t); + +/** + * Type for mutation log leftovers. + */ +struct mutation_log_uncommitted_t { + std::string key; + uint64_t rowid; + mutation_log_type_t type; + uint16_t vbucket; +}; + +/** + * Read log entries back from the log to reconstruct the state. + */ +class MutationLogHarvester { +public: + MutationLogHarvester(MutationLog &ml) : mlog(ml) { + memset(vbids, 0, sizeof(vbids)); + memset(itemsSeen, 0, sizeof(itemsSeen)); + } + + /** + * Set a vbucket version before loading. + * + * Provided versions will be given to the callbacks at apply time. + * vbuckets that are not registered here will not be considered. + */ + void setVbVer(uint16_t vb, uint16_t ver) { + vbids[vb] = ver; + vbid_set.insert(vb); + } + + /** + * Load the entries from the file. + * + * @return true if the file was clean and can likely be trusted. + */ + bool load(); + + /** + * Apply the processed log entries through the given function. + */ + void apply(void *arg, mlCallback mlc); + + /** + * Get the total number of entries found in the log. + */ + size_t total(); + + /** + * Get all of the counts of log entries by type. + */ + size_t *getItemsSeen() { + return itemsSeen; + } + + /** + * Get the list of uncommitted keys and stuff from the log. + */ + std::vector getUncommitted(); + +private: + + MutationLog &mlog; + + std::set vbid_set; + uint16_t vbids[65536]; + + unordered_map > committed; + unordered_map > loading; + size_t itemsSeen[MUTATION_LOG_TYPES]; +}; + +#endif /* MUTATION_LOG_HH */ diff --git a/t/mutation_log_test.cc b/t/mutation_log_test.cc new file mode 100644 index 000000000..bb6d1682d --- /dev/null +++ b/t/mutation_log_test.cc @@ -0,0 +1,322 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#include "config.h" +#include +#include +#include + +#include +#include +#include +#include + +#include "assert.h" +#include "mutation_log.hh" + +#define TMP_LOG_FILE "/tmp/mlt_test.log" + +static void testUnconfigured() { + MutationLog ml(""); + assert(!ml.isEnabled()); + ml.newItem(3, "somekey", 931); + ml.delItem(3, "somekey"); + ml.deleteAll(3); + ml.commit1(); + ml.commit2(); + ml.flush(); + + assert(ml.begin() == ml.end()); +} + +static void testSyncSet() { + + // Some basics + assert(SYNC_COMMIT_1 | SYNC_COMMIT_2 == SYNC_FULL); + assert(FLUSH_COMMIT_1 | FLUSH_COMMIT_2 == FLUSH_FULL); + // No overlap + assert((FLUSH_FULL & ~SYNC_FULL) == FLUSH_FULL); + assert((SYNC_FULL & ~FLUSH_FULL) == SYNC_FULL); + + // + // Now the real tests. + // + MutationLog ml(""); + + assert(ml.setSyncConfig("off")); + assert(ml.getSyncConfig() == 0); + + assert(ml.setSyncConfig("commit1")); + assert(ml.getSyncConfig() == SYNC_COMMIT_1); + + assert(ml.setSyncConfig("commit2")); + assert(ml.getSyncConfig() == SYNC_COMMIT_2); + + assert(ml.setSyncConfig("full")); + assert(ml.getSyncConfig() == SYNC_COMMIT_1|SYNC_COMMIT_2); + + assert(!ml.setSyncConfig("otherwise")); + + // reset + assert(ml.setSyncConfig("off")); + assert(ml.getSyncConfig() == 0); + + // + // Flush tests + // + assert(ml.setFlushConfig("commit1")); + assert(ml.getFlushConfig() == FLUSH_COMMIT_1); + + assert(ml.setFlushConfig("commit2")); + assert(ml.getFlushConfig() == FLUSH_COMMIT_2); + + assert(ml.setFlushConfig("full")); + assert(ml.getFlushConfig() == FLUSH_COMMIT_1|FLUSH_COMMIT_2); + + assert(!ml.setFlushConfig("otherwise")); + + // reset + assert(ml.setSyncConfig("off")); + assert(ml.getSyncConfig() == 0); + assert(ml.setFlushConfig("off")); + assert(ml.getFlushConfig() == 0); + + // + // Try both + // + + assert(ml.setSyncConfig("commit1")); + assert(ml.setFlushConfig("commit2")); + assert(ml.getSyncConfig() == SYNC_COMMIT_1); + assert(ml.getFlushConfig() == FLUSH_COMMIT_2); + + // Swap them and apply in reverse order. + assert(ml.setFlushConfig("commit1")); + assert(ml.setSyncConfig("commit2")); + assert(ml.getSyncConfig() == SYNC_COMMIT_2); + assert(ml.getFlushConfig() == FLUSH_COMMIT_1); +} + +static void loaderFun(void *arg, uint16_t vb, uint16_t, + const std::string &k, uint64_t rowid) { + std::map *maps = reinterpret_cast *>(arg); + maps[vb][k] = rowid; +} + +static void testLogging() { + remove(TMP_LOG_FILE); + + { + MutationLog ml(TMP_LOG_FILE); + + ml.newItem(3, "key1", 1); + ml.newItem(2, "key1", 2); + ml.commit1(); + ml.commit2(); + ml.newItem(3, "key2", 3); + ml.delItem(3, "key1"); + ml.commit1(); + ml.commit2(); + // Remaining: 3:key2, 2:key1 + + assert(ml.itemsLogged[ML_NEW] == 3); + assert(ml.itemsLogged[ML_DEL] == 1); + assert(ml.itemsLogged[ML_COMMIT1] == 2); + assert(ml.itemsLogged[ML_COMMIT2] == 2); + } + + { + MutationLog ml(TMP_LOG_FILE); + MutationLogHarvester h(ml); + h.setVbVer(1, 1); + h.setVbVer(2, 1); + h.setVbVer(3, 1); + + assert(h.load()); + + assert(h.getItemsSeen()[ML_NEW] == 3); + assert(h.getItemsSeen()[ML_DEL] == 1); + assert(h.getItemsSeen()[ML_COMMIT1] == 2); + assert(h.getItemsSeen()[ML_COMMIT2] == 2); + + // Check stat copying + ml.resetCounts(h.getItemsSeen()); + + assert(ml.itemsLogged[ML_NEW] == 3); + assert(ml.itemsLogged[ML_DEL] == 1); + assert(ml.itemsLogged[ML_COMMIT1] == 2); + assert(ml.itemsLogged[ML_COMMIT2] == 2); + + // See if we got what we expect. + std::map maps[4]; + h.apply(&maps, loaderFun); + + assert(maps[0].size() == 0); + assert(maps[1].size() == 0); + assert(maps[2].size() == 1); + assert(maps[3].size() == 1); + + assert(maps[2].find("key1") != maps[2].end()); + assert(maps[3].find("key2") != maps[3].end()); + } + + remove(TMP_LOG_FILE); +} + +static void testDelAll() { + remove(TMP_LOG_FILE); + + { + MutationLog ml(TMP_LOG_FILE); + + ml.newItem(3, "key1", 1); + ml.newItem(2, "key1", 2); + ml.commit1(); + ml.commit2(); + ml.newItem(3, "key2", 3); + ml.deleteAll(3); + ml.commit1(); + ml.commit2(); + // Remaining: 2:key1 + + assert(ml.itemsLogged[ML_NEW] == 3); + assert(ml.itemsLogged[ML_DEL] == 0); + assert(ml.itemsLogged[ML_DEL_ALL] == 1); + assert(ml.itemsLogged[ML_COMMIT1] == 2); + assert(ml.itemsLogged[ML_COMMIT2] == 2); + } + + { + MutationLog ml(TMP_LOG_FILE); + MutationLogHarvester h(ml); + h.setVbVer(1, 1); + h.setVbVer(2, 1); + h.setVbVer(3, 1); + + assert(h.load()); + + assert(h.getItemsSeen()[ML_NEW] == 3); + assert(h.getItemsSeen()[ML_DEL_ALL] == 1); + assert(h.getItemsSeen()[ML_COMMIT1] == 2); + assert(h.getItemsSeen()[ML_COMMIT2] == 2); + + // Check stat copying + ml.resetCounts(h.getItemsSeen()); + + assert(ml.itemsLogged[ML_NEW] == 3); + assert(ml.itemsLogged[ML_DEL_ALL] == 1); + assert(ml.itemsLogged[ML_COMMIT1] == 2); + assert(ml.itemsLogged[ML_COMMIT2] == 2); + + // See if we got what we expect. + std::map maps[4]; + h.apply(&maps, loaderFun); + + assert(maps[0].size() == 0); + assert(maps[1].size() == 0); + assert(maps[2].size() == 1); + assert(maps[3].size() == 0); + + assert(maps[2].find("key1") != maps[2].end()); + } + + remove(TMP_LOG_FILE); +} + +static bool leftover_compare(mutation_log_uncommitted_t a, + mutation_log_uncommitted_t b) { + if (a.vbucket != b.vbucket) { + return a.vbucket < b.vbucket; + } + + if (a.key != b.key) { + return a.key < b.key; + } + + if (a.type != b.type) { + return a.type < b.type; + } + + return false; +} + +static void testLoggingDirty() { + remove(TMP_LOG_FILE); + + { + MutationLog ml(TMP_LOG_FILE); + + ml.newItem(3, "key1", 1); + ml.newItem(2, "key1", 2); + ml.commit1(); + ml.commit2(); + // These two will be dropped from the normal loading path + // because there's no commit. + ml.newItem(3, "key2", 3); + ml.delItem(3, "key1"); + // Remaining: 3:key1, 2:key1 + + assert(ml.itemsLogged[ML_NEW] == 3); + assert(ml.itemsLogged[ML_DEL] == 1); + assert(ml.itemsLogged[ML_COMMIT1] == 1); + assert(ml.itemsLogged[ML_COMMIT2] == 1); + } + + { + MutationLog ml(TMP_LOG_FILE); + MutationLogHarvester h(ml); + h.setVbVer(1, 1); + h.setVbVer(2, 1); + h.setVbVer(3, 1); + + assert(!h.load()); + + assert(h.getItemsSeen()[ML_NEW] == 3); + assert(h.getItemsSeen()[ML_DEL] == 1); + assert(h.getItemsSeen()[ML_COMMIT1] == 1); + assert(h.getItemsSeen()[ML_COMMIT2] == 1); + + // Check stat copying + ml.resetCounts(h.getItemsSeen()); + + assert(ml.itemsLogged[ML_NEW] == 3); + assert(ml.itemsLogged[ML_DEL] == 1); + assert(ml.itemsLogged[ML_COMMIT1] == 1); + assert(ml.itemsLogged[ML_COMMIT2] == 1); + + // See if we got what we expect. + std::map maps[4]; + h.apply(&maps, loaderFun); + + assert(maps[0].size() == 0); + assert(maps[1].size() == 0); + assert(maps[2].size() == 1); + assert(maps[3].size() == 1); + + assert(maps[2].find("key1") != maps[2].end()); + assert(maps[3].find("key1") != maps[3].end()); + assert(maps[3].find("key2") == maps[3].end()); + + // Check the leftovers + std::vector leftovers(h.getUncommitted()); + std::sort(leftovers.begin(), leftovers.end(), leftover_compare); + assert(leftovers.size() == 2); + assert(leftovers[0].vbucket == 3); + assert(leftovers[0].key == "key1"); + assert(leftovers[0].type == ML_DEL); + assert(leftovers[1].vbucket == 3); + assert(leftovers[1].key == "key2"); + assert(leftovers[1].type == ML_NEW); + assert(leftovers[1].rowid == 3); + } + + remove(TMP_LOG_FILE); +} + +int main(int, char **) { + testUnconfigured(); + testSyncSet(); + testLogging(); + testDelAll(); + testLoggingDirty(); + + return 0; +} diff --git a/win32/Makefile.mingw b/win32/Makefile.mingw index b3d8b0262..bc146e819 100644 --- a/win32/Makefile.mingw +++ b/win32/Makefile.mingw @@ -60,6 +60,7 @@ EP_ENGINE_CC_SRC = \ item_pager.cc \ mc-kvstore/mc-engine.cc \ mc-kvstore/mc-kvstore.cc \ + mutation_log.cc \ mutex.cc \ objectregistry.cc \ observe_registry.cc \ @@ -83,6 +84,7 @@ EP_ENGINE_CC_SRC = \ EP_ENGINE_C_SRC = \ byteorder.c \ + crc32.c \ embedded/sqlite3.c \ gethrtime.c \ tools/cJSON.c