Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 268 lines (235 sloc) 8.03 kb
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
ba5b814 Matt Ingenthron Added LICENSE and headers.
ingenthr authored
2 /*
3 * Copyright 2010 NorthScale, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
57e58b2 Trond Norbye Fixed warning generated from gcc
trondn authored
12 * distributed under the License is distributed on an "AS IS" BASIS,
ba5b814 Matt Ingenthron Added LICENSE and headers.
ingenthr authored
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
d067db6 Trond Norbye Include config.h as the first file in the source files
trondn authored
17 #include "config.h"
4cd99c8 Dustin Sallings Speed up tests by jumping out faster on shutdown.
dustin authored
18 #include <stdlib.h>
19
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
20 #include "flusher.hh"
21
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
22 bool FlusherStepper::callback(Dispatcher &d, TaskId t) {
23 return flusher->step(d, t);
24 }
25
e2f58c0 Chiyoung Seo Transition flusher's state to stopped for force shutdown
chiyoung authored
26 bool Flusher::stop(bool isForceShutdown) {
27 forceShutdownReceived = isForceShutdown;
28 enum flusher_state to = forceShutdownReceived ? stopped : stopping;
29 return transition_state(to);
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
30 }
31
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
32 void Flusher::wait(void) {
4289932 Dustin Sallings Log unexpectedly long flusher shutdowns.
dustin authored
33 hrtime_t startt(gethrtime());
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
34 while (_state != stopped) {
4cd99c8 Dustin Sallings Speed up tests by jumping out faster on shutdown.
dustin authored
35 usleep(1000);
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
36 }
4289932 Dustin Sallings Log unexpectedly long flusher shutdowns.
dustin authored
37 hrtime_t endt(gethrtime());
a6d97bb Aliaksey Kandratsenka (aka Aliaksei Kandratsenka) flusher wait time is in microseconds
alk authored
38 int udiff((endt - startt) / 1000);
39 if (udiff > 10000) {
4289932 Dustin Sallings Log unexpectedly long flusher shutdowns.
dustin authored
40 getLogger()->log(EXTENSION_LOG_WARNING, NULL,
a6d97bb Aliaksey Kandratsenka (aka Aliaksei Kandratsenka) flusher wait time is in microseconds
alk authored
41 "Had to wait %dus for shutdown\n", udiff);
4289932 Dustin Sallings Log unexpectedly long flusher shutdowns.
dustin authored
42 }
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
43 }
44
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
45 bool Flusher::pause(void) {
46 return transition_state(pausing);
47 }
48
49 bool Flusher::resume(void) {
50 return transition_state(running);
51 }
52
53 static bool validTransition(enum flusher_state from,
54 enum flusher_state to)
55 {
56 bool rv(true);
67f0337 Dustin Sallings Added initializing state to flusher.
dustin authored
57 if (from == initializing && to == running) {
1328451 Dustin Sallings Allow initializing -> stopping state in flusher.
dustin authored
58 } else if (from == initializing && to == stopping) {
67f0337 Dustin Sallings Added initializing state to flusher.
dustin authored
59 } else if (from == running && to == pausing) {
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
60 } else if (from == running && to == stopping) {
61 } else if (from == pausing && to == paused) {
62 } else if (from == stopping && to == stopped) {
63 } else if (from == paused && to == running) {
64 } else if (from == paused && to == stopping) {
65 } else if (from == pausing && to == stopping) {
66 } else {
67 rv = false;
68 }
69 return rv;
70 }
71
37415fc Dustin Sallings Compilation fixes for Linux.
dustin authored
72 const char * Flusher::stateName(enum flusher_state st) const {
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
73 static const char * const stateNames[] = {
67f0337 Dustin Sallings Added initializing state to flusher.
dustin authored
74 "initializing", "running", "pausing", "paused", "stopping", "stopped"
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
75 };
67f0337 Dustin Sallings Added initializing state to flusher.
dustin authored
76 assert(st >= initializing && st <= stopped);
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
77 return stateNames[st];
78 }
79
80 bool Flusher::transition_state(enum flusher_state to) {
81
82 getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
83 "Attempting transition from %s to %s\n",
84 stateName(_state), stateName(to));
85
e2f58c0 Chiyoung Seo Transition flusher's state to stopped for force shutdown
chiyoung authored
86 if (!forceShutdownReceived && !validTransition(_state, to)) {
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
87 return false;
88 }
89
90 getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "Transitioning from %s to %s\n",
91 stateName(_state), stateName(to));
92
93 _state = to;
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
94 //Reschedule the task
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
95 LockHolder lh(taskMutex);
87c6be1 Dustin Sallings Ensure the task ID gets assigned to the flusher before use.
dustin authored
96 assert(task.get());
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
97 dispatcher->cancel(task);
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
98 schedule_UNLOCKED();
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
99 return true;
100 }
101
37415fc Dustin Sallings Compilation fixes for Linux.
dustin authored
102 const char * Flusher::stateName() const {
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
103 return stateName(_state);
104 }
105
106 enum flusher_state Flusher::state() const {
107 return _state;
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
108 }
109
87c6be1 Dustin Sallings Ensure the task ID gets assigned to the flusher before use.
dustin authored
110 void Flusher::initialize(TaskId tid) {
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
111 assert(task.get() == tid.get());
4487e12 Dustin Sallings Fix time tracking during flusher warmup (and log it).
dustin authored
112 getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
113 "Initializing flusher; warming up\n");
114
51b371c Chiyoung Seo Bug 2692 - Warmup time is now returned in µs.
chiyoung authored
115 hrtime_t startTime = gethrtime();
1b03c26 Chiyoung Seo MB-3667 Wait for loading the vbucket state from DB during warmup
chiyoung authored
116 vbStateLoaded = false;
117 store->warmup(vbStateLoaded);
51b371c Chiyoung Seo Bug 2692 - Warmup time is now returned in µs.
chiyoung authored
118 store->stats.warmupTime.set((gethrtime() - startTime) / 1000);
ff9c38d Dustin Sallings Abstract the stat types.
dustin authored
119 store->stats.warmupComplete.set(true);
4487e12 Dustin Sallings Fix time tracking during flusher warmup (and log it).
dustin authored
120
121 getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
ff9c38d Dustin Sallings Abstract the stat types.
dustin authored
122 "Warmup completed in %ds\n", store->stats.warmupTime.get());
67f0337 Dustin Sallings Added initializing state to flusher.
dustin authored
123 transition_state(running);
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
124 }
125
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
126 void Flusher::schedule_UNLOCKED() {
127 dispatcher->schedule(shared_ptr<FlusherStepper>(new FlusherStepper(this)),
d5aca7d Chiyoung Seo Bug 2270: Adjusted dispatcher job priorities
chiyoung authored
128 &task, Priority::FlusherPriority);
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
129 assert(task.get());
130 }
131
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
132 void Flusher::start(void) {
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
133 LockHolder lh(taskMutex);
134 schedule_UNLOCKED();
2a1c9f8 Dustin Sallings Maintain flusher status within the flusher.
dustin authored
135 }
136
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
137 void Flusher::wake(void) {
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
138 LockHolder lh(taskMutex);
87c6be1 Dustin Sallings Ensure the task ID gets assigned to the flusher before use.
dustin authored
139 assert(task.get());
dc22189 Dustin Sallings Dispatcher and Flusher refactoring to prevent task ID races.
dustin authored
140 dispatcher->wake(task, &task);
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
141 }
142
143 bool Flusher::step(Dispatcher &d, TaskId tid) {
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
144 try {
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
145 switch (_state) {
146 case initializing:
87c6be1 Dustin Sallings Ensure the task ID gets assigned to the flusher before use.
dustin authored
147 initialize(tid);
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
148 return true;
149 case paused:
150 return false;
151 case pausing:
152 transition_state(paused);
153 return false;
154 case running:
155 {
decb73d Dustin Sallings Dynamically adjust gap between flusher runs based on work done.
dustin authored
156 doFlush();
90bd1d4 Dustin Sallings Always sleep a minimum of 1s between flushes.
dustin authored
157 if (_state == running) {
cab3abe Dustin Sallings MB-2941: Immediately reschedule flusher after preemption.
dustin authored
158 double tosleep = computeMinSleepTime();
159 if (tosleep > 0) {
160 d.snooze(tid, tosleep);
161 }
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
162 return true;
90bd1d4 Dustin Sallings Always sleep a minimum of 1s between flushes.
dustin authored
163 } else {
164 return false;
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
165 }
166 }
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
167 case stopping:
168 {
169 std::stringstream ss;
170 ss << "Shutting down flusher (Write of all dirty items)"
171 << std::endl;
172 getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "%s",
173 ss.str().c_str());
174 }
175 store->stats.min_data_age = 0;
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
176 completeFlush();
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
177 getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "Flusher stopped\n");
178 transition_state(stopped);
179 return false;
180 case stopped:
181 return false;
182 default:
183 getLogger()->log(EXTENSION_LOG_WARNING, NULL,
184 "Unexpected state in flusher: %s", stateName());
185 assert(false);
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
186 }
187 } catch(std::runtime_error &e) {
188 std::stringstream ss;
3329250 Dustin Sallings Typo fix.
dustin authored
189 ss << "Exception in flusher loop: " << e.what() << std::endl;
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
190 getLogger()->log(EXTENSION_LOG_WARNING, NULL, "%s",
191 ss.str().c_str());
192 assert(false);
193 }
57e58b2 Trond Norbye Fixed warning generated from gcc
trondn authored
194
195 // We should _NEVER_ get here (unless you compile with -DNDEBUG causing
196 // the assertions to be removed.. It's a bug, so we should abort and
197 // create a coredump
198 abort();
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
199 }
200
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
201 void Flusher::completeFlush() {
202 doFlush();
203 while (flushQueue) {
204 doFlush();
205 }
206 }
207
decb73d Dustin Sallings Dynamically adjust gap between flusher runs based on work done.
dustin authored
208 double Flusher::computeMinSleepTime() {
cab3abe Dustin Sallings MB-2941: Immediately reschedule flusher after preemption.
dustin authored
209 // If we were preempted, keep going!
210 if (flushQueue && !flushQueue->empty()) {
211 flushRv = 0;
212 prevFlushRv = 0;
213 return 0.0;
214 }
215
decb73d Dustin Sallings Dynamically adjust gap between flusher runs based on work done.
dustin authored
216 if (flushRv + prevFlushRv == 0) {
217 minSleepTime = std::min(minSleepTime * 2, 1.0);
218 } else {
219 minSleepTime = DEFAULT_MIN_SLEEP_TIME;
220 }
221 prevFlushRv = flushRv;
222 return std::max(static_cast<double>(flushRv), minSleepTime);
223 }
224
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
225 int Flusher::doFlush() {
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
226
227 // On a fresh entry, flushQueue is null and we need to build one.
228 if (!flushQueue) {
229 flushRv = store->stats.min_data_age;
230 flushQueue = store->beginFlush();
231 if (flushQueue) {
232 getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
233 "Beginning a write queue flush.\n");
68900ec Chiyoung Seo Integration of checkpoint support into the persistence module.
chiyoung authored
234 rejectQueue = new std::queue<queued_item>();
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
235 flushStart = ep_current_time();
236 }
237 }
238
239 // Now do the every pass thing.
240 if (flushQueue) {
241 if (!flushQueue->empty()) {
242 int n = store->flushSome(flushQueue, rejectQueue);
1ea523b Dustin Sallings Use dispatcher to run the flusher
dustin authored
243 if (_state == pausing) {
244 transition_state(paused);
245 }
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
246 flushRv = std::min(n, flushRv);
933ca26 Dustin Sallings Control flusher loops from within flusher.
dustin authored
247 }
248
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
249 if (flushQueue->empty()) {
976bfee Chiyoung Seo Update vbucket_state table with the last closed checkpoint persisted.
chiyoung authored
250 if (!rejectQueue->empty()) {
251 // Requeue the rejects.
252 store->requeueRejectedItems(rejectQueue);
253 } else {
254 store->completeFlush(flushStart);
255 getLogger()->log(EXTENSION_LOG_INFO, NULL,
256 "Completed a flush, age of oldest item was %ds\n",
257 flushRv);
258
259 delete rejectQueue;
260 rejectQueue = NULL;
261 flushQueue = NULL;
262 }
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
263 }
933ca26 Dustin Sallings Control flusher loops from within flusher.
dustin authored
264 }
7aa54e8 Dustin Sallings Kill the flusher's while() loop.
dustin authored
265
266 return flushRv;
dbd2dff Dustin Sallings Moved flusher implementation out of header file.
dustin authored
267 }
Something went wrong with that request. Please try again.