Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge remote-tracking branch 'gerrit/2.0.2' into master

Change-Id: I0baa985bea4834ec5edad493f9793a2e52b6dcd4
  • Loading branch information...
commit 3b7d41d1b2fef98114ac66ff8a6e56e49dbe0eb2 2 parents 845a1e4 + ad865a2
@mikewied mikewied authored
Showing with 632 additions and 831 deletions.
  1. +0 −15 configuration.json
  2. +91 −87 docs/engine-params.org
  3. +1 −20 docs/stats.org
  4. +1 −1  m4/couchbase.m4
  5. +43 −41 management/cbepctl
  6. +6 −22 management/cbstats
  7. +1 −1  src/access_scanner.cc
  8. +2 −1  src/backfill.cc
  9. +1 −3 src/backfill.h
  10. +20 −33 src/bgfetcher.cc
  11. +0 −2  src/bgfetcher.h
  12. +1 −3 src/blackhole-kvstore/blackhole.cc
  13. +5 −5 src/callbacks.h
  14. +1 −0  src/checkpoint.cc
  15. +7 −5 src/couch-kvstore/couch-kvstore.cc
  16. +119 −121 src/ep.cc
  17. +27 −58 src/ep.h
  18. +24 −63 src/ep_engine.cc
  19. +0 −27 src/ep_engine.h
  20. +58 −103 src/item_pager.cc
  21. +17 −17 src/item_pager.h
  22. +4 −15 src/kvstore.h
  23. +1 −31 src/statsnap.cc
  24. +4 −17 src/statsnap.h
  25. +21 −22 src/stored-value.cc
  26. +28 −29 src/stored-value.h
  27. +9 −14 src/tapconnection.cc
  28. +3 −5 src/tapconnection.h
  29. +2 −5 src/vbucket.cc
  30. +1 −1  src/vbucket.h
  31. +8 −0 tests/ep_test_apis.cc
  32. +3 −0  tests/ep_test_apis.h
  33. +123 −64 tests/ep_testsuite.cc
