Skip to content

Commit

Permalink
MB-2771 - Support for the force shutdown.
Browse files Browse the repository at this point in the history
This change adapts the bucket engine's destroy API to support
the force shutdown.

Change-Id: Ie9d813daefbc02f123567949bb2d81baf57116c4
Reviewed-on: http://review.membase.org/4378
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information
chiyoung authored and alk committed Feb 2, 2011
1 parent 65a868b commit d838bf3
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
47 changes: 41 additions & 6 deletions bucket_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ typedef struct proxied_engine_handle {
TAP_ITERATOR tap_iterator;
/* ON_DISCONNECT handling */
bool wants_disconnects;
/* Force shutdown flag */
bool force_shutdown;
EVENT_CALLBACK cb;
const void *cb_data;
} proxied_engine_handle_t;
Expand Down Expand Up @@ -85,7 +87,8 @@ static const char *get_default_bucket_config(void);

static ENGINE_ERROR_CODE bucket_initialize(ENGINE_HANDLE* handle,
const char* config_str);
static void bucket_destroy(ENGINE_HANDLE* handle);
static void bucket_destroy(ENGINE_HANDLE* handle,
const bool force);
static ENGINE_ERROR_CODE bucket_item_allocate(ENGINE_HANDLE* handle,
const void* cookie,
item **item,
Expand Down Expand Up @@ -364,10 +367,11 @@ ENGINE_ERROR_CODE create_instance(uint64_t interface,

static void *engine_destroyer(void *arg) {
proxied_engine_handle_t *peh = (proxied_engine_handle_t*)arg;

assert(peh);
assert(peh->state == STATE_STOPPING);

peh->pe.v1->destroy(peh->pe.v0);
peh->pe.v1->destroy(peh->pe.v0, peh->force_shutdown);
bucket_engine.upstream_server->stat->release_stats(peh->stats);

int locked = pthread_mutex_lock(&bucket_engine.retention_mutex) == 0;
Expand Down Expand Up @@ -451,6 +455,7 @@ static ENGINE_ERROR_CODE create_bucket(struct bucket_engine *e,
peh->name = strdup(bucket_name);
peh->name_len = strlen(peh->name);
peh->state = STATE_RUNNING;
peh->force_shutdown = false;

ENGINE_ERROR_CODE rv = ENGINE_FAILED;

Expand Down Expand Up @@ -482,7 +487,7 @@ static ENGINE_ERROR_CODE create_bucket(struct bucket_engine *e,
// This was already verified, but we'll check it anyway
assert(peh->pe.v0->interface == 1);
if (peh->pe.v1->initialize(peh->pe.v0, config) != ENGINE_SUCCESS) {
peh->pe.v1->destroy(peh->pe.v0);
peh->pe.v1->destroy(peh->pe.v0, false);
genhash_delete_all(e->engines, bucket_name, strlen(bucket_name));
if (msg) {
snprintf(msg, msglen,
Expand Down Expand Up @@ -632,7 +637,7 @@ static ENGINE_HANDLE *load_engine(const char *soname, const char *config_str,
if (engine->interface == 1) {
ENGINE_HANDLE_V1 *v1 = (ENGINE_HANDLE_V1*)engine;
if (v1->initialize(engine, config_str) != ENGINE_SUCCESS) {
v1->destroy(engine);
v1->destroy(engine, false);
fprintf(stderr, "Failed to initialize instance. Error code: %d\n",
error);
dlclose(handle);
Expand Down Expand Up @@ -771,7 +776,7 @@ static ENGINE_ERROR_CODE bucket_initialize(ENGINE_HANDLE* handle,
}

if (dv1->initialize(se->default_engine.pe.v0, config_str) != ENGINE_SUCCESS) {
dv1->destroy(se->default_engine.pe.v0);
dv1->destroy(se->default_engine.pe.v0, false);
return ENGINE_FAILED;
}
}
Expand All @@ -787,7 +792,9 @@ static ENGINE_ERROR_CODE bucket_initialize(ENGINE_HANDLE* handle,
return ENGINE_SUCCESS;
}

static void bucket_destroy(ENGINE_HANDLE* handle) {
static void bucket_destroy(ENGINE_HANDLE* handle,
const bool force) {
(void)force;
struct bucket_engine* se = get_handle(handle);

if (se->initialized) {
Expand Down Expand Up @@ -1247,13 +1254,41 @@ static ENGINE_ERROR_CODE handle_delete_bucket(ENGINE_HANDLE* handle,

EXTRACT_KEY(breq, keyz);

size_t bodylen = ntohl(breq->message.header.request.bodylen)
- ntohs(breq->message.header.request.keylen);
if (bodylen >= (1 << 16)) {
return ENGINE_DISCONNECT;
}
char config[bodylen + 1];
memcpy(config, ((char*)request) + sizeof(breq->message.header)
+ ntohs(breq->message.header.request.keylen), bodylen);
config[bodylen] = 0x00;

bool force = false;
if (config[0] != 0) {
struct config_item items[2] = {
{.key = "force",
.datatype = DT_BOOL,
.value.dt_bool = &force},
{.key = NULL}
};

if (bucket_get_server_api()->core->parse_config(config, items, stderr) != 0) {
const char *msg = "Invalid config parameters";
response(msg, strlen(msg), "", 0, "", 0, 0,
PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
return ENGINE_SUCCESS;
}
}

bool found = false;
if (pthread_mutex_lock(&e->engines_mutex) == 0) {
proxied_engine_handle_t *peh = genhash_find(e->engines, keyz,
strlen(keyz));
if (peh && peh->state == STATE_RUNNING) {
found = true;
peh->state = STATE_STOPPING;
peh->force_shutdown = force;
release_handle(peh);
}
pthread_mutex_unlock(&e->engines_mutex);
Expand Down
5 changes: 4 additions & 1 deletion management/delete_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@
if __name__ == '__main__':
mc = mc_bin_client.MemcachedClient(sys.argv[1])
mc.sasl_auth_plain(sys.argv[2], sys.argv[3])
mc.bucket_delete(sys.argv[4])
config = "force=false"
if len(sys.argv) == 6:
config = sys.argv[5]
mc.bucket_delete(sys.argv[4], config)
4 changes: 2 additions & 2 deletions management/mc_bin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ def bucket_create(self, name, engine, config=""):
return self._doCmd(memcacheConstants.CMD_CREATE_BUCKET, name,
"\0".join([engine, config]))

def bucket_delete(self, name):
def bucket_delete(self, name, config=""):
"""Delete a bucket."""
return self._doCmd(memcacheConstants.CMD_DELETE_BUCKET, name, '')
return self._doCmd(memcacheConstants.CMD_DELETE_BUCKET, name, config)

def bucket_expand(self, name, new_size):
"""Create a bucket."""
Expand Down
7 changes: 5 additions & 2 deletions mock_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ENGINE_ERROR_CODE create_instance(uint64_t interface,
static const engine_info* mock_get_info(ENGINE_HANDLE* handle);
static ENGINE_ERROR_CODE mock_initialize(ENGINE_HANDLE* handle,
const char* config_str);
static void mock_destroy(ENGINE_HANDLE* handle);
static void mock_destroy(ENGINE_HANDLE* handle,
const bool force);
static ENGINE_ERROR_CODE mock_item_allocate(ENGINE_HANDLE* handle,
const void* cookie,
item **item,
Expand Down Expand Up @@ -284,7 +285,9 @@ static ENGINE_ERROR_CODE mock_initialize(ENGINE_HANDLE* handle,
return ENGINE_SUCCESS;
}

static void mock_destroy(ENGINE_HANDLE* handle) {
static void mock_destroy(ENGINE_HANDLE* handle,
const bool force) {
(void)force;
struct mock_engine* se = get_handle(handle);

if (se->initialized) {
Expand Down
10 changes: 5 additions & 5 deletions testapp.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ static ENGINE_HANDLE *load_engine(const char *soname, const char *config_str) {
if (engine->interface == 1) {
ENGINE_HANDLE_V1 *v1 = (ENGINE_HANDLE_V1*)engine;
if (v1->initialize(engine, config_str) != ENGINE_SUCCESS) {
v1->destroy(engine);
v1->destroy(engine, false);
fprintf(stderr, "Failed to initialize instance. Error code: %d\n",
error);
dlclose(handle);
Expand Down Expand Up @@ -802,12 +802,12 @@ static enum test_result test_delete_bucket(ENGINE_HANDLE *h,
strlen(value), 9258, 3600);
assert(rv == ENGINE_SUCCESS);

pkt = create_packet(DELETE_BUCKET, "someuser", "");
pkt = create_packet(DELETE_BUCKET, "someuser", "force=false");
rv = h1->unknown_command(h, adm_cookie, pkt, add_response);
free(pkt);
assert(rv == ENGINE_SUCCESS);

pkt = create_packet(DELETE_BUCKET, "someuser", "");
pkt = create_packet(DELETE_BUCKET, "someuser", "force=false");
rv = h1->unknown_command(h, adm_cookie, pkt, add_response);
free(pkt);
assert(rv == ENGINE_SUCCESS);
Expand Down Expand Up @@ -892,7 +892,7 @@ static enum test_result test_delete_bucket_concurrent(ENGINE_HANDLE *h,

usleep(1000);

pkt = create_packet(DELETE_BUCKET, "someuser", "");
pkt = create_packet(DELETE_BUCKET, "someuser", "force=false");
rv = h1->unknown_command(h, adm_cookie, pkt, add_response);
free(pkt);
assert(rv == ENGINE_SUCCESS);
Expand Down Expand Up @@ -1337,7 +1337,7 @@ static enum test_result run_test(struct test test) {
disconnect_all_connections(connstructs);
destroy_event_handlers();
connstructs = NULL;
h->destroy((ENGINE_HANDLE*)h);
h->destroy((ENGINE_HANDLE*)h, false);
genhash_free(stats_hash);
#ifndef USE_GCOV
exit((int)ret);
Expand Down

0 comments on commit d838bf3

Please sign in to comment.