Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
@andreiz
397 lines (365 sloc) 12.359 kb
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef DLL_EXPORT
#define USE_STATIC_LIB
#endif
#if defined(__CYGWIN__)
#define USE_IPV6
#endif
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <zookeeper_log.h>
#include <time.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <limits.h>
#include <zoo_lock.h>
#include <stdbool.h>
#ifdef HAVE_SYS_UTSNAME_H
#include <sys/utsname.h>
#endif
#ifdef HAVE_GETPWUID_R
#include <pwd.h>
#endif
#define IF_DEBUG(x) if (logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
ZOOAPI int zkr_lock_init(zkr_lock_mutex_t* mutex, zhandle_t* zh,
char* path, struct ACL_vector *acl) {
mutex->zh = zh;
mutex->path = path;
mutex->acl = acl;
mutex->completion = NULL;
mutex->cbdata = NULL;
mutex->id = NULL;
mutex->ownerid = NULL;
mutex->isOwner = 0;
pthread_mutex_init(&(mutex->pmutex), NULL);
return 0;
}
ZOOAPI int zkr_lock_init_cb(zkr_lock_mutex_t *mutex, zhandle_t* zh,
char *path, struct ACL_vector *acl,
zkr_lock_completion completion, void* cbdata) {
mutex->zh = zh;
mutex->path = path;
mutex->acl = acl;
mutex->completion = completion;
mutex->cbdata = cbdata;
mutex->isOwner = 0;
mutex->ownerid = NULL;
mutex->id = NULL;
pthread_mutex_init(&(mutex->pmutex), NULL);
return 0;
}
/**
* unlock the mutex
*/
ZOOAPI int zkr_lock_unlock(zkr_lock_mutex_t *mutex) {
pthread_mutex_lock(&(mutex->pmutex));
zhandle_t *zh = mutex->zh;
if (mutex->id != NULL) {
int len = strlen(mutex->path) + strlen(mutex->id) + 2;
char buf[len];
sprintf(buf, "%s/%s", mutex->path, mutex->id);
int ret = 0;
int count = 0;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = (.5)*1000000;
ret = ZCONNECTIONLOSS;
while (ret == ZCONNECTIONLOSS && (count < 3)) {
ret = zoo_delete(zh, buf, -1);
if (ret == ZCONNECTIONLOSS) {
LOG_DEBUG(("connectionloss while deleting the node"));
nanosleep(&ts, 0);
count++;
}
}
if (ret == ZOK || ret == ZNONODE) {
zkr_lock_completion completion = mutex->completion;
if (completion != NULL) {
completion(1, mutex->cbdata);
}
free(mutex->id);
mutex->id = NULL;
pthread_mutex_unlock(&(mutex->pmutex));
return 0;
}
LOG_WARN(("not able to connect to server - giving up"));
pthread_mutex_unlock(&(mutex->pmutex));
return ZCONNECTIONLOSS;
}
pthread_mutex_unlock(&(mutex->pmutex));
return ZSYSTEMERROR;
}
static void free_String_vector(struct String_vector *v) {
if (v->data) {
int32_t i;
for (i=0; i<v->count; i++) {
free(v->data[i]);
}
free(v->data);
v->data = 0;
}
}
static int vstrcmp(const void* str1, const void* str2) {
const char **a = (const char**)str1;
const char **b = (const char**) str2;
return strcmp(strrchr(*a, '-')+1, strrchr(*b, '-')+1);
}
static void sort_children(struct String_vector *vector) {
qsort( vector->data, vector->count, sizeof(char*), &vstrcmp);
}
static char* child_floor(char **sorted_data, int len, char *element) {
char* ret = NULL;
int i =0;
for (i=0; i < len; i++) {
if (strcmp(sorted_data[i], element) < 0) {
ret = sorted_data[i];
}
}
return ret;
}
static void lock_watcher_fn(zhandle_t* zh, int type, int state,
const char* path, void *watcherCtx) {
//callback that we registered
//should be called
zkr_lock_lock((zkr_lock_mutex_t*) watcherCtx);
}
/**
* get the last name of the path
*/
static char* getName(char* str) {
char* name = strrchr(str, '/');
if (name == NULL)
return NULL;
return strdup(name + 1);
}
/**
* just a method to retry get children
*/
static int retry_getchildren(zhandle_t *zh, char* path, struct String_vector *vector,
struct timespec *ts, int retry) {
int ret = ZCONNECTIONLOSS;
int count = 0;
while (ret == ZCONNECTIONLOSS && count < retry) {
ret = zoo_get_children(zh, path, 0, vector);
if (ret == ZCONNECTIONLOSS) {
LOG_DEBUG(("connection loss to the server"));
nanosleep(ts, 0);
count++;
}
}
return ret;
}
/** see if our node already exists
* if it does then we dup the name and
* return it
*/
static char* lookupnode(struct String_vector *vector, char *prefix) {
char *ret = NULL;
if (vector->data) {
int i = 0;
for (i = 0; i < vector->count; i++) {
char* child = vector->data[i];
if (strncmp(prefix, child, strlen(prefix)) == 0) {
ret = strdup(child);
break;
}
}
}
return ret;
}
/** retry zoo_wexists
*/
static int retry_zoowexists(zhandle_t *zh, char* path, watcher_fn watcher, void* ctx,
struct Stat *stat, struct timespec *ts, int retry) {
int ret = ZCONNECTIONLOSS;
int count = 0;
while (ret == ZCONNECTIONLOSS && count < retry) {
ret = zoo_wexists(zh, path, watcher, ctx, stat);
if (ret == ZCONNECTIONLOSS) {
LOG_DEBUG(("connectionloss while setting watch on my predecessor"));
nanosleep(ts, 0);
count++;
}
}
return ret;
}
/**
* the main code that does the zookeeper leader
* election. this code creates its own ephemeral
* node on the given path and sees if its the first
* one on the list and claims to be a leader if and only
* if its the first one of children in the paretn path
*/
static int zkr_lock_operation(zkr_lock_mutex_t *mutex, struct timespec *ts) {
zhandle_t *zh = mutex->zh;
char *path = mutex->path;
char *id = mutex->id;
struct Stat stat;
char* owner_id = NULL;
int retry = 3;
do {
const clientid_t *cid = zoo_client_id(zh);
// get the session id
int64_t session = cid->client_id;
char prefix[30];
int ret = 0;
#if defined(__x86_64__)
snprintf(prefix, 30, "x-%016lx-", session);
#else
snprintf(prefix, 30, "x-%016llx-", session);
#endif
struct String_vector vectorst;
vectorst.data = NULL;
vectorst.count = 0;
ret = ZCONNECTIONLOSS;
ret = retry_getchildren(zh, path, &vectorst, ts, retry);
if (ret != ZOK)
return ret;
struct String_vector *vector = &vectorst;
mutex->id = lookupnode(vector, prefix);
free_String_vector(vector);
if (mutex->id == NULL) {
int len = strlen(path) + strlen(prefix) + 2;
char buf[len];
char retbuf[len+20];
snprintf(buf, len, "%s/%s", path, prefix);
ret = ZCONNECTIONLOSS;
ret = zoo_create(zh, buf, NULL, 0, mutex->acl,
ZOO_EPHEMERAL|ZOO_SEQUENCE, retbuf, (len+20));
// do not want to retry the create since
// we would end up creating more than one child
if (ret != ZOK) {
LOG_WARN(("could not create zoo node %s", buf));
return ret;
}
mutex->id = getName(retbuf);
}
if (mutex->id != NULL) {
ret = ZCONNECTIONLOSS;
ret = retry_getchildren(zh, path, vector, ts, retry);
if (ret != ZOK) {
LOG_WARN(("could not connect to server"));
return ret;
}
//sort this list
sort_children(vector);
owner_id = vector->data[0];
mutex->ownerid = strdup(owner_id);
id = mutex->id;
char* lessthanme = child_floor(vector->data, vector->count, id);
if (lessthanme != NULL) {
int flen = strlen(mutex->path) + strlen(lessthanme) + 2;
char last_child[flen];
sprintf(last_child, "%s/%s",mutex->path, lessthanme);
ret = ZCONNECTIONLOSS;
ret = retry_zoowexists(zh, last_child, &lock_watcher_fn, mutex,
&stat, ts, retry);
// cannot watch my predecessor i am giving up
// we need to be able to watch the predecessor
// since if we do not become a leader the others
// will keep waiting
if (ret != ZOK) {
free_String_vector(vector);
LOG_WARN(("unable to watch my predecessor"));
ret = zkr_lock_unlock(mutex);
while (ret == 0) {
//we have to give up our leadership
// since we cannot watch out predecessor
ret = zkr_lock_unlock(mutex);
}
return ret;
}
// we are not the owner of the lock
mutex->isOwner = 0;
}
else {
// this is the case when we are the owner
// of the lock
if (strcmp(mutex->id, owner_id) == 0) {
LOG_DEBUG(("got the zoo lock owner - %s", mutex->id));
mutex->isOwner = 1;
if (mutex->completion != NULL) {
mutex->completion(0, mutex->cbdata);
}
return ZOK;
}
}
free_String_vector(vector);
return ZOK;
}
} while (mutex->id == NULL);
return ZOK;
}
ZOOAPI int zkr_lock_lock(zkr_lock_mutex_t *mutex) {
pthread_mutex_lock(&(mutex->pmutex));
zhandle_t *zh = mutex->zh;
char *path = mutex->path;
struct Stat stat;
int exists = zoo_exists(zh, path, 0, &stat);
int count = 0;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = (.5)*1000000;
// retry to see if the path exists and
// and create if the path does not exist
while ((exists == ZCONNECTIONLOSS || exists == ZNONODE) && (count <4)) {
count++;
// retry the operation
if (exists == ZCONNECTIONLOSS)
exists = zoo_exists(zh, path, 0, &stat);
else if (exists == ZNONODE)
exists = zoo_create(zh, path, NULL, 0, mutex->acl, 0, NULL, 0);
nanosleep(&ts, 0);
}
// need to check if we cannot still access the server
int check_retry = ZCONNECTIONLOSS;
count = 0;
while (check_retry != ZOK && count <4) {
check_retry = zkr_lock_operation(mutex, &ts);
if (check_retry != ZOK) {
nanosleep(&ts, 0);
count++;
}
}
pthread_mutex_unlock(&(mutex->pmutex));
return zkr_lock_isowner(mutex);
}
ZOOAPI char* zkr_lock_getpath(zkr_lock_mutex_t *mutex) {
return mutex->path;
}
ZOOAPI int zkr_lock_isowner(zkr_lock_mutex_t *mutex) {
return (mutex->id != NULL && mutex->ownerid != NULL
&& (strcmp(mutex->id, mutex->ownerid) == 0));
}
ZOOAPI char* zkr_lock_getid(zkr_lock_mutex_t *mutex) {
return mutex->ownerid;
}
ZOOAPI int zkr_lock_destroy(zkr_lock_mutex_t* mutex) {
if (mutex->id)
free(mutex->id);
mutex->path = NULL;
mutex->acl = NULL;
mutex->completion = NULL;
pthread_mutex_destroy(&(mutex->pmutex));
mutex->isOwner = 0;
if (mutex->ownerid)
free(mutex->ownerid);
return 0;
}
Jump to Line
Something went wrong with that request. Please try again.