View
15 configuration.json
@@ -72,10 +72,6 @@
"default": "5",
"type": "size_t"
},
- "concurrentDB": {
- "default": "true",
- "type": "bool"
- },
"config_file": {
"default": "",
"dynamic": false,
@@ -276,17 +272,6 @@
}
}
},
- "pager_unbiased_period": {
- "default": "60",
- "descr": "Number of minutes since access scanner time in which item pager ignores items nru info",
- "type": "size_t",
- "validator": {
- "range": {
- "max": 720,
- "min": 1
- }
- }
- },
"postInitfile": {
"default": "",
"type": "std::string"
View
178 docs/engine-params.org
@@ -24,90 +24,94 @@ memcached like this:
* Parameters for the EP Engine
-| key | type | descr |
-|------------------------+--------+--------------------------------------------|
-| config_file | string | Path to additional parameters. |
-| dbname | string | Path to on-disk storage. |
-| ht_locks | int | Number of locks per hash table. |
-| ht_size | int | Number of buckets per hash table. |
-| max_item_size | int | Maximum number of bytes allowed for |
-| | | an item. |
-| max_size | int | Max cumulative item size in bytes. |
-| max_txn_size | int | Max number of disk mutations per |
-| | | transaction. |
-| mem_high_wat | int | Automatically evict when exceeding |
-| | | this size. |
-| mem_low_wat | int | Low water mark to aim for when evicting. |
-| couch_response_timeout | int | The maximum time to wait for couch to |
-| | | respond to a persistence request before |
-| | | resetting the connection (milliseconds) |
-| tap_backlog_limit | int | Max number of items allowed in a |
-| | | tap backfill |
-| tap_noop_interval | int | Number of seconds between a noop is sent |
-| | | on an idle connection |
-| tap_keepalive | int | Seconds to hold open named tap connections |
-| tap_bg_max_pending | int | Maximum number of pending bg fetch |
-| | | operations |
-| | | a tap queue may issue (before it must wait |
-| | | for responses to appear. |
-| tap_backoff_period | float | Number of seconds the tap connection |
-| | | should back off after receiving ETMPFAIL |
-| vb0 | bool | If true, start with an active vbucket 0 |
-| waitforwarmup | bool | Whether to block server start during |
-| | | warmup. |
-| warmup | bool | Whether to load existing data at startup. |
-| expiry_window | int | expiry window to not persist an object |
-| | | that is expired (or will be soon) |
-| exp_pager_stime | int | Sleep time for the pager that purges |
-| | | expired objects from memory and disk |
-| failpartialwarmup | bool | If false, continue running after failing |
-| | | to load some records. |
-| max_vbuckets | int | Maximum number of vbuckets expected (1024) |
-| concurrentDB | bool | True (default) if concurrent DB reads are |
-| | | permitted where possible. |
-| chk_remover_stime | int | Interval for the checkpoint remover that |
-| | | purges closed unreferenced checkpoints. |
-| chk_max_items | int | Number of max items allowed in a |
-| | | checkpoint |
-| chk_period | int | Time bound (in sec.) on a checkpoint |
-| max_checkpoints | int | Number of max checkpoints allowed per |
-| | | vbucket |
-| inconsistent_slave_chk | bool | True if we allow a "downstream" master to |
-| | | receive checkpoint begin/end messages |
-| item_num_based_new_chk | bool | Enable a new checkpoint creation if the |
-| | | number of items in a checkpoint is greater |
-| | | than the max number allowed |
-| | | along with normal get/set operations. |
-| tap_backfill_resident | float | Resident item threshold for only memory |
-| | | backfill to be kicked off |
-| keep_closed_chks | bool | True if we want to keep closed checkpoints |
-| | | in memory if the current memory usage is |
-| | | below high water mark |
-| bf_resident_threshold | float | Resident item threshold for only memory |
-| | | backfill to be kicked off |
-| getl_default_timeout | int | The default timeout for a getl lock in (s) |
-| getl_max_timeout | int | The maximum timeout for a getl lock in (s) |
-| mutation_mem_threshold | float | Memory threshold on the current bucket |
-| | | quota for accepting a new mutation |
-| tap_throttle_queue_cap | int | The maximum size of the disk write queue |
-| | | to throttle down tap-based replication. -1 |
-| | | means don't throttle. |
-| tap_throttle_threshold | float | Percentage of memory in use before we |
-| | | throttle tap streams |
-| tap_throttle_cap_pcnt | int | Percentage of total items in write queue |
-| | | to throttle tap input. 0 means use fixed |
-| | | throttle queue cap. |
-| klog_path | string | Path to the mutation key log. |
-| klog_block_size | int | Mutation key log block size. |
-| klog_flush | string | When to force buffer flushes during |
-| | | klog (off, commit1, commit2, full) |
-| klog_sync | string | When to fsync during klog. |
-| flushall_enabled | bool | True if we enable flush_all command; The |
-| | | default value is False. |
-| data_traffic_enabled | bool | True if we want to enable data traffic |
-| | | immediately after warmup completion |
-| alog_sleep_time | int | Interval of access scanner task in (min) |
-| alog_task_time | int | Hour (0~23) in GMT time at which access |
-| } | scanner will be scheduled to run. |
-| pager_active_vb_pcnt | int | Percentage of active vbucket items among |
-| | | all evicted items by item pager. |
+| key | type | descr |
+|-----------------------------+--------+--------------------------------------------|
+| config_file | string | Path to additional parameters. |
+| dbname | string | Path to on-disk storage. |
+| ht_locks | int | Number of locks per hash table. |
+| ht_size | int | Number of buckets per hash table. |
+| max_item_size | int | Maximum number of bytes allowed for |
+| | | an item. |
+| max_size | int | Max cumulative item size in bytes. |
+| max_txn_size | int | Max number of disk mutations per |
+| | | transaction. |
+| mem_high_wat | int | Automatically evict when exceeding |
+| | | this size. |
+| mem_low_wat | int | Low water mark to aim for when evicting. |
+| couch_response_timeout | int | The maximum time to wait for couch to |
+| | | respond to a persistence request before |
+| | | resetting the connection (milliseconds) |
+| tap_backlog_limit | int | Max number of items allowed in a |
+| | | tap backfill |
+| tap_noop_interval | int | Number of seconds between a noop is sent |
+| | | on an idle connection |
+| tap_keepalive | int | Seconds to hold open named tap connections |
+| tap_bg_max_pending | int | Maximum number of pending bg fetch |
+| | | operations |
+| | | a tap queue may issue (before it must wait |
+| | | for responses to appear. |
+| tap_backoff_period | float | Number of seconds the tap connection |
+| | | should back off after receiving ETMPFAIL |
+| vb0 | bool | If true, start with an active vbucket 0 |
+| waitforwarmup | bool | Whether to block server start during |
+| | | warmup. |
+| warmup | bool | Whether to load existing data at startup. |
+| expiry_window | int | expiry window to not persist an object |
+| | | that is expired (or will be soon) |
+| exp_pager_stime | int | Sleep time for the pager that purges |
+| | | expired objects from memory and disk |
+| failpartialwarmup | bool | If false, continue running after failing |
+| | | to load some records. |
+| max_vbuckets | int | Maximum number of vbuckets expected (1024) |
+| concurrentDB | bool | True (default) if concurrent DB reads are |
+| | | permitted where possible. |
+| chk_remover_stime | int | Interval for the checkpoint remover that |
+| | | purges closed unreferenced checkpoints. |
+| chk_max_items | int | Number of max items allowed in a |
+| | | checkpoint |
+| chk_period | int | Time bound (in sec.) on a checkpoint |
+| max_checkpoints | int | Number of max checkpoints allowed per |
+| | | vbucket |
+| inconsistent_slave_chk | bool | True if we allow a "downstream" master to |
+| | | receive checkpoint begin/end messages |
+| item_num_based_new_chk | bool | Enable a new checkpoint creation if the |
+| | | number of items in a checkpoint is greater |
+| | | than the max number allowed |
+| | | along with normal get/set operations. |
+| tap_backfill_resident | float | Resident item threshold for only memory |
+| | | backfill to be kicked off |
+| keep_closed_chks | bool | True if we want to keep closed checkpoints |
+| | | in memory if the current memory usage is |
+| | | below high water mark |
+| bf_resident_threshold | float | Resident item threshold for only memory |
+| | | backfill to be kicked off |
+| getl_default_timeout | int | The default timeout for a getl lock in (s) |
+| getl_max_timeout | int | The maximum timeout for a getl lock in (s) |
+| mutation_mem_threshold | float | Memory threshold on the current bucket |
+| | | quota for accepting a new mutation |
+| tap_throttle_queue_cap | int | The maximum size of the disk write queue |
+| | | to throttle down tap-based replication. -1 |
+| | | means don't throttle. |
+| tap_throttle_threshold | float | Percentage of memory in use before we |
+| | | throttle tap streams |
+| tap_throttle_cap_pcnt | int | Percentage of total items in write queue |
+| | | to throttle tap input. 0 means use fixed |
+| | | throttle queue cap. |
+| klog_path | string | Path to the mutation key log. |
+| klog_block_size | int | Mutation key log block size. |
+| klog_flush | string | When to force buffer flushes during |
+| | | klog (off, commit1, commit2, full) |
+| klog_sync | string | When to fsync during klog. |
+| flushall_enabled | bool | True if we enable flush_all command; The |
+| | | default value is False. |
+| data_traffic_enabled | bool | True if we want to enable data traffic |
+| | | immediately after warmup completion |
+| alog_sleep_time | int | Interval of access scanner task in (min) |
+| alog_task_time | int | Hour (0~23) in GMT time at which access |
+| | | scanner will be scheduled to run. |
+| pager_active_vb_pcnt | int | Percentage of active vbucket items among |
+| | | all evicted items by item pager. |
+| warmup_min_memory_threshold | int | Memory threshold (%) during warmup to |
+| | | enable traffic. |
+| warmup_min_items_threshold | int | Item num threshold (%) during warmup to |
+| | | enable traffic. |
View
21 docs/stats.org
@@ -158,13 +158,7 @@ For introductory information on stats within membase, start with the
| ep_bg_max_load | The longest load time (µs) |
| ep_bg_load_avg | The average time (µs) for an item to |
| | be loaded from the persistence layer |
-| ep_num_non_resident | The number of non-resident items | |
-| ep_store_max_concurrency | Maximum allowed concurrency at the |
-| | storage layer |
-| ep_store_max_readers | Maximum number of concurrent read-only |
-| | storage threads |
-| ep_store_max_readwrite | Maximum number of concurrent |
-| | read/writestorage threads |
+| ep_num_non_resident | The number of non-resident items |
| ep_bg_wait | The total elapse time for the wait |
| | queue |
| ep_bg_load | The total elapse time for items to be |
@@ -196,7 +190,6 @@ For introductory information on stats within membase, start with the
| | persistence |
| ep_chk_remover_stime | The time interval for purging closed |
| | checkpoints from memory |
-| ep_concurrentDB | Enables multiple dispatchers |
| ep_config_file | The location of the ep-engine config |
| | file |
| ep_couch_bucket | The name of this bucket |
@@ -253,9 +246,6 @@ For introductory information on stats within membase, start with the
| | that we should start sending temp oom |
| | or oom message when hitting |
| ep_pager_active_vb_pcnt | Active vbuckets paging percentage |
-| ep_pager_unbiased_period | Number of minutes since access scanner |
-| | time in which item pager ignores items |
-| | nru info |
| ep_tap_ack_grace_period | The amount of time to wait for a tap |
| | acks before disconnecting |
| ep_tap_ack_initial_sequence_number | The initial sequence number for a tap |
@@ -353,9 +343,6 @@ For introductory information on stats within membase, start with the
| vb_active_queue_pending | Total bytes of pending writes |
| vb_active_queue_fill | Total enqueued items |
| vb_active_queue_drain | Total drained items |
-| vb_active_num_ref_items | Number of referenced items |
-| vb_active_num_ref_ejects | Number of times referenced item values |
-| | got ejected |
*** Replica vBucket stats
@@ -380,9 +367,6 @@ For introductory information on stats within membase, start with the
| vb_replica_queue_pending | Total bytes of pending writes |
| vb_replica_queue_fill | Total enqueued items |
| vb_replica_queue_drain | Total drained items |
-| vb_replica_num_ref_items | Number of referenced items |
-| vb_replica_num_ref_ejects | Number of times referenced item values |
-| | got ejected |
*** Pending vBucket stats
@@ -407,9 +391,6 @@ For introductory information on stats within membase, start with the
| vb_pending_queue_pending | Total bytes of pending writes |
| vb_pending_queue_fill | Total enqueued items |
| vb_pending_queue_drain | Total drained items |
-| vb_pending_num_ref_items | Number of referenced items |
-| vb_pending_num_ref_ejects | Number of times referenced item values |
-| | got ejected |
** Tap stats
View
2  m4/couchbase.m4
@@ -63,7 +63,7 @@ AC_DEFUN([COUCHBASE_GENERIC_COMPILER], [
GCC_C89=-std=c89
GCC_C99=-std=gnu99
GCC_LDFLAGS=""
- GCC_CPP_WARNINGS="-Wall -pedantic -Wshadow -fdiagnostics-show-option -Wformat -fno-strict-aliasing -Wno-strict-aliasing -Wextra"
+ GCC_CPP_WARNINGS="-Wall -pedantic -Wshadow -fdiagnostics-show-option -Wformat -fno-strict-aliasing -Wno-strict-aliasing -Wextra -Wunused-variable"
GCC_C_COMPILER_WARNINGS="-Wundef -Wstrict-prototypes -Wmissing-prototypes -Wredundant-decls -Wmissing-declarations -Wcast-align"
GCC_CXX_COMPILER_WARNINGS="-std=gnu++98 -Woverloaded-virtual -Wnon-virtual-dtor -Wctor-dtor-privacy -Wno-long-long -Wno-redundant-decls"
View
84 management/cbepctl
@@ -116,52 +116,54 @@ Persistence:
Available params for "set":
Available params for set checkpoint_param:
- chk_max_items - Max number of items allowed in a checkpoint.
- chk_period - Time bound (in sec.) on a checkpoint.
- inconsistent_slave_chk - true if we allow a downstream master to receive
- checkpoint begin/end messages from the upstream
- master.
- item_num_based_new_chk - true if a new checkpoint can be created based
- on.
- the number of items in the open checkpoint.
- keep_closed_chks - true if we want to keep closed checkpoints in
- memory.
- as long as the current memory usage is below
- high water mark.
- max_checkpoints - Max number of checkpoints allowed per vbucket.
+ chk_max_items - Max number of items allowed in a checkpoint.
+ chk_period - Time bound (in sec.) on a checkpoint.
+ inconsistent_slave_chk - true if we allow a downstream master to receive
+ checkpoint begin/end messages from the upstream
+ master.
+ item_num_based_new_chk - true if a new checkpoint can be created based
+ on.
+ the number of items in the open checkpoint.
+ keep_closed_chks - true if we want to keep closed checkpoints in
+ memory.
+ as long as the current memory usage is below
+ high water mark.
+ max_checkpoints - Max number of checkpoints allowed per vbucket.
Available params for set flush_param:
- alog_sleep_time - Access scanner interval (minute)
- alog_task_time - Access scanner next task time (UTC)
- bg_fetch_delay - Delay before executing a bg fetch (test
- feature).
- couch_response_timeout - timeout in receiving a response from couchdb.
- exp_pager_stime - Expiry Pager Sleeptime.
- flushall_enabled - Enable flush operation.
- klog_compactor_queue_cap - queue cap to throttle the log compactor.
- klog_max_log_size - maximum size of a mutation log file allowed.
- klog_max_entry_ratio - max ratio of # of items logged to # of unique
- items.
- pager_active_vb_pcnt - Percentage of active vbuckets items among
- all ejected items by item pager.
- pager_unbiased_period - Period after last access scanner run during
- which item pager preserve working set.
- max_size - Max memory used by the server.
- max_txn_size - Maximum number of items in a flusher
- transaction.
- mem_high_wat - High water mark.
- mem_low_wat - Low water mark.
- mutation_mem_threshold - Memory threshold (%) on the current bucket quota
- for accepting a new mutation.
- timing_log - path to log detailed timing stats.
+ alog_sleep_time - Access scanner interval (minute)
+ alog_task_time - Access scanner next task time (UTC)
+ bg_fetch_delay - Delay before executing a bg fetch (test
+ feature).
+ couch_response_timeout - timeout in receiving a response from couchdb.
+ exp_pager_stime - Expiry Pager Sleeptime.
+ flushall_enabled - Enable flush operation.
+ klog_compactor_queue_cap - queue cap to throttle the log compactor.
+ klog_max_log_size - maximum size of a mutation log file allowed.
+ klog_max_entry_ratio - max ratio of # of items logged to # of unique
+ items.
+ pager_active_vb_pcnt - Percentage of active vbuckets items among
+ all ejected items by item pager.
+ max_size - Max memory used by the server.
+ max_txn_size - Maximum number of items in a flusher
+ transaction.
+ mem_high_wat - High water mark.
+ mem_low_wat - Low water mark.
+ mutation_mem_threshold - Memory threshold (%) on the current bucket quota
+ for accepting a new mutation.
+ timing_log - path to log detailed timing stats.
+ warmup_min_memory_threshold - Memory threshold (%) during warmup to enable
+ traffic
+ warmup_min_items_threshold - Item number threshold (%) during warmup to enable
+ traffic
Available params for "set tap_param":
- tap_keepalive - Seconds to hold a named tap connection.
- tap_throttle_queue_cap - Max disk write queue size to throttle tap
- streams ('infinite' means no cap).
- tap_throttle_threshold - Percentage of memory in use to throttle tap
- streams.
+ tap_keepalive - Seconds to hold a named tap connection.
+ tap_throttle_queue_cap - Max disk write queue size to throttle tap
+ streams ('infinite' means no cap).
+ tap_throttle_threshold - Percentage of memory in use to throttle tap
+ streams.
""")
c.addCommand('drain', drain, "drain")
View
28 management/cbstats
@@ -80,6 +80,12 @@ def time_label(s):
product = sz * product
sizeMap.insert(0, (l, product))
lbl, factor = itertools.dropwhile(lambda x: x[1] > s, sizeMap).next()
+
+ # Give some extra granularity for timings in minutes
+ if lbl == 'm':
+ mins = s / factor
+ secs = (s % factor) / (factor / 60)
+ return '%d%s:%02ds' % (mins, lbl, secs)
return "%d%s" % (s / factor, lbl)
def sec_label(s):
@@ -185,28 +191,6 @@ def stats_vkey(mc, key, vb):
def stats_all(mc):
stats_formatter(stats_perform(mc))
-def time_label(s):
- # -(2**64) -> '-inf'
- # 2**64 -> 'inf'
- # 0 -> '0'
- # 4 -> '4us'
- # 838384 -> '838ms'
- # 8283852 -> '8s'
- if s > BIG_VALUE:
- return 'inf'
- elif s < SMALL_VALUE:
- return '-inf'
- elif s == 0:
- return '0'
- product = 1
- sizes = (('us', 1), ('ms', 1000), ('s', 1000), ('m', 60))
- sizeMap = []
- for l,sz in sizes:
- product = sz * product
- sizeMap.insert(0, (l, product))
- lbl, factor = itertools.dropwhile(lambda x: x[1] > s, sizeMap).next()
- return "%d%s" % (s / factor, lbl)
-
@cmd
def stats_timings(mc):
h = stats_perform(mc, 'timings')
View
2  src/access_scanner.cc
@@ -46,7 +46,7 @@ class ItemAccessVisitor : public VBucketVisitor {
}
void visit(StoredValue *v) {
- if (log != NULL && v->isReferenced(true, &currentBucket->ht)) {
+ if (log != NULL && v->isResident()) {
if (v->isExpired(startTime) || v->isDeleted()) {
LOG(EXTENSION_LOG_INFO, "INFO: Skipping expired/deleted item: %s",
v->getKey().c_str());
View
3  src/backfill.cc
@@ -237,6 +237,7 @@ bool BackFillVisitor::checkValidity() {
bool BackfillTask::callback(Dispatcher &d, TaskId &t) {
(void) t;
- epstore->visit(bfv, "Backfill task", &d, Priority::BackfillTaskPriority, true, 1);
+ engine->getEpStore()->visit(bfv, "Backfill task", &d,
+ Priority::BackfillTaskPriority, true, 1);
return false;
}
View
4 src/backfill.h
@@ -132,9 +132,8 @@ class BackfillTask : public DispatcherCallback {
public:
BackfillTask(EventuallyPersistentEngine *e, TapProducer *tc,
- EventuallyPersistentStore *s,
const VBucketFilter &backfillVBFilter):
- bfv(new BackFillVisitor(e, tc, backfillVBFilter)), engine(e), epstore(s) {}
+ bfv(new BackFillVisitor(e, tc, backfillVBFilter)), engine(e) {}
virtual ~BackfillTask() {}
@@ -146,7 +145,6 @@ class BackfillTask : public DispatcherCallback {
shared_ptr<BackFillVisitor> bfv;
EventuallyPersistentEngine *engine;
- EventuallyPersistentStore *epstore;
};
#endif // SRC_BACKFILL_H_
View
53 src/bgfetcher.cc 100755 → 100644
@@ -22,7 +22,7 @@
#include "bgfetcher.h"
#include "ep.h"
-const double BgFetcher::sleepInterval = 60.0;
+const double BgFetcher::sleepInterval = 1.0;
bool BgFetcherCallback::callback(Dispatcher &, TaskId &t) {
return bgfetcher->run(t);
@@ -74,7 +74,6 @@ void BgFetcher::doFetch(uint16_t vbId) {
if (totalfetches > 0) {
store->completeBGFetchMulti(vbId, fetchedItems, startTime);
stats.getMultiHisto.add((gethrtime()-startTime)/1000, totalfetches);
- total_num_fetched_items += totalfetches;
}
// failed requests will get requeued for retry within clearItems()
@@ -111,51 +110,39 @@ void BgFetcher::clearItems(uint16_t vbId) {
}
}
- total_num_requeued_items += numRequeuedItems;
+ if (numRequeuedItems) {
+ stats.numRemainingBgJobs.incr(numRequeuedItems);
+ }
}
bool BgFetcher::run(TaskId &tid) {
assert(tid.get());
- size_t total_num_items2fetch = 0;
-
- total_num_fetched_items = 0;
- total_num_requeued_items = 0;
-
- if(stats.numRemainingBgJobs.get()) {
- const VBucketMap &vbMap = store->getVBuckets();
- size_t numVbuckets = vbMap.getSize();
- size_t num_items2fetch;
- for (size_t vbid = 0; vbid < numVbuckets; vbid++) {
- RCPtr<VBucket> vb = vbMap.getBucket(vbid);
- num_items2fetch = 0;
- if (vb && (num_items2fetch =
- vb->getBGFetchItems(items2fetch))) {
- doFetch(vbid);
- total_num_items2fetch += num_items2fetch;
- items2fetch.clear();
- }
- }
+ size_t num_fetched_items = 0;
- stats.numRemainingBgJobs.decr(total_num_fetched_items);
-
- LOG(EXTENSION_LOG_DEBUG, "BgFetcher: total_num_items2fetch = %d "
- "total_num_fetched_items = %d totaL_num_requeued_items = %d",
- total_num_items2fetch, total_num_fetched_items,
- total_num_requeued_items);
+ const VBucketMap &vbMap = store->getVBuckets();
+ size_t numVbuckets = vbMap.getSize();
+ for (size_t vbid = 0; vbid < numVbuckets; vbid++) {
+ RCPtr<VBucket> vb = vbMap.getBucket(vbid);
+ assert(items2fetch.empty());
+ if (vb && vb->getBGFetchItems(items2fetch)) {
+ doFetch(vbid);
+ num_fetched_items += items2fetch.size();
+ items2fetch.clear();
+ }
}
- if(!stats.numRemainingBgJobs.get()) {
+ size_t remains = stats.numRemainingBgJobs.decr(num_fetched_items);
+ if (!remains) {
// wait a bit until next fetch request arrives
double sleep = std::max(store->getBGFetchDelay(), sleepInterval);
dispatcher->snooze(tid, sleep);
if (stats.numRemainingBgJobs.get()) {
- // check again numRemainingBgJobs, a new fetch request
- // could have arrvied right before calling above snooze()
- dispatcher->snooze(tid, 0);
+ // check again numRemainingBgJobs, a new fetch request
+ // could have arrvied right before calling above snooze()
+ dispatcher->snooze(tid, 0);
}
}
-
return true;
}
View
2  src/bgfetcher.h 100755 → 100644
@@ -118,8 +118,6 @@ class BgFetcher {
EventuallyPersistentStore *store;
Dispatcher *dispatcher;
vb_bgfetch_queue_t items2fetch;
- size_t total_num_fetched_items;
- size_t total_num_requeued_items;
TaskId task;
Mutex taskMutex;
EPStats &stats;
View
4 src/blackhole-kvstore/blackhole.cc
@@ -98,9 +98,7 @@ void BlackholeKVStore::dump(uint16_t, shared_ptr<Callback<GetValue> >)
StorageProperties BlackholeKVStore::getStorageProperties()
{
- size_t concurrency(10);
- StorageProperties rv(concurrency, concurrency - 1, 1, true, true,
- true, false);
+ StorageProperties rv(true, true, true, false);
return rv;
}
View
10 src/callbacks.h
@@ -33,13 +33,13 @@ class GetValue {
public:
GetValue() : value(NULL), id(-1),
status(ENGINE_KEY_ENOENT),
- partial(false), nru(false) { }
+ partial(false), nru(0xff) { }
explicit GetValue(Item *v, ENGINE_ERROR_CODE s=ENGINE_SUCCESS,
uint64_t i = -1,
- bool incomplete = false, bool reference = false) :
+ bool incomplete = false, uint8_t _nru = 0xff) :
value(v), id(i), status(s),
- partial(incomplete), nru(reference) { }
+ partial(incomplete), nru(_nru) { }
/**
* The value retrieved for the key.
@@ -70,7 +70,7 @@ class GetValue {
void setPartial() { partial = true; }
- bool isReferenced() const { return nru; }
+ uint8_t getNRUValue() const { return nru; }
void setValue(Item *i) { value = i; }
@@ -80,7 +80,7 @@ class GetValue {
uint64_t id;
ENGINE_ERROR_CODE status;
bool partial;
- bool nru;
+ uint8_t nru;
};
/**
View
1  src/checkpoint.cc
@@ -74,6 +74,7 @@ void Checkpoint::setState(checkpoint_state state) {
void Checkpoint::popBackCheckpointEndItem() {
if (!toWrite.empty() && toWrite.back()->getOperation() == queue_op_checkpoint_end) {
+ keyIndex.erase(toWrite.back()->getKey());
toWrite.pop_back();
}
}
View
12 src/couch-kvstore/couch-kvstore.cc
@@ -597,15 +597,19 @@ void CouchKVStore::getPersistedStats(std::map<std::string, std::string> &stats)
for (int i = 0; i < json_arr_size; ++i) {
cJSON *obj = cJSON_GetArrayItem(json_obj, i);
if (obj) {
- stats[obj->string] = obj->valuestring;
+ stats[obj->string] = obj->valuestring ? obj->valuestring : "";
}
}
cJSON_Delete(json_obj);
- } catch (const std::ifstream::failure& e) {
+ } catch (const std::ifstream::failure &e) {
LOG(EXTENSION_LOG_WARNING,
"Warning: failed to load the engine session stats "
" due to IO exception \"%s\"", e.what());
+ } catch (...) {
+ LOG(EXTENSION_LOG_WARNING,
+ "Warning: failed to load the engine session stats "
+ " due to IO exception");
}
delete[] buffer;
@@ -824,9 +828,7 @@ void CouchKVStore::dumpDeleted(uint16_t vb, shared_ptr<Callback<GetValue> > cb)
StorageProperties CouchKVStore::getStorageProperties()
{
- size_t concurrency(10);
- StorageProperties rv(concurrency, concurrency - 1, 1, true, true,
- true, true);
+ StorageProperties rv(true, true, true, true);
return rv;
}
View
240 src/ep.cc
@@ -132,7 +132,7 @@ class BGFetchCallback : public DispatcherCallback {
const std::string &k, uint16_t vbid,
uint64_t r, const void *c, bg_fetch_type_t t) :
ep(e), key(k), vbucket(vbid), rowid(r), cookie(c), type(t),
- counter(ep->bgFetchQueue), init(gethrtime()) {
+ init(gethrtime()) {
assert(ep);
assert(cookie);
}
@@ -155,9 +155,7 @@ class BGFetchCallback : public DispatcherCallback {
uint64_t rowid;
const void *cookie;
bg_fetch_type_t type;
- BGFetchCounter counter;
-
- hrtime_t init;
+ hrtime_t init;
};
/**
@@ -167,23 +165,14 @@ class VKeyStatBGFetchCallback : public DispatcherCallback {
public:
VKeyStatBGFetchCallback(EventuallyPersistentStore *e,
const std::string &k, uint16_t vbid,
- uint64_t r,
- const void *c, shared_ptr<Callback<GetValue> > cb) :
- ep(e), key(k), vbucket(vbid), rowid(r), cookie(c),
- lookup_cb(cb), counter(e->bgFetchQueue) {
+ uint64_t s, const void *c) :
+ ep(e), key(k), vbucket(vbid), bySeqNum(s), cookie(c) {
assert(ep);
assert(cookie);
- assert(lookup_cb);
}
bool callback(Dispatcher &, TaskId &) {
- RememberingCallback<GetValue> gcb;
-
- ep->getROUnderlying()->get(key, rowid, vbucket, gcb);
- gcb.waitForValue();
- assert(gcb.fired);
- lookup_cb->callback(gcb.val);
-
+ ep->completeStatsVKey(cookie, key, vbucket, bySeqNum);
return false;
}
@@ -197,10 +186,8 @@ class VKeyStatBGFetchCallback : public DispatcherCallback {
EventuallyPersistentStore *ep;
std::string key;
uint16_t vbucket;
- uint64_t rowid;
+ uint64_t bySeqNum;
const void *cookie;
- shared_ptr<Callback<GetValue> > lookup_cb;
- BGFetchCounter counter;
};
/**
@@ -252,28 +239,13 @@ class VBucketMemoryDeletionCallback : public DispatcherCallback {
*/
class VBucketDeletionCallback : public DispatcherCallback {
public:
- VBucketDeletionCallback(EventuallyPersistentStore *e, RCPtr<VBucket> &vb,
- EPStats &st, const void* c = NULL, bool rc = false) :
- ep(e), vbucket(vb->getId()),
- stats(st), cookie(c), recreate(rc) {}
+ VBucketDeletionCallback(EventuallyPersistentStore *e, uint16_t vbid,
+ const void* c = NULL, bool rc = false) :
+ ep(e), vbucket(vbid), cookie(c),
+ recreate(rc) {}
bool callback(Dispatcher &, TaskId &) {
- hrtime_t start_time(gethrtime());
- vbucket_del_result result = ep->completeVBucketDeletion(vbucket, recreate);
- if (result == vbucket_del_success || result == vbucket_del_invalid) {
- hrtime_t spent(gethrtime() - start_time);
- hrtime_t wall_time = spent / 1000;
- BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
- stats.diskVBDelHisto.add(wall_time);
- stats.vbucketDelMaxWalltime.setIfBigger(wall_time);
- stats.vbucketDelTotWalltime.incr(wall_time);
- if (cookie) {
- ep->getEPEngine().notifyIOComplete(cookie, ENGINE_SUCCESS);
- }
- return false;
- }
-
- return true;
+ return !ep->completeVBucketDeletion(vbucket, cookie, recreate);
}
std::string description() {
@@ -285,15 +257,13 @@ class VBucketDeletionCallback : public DispatcherCallback {
private:
EventuallyPersistentStore *ep;
uint16_t vbucket;
- EPStats &stats;
const void* cookie;
bool recreate;
};
EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine &theEngine,
KVStore *t,
- bool startVb0,
- bool concurrentDB) :
+ bool startVb0) :
engine(theEngine), stats(engine.getEpStats()), rwUnderlying(t),
storageProperties(t->getStorageProperties()), bgFetcher(NULL),
vbuckets(theEngine.getConfiguration()),
@@ -303,31 +273,15 @@ EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine
engine.getConfiguration().getAlogBlockSize()),
diskFlushAll(false), bgFetchDelay(0), snapshotVBState(false)
{
- LOG(EXTENSION_LOG_INFO, "Storage props: c=%ld/r=%ld/rw=%ld\n",
- storageProperties.maxConcurrency(),
- storageProperties.maxReaders(),
- storageProperties.maxWriters());
-
doPersistence = getenv("EP_NO_PERSISTENCE") == NULL;
dispatcher = new Dispatcher(theEngine, "RW_Dispatcher");
- if (storageProperties.maxConcurrency() > 1
- && storageProperties.maxReaders() > 1
- && concurrentDB) {
- roUnderlying = engine.newKVStore(true);
- roDispatcher = new Dispatcher(theEngine, "RO_Dispatcher");
- } else {
- roUnderlying = rwUnderlying;
- roDispatcher = dispatcher;
- }
- if (storageProperties.maxConcurrency() > 2
- && storageProperties.maxReaders() > 2
- && concurrentDB) {
- auxUnderlying = engine.newKVStore(true);
- auxIODispatcher = new Dispatcher(theEngine, "AUXIO_Dispatcher");
- } else {
- auxUnderlying = roUnderlying;
- auxIODispatcher = roDispatcher;
- }
+
+ roUnderlying = engine.newKVStore(true);
+ roDispatcher = new Dispatcher(theEngine, "RO_Dispatcher");
+
+ auxUnderlying = engine.newKVStore(true);
+ auxIODispatcher = new Dispatcher(theEngine, "AUXIO_Dispatcher");
+
nonIODispatcher = new Dispatcher(theEngine, "NONIO_Dispatcher");
flusher = new Flusher(this, dispatcher);
@@ -523,36 +477,29 @@ EventuallyPersistentStore::~EventuallyPersistentStore() {
stopWarmup();
stopFlusher();
stopBgFetcher();
- dispatcher->schedule(shared_ptr<DispatcherCallback>(new StatSnap(&engine, true)),
- NULL, Priority::StatSnapPriority, 0, false, true);
+
dispatcher->stop(forceShutdown);
- if (hasSeparateRODispatcher()) {
- roDispatcher->stop(forceShutdown);
- delete roDispatcher;
- delete roUnderlying;
- }
- if (hasSeparateAuxIODispatcher()) {
- auxIODispatcher->stop(forceShutdown);
- delete auxIODispatcher;
- delete auxUnderlying;
- }
+ roDispatcher->stop(forceShutdown);
+ auxIODispatcher->stop(forceShutdown);
nonIODispatcher->stop(forceShutdown);
delete flusher;
delete bgFetcher;
+ delete warmupTask;
+
delete dispatcher;
+ delete roDispatcher;
+ delete auxIODispatcher;
delete nonIODispatcher;
- delete warmupTask;
+
+ delete roUnderlying;
+ delete auxUnderlying;
}
void EventuallyPersistentStore::startDispatcher() {
dispatcher->start();
- if (hasSeparateRODispatcher()) {
- roDispatcher->start();
- }
- if (hasSeparateAuxIODispatcher()) {
- auxIODispatcher->start();
- }
+ roDispatcher->start();
+ auxIODispatcher->start();
}
void EventuallyPersistentStore::startNonIODispatcher() {
@@ -739,7 +686,7 @@ protocol_binary_response_status EventuallyPersistentStore::evictKey(const std::s
ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
const void *cookie,
bool force,
- bool trackReference) {
+ uint8_t nru) {
RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
if (!vb || vb->getState() == vbucket_state_dead) {
@@ -756,7 +703,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::set(const Item &itm,
bool cas_op = (itm.getCas() != 0);
- mutation_type_t mtype = vb->ht.set(itm, trackReference);
+ mutation_type_t mtype = vb->ht.set(itm, nru);
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (mtype) {
@@ -819,7 +766,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::add(const Item &itm,
}
ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm, bool meta,
- bool trackReference) {
+ uint8_t nru) {
RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
if (!vb ||
@@ -833,9 +780,9 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::addTAPBackfillItem(const Item &itm,
mutation_type_t mtype;
if (meta) {
- mtype = vb->ht.set(itm, 0, true, true, trackReference);
+ mtype = vb->ht.set(itm, 0, true, true, nru);
} else {
- mtype = vb->ht.set(itm, trackReference);
+ mtype = vb->ht.set(itm, nru);
}
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
@@ -959,10 +906,13 @@ void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
NULL, p, 0, false);
}
-vbucket_del_result
-EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid, bool recreate) {
+bool EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid,
+ const void* cookie,
+ bool recreate) {
LockHolder lh(vbsetMutex);
+ hrtime_t start_time(gethrtime());
+ vbucket_del_result result = vbucket_del_invalid;
RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
if (!vb || vb->getState() == vbucket_state_dead || vbuckets.isBucketDeletion(vbid)) {
lh.unlock();
@@ -982,13 +932,27 @@ EventuallyPersistentStore::completeVBucketDeletion(uint16_t vbid, bool recreate)
mutationLog.commit1();
mutationLog.commit2();
++stats.vbucketDeletions;
- return vbucket_del_success;
+ result = vbucket_del_success;
} else {
++stats.vbucketDeletionFail;
- return vbucket_del_fail;
+ result = vbucket_del_fail;
+ }
+ }
+
+ if (result == vbucket_del_success || result == vbucket_del_invalid) {
+ hrtime_t spent(gethrtime() - start_time);
+ hrtime_t wall_time = spent / 1000;
+ BlockTimer::log(spent, "disk_vb_del", stats.timingLog);
+ stats.diskVBDelHisto.add(wall_time);
+ stats.vbucketDelMaxWalltime.setIfBigger(wall_time);
+ stats.vbucketDelTotWalltime.incr(wall_time);
+ if (cookie) {
+ engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
}
+ return true;
}
- return vbucket_del_invalid;
+
+ return false;
}
void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
@@ -999,8 +963,8 @@ void EventuallyPersistentStore::scheduleVBDeletion(RCPtr<VBucket> &vb,
nonIODispatcher->schedule(mem_cb, NULL, Priority::VBMemoryDeletionPriority, delay, false);
if (vbuckets.setBucketDeletion(vb->getId(), true)) {
- shared_ptr<DispatcherCallback> cb(new VBucketDeletionCallback(this, vb,
- stats,
+ shared_ptr<DispatcherCallback> cb(new VBucketDeletionCallback(this,
+ vb->getId(),
cookie,
recreate));
dispatcher->schedule(cb,
@@ -1059,6 +1023,34 @@ bool EventuallyPersistentStore::resetVBucket(uint16_t vbid) {
return rv;
}
+extern "C" {
+ static void add_stat(const char *key, const uint16_t klen,
+ const char *val, const uint32_t vlen,
+ const void *cookie) {
+ assert(cookie);
+ void *ptr = const_cast<void *>(cookie);
+ std::map<std::string, std::string> *smap =
+ static_cast<std::map<std::string, std::string>*>(ptr);
+
+ std::string k(key, klen);
+ std::string v(val, vlen);
+ smap->insert(std::pair<std::string, std::string>(k, v));
+ }
+}
+
+void EventuallyPersistentStore::snapshotStats() {
+ std::map<std::string, std::string> smap;
+ bool rv = engine.getStats(&smap, NULL, 0, add_stat) == ENGINE_SUCCESS &&
+ engine.getStats(&smap, "tap", 3, add_stat) == ENGINE_SUCCESS;
+ if (rv && engine.isShutdownMode()) {
+ smap["ep_force_shutdown"] = engine.isForceShutdown() ? "true" : "false";
+ std::stringstream ss;
+ ss << ep_real_time();
+ smap["ep_shutdown_time"] = ss.str();
+ }
+ rwUnderlying->snapshotStats(smap);
+}
+
void EventuallyPersistentStore::updateBGStats(const hrtime_t init,
const hrtime_t start,
const hrtime_t stop) {
@@ -1088,11 +1080,6 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
hrtime_t init,
bg_fetch_type_t type) {
hrtime_t start(gethrtime());
- std::stringstream ss;
- ss << "Completed a background fetch, now at " << bgFetchQueue.get()
- << std::endl;
- LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
-
// Go find the data
RememberingCallback<GetValue> gcb;
if (BG_FETCH_METADATA == type) {
@@ -1148,6 +1135,7 @@ void EventuallyPersistentStore::completeBGFetch(const std::string &key,
hrtime_t stop = gethrtime();
updateBGStats(init, start, stop);
+ bgFetchQueue--;
delete gcb.val.getValue();
engine.notifyIOComplete(cookie, status);
@@ -1236,6 +1224,7 @@ void EventuallyPersistentStore::bgFetch(const std::string &key,
shared_ptr<BGFetchCallback> dcb(new BGFetchCallback(this, key,
vbucket,
rowid, cookie, type));
+ bgFetchQueue++;
assert(bgFetchQueue > 0);
ss << "Queued a background fetch, now at " << bgFetchQueue.get()
<< std::endl;
@@ -1281,11 +1270,11 @@ GetValue EventuallyPersistentStore::getInternal(const std::string &key,
bgFetch(key, vbucket, v->getId(), cookie);
}
return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getId(), true,
- v->isReferenced());
+ v->getNRUValue());
}
GetValue rv(v->toItem(v->isLocked(ep_current_time()), vbucket),
- ENGINE_SUCCESS, v->getId(), false, v->isReferenced());
+ ENGINE_SUCCESS, v->getId(), false, v->getNRUValue());
return rv;
} else {
GetValue rv;
@@ -1356,7 +1345,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
const void *cookie,
bool force,
bool allowExisting,
- bool trackReference)
+ uint8_t nru)
{
RCPtr<VBucket> vb = getVBucket(itm.getVBucketId());
if (!vb || vb->getState() == vbucket_state_dead) {
@@ -1372,7 +1361,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setWithMeta(const Item &itm,
}
mutation_type_t mtype = vb->ht.set(itm, cas, allowExisting,
- true, trackReference);
+ true, nru);
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (mtype) {
@@ -1464,10 +1453,9 @@ GetValue EventuallyPersistentStore::getAndUpdateTtl(const std::string &key,
}
ENGINE_ERROR_CODE
-EventuallyPersistentStore::getFromUnderlying(const std::string &key,
- uint16_t vbucket,
- const void *cookie,
- shared_ptr<Callback<GetValue> > cb) {
+EventuallyPersistentStore::statsVKey(const std::string &key,
+ uint16_t vbucket,
+ const void *cookie) {
RCPtr<VBucket> vb = getVBucket(vbucket);
if (!vb) {
return ENGINE_NOT_MY_VBUCKET;
@@ -1481,7 +1469,8 @@ EventuallyPersistentStore::getFromUnderlying(const std::string &key,
shared_ptr<VKeyStatBGFetchCallback> dcb(new VKeyStatBGFetchCallback(this, key,
vbucket,
v->getId(),
- cookie, cb));
+ cookie));
+ bgFetchQueue++;
assert(bgFetchQueue > 0);
roDispatcher->schedule(dcb, NULL, Priority::VKeyStatBgFetcherPriority, bgFetchDelay);
return ENGINE_EWOULDBLOCK;
@@ -1490,6 +1479,26 @@ EventuallyPersistentStore::getFromUnderlying(const std::string &key,
}
}
+void EventuallyPersistentStore::completeStatsVKey(const void* cookie,
+ std::string &key,
+ uint16_t vbid,
+ uint64_t bySeqNum) {
+ RememberingCallback<GetValue> gcb;
+
+ roUnderlying->get(key, bySeqNum, vbid, gcb);
+ gcb.waitForValue();
+ assert(gcb.fired);
+
+ if (gcb.val.getStatus() == ENGINE_SUCCESS) {
+ engine.addLookupResult(cookie, gcb.val.getValue());
+ } else {
+ engine.addLookupResult(cookie, NULL);
+ }
+
+ bgFetchQueue--;
+ engine.notifyIOComplete(cookie, ENGINE_SUCCESS);
+}
+
bool EventuallyPersistentStore::getLocked(const std::string &key,
uint16_t vbucket,
Callback<GetValue> &cb,
@@ -1774,17 +1783,6 @@ class PersistenceCallback : public Callback<mutation_result>,
// mark this item clean only if current and stored cas
// value match
v->markClean();
- vbucket_state_t vbstate = vb->getState();
- if (vbstate != vbucket_state_active &&
- vbstate != vbucket_state_pending) {
- double current = static_cast<double>(stats->getTotalMemoryUsed());
- double lower = static_cast<double>(stats->mem_low_wat);
- // evict unreferenced replica items only
- if (current > lower && !v->isReferenced() &&
- vb->checkpointManager.eligibleForEviction(v->getKey())) {
- v->ejectValue(*stats, vb->ht);
- }
- }
}
}
View
85 src/ep.h
@@ -161,8 +161,7 @@ class EventuallyPersistentStore {
public:
EventuallyPersistentStore(EventuallyPersistentEngine &theEngine,
- KVStore *t, bool startVb0,
- bool concurrentDB);
+ KVStore *t, bool startVb0);
~EventuallyPersistentStore();
@@ -174,13 +173,13 @@ class EventuallyPersistentStore {
* @param cookie the cookie representing the client to store the item
* @param force override access to the vbucket even if the state of the
* vbucket would deny mutations.
- * @param trackReference true if we want to set the nru bit for the item
+ * @param nru the nru bit value for the item
* @return the result of the store operation
*/
ENGINE_ERROR_CODE set(const Item &item,
const void *cookie,
bool force = false,
- bool trackReference = true);
+ uint8_t nru = 0xff);
ENGINE_ERROR_CODE add(const Item &item, const void *cookie);
@@ -188,11 +187,11 @@ class EventuallyPersistentStore {
* Add an TAP backfill item into its corresponding vbucket
* @param item the item to be added
* @param meta contains meta info or not
- * @param trackReference true if we want to set the nru bit for the item
+ * @param nru the nru bit for the item
* @return the result of the operation
*/
ENGINE_ERROR_CODE addTAPBackfillItem(const Item &item, bool meta,
- bool trackReference=false);
+ uint8_t nru = 0xff);
/**
* Retrieve a value.
@@ -255,7 +254,7 @@ class EventuallyPersistentStore {
* @param force override vbucket states
* @param allowExisting set to false if you want set to fail if the
* item exists already
- * @param trackReference true if we want to set the nru bit for the item
+ * @param nru the nru bit for the item
* @return the result of the store operation
*/
ENGINE_ERROR_CODE setWithMeta(const Item &item,
@@ -263,7 +262,7 @@ class EventuallyPersistentStore {
const void *cookie,
bool force,
bool allowReplace,
- bool trackReference = false);
+ uint8_t nru = 0xff);
/**
* Retrieve a value, but update its TTL first
@@ -289,10 +288,12 @@ class EventuallyPersistentStore {
*
* @return a status resulting form executing the method
*/
- ENGINE_ERROR_CODE getFromUnderlying(const std::string &key,
- uint16_t vbucket,
- const void *cookie,
- shared_ptr<Callback<GetValue> > cb);
+ ENGINE_ERROR_CODE statsVKey(const std::string &key,
+ uint16_t vbucket,
+ const void *cookie);
+
+ void completeStatsVKey(const void* cookie, std::string &key, uint16_t vbid,
+ uint64_t bySeqNum);
protocol_binary_response_status evictKey(const std::string &key,
uint16_t vbucket,
@@ -366,13 +367,6 @@ class EventuallyPersistentStore {
}
/**
- * True if the RW dispatcher and RO dispatcher are distinct.
- */
- bool hasSeparateRODispatcher() {
- return dispatcher != roDispatcher;
- }
-
- /**
* Get the auxiliary IO dispatcher.
*/
Dispatcher* getAuxIODispatcher(void) {
@@ -381,13 +375,6 @@ class EventuallyPersistentStore {
}
/**
- * True if the RO dispatcher and auxiliary IO dispatcher are distinct.
- */
- bool hasSeparateAuxIODispatcher() {
- return roDispatcher != auxIODispatcher;
- }
-
- /**
* Get the current non-io dispatcher.
*
* Use this dispatcher to queue non-io jobs.
@@ -409,6 +396,11 @@ class EventuallyPersistentStore {
void stopBgFetcher(void);
/**
+ * Takes a snapshot of the current stats and persists them to disk.
+ */
+ void snapshotStats(void);
+
+ /**
* Enqueue a background fetch for a key.
*
* @param key the key to be bg fetched
@@ -478,9 +470,15 @@ class EventuallyPersistentStore {
ENGINE_ERROR_CODE setVBucketState(uint16_t vbid, vbucket_state_t state);
/**
- * Perform a fast vbucket deletion.
+ * Physically deletes a VBucket from disk. This function should only
+ * be called on a VBucket that has already been logically deleted.
+ *
+ * @param vbid The VBucket to physically delete
+ * @param cookie The connection that requested the deletion
+ * @param recreate Whether or not to recreate the VBucket after deletion
*/
- vbucket_del_result completeVBucketDeletion(uint16_t vbid, bool recreate);
+ bool completeVBucketDeletion(uint16_t vbid, const void* cookie,
+ bool recreate);
/**
* Deletes a vbucket
@@ -615,8 +613,6 @@ class EventuallyPersistentStore {
void resetAccessScannerTasktime() {
accessScanner.lastTaskRuntime = gethrtime();
- // notify item pager to check access scanner task time
- pager.biased = false;
}
/**
@@ -641,7 +637,7 @@ class EventuallyPersistentStore {
}
bool multiBGFetchEnabled() {
- return hasSeparateRODispatcher() && storageProperties.hasEfficientGet();
+ return storageProperties.hasEfficientGet();
}
void updateCachedResidentRatio(size_t activePerc, size_t replicaPerc) {
@@ -789,10 +785,6 @@ class EventuallyPersistentStore {
Atomic<size_t> activeRatio;
Atomic<size_t> replicaRatio;
} cachedResidentRatio;
- struct ItemPagerInfo {
- ItemPagerInfo() : biased(true) {}
- Atomic<bool> biased;
- } pager;
size_t transactionSize;
size_t lastTransTimePerItem;
size_t itemExpiryWindow;
@@ -803,27 +795,4 @@ class EventuallyPersistentStore {
DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);
};
-/**
- * Object whose existence maintains a counter incremented.
- *
- * When the object is constructed, it increments the given counter,
- * when destructed, it decrements the counter.
- */
-class BGFetchCounter {
-public:
-
- BGFetchCounter(Atomic<size_t> &c) : counter(c) {
- ++counter;
- }
-
- ~BGFetchCounter() {
- --counter;
- assert(counter.get() < GIGANTOR);
- }
-
-private:
- Atomic<size_t> &counter;
-};
-
-
#endif // SRC_EP_H_
View
87 src/ep_engine.cc
@@ -37,6 +37,8 @@
#include "htresizer.h"
#include "memory_tracker.h"
#include "stats-info.h"
+#include "statsnap.h"
+
#define STATWRITER_NAMESPACE core_engine
#include "statwriter.h"
#undef STATWRITER_NAMESPACE
@@ -89,20 +91,6 @@ static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
return rv;
}
-void LookupCallback::callback(GetValue &value) {
- if (value.getStatus() == ENGINE_SUCCESS) {
- engine->addLookupResult(cookie, value.getValue());
- } else {
- engine->addLookupResult(cookie, NULL);
- }
-
- if (forceSuccess) {
- engine->notifyIOComplete(cookie, ENGINE_SUCCESS);
- return;
- }
- engine->notifyIOComplete(cookie, value.getStatus());
-}
-
template <typename T>
static void validate(T v, T l, T h) {
if (v < l || v > h) {
@@ -428,8 +416,10 @@ extern "C" {
e->getConfiguration().setAlogTaskTime(v);
} else if (strcmp(keyz, "pager_active_vb_pcnt") == 0) {
e->getConfiguration().setPagerActiveVbPcnt(v);
- } else if (strcmp(keyz, "pager_unbiased_period") == 0) {
- e->getConfiguration().setPagerUnbiasedPeriod(v);
+ } else if (strcmp(keyz, "warmup_min_memory_threshold") == 0) {
+ e->getConfiguration().setWarmupMinMemoryThreshold(v);
+ } else if (strcmp(keyz, "warmup_min_items_threshold") == 0) {
+ e->getConfiguration().setWarmupMinItemsThreshold(v);
} else {
*msg = "Unknown config param";
rv = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
@@ -1167,7 +1157,7 @@ void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
if (loggerApi->get_level() <= severity) {
va_list va;
va_start(va, fmt);
- vsprintf(buffer, fmt, va);
+ vsnprintf(buffer, sizeof(buffer) - 1, fmt, va);
if (engine) {
logger->log(severity, NULL, "(%s) %s", engine->getName(), buffer);
} else {
@@ -1304,6 +1294,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
// Start updating the variables from the config!
HashTable::setDefaultNumBuckets(configuration.getHtSize());
HashTable::setDefaultNumLocks(configuration.getHtLocks());
+ StoredValue::setMutationMemoryThreshold(configuration.getMutationMemThreshold());
if (configuration.getMaxSize() == 0) {
configuration.setMaxSize(std::numeric_limits<size_t>::max());
@@ -1358,8 +1349,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::initialize(const char* config) {
}
databaseInitTime = ep_real_time() - start;
- epstore = new EventuallyPersistentStore(*this, kvstore, configuration.isVb0(),
- configuration.isConcurrentDB());
+ epstore = new EventuallyPersistentStore(*this, kvstore, configuration.isVb0());
if (epstore == NULL) {
return ENGINE_ENOMEM;
}
@@ -1394,6 +1384,9 @@ KVStore* EventuallyPersistentEngine::newKVStore(bool read_only) {
void EventuallyPersistentEngine::destroy(bool force) {
forceShutdown = force;
stopEngineThreads();
+ shared_ptr<DispatcherCallback> dist(new StatSnap(this, true));
+ getEpStore()->getDispatcher()->schedule(dist, NULL, Priority::StatSnapPriority,
+ 0, false, true);
tapConnMap->shutdownAllTapConnections();
}
@@ -1649,8 +1642,8 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
queueBackfill(backFillVBFilter, connection);
}
- bool referenced = false;
- Item *it = connection->getNextItem(cookie, vbucket, ret, referenced);
+ uint8_t nru = INITIAL_NRU_VALUE;
+ Item *it = connection->getNextItem(cookie, vbucket, ret, nru);
switch (ret) {
case TAP_CHECKPOINT_START:
case TAP_CHECKPOINT_END:
@@ -1659,7 +1652,7 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
*itm = it;
if (ret == TAP_MUTATION) {
*nes = TapEngineSpecific::packSpecificData(ret, connection, it->getSeqno(),
- referenced);
+ nru);
*es = connection->specificData;
} else if (ret == TAP_DELETION) {
*nes = TapEngineSpecific::packSpecificData(ret, connection, it->getSeqno());
@@ -2035,18 +2028,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::tapNotify(const void *cookie,
if (tc) {
bool meta = false;
- bool nru = false;
+ uint8_t nru = INITIAL_NRU_VALUE;
if (nengine >= TapEngineSpecific::sizeRevSeqno) {
uint64_t seqnum;
- uint8_t extra = 0;
TapEngineSpecific::readSpecificData(tap_event, engine_specific, nengine,
- &seqnum, &extra);
- // extract replicated item nru reference
- if (nengine == TapEngineSpecific::sizeTotal &&
- extra == TapEngineSpecific::nru)
- {
- nru = true;
- }
+ &seqnum, &nru);
itm->setCas(cas);
itm->setSeqno(seqnum);
meta = true;
@@ -2281,7 +2267,7 @@ void EventuallyPersistentEngine::startEngineThreads(void)
void EventuallyPersistentEngine::queueBackfill(const VBucketFilter &backfillVBFilter,
TapProducer *tc) {
- shared_ptr<DispatcherCallback> backfill_cb(new BackfillTask(this, tc, epstore,
+ shared_ptr<DispatcherCallback> backfill_cb(new BackfillTask(this, tc,
backfillVBFilter));
epstore->getNonIODispatcher()->schedule(backfill_cb, NULL,
Priority::BackfillTaskPriority,
@@ -2304,8 +2290,6 @@ bool VBucketCountVisitor::visitBucket(RCPtr<VBucket> &vb) {
htCacheSize += vb->ht.cacheSize;
numEjects += vb->ht.getNumEjects();
numExpiredItems += vb->numExpiredItems;
- numReferencedItems += vb->ht.getNumReferenced();
- numReferencedEjects += vb->ht.getNumReferencedEjects();
metaDataMemory += vb->ht.metaDataMemory;
opsCreate += vb->opsCreate;
opsUpdate += vb->opsUpdate;
@@ -2456,10 +2440,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
add_casted_stat("vb_active_queue_fill", activeCountVisitor.getQueueFill(), add_stat, cookie);
add_casted_stat("vb_active_queue_drain", activeCountVisitor.getQueueDrain(),
add_stat, cookie);
- add_casted_stat("vb_active_num_ref_items", activeCountVisitor.getReferenced(),
- add_stat, cookie);
- add_casted_stat("vb_active_num_ref_ejects", activeCountVisitor.getReferencedEjects(),
- add_stat, cookie);
add_casted_stat("vb_replica_num", replicaCountVisitor.getVBucketNumber(), add_stat, cookie);
add_casted_stat("vb_replica_curr_items", replicaCountVisitor.getNumItems(), add_stat, cookie);
@@ -2486,10 +2466,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
add_stat, cookie);
add_casted_stat("vb_replica_queue_fill", replicaCountVisitor.getQueueFill(), add_stat, cookie);
add_casted_stat("vb_replica_queue_drain", replicaCountVisitor.getQueueDrain(), add_stat, cookie);
- add_casted_stat("vb_replica_num_ref_items", replicaCountVisitor.getReferenced(),
- add_stat, cookie);
- add_casted_stat("vb_replica_num_ref_ejects", replicaCountVisitor.getReferencedEjects(),
- add_stat, cookie);
add_casted_stat("vb_pending_num", pendingCountVisitor.getVBucketNumber(), add_stat, cookie);
add_casted_stat("vb_pending_curr_items", pendingCountVisitor.getNumItems(), add_stat, cookie);
@@ -2516,10 +2492,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
add_stat, cookie);
add_casted_stat("vb_pending_queue_fill", pendingCountVisitor.getQueueFill(), add_stat, cookie);
add_casted_stat("vb_pending_queue_drain", pendingCountVisitor.getQueueDrain(), add_stat, cookie);
- add_casted_stat("vb_pending_num_ref_items", pendingCountVisitor.getReferenced(),
- add_stat, cookie);
- add_casted_stat("vb_pending_num_ref_ejects", pendingCountVisitor.getReferencedEjects(),
- add_stat, cookie);
add_casted_stat("vb_dead_num", deadCountVisitor.getVBucketNumber(), add_stat, cookie);
@@ -2649,13 +2621,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
add_stat, cookie);
}
- StorageProperties sprop(epstore->getStorageProperties());
- add_casted_stat("ep_store_max_concurrency", sprop.maxConcurrency(),
- add_stat, cookie);
- add_casted_stat("ep_store_max_readers", sprop.maxReaders(),
- add_stat, cookie);
- add_casted_stat("ep_store_max_readwrite", sprop.maxWriters(),
- add_stat, cookie);
add_casted_stat("ep_num_non_resident",
activeCountVisitor.getNonResident() +
pendingCountVisitor.getNonResident() +
@@ -3115,8 +3080,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doKeyStats(const void *cookie,
diskItem.reset();
}
} else if (validate) {
- shared_ptr<LookupCallback> cb(new LookupCallback(this, cookie, true));
- rv = epstore->getFromUnderlying(key, vbid, cookie, cb);
+ rv = epstore->statsVKey(key, vbid, cookie);
if (rv == ENGINE_NOT_MY_VBUCKET || rv == ENGINE_KEY_ENOENT) {
if (isDegradedMode()) {
return ENGINE_TMPFAIL;
@@ -3259,15 +3223,11 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doDispatcherStats(const void *cook
DispatcherState ds(epstore->getDispatcher()->getDispatcherState());
doDispatcherStat("dispatcher", ds, cookie, add_stat);
- if (epstore->hasSeparateRODispatcher()) {
- DispatcherState rods(epstore->getRODispatcher()->getDispatcherState());
- doDispatcherStat("ro_dispatcher", rods, cookie, add_stat);
- }
+ DispatcherState rods(epstore->getRODispatcher()->getDispatcherState());
+ doDispatcherStat("ro_dispatcher", rods, cookie, add_stat);
- if (epstore->hasSeparateAuxIODispatcher()) {
- DispatcherState tapds(epstore->getAuxIODispatcher()->getDispatcherState());
- doDispatcherStat("auxio_dispatcher", tapds, cookie, add_stat);
- }
+ DispatcherState tapds(epstore->getAuxIODispatcher()->getDispatcherState());
+ doDispatcherStat("auxio_dispatcher", tapds, cookie, add_stat);
DispatcherState nds(epstore->getNonIODispatcher()->getDispatcherState());
doDispatcherStat("nio_dispatcher", nds, cookie, add_stat);
@@ -3455,6 +3415,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::observe(const void* cookie,
// Get key stats
uint16_t keystatus = 0;
struct key_stats kstats;
+ memset(&kstats, 0, sizeof(key_stats));
ENGINE_ERROR_CODE rv = epstore->getKeyStats(key, vb_id, kstats, true);
if (rv == ENGINE_SUCCESS) {
if (kstats.logically_deleted) {
View
27 src/ep_engine.h
@@ -73,25 +73,6 @@ class EventuallyPersistentEngine;
class TapConnMap;
/**
- * Base storage callback for things that look up data.
- */
-class LookupCallback : public Callback<GetValue> {
-public:
- LookupCallback(EventuallyPersistentEngine *e, const void* c) :
- engine(e), cookie(c), forceSuccess(false) {}
-
- LookupCallback(EventuallyPersistentEngine *e, const void* c,
- const bool force) :
- engine(e), cookie(c), forceSuccess(force) {}
-
- virtual void callback(GetValue &value);
-private:
- EventuallyPersistentEngine *engine;
- const void *cookie;
- const bool forceSuccess;
-};
-
-/**
* Vbucket visitor that counts active vbuckets.
*/
class VBucketCountVisitor : public VBucketVisitor {
@@ -101,8 +82,6 @@ class VBucketCountVisitor : public VBucketVisitor {
numVbucket(0), htMemory(0),
htItemMemory(0), htCacheSize(0),
numEjects(0), numExpiredItems(0),
- numReferencedItems(0),
- numReferencedEjects(0),
metaDataMemory(0), opsCreate(0),
opsUpdate(0), opsDelete(0),
opsReject(0), queueSize(0),
@@ -138,10 +117,6 @@ class VBucketCountVisitor : public VBucketVisitor {
size_t getExpired() { return numExpiredItems; }
- size_t getReferenced() { return numReferencedItems; }
-
- size_t getReferencedEjects() { return numReferencedEjects; }
-
size_t getMetaDataMemory() { return metaDataMemory; }
size_t getHashtableMemory() { return htMemory; }
@@ -174,8 +149,6 @@ class VBucketCountVisitor : public VBucketVisitor {
size_t htCacheSize;
size_t numEjects;
size_t numExpiredItems;
- size_t numReferencedItems;
- size_t numReferencedEjects;
size_t metaDataMemory;
size_t opsCreate;
View
161 src/item_pager.cc
@@ -29,11 +29,8 @@
#include "ep_engine.h"
#include "item_pager.h"
-static const double EJECTION_RATIO_THRESHOLD(0.1);
static const size_t MAX_PERSISTENCE_QUEUE_SIZE = 1000000;
-const bool PagingConfig::phaseConfig[paging_max] = {false, true};
-
/**
* As part of the ItemPager, visit all of the objects in memory and
* eject some within a constrained probability
@@ -48,17 +45,18 @@ class PagingVisitor : public VBucketVisitor {
* @param s the store that will handle the bulk removal
* @param st the stats where we'll track what we've done
* @param pcnt percentage of objects to attempt to evict (0-1)
- * @param bias active vbuckets eviction probability bias multiplier (0-1)
* @param sfin pointer to a bool to be set to true after run completes
* @param pause flag indicating if PagingVisitor can pause between vbucket visits
- * @param nru false if ignoring reference bits
+ * @param bias active vbuckets eviction probability bias multiplier (0-1)
+ * @param phase pointer to an item_pager_phase to be set
*/
PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt,
- bool *sfin, bool pause = false, double bias = 1, bool nru = true)
- : store(s), stats(st), randomEvict(PagingConfig::phaseConfig[0]), percent(pcnt),
+ bool *sfin, bool pause = false,
+ double bias = 1, item_pager_phase *phase = NULL)
+ : store(s), stats(st), percent(pcnt),
activeBias(bias), ejected(0), totalEjected(0), totalEjectionAttempts(0),
startTime(ep_real_time()), stateFinalizer(sfin), canPause(pause),
- useNru(nru) {}
+ completePhase(true), pager_phase(phase) {}
void visit(StoredValue *v) {
// Remember expired objects -- we're going to delete them.
@@ -68,25 +66,19 @@ class PagingVisitor : public VBucketVisitor {
}
// return if not ItemPager, which uses valid eviction percentage
- if (percent <= 0) {
+ if (percent <= 0 || !pager_phase) {
return;
}
// always evict unreferenced items, or randomly evict referenced item
- double r = randomEvict == false ?
+ double r = *pager_phase == PAGING_UNREFERENCED ?
1 : static_cast<double>(std::rand()) / static_cast<double>(RAND_MAX);
- if ((useNru && !v->isReferenced()) || percent >= r) {
- ++totalEjectionAttempts;
- if (!v->eligibleForEviction()) {
- ++stats.numFailedEjects;
- return;
- }
- // Check if the key was already visited by all the cursors.
- bool can_evict =
- currentBucket->checkpointManager.eligibleForEviction(v->getKey());
- if (can_evict && v->ejectValue(stats, currentBucket->ht)) {
- ++ejected;
- }
+
+ if (*pager_phase == PAGING_UNREFERENCED && v->getNRUValue() == MAX_NRU_VALUE) {
+ doEviction(v);
+ } else if (*pager_phase == PAGING_RANDOM && v->incrNRUValue() == MAX_NRU_VALUE &&
+ r <= percent) {
+ doEviction(v);
}
}
@@ -94,8 +86,8 @@ class PagingVisitor : public VBucketVisitor {
update();
// fast path for expiry item pager
- if (percent <= 0 ) {
- return VBucketVisitor::visitBucket(vb);;
+ if (percent <= 0 || !pager_phase) {
+ return VBucketVisitor::visitBucket(vb);
}
// skip active vbuckets if active resident ratio is lower than replica
@@ -103,19 +95,19 @@ class PagingVisitor : public VBucketVisitor {
double lower = static_cast<double>(stats.mem_low_wat);
double high = static_cast<double>(stats.mem_high_wat);
if (vb->getState() == vbucket_state_active && current < high &&
- store.cachedResidentRatio.activeRatio <
- store.cachedResidentRatio.replicaRatio)
+ store.cachedResidentRatio.activeRatio < store.cachedResidentRatio.replicaRatio)
{
return false;
}
- // stop eviction whenever memory usage is below low watermark
if (current > lower) {
double p = (current - static_cast<double>(lower)) / current;
adjustPercent(p, vb->getState());
return VBucketVisitor::visitBucket(vb);
+ } else { // stop eviction whenever memory usage is below low watermark
+ completePhase = false;
+ return false;
}
- return false;
}
void update() {
@@ -145,6 +137,14 @@ class PagingVisitor : public VBucketVisitor {
if (stateFinalizer) {
*stateFinalizer = true;
}
+
+ if (pager_phase && completePhase) {
+ if (*pager_phase == PAGING_UNREFERENCED) {
+ *pager_phase = PAGING_RANDOM;
+ } else {
+ *pager_phase = PAGING_UNREFERENCED;
+ }
+ }
}
/**
@@ -163,13 +163,6 @@ class PagingVisitor : public VBucketVisitor {
*/
size_t getTotalEjectionAttempts() { return totalEjectionAttempts; }
- void configPaging(bool cfg) {
- randomEvict = cfg;
- if (store.getEPEngine().isDegradedMode()) {
- useNru = false;
- }
- }
-
private:
void adjustPercent(double prob, vbucket_state_t state) {
if (state == vbucket_state_replica ||
@@ -184,43 +177,40 @@ class PagingVisitor : public VBucketVisitor {
}
}
+ void doEviction(StoredValue *v) {
+ ++totalEjectionAttempts;
+ if (!v->eligibleForEviction()) {
+ ++stats.numFailedEjects;
+ return;
+ }
+ // Check if the key was already visited by all the cursors.
+ bool can_evict = currentBucket->checkpointManager.eligibleForEviction(v->getKey());
+ if (can_evict && v->ejectValue(stats, currentBucket->ht)) {
+ ++ejected;
+ }
+ }
+
std::list<std::pair<uint16_t, std::string> > expired;
EventuallyPersistentStore &store;
- EPStats &stats;
- bool randomEvict;
- double percent;
- double activeBias;
- size_t ejected;
- size_t totalEjected;
- size_t totalEjectionAttempts;
- time_t startTime;
- bool *stateFinalizer;
- bool canPause;
- bool useNru;
+ EPStats &stats;
+ double percent;
+ double activeBias;
+ size_t ejected;
+ size_t totalEjected;
+ size_t totalEjectionAttempts;
+ time_t startTime;
+ bool *stateFinalizer;
+ bool canPause;
+ bool completePhase;
+ item_pager_phase *pager_phase;
};
-bool ItemPager::checkAccessScannerTask() {
- if (store.pager.biased) {
- return true;
- }
-
- // compute time difference (in seconds) since last access scanner task
- hrtime_t tdiff = gethrtime() - store.accessScanner.lastTaskRuntime;
- tdiff /= 1000000000;
-
- Configuration &cfg = store.getEPEngine().getConfiguration();
- uint64_t period = cfg.getPagerUnbiasedPeriod() * 60;
- bool biased = (uint64_t)tdiff > period ? true : false;
- store.pager.biased = biased;
- return biased;
-}
-
bool ItemPager::callback(Dispatcher &d, TaskId &t) {
double current = static_cast<double>(stats.getTotalMemoryUsed());
double upper = static_cast<double>(stats.mem_high_wat);
double lower = static_cast<double>(stats.mem_low_wat);
- double sleepTime = 10;
+ double sleepTime = 5;
if (available && current > upper) {
++stats.pagerRuns;
@@ -236,47 +226,11 @@ bool ItemPager::callback(Dispatcher &d, TaskId &t) {
size_t activeEvictPerc = cfg.getPagerActiveVbPcnt();
double bias = static_cast<double>(activeEvictPerc) / 50;
- // ignore using NRU if it is still in the unbiased period
- bool nru = checkAccessScannerTask();
- if (!nru && phase == PagingConfig::paging_unreferenced) {
- ++phase;
- }
-
available = false;
shared_ptr<PagingVisitor> pv(new PagingVisitor(store, stats, toKill,
- &available, false, bias, nru));
- std::srand(ep_real_time());
- pv->configPaging(PagingConfig::phaseConfig[phase]);
+ &available,
+ false, bias, &phase));
store.visit(pv, "Item pager", &d, Priority::ItemPagerPriority);
-
- phase = phase + 1;
- if (stats.getTotalMemoryUsed() <= stats.mem_low_wat ||
- phase >= PagingConfig::paging_max) {
- phase = PagingConfig::paging_unreferenced;
- } else { // move fast to next paging phase if memory usage is still high
- sleepTime = 5;
- }
-
- double total_eject_attms = static_cast<double>(pv->getTotalEjectionAttempts());
- double total_ejected = static_cast<double>(pv->getTotalEjected());
- double ejection_ratio =
- total_eject_attms > 0 ? total_ejected / total_eject_attms : 0;
-
- if (stats.getTotalMemoryUsed() > stats.mem_high_wat &&
- ejection_ratio < EJECTION_RATIO_THRESHOLD)
- {
- const VBucketMap &vbuckets = store.getVBuckets();
- size_t num_vbuckets = vbuckets.getSize();
- for (size_t i = 0; i < num_vbuckets; ++i) {
- assert(i <= std::numeric_limits<uint16_t>::max());
- uint16_t vbid = static_cast<uint16_t>(i);
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
- if (!vb) {
- continue;
- }
- vb->checkpointManager.setCheckpointExtension(false);
- }
- }
}
d.snooze(t, sleepTime);
@@ -288,8 +242,9 @@ bool ExpiredItemPager::callback(Dispatcher &d, TaskId &t) {
++stats.expiryPagerRuns;
available = false;
- shared_ptr<PagingVisitor> pv(new PagingVisitor(store, stats,
- -1, &available, true));
+ shared_ptr<PagingVisitor> pv(new PagingVisitor(store, stats, -1,
+ &available,
+ true, 1, NULL));
store.visit(pv, "Expired item remover", &d, Priority::ItemPagerPriority,
true, 10);
}
View
34 src/item_pager.h
@@ -36,19 +36,12 @@ typedef std::pair<int64_t, int64_t> row_range_t;
class EventuallyPersistentStore;
/**
- * ItemPager visits replica vbuckets and active vbuckets in one phases.
- * The config_t is a bool value and it indicates whether random items ejection
- * will be performed.
+ * The item pager phase
*/
-class PagingConfig {
-public:
- static const short int paging_max = 2;
-
- static const short int paging_unreferenced = 0;
- static const short int paging_random = 1;
-
- static const bool phaseConfig[paging_max];
-};
+typedef enum {
+ PAGING_UNREFERENCED,
+ PAGING_RANDOM
+} item_pager_phase;
/**
* Dispatcher job responsible for periodically pushing data out of
@@ -64,19 +57,26 @@ class ItemPager : public DispatcherCallback {
* @param st the stats
*/
ItemPager(EventuallyPersistentStore *s, EPStats &st) :
- store(*s), stats(st), available(true), phase(PagingConfig::paging_unreferenced) {}
+ store(*s), stats(st), available(true), phase(PAGING_UNREFERENCED) {}
bool callback(Dispatcher &d, TaskId &t);
+ item_pager_phase getPhase() const {
+ return phase;
+ }
+
+ void setPhase(item_pager_phase item_phase) {
+ phase = item_phase;
+ }
+
std::string description() { return std::string("Paging out items."); }
private:
- bool checkAccessScannerTask();
EventuallyPersistentStore &store;
- EPStats &stats;
- bool available;
- short int phase;
+ EPStats &stats;
+ bool available;
+ item_pager_phase phase;
};
/**
View
19 src/kvstore.h
@@ -77,18 +77,10 @@ typedef std::map<uint16_t, vbucket_state> vbucket_map_t;
class StorageProperties {
public:
- StorageProperties(size_t c, size_t r, size_t w, bool evb, bool evd,
- bool pd, bool eget)
- : maxc(c), maxr(r), maxw(w), efficientVBDump(evb),
- efficientVBDeletion(evd), persistedDeletions(pd),
- efficientGet(eget) {}
-
- //! The maximum number of active queries.
- size_t maxConcurrency() const { return maxc; }
- //! Maximum number of active read-only connections.
- size_t maxReaders() const { return maxr; }
- //! Maximum number of active connections for read and write.
- size_t maxWriters() const { return maxw; }
+ StorageProperties(bool evb, bool evd, bool pd, bool eget)
+ : efficientVBDump(evb), efficientVBDeletion(evd),
+ persistedDeletions(pd), efficientGet(eget) {}
+
//! True if we can efficiently dump a single vbucket.
bool hasEfficientVBDump() const { return efficientVBDump; }
//! True if we can efficiently delete a vbucket all at once.
@@ -101,9 +93,6 @@ class StorageProperties {
bool hasEfficientGet() const { return efficientGet; }
private:
- size_t maxc;
- size_t maxr;
- size_t maxw;
bool efficientVBDump;
bool efficientVBDeletion;
bool persistedDeletions;
View
32 src/statsnap.cc
@@ -26,38 +26,8 @@
#include "ep_engine.h"
#include "statsnap.h"
-extern "C" {
- static void add_stat(const char *key, const uint16_t klen,
- const char *val, const uint32_t vlen,
- const void *cookie) {
- assert(cookie);
- void *cokie = const_cast<void *>(cookie);
- StatSnap *ssnap = static_cast<StatSnap *>(cokie);
-
- ObjectRegistry::onSwitchThread(ssnap->getEngine());
- std::string k(key, klen);
- std::string v(val, vlen);
- ssnap->getMap()[k] = v;
- }
-}
-
-bool StatSnap::getStats() {
- map.clear();
- bool rv = engine->getStats(this, NULL, 0, add_stat) == ENGINE_SUCCESS &&
- engine->getStats(this, "tap", 3, add_stat) == ENGINE_SUCCESS;
- if (rv && engine->isShutdownMode()) {
- map["ep_force_shutdown"] = engine->isForceShutdown() ? "true" : "false";
- std::stringstream ss;
- ss << ep_real_time();
- map["ep_shutdown_time"] = ss.str();
- }
- return rv;
-}
-
bool StatSnap::callback(Dispatcher &d, TaskId &t) {
- if (getStats()) {
- engine->getEpStore()->getRWUnderlying()->snapshotStats(map);
- }
+ engine->getEpStore()->snapshotStats();
if (runOnce) {
return false;
}
View
21 src/statsnap.h
@@ -29,32 +29,20 @@
const int STATSNAP_FREQ(60);
-// Forward declaration.
class EventuallyPersistentEngine;
/**
- * Periodically take a snapshot of the stats and record it in the main
- * DB.
+ * Periodically take a snapshot of the stats and record it in the main DB.
*/
class StatSnap : public DispatcherCallback {
public:
StatSnap(EventuallyPersistentEngine *e, bool runOneTimeOnly = false) :
- engine(e), runOnce(runOneTimeOnly) { }
-
- bool callback(Dispatcher &d, TaskId &t);
+ engine(e), runOnce(runOneTimeOnly) {}
/**
- * Grab stats from the engine.
- * @return true if the stat fetch was successful
+ * Called when the task is run to perform the stats snapshot.
*/
- bool getStats();
-
- /**
- * Get the current map of data.
- */
- std::map<std::string, std::string> &getMap() { return map; }
-
- EventuallyPersistentEngine *getEngine() { return engine; }
+ bool callback(Dispatcher &d, TaskId &t);
/**
* Description of task.
@@ -67,7 +55,6 @@ class StatSnap : public DispatcherCallback {
private:
EventuallyPersistentEngine *engine;
bool runOnce;
- std::map<std::string, std::string> map;
};
#endif // SRC_STATSNAP_H_
View
43 src/stored-value.cc
@@ -72,32 +72,36 @@ bool StoredValue::ejectValue(EPStats &stats, HashTable &ht) {
++stats.numValueEjects;
++ht.numNonResidentItems;
++ht.numEjects;
- if (isReferenced()) {
- ++ht.numReferencedEjects;
- }
return true;
}
++stats.numFailedEjects;
return false;
}
-void StoredValue::referenced(HashTable &ht) {
- if (nru == false) {
- nru = true;
- ++ht.numReferenced;
+void StoredValue::referenced() {
+ if (nru > MIN_NRU_VALUE) {
+ --nru;
+ }
+}
+
+void StoredValue::setNRUValue(uint8_t nru_val) {
+ if (nru_val <= MAX_NRU_VALUE) {
+ nru = nru_val;
}
}
-bool StoredValue::isReferenced(bool reset, HashTable *ht) {
- bool ret = nru;
- if (reset && nru) {
- nru = false;
- assert(ht);
- --ht->numReferenced;
+uint8_t StoredValue::incrNRUValue() {
+ uint8_t ret = MAX_NRU_VALUE;
+ if (nru < MAX_NRU_VALUE) {
+ ret = ++nru;
}
return ret;
}
+uint8_t StoredValue::getNRUValue() {
+ return nru;
+}
+
bool StoredValue::unlocked_restoreValue(Item *itm, EPStats &stats,
HashTable &ht) {
// If cas == we loaded the object from our meta file, but
@@ -288,8 +292,6 @@ HashTableStatVisitor HashTable::clear(bool deactivate) {
numNonResidentItems.set(0);
memSize.set(0);
cacheSize.set(0);
- numReferenced.set(0);
- numReferencedEjects.set(0);
return rv;
}
@@ -452,10 +454,9 @@ void HashTable::visitDepth(HashTableDepthVisitor &visitor) {
add_type_t HashTable::unlocked_add(int &bucket_num,
const Item &val,
bool isDirty,
- bool storeVal,
- bool trackReference) {
+ bool storeVal) {
StoredValue *v = unlocked_find(val.getKey(), bucket_num,
- true, trackReference);
+ true, false);
add_type_t rv = ADD_SUCCESS;
if (v && !v->isDeleted() && !v->isExpired(ep_real_time())) {
rv = ADD_EXISTS;
@@ -497,8 +498,7 @@ add_type_t HashTable::unlocked_add(int &bucket_num,
}
if (v->isTempItem()) {
v->resetValue();
- } else {
- v->referenced(*this);
+ v->setNRUValue(MAX_NRU_VALUE);
}
}
@@ -518,8 +518,7 @@ add_type_t HashTable::unlocked_addTempDeletedItem(int &bucket_num,
return unlocked_add(bucket_num, itm,
false, // isDirty
- true, // storeVal
- false); // trackReference
+ true); // storeVal
}
void StoredValue::setMutationMemoryThreshold(double memThreshold) {
View
57 src/stored-value.h
@@ -33,6 +33,13 @@
#include "queueditem.h"
#include "stats.h"
+// Max value for NRU bits
+const uint8_t MAX_NRU_VALUE = 3;
+// Initial value for NRU bits
+const uint8_t INITIAL_NRU_VALUE = 2;
+// Min value for NRU bits
+const uint8_t MIN_NRU_VALUE = 0;
+
// Forward declaration for StoredValue
class HashTable;
class StoredValueFactory;
@@ -55,9 +62,13 @@ class StoredValue {
::operator delete(p);
}
- bool isReferenced(bool reset=false, HashTable *ht=NULL);
+ uint8_t getNRUValue();
+
+ void setNRUValue(uint8_t nru_val);
+
+ uint8_t incrNRUValue();
- void referenced(HashTable &ht);
+ void referenced();
/**
* Mark this item as needing to be persisted.
@@ -554,7 +565,7 @@ class StoredValue {
cas = itm.getCas();
exptime = itm.getExptime();
resident = true;
- nru = false;
+ nru = INITIAL_NRU_VALUE;
lock_expiry = 0;
keylen = itm.getKey().length();
seqno = itm.getSeqno();
@@ -587,7 +598,7 @@ class StoredValue {
uint32_t flags; // 4 bytes
bool _isDirty : 1; // 1 bit
bool resident : 1; //!< True if this object's value is in memory.
- bool nru : 1; //!< True if referenced since last sweep
+ uint8_t nru : 2; //!< True if referenced since last sweep
uint8_t keylen;
char keybytes[1]; //!< The key itself.
@@ -860,16 +871,6 @@ class HashTable {
size_t getNumEjects(void) { return numEjects; }
/**
- * Get the number of referenced items.
- */
- size_t getNumReferenced(void) { return numReferenced; }
-
- /**
- * Get the number of referenced items whose values are ejected.
- */
- size_t getNumReferencedEjects(void) { return numReferencedEjects; }
-
- /**
* Get the total item memory size in this hash table.
*/
size_t getItemMemory(void) { return memSize; }
@@ -945,12 +946,12 @@ class HashTable {
* doesn't contain meta data.