Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'wickman'

Conflicts:
	CHANGELOG
	Manifest
	README
	Rakefile
	ext/zookeeper_c.c
	lib/zookeeper.rb
	test/test_basic.rb
  • Loading branch information...
commit 4ad2371a2f61e75a128f0944c10f66e6e3b50ff5 2 parents dcc47bd + 60249ff
Evan Weaver authored
View
3  CHANGELOG
@@ -1,4 +1,6 @@
+v0.3.0. Wickman's rewrite.
+
v0.2.2. Fix compatibility with stock Leopard fat-binary Ruby.
v0.2.1. No more camelcase classname.
@@ -6,3 +8,4 @@ v0.2.1. No more camelcase classname.
v0.2. Bundle C dependencies, like memcached.gem.
v0.1. First release.
+
View
14 Manifest
@@ -3,8 +3,20 @@ LICENSE
Manifest
README
Rakefile
+examples/cloud_config.rb
ext/extconf.rb
-ext/zkc-3.2.2.tar.gz
+ext/zkc-3.3.1.tar.gz
ext/zookeeper_c.c
+ext/zookeeper_lib.c
+ext/zookeeper_lib.h
lib/zookeeper.rb
+lib/zookeeper/acls.rb
+lib/zookeeper/callbacks.rb
+lib/zookeeper/constants.rb
+lib/zookeeper/exceptions.rb
+lib/zookeeper/stat.rb
test/test_basic.rb
+test/test_callback1.rb
+test/test_esoteric.rb
+test/test_watcher1.rb
+test/test_watcher2.rb
View
30 README
@@ -4,7 +4,14 @@ An interface to the Zookeeper distributed configuration server.
== License
+<<<<<<< HEAD
Copyright 2008 Phillip Pearson, and 2010 Twitter, Inc. Licensed under the MIT License. See the included LICENSE file. Portions copyright 2008-2010 the Apache Software Foundation, licensed under the Apache 2 license, and used with permission.
+=======
+Copyright 2008 Phillip Pearson, and 2010 Twitter, Inc. Licensed under the
+MIT License. See the included LICENSE file. Portions copyright 2008-2010
+the Apache Software Foundation, licensed under the Apache 2 license, and
+used with permission.
+>>>>>>> wickman
== Install
@@ -17,6 +24,7 @@ Connect to a server:
require 'rubygems'
require 'zookeeper'
z = Zookeeper.new("localhost:2181")
+<<<<<<< HEAD
Create, set and read nodes:
@@ -51,3 +59,25 @@ Acquire locks:
z.try_acquire "/parent/lock-", "content for the lock file" do |have_lock|
puts have_lock ? "we have the lock" : "we don't have the lock"
end
+=======
+ z.get_children(:path => "/")
+
+== Idioms
+
+ The following methods are initially supported:
+ get
+ set
+ get_children
+ stat
+ create
+ delete
+ get_acl
+ set_acl
+
+ All support async callbacks. get, get_children and stat support both
+ watchers and callbacks.
+
+ Calls take a dictionary of parameters. With the exception of set_acl, the
+ only required parameter is :path. Each call returns a dictionary with at
+ minimum two keys :req_id and :rc.
+>>>>>>> wickman
View
2  Rakefile
@@ -1,7 +1,7 @@
require 'echoe'
Echoe.new("zookeeper") do |p|
- p.author = "Phillip Pearson, Eric Maland, Evan Weaver"
+ p.author = "Phillip Pearson, Eric Maland, Evan Weaver, Brian Wickman"
p.project = "fauna"
p.summary = "An interface to the Zookeeper distributed configuration server."
p.url = "http://blog.evanweaver.com/files/doc/fauna/zookeeper/"
View
125 examples/cloud_config.rb
@@ -0,0 +1,125 @@
+require "rubygems"
+require "zookeeper"
+
+# A basic cloud-based YAML config library. Ruby Zookeeper client example.
+#
+# If you pass in a file as 'zk:/foo.yml/blah' it will go out to zookeeper.
+# Otherwise the file is assumed to be local. The yml file will get parsed
+# and cached locally, and keys after the .yml get interpreted as keys into
+# the YAML.
+#
+# e.g. get(zk:/config/service.yml/key1/key2/key3..) =>
+# zk.get(:path => /config/service.yml)
+# yaml <= YAML.parse(data)
+# yaml[key1][key2][key3]...
+#
+# If keys are unspecified, it returns the parsed YAML as one big object
+#
+# TODO if staleness is set to 0, read in YAML immediately before next
+# get(...)
+
+class CloudConfig
+ class NodeNotFound < StandardError; end
+ class BadPathError < StandardError; end
+
+ DEFAULT_SERVERS = "localhost:2181"
+
+ def initialize(zkservers = DEFAULT_SERVERS, staleness = 15) # maximum allowed staleness in seconds
+ @staleness = staleness
+ @lock = Mutex.new
+ @zkservers = DEFAULT_SERVERS
+
+ # cache
+ @data = {}
+ @zkcb = Zookeeper::WatcherCallback.new { dirty_callback(@zkcb.context) }
+ @zk = nil
+ end
+
+ def get(node)
+ filename, keys = extract_filename(node)
+
+ # read(filename) is potentially a zk call, so do not hold the lock during the read
+ if @lock.synchronize { !@data.has_key?(filename) }
+ d = YAML.load(read(filename))
+ @lock.synchronize { @data[filename] = d }
+ end
+
+ # synchronized b/c we potentially have a background thread updating data nodes from zk
+ # if keys is empty, return the whole file, otherwise roll up the keys
+ @lock.synchronize {
+ keys.empty? ? @data[filename] : keys.inject(@data[filename]) { |hash, key| hash[key] }
+ }
+ end
+
+ # todo:
+ # factor get-and-watch into a different subsystem (so you can have
+ # polling stat() ops on local filesystem.)
+ def read(yaml)
+ # read yaml file and register watcher. if watcher fires, set up
+ # background thread to do read and update data.
+ if yaml.match(/^zk:/)
+ @zk ||= init_zk
+ yaml = yaml['zk:'.length..-1] # strip off zk: from zk:/config/path.yml
+ resp = get_and_register(yaml)
+
+ if resp[:rc] != Zookeeper::ZOK
+ @zk.unregister_watcher(resp[:req_id])
+ raise NodeNotFound
+ end
+
+ resp[:data]
+ else
+ raise NodeNotFound unless File.exists?(yaml)
+ File.read(yaml)
+ end
+ end
+
+ def extract_filename(node)
+ path_elements = node.split("/")
+
+ yamlindex = path_elements.map{ |x| x.match("\.yml$") != nil }.index(true)
+ raise BadPathError unless yamlindex
+
+ yamlname = path_elements[0..yamlindex].join '/'
+ yamlkeys = path_elements[(yamlindex+1)..-1]
+
+ return yamlname, yamlkeys
+ end
+
+ private
+ def init_zk
+ Zookeeper.new(@zkservers)
+ end
+
+ def get_and_register(znode)
+ @zk.get(:path => znode, :watcher => @zkcb,
+ :watcher_context => { :path => znode,
+ :wait => rand(@staleness) })
+ end
+
+ def dirty_callback(context)
+ path = context[:path]
+ wait = context[:wait]
+
+ # Fire off a background update that waits a randomized period of time up
+ # to @staleness seconds.
+ Thread.new do
+ sleep wait
+ background_update(path)
+ end
+ end
+
+ def background_update(zkpath)
+ # do a synchronous get/register a new watcher
+ resp = get_and_register(zkpath)
+ if resp[:rc] != Zookeeper::ZOK
+ # puts "Unable to read #{zkpath} from Zookeeper!" @logger.error
+ zk.unregister_watcher(resp[:req_id])
+ else
+ # puts "Updating data."
+ d = YAML.load(resp[:data])
+ @lock.synchronize { @data["zk:#{zkpath}"] = d }
+ end
+ end
+end
+
View
BIN  ext/zkc-3.2.2.tar.gz
Binary file not shown
View
BIN  ext/zkc-3.3.1.tar.gz
Binary file not shown
View
731 ext/zookeeper_c.c
@@ -1,83 +1,79 @@
/* Ruby wrapper for the Zookeeper C API
* Phillip Pearson <pp@myelin.co.nz>
* Eric Maland <eric@twitter.com>
+ * Brian Wickman <wickman@twitter.com>
+ *
+ * This fork is a 90% rewrite of the original. It takes a more evented
+ * approach to isolate the ZK state machine from the ruby interpreter via an
+ * event queue. It's similar to the ZookeeperFFI version except that it
+ * actually works on MRI 1.8.
*/
#define THREADED
#include "ruby.h"
-
#include "c-client-src/zookeeper.h"
#include <errno.h>
#include <stdio.h>
+#include <stdlib.h>
+
+#include "zookeeper_lib.h"
static VALUE Zookeeper = Qnil;
-static VALUE eNoNode = Qnil;
-static VALUE eBadVersion = Qnil;
-struct zk_rb_data {
- zhandle_t *zh;
- clientid_t myid;
+struct zkrb_instance_data {
+ zhandle_t *zh;
+ clientid_t myid;
+ zkrb_queue_t *queue;
};
-static void watcher(zhandle_t *zh, int type, int state, const char *path, void *ctx) {
- VALUE self, watcher_id;
- (void)ctx;
- return; // watchers don't work in ruby yet
-
- self = (VALUE)zoo_get_context(zh);;
- watcher_id = rb_intern("watcher");
-
- fprintf(stderr,"C watcher %d state = %d for %s.\n", type, state, (path ? path: "null"));
- rb_funcall(self, watcher_id, 3, INT2FIX(type), INT2FIX(state), rb_str_new2(path));
-}
-
-#warning [emaland] incomplete - but easier to read!
-static void check_errors(int rc) {
- switch (rc) {
- case ZOK:
- /* all good! */
- break;
- case ZNONODE:
- rb_raise(eNoNode, "the node does not exist");
- break;
- case ZBADVERSION:
- rb_raise(eBadVersion, "expected version does not match actual version");
- break;
- default:
- rb_raise(rb_eRuntimeError, "unknown error returned from zookeeper: %d (%s)", rc, zerror(rc));
+typedef enum {
+ SYNC = 0,
+ ASYNC = 1,
+ SYNC_WATCH = 2,
+ ASYNC_WATCH = 3
+} zkrb_call_type;
+
+#define IS_SYNC(zkrbcall) ((zkrbcall)==SYNC || (zkrbcall)==SYNC_WATCH)
+#define IS_ASYNC(zkrbcall) ((zkrbcall)==ASYNC || (zkrbcall)==ASYNC_WATCH)
+
+static void free_zkrb_instance_data(struct zkrb_instance_data* ptr) {
+#warning [wickman] TODO: free queue
+#warning [wickman] TODO: fire off warning if queue is not empty
+ if (ptr->zh && zoo_state(ptr->zh) == ZOO_CONNECTED_STATE) {
+ zookeeper_close(ptr->zh);
}
}
-static void free_zk_rb_data(struct zk_rb_data* ptr) {
- zookeeper_close(ptr->zh);
-}
-
-static VALUE array_from_stat(const struct Stat* stat) {
- return rb_ary_new3(8,
- LL2NUM(stat->czxid),
- LL2NUM(stat->mzxid),
- LL2NUM(stat->ctime),
- LL2NUM(stat->mtime),
- INT2NUM(stat->version),
- INT2NUM(stat->cversion),
- INT2NUM(stat->aversion),
- LL2NUM(stat->ephemeralOwner));
-}
-
static VALUE method_initialize(VALUE self, VALUE hostPort) {
- VALUE data;
- struct zk_rb_data* zk = NULL;
-
Check_Type(hostPort, T_STRING);
- data = Data_Make_Struct(Zookeeper, struct zk_rb_data, 0, free_zk_rb_data, zk);
-
+ VALUE data;
+ struct zkrb_instance_data *zk_local_ctx = NULL;
+ data = Data_Make_Struct(Zookeeper,
+ struct zkrb_instance_data,
+ 0,
+ free_zkrb_instance_data,
+ zk_local_ctx);
+ zk_local_ctx->queue = zkrb_queue_alloc();
+
zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);
zoo_deterministic_conn_order(0);
-
- zk->zh = zookeeper_init(RSTRING(hostPort)->ptr, watcher, 10000, &zk->myid, (void*)self, 0);
- if (!zk->zh) {
+
+ zkrb_calling_context *ctx =
+ zkrb_calling_context_alloc(ZKRB_GLOBAL_REQ, zk_local_ctx->queue);
+
+ zk_local_ctx->zh =
+ zookeeper_init(
+ RSTRING(hostPort)->ptr,
+ zkrb_state_callback,
+ 10000,
+ &zk_local_ctx->myid,
+ ctx,
+ 0);
+
+#warning [wickman] TODO handle this properly on the Ruby side rather than C side
+ if (!zk_local_ctx->zh) {
rb_raise(rb_eRuntimeError, "error connecting to zookeeper: %d", errno);
}
@@ -87,194 +83,298 @@ static VALUE method_initialize(VALUE self, VALUE hostPort) {
}
#define FETCH_DATA_PTR(x, y) \
- struct zk_rb_data * y; \
- Data_Get_Struct(rb_iv_get(x, "@data"), struct zk_rb_data, y)
-
-static VALUE method_get_children(VALUE self, VALUE path) {
+ struct zkrb_instance_data * y; \
+ Data_Get_Struct(rb_iv_get(x, "@data"), struct zkrb_instance_data, y)
+
+#define STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, cb_ctx, w_ctx, call_type) \
+ if (TYPE(reqid) != T_FIXNUM && TYPE(reqid) != T_BIGNUM) { \
+ rb_raise(rb_eTypeError, "reqid must be Fixnum/Bignum"); \
+ return Qnil; \
+ } \
+ Check_Type(path, T_STRING); \
+ struct zkrb_instance_data * zk; \
+ Data_Get_Struct(rb_iv_get(self, "@data"), struct zkrb_instance_data, zk); \
+ zkrb_calling_context* cb_ctx = \
+ (async != Qfalse && async != Qnil) ? \
+ zkrb_calling_context_alloc(NUM2LL(reqid), zk->queue) : \
+ NULL; \
+ zkrb_calling_context* w_ctx = \
+ (watch != Qfalse && watch != Qnil) ? \
+ zkrb_calling_context_alloc(NUM2LL(reqid), zk->queue) : \
+ NULL; \
+ int a = (async != Qfalse && async != Qnil); \
+ int w = (watch != Qfalse && watch != Qnil); \
+ zkrb_call_type call_type; \
+ if (a) { if (w) { call_type = ASYNC_WATCH; } else { call_type = ASYNC; } } \
+ else { if (w) { call_type = SYNC_WATCH; } else { call_type = SYNC; } }
+
+static VALUE method_get_children(VALUE self, VALUE reqid, VALUE path, VALUE async, VALUE watch) {
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
+
struct String_vector strings;
- int i;
- VALUE output;
-
- Check_Type(path, T_STRING);
- FETCH_DATA_PTR(self, zk);
-
- check_errors(zoo_get_children(zk->zh, RSTRING(path)->ptr, 0, &strings));
+ struct Stat stat;
+
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_get_children2(zk->zh, RSTRING(path)->ptr, 0, &strings, &stat);
+ break;
+
+ case SYNC_WATCH:
+ rc = zoo_wget_children2(zk->zh, RSTRING(path)->ptr, zkrb_state_callback, watch_ctx, &strings, &stat);
+ break;
+
+ case ASYNC:
+ rc = zoo_aget_children2(zk->zh, RSTRING(path)->ptr, 0, zkrb_strings_stat_callback, data_ctx);
+ break;
+
+ case ASYNC_WATCH:
+ rc = zoo_awget_children2(zk->zh, RSTRING(path)->ptr, zkrb_state_callback, watch_ctx, zkrb_strings_stat_callback, data_ctx);
+ break;
+ }
- output = rb_ary_new();
- for (i = 0; i < strings.count; ++i) {
- rb_ary_push(output, rb_str_new2(strings.data[i]));
+ VALUE output = rb_ary_new();
+ rb_ary_push(output, INT2FIX(rc));
+ if (IS_SYNC(call_type) && rc == ZOK) {
+ rb_ary_push(output, zkrb_string_vector_to_ruby(&strings));
+ rb_ary_push(output, zkrb_stat_to_rarray(&stat));
}
return output;
}
-static VALUE method_exists(VALUE self, VALUE path, VALUE watch) {
+static VALUE method_exists(VALUE self, VALUE reqid, VALUE path, VALUE async, VALUE watch) {
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
+
struct Stat stat;
- Check_Type(path, T_STRING);
- FETCH_DATA_PTR(self, zk);
-
- check_errors(zoo_exists(zk->zh, RSTRING(path)->ptr, (watch != Qfalse && watch != Qnil), &stat));
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_exists(zk->zh, RSTRING(path)->ptr, 0, &stat);
+ break;
+
+ case SYNC_WATCH:
+ rc = zoo_wexists(zk->zh, RSTRING(path)->ptr, zkrb_state_callback, watch_ctx, &stat);
+ break;
+
+ case ASYNC:
+ rc = zoo_aexists(zk->zh, RSTRING(path)->ptr, 0, zkrb_stat_callback, data_ctx);
+ break;
+
+ case ASYNC_WATCH:
+ rc = zoo_awexists(zk->zh, RSTRING(path)->ptr, zkrb_state_callback, watch_ctx, zkrb_stat_callback, data_ctx);
+ break;
+ }
- return array_from_stat(&stat);
+ VALUE output = rb_ary_new();
+ rb_ary_push(output, INT2FIX(rc));
+ if (IS_SYNC(call_type) && rc == ZOK) {
+ rb_ary_push(output, zkrb_stat_to_rarray(&stat));
+ }
+ return output;
}
-static VALUE method_create(VALUE self, VALUE path, VALUE value, VALUE flags) {
- char realpath[10240];
-
- Check_Type(path, T_STRING);
- Check_Type(value, T_STRING);
+static VALUE method_create(VALUE self, VALUE reqid, VALUE path, VALUE data, VALUE async, VALUE acls, VALUE flags) {
+ VALUE watch = Qfalse;
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
+
+ struct Stat stat;
+ if (data != Qnil) Check_Type(data, T_STRING);
Check_Type(flags, T_FIXNUM);
+ const char *data_ptr = (data == Qnil) ? NULL : RSTRING(data)->ptr;
+ size_t data_len = (data == Qnil) ? -1 : RSTRING(data)->len;
+
+ struct ACL_vector *aclptr = NULL;
+ if (acls != Qnil) { aclptr = zkrb_ruby_to_aclvector(acls); }
+ char realpath[16384];
+
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_create(zk->zh, RSTRING(path)->ptr, data_ptr, data_len, aclptr, FIX2INT(flags), realpath, sizeof(realpath));
+ if (aclptr != NULL) deallocate_ACL_vector(aclptr);
+ break;
+ case ASYNC:
+ rc = zoo_acreate(zk->zh, RSTRING(path)->ptr, data_ptr, data_len, aclptr, FIX2INT(flags), zkrb_string_callback, data_ctx);
+ if (aclptr != NULL) deallocate_ACL_vector(aclptr);
+ break;
+ default:
+ /* TODO(wickman) raise proper argument error */
+ return Qnil;
+ break;
+ }
- FETCH_DATA_PTR(self, zk);
-
- check_errors(zoo_create(zk->zh, RSTRING(path)->ptr,
- RSTRING(value)->ptr, RSTRING(value)->len,
- &ZOO_OPEN_ACL_UNSAFE, FIX2INT(flags),
- realpath, sizeof(realpath)));
-
- return rb_str_new2(realpath);
+ VALUE output = rb_ary_new();
+ rb_ary_push(output, INT2FIX(rc));
+ if (IS_SYNC(call_type) && rc == ZOK) {
+ return rb_ary_push(output, rb_str_new2(realpath));
+ }
+ return output;
}
-static VALUE method_delete(VALUE self, VALUE path, VALUE version) {
- Check_Type(path, T_STRING);
+static VALUE method_delete(VALUE self, VALUE reqid, VALUE path, VALUE version, VALUE async) {
+ VALUE watch = Qfalse;
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
Check_Type(version, T_FIXNUM);
-
- FETCH_DATA_PTR(self, zk);
-
- check_errors(zoo_delete(zk->zh, RSTRING(path)->ptr, FIX2INT(version)));
-
- return Qtrue;
-}
-
-static VALUE method_get(VALUE self, VALUE path) {
- char data[1024];
- int data_len = sizeof(data);
-
- struct Stat stat;
- memset(data, 0, sizeof(data));
-
- Check_Type(path, T_STRING);
- FETCH_DATA_PTR(self, zk);
- check_errors(zoo_get(zk->zh, RSTRING(path)->ptr, 0, data, &data_len, &stat));
+ int rc = 0;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_delete(zk->zh, RSTRING(path)->ptr, FIX2INT(version));
+ break;
+ case ASYNC:
+ rc = zoo_adelete(zk->zh, RSTRING(path)->ptr, FIX2INT(version), zkrb_void_callback, data_ctx);
+ break;
+ default:
+ /* TODO(wickman) raise proper argument error */
+ return Qnil;
+ break;
+ }
- return rb_ary_new3(2,
- rb_str_new(data, data_len),
- array_from_stat(&stat));
+ return INT2FIX(rc);
}
-static VALUE method_set(int argc, VALUE* argv, VALUE self)
-{
- VALUE v_path, v_data, v_version;
- int real_version = -1;
-
- FETCH_DATA_PTR(self, zk);
-
- rb_scan_args(argc, argv, "21", &v_path, &v_data, &v_version);
-
- Check_Type(v_path, T_STRING);
- Check_Type(v_data, T_STRING);
- Check_Type(v_version, T_FIXNUM);
- if(!NIL_P(v_version))
- real_version = FIX2INT(v_version);
+#define MAX_ZNODE_SIZE 1048576
- check_errors(zoo_set(zk->zh,
- RSTRING(v_path)->ptr,
- RSTRING(v_data)->ptr, RSTRING(v_data)->len,
- FIX2INT(v_version)));
+static VALUE method_get(VALUE self, VALUE reqid, VALUE path, VALUE async, VALUE watch) {
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
- return Qtrue;
-}
+ /* ugh */
+ char * data = malloc(MAX_ZNODE_SIZE);
+ int data_len = MAX_ZNODE_SIZE;
+ struct Stat stat;
-static void void_completion_callback(int rc, const void *data) {
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_get(zk->zh, RSTRING(path)->ptr, 0, data, &data_len, &stat);
+ break;
+
+ case SYNC_WATCH:
+ rc = zoo_wget(zk->zh, RSTRING(path)->ptr, zkrb_state_callback, watch_ctx, data, &data_len, &stat);
+ break;
+
+ case ASYNC:
+ rc = zoo_aget(zk->zh, RSTRING(path)->ptr, 0, zkrb_data_callback, data_ctx);
+ break;
+
+ case ASYNC_WATCH:
+ rc = zoo_awget(zk->zh, RSTRING(path)->ptr, zkrb_state_callback, watch_ctx, zkrb_data_callback, data_ctx);
+ break;
+ }
+ VALUE output = rb_ary_new();
+ rb_ary_push(output, INT2FIX(rc));
+ if (IS_SYNC(call_type) && rc == ZOK) {
+ rb_ary_push(output, rb_str_new(data, data_len));
+ rb_ary_push(output, zkrb_stat_to_rarray(&stat));
+ }
+ free(data);
+ return output;
}
-static void string_completion_callback(int rc, const char *value, const void *data) {
+static VALUE method_set(VALUE self, VALUE reqid, VALUE path, VALUE data, VALUE async, VALUE version) {
+ VALUE watch = Qfalse;
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
-}
-
-#warning [emaland] to be implemented
-static VALUE method_set2(int argc, VALUE *argv, VALUE self) {
- // ZOOAPI int zoo_set2(zhandle_t *zh, const char *path, const char *buffer,
- // int buflen, int version, struct Stat *stat);
- return Qnil;
+ struct Stat stat;
+ if (data != Qnil) Check_Type(data, T_STRING);
+ const char *data_ptr = (data == Qnil) ? NULL : RSTRING(data)->ptr;
+ size_t data_len = (data == Qnil) ? -1 : RSTRING(data)->len;
+
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_set2(zk->zh, RSTRING(path)->ptr, data_ptr, data_len, FIX2INT(version), &stat);
+ break;
+ case ASYNC:
+ rc = zoo_aset(zk->zh, RSTRING(path)->ptr, data_ptr, data_len, FIX2INT(version),
+ zkrb_stat_callback, data_ctx);
+ break;
+ default:
+ /* TODO(wickman) raise proper argument error */
+ return Qnil;
+ break;
+ }
+ VALUE output = rb_ary_new();
+ rb_ary_push(output, INT2FIX(rc));
+ if (IS_SYNC(call_type) && rc == ZOK) {
+ rb_ary_push(output, zkrb_stat_to_rarray(&stat));
+ }
+ return output;
}
-static VALUE method_set_acl(int argc, VALUE* argv, VALUE self) {
-/* STUB */
-/* VALUE v_path, v_data, v_version; */
-/* struct zk_rb_data* zk; */
-/* int real_version = -1; */
+static VALUE method_set_acl(VALUE self, VALUE reqid, VALUE path, VALUE acls, VALUE async, VALUE version) {
+ VALUE watch = Qfalse;
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
+ struct ACL_vector * aclptr = zkrb_ruby_to_aclvector(acls);
+
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_set_acl(zk->zh, RSTRING(path)->ptr, FIX2INT(version), aclptr);
+ deallocate_ACL_vector(aclptr);
+ break;
+ case ASYNC:
+ rc = zoo_aset_acl(zk->zh, RSTRING(path)->ptr, FIX2INT(version), aclptr, zkrb_void_callback, data_ctx);
+ deallocate_ACL_vector(aclptr);
+ break;
+ default:
+ /* TODO(wickman) raise proper argument error */
+ return Qnil;
+ break;
+ }
-/* rb_scan_args(argc, argv, "21", &v_path, &v_data, &v_version); */
-
-/* Check_Type(v_path, T_STRING); */
-/* Check_Type(v_data, T_STRING); */
-/* Check_Type(v_version, T_FIXNUM); */
+ return INT2FIX(rc);
+}
-/* if(!NIL_P(v_version)) */
-/* real_version = FIX2INT(v_version); */
+static VALUE method_get_acl(VALUE self, VALUE reqid, VALUE path, VALUE async) {
+ VALUE watch = Qfalse;
+ STANDARD_PREAMBLE(self, zk, reqid, path, async, watch, data_ctx, watch_ctx, call_type);
-/* Data_Get_Struct(rb_iv_get(self, "@data"), struct zk_rb_data, zk); */
+ struct ACL_vector acls;
+ struct Stat stat;
-/* check_errors(zoo_set(zk->zh, RSTRING(v_path)->ptr, */
-/* RSTRING(v_data)->ptr, RSTRING(v_data)->len, */
-/* FIX2INT(v_version))); */
+ int rc;
+ switch (call_type) {
+ case SYNC:
+ rc = zoo_get_acl(zk->zh, RSTRING(path)->ptr, &acls, &stat);
+ break;
+ case ASYNC:
+ rc = zoo_aget_acl(zk->zh, RSTRING(path)->ptr, zkrb_acl_callback, data_ctx);
+ break;
+ default:
+ /* TODO(wickman) raise proper argument error */
+ return Qnil;
+ break;
+ }
- return Qnil;
+ // do we need to deallocate the strings in the acl vector????
+ VALUE output = rb_ary_new();
+ rb_ary_push(output, INT2FIX(rc));
+ if (IS_SYNC(call_type) && rc == ZOK) {
+ rb_ary_push(output, zkrb_acl_vector_to_ruby(&acls));
+ rb_ary_push(output, zkrb_stat_to_rarray(&stat));
+ }
+ return output;
}
-/*
- PARAMETERS:
- zh: the zookeeper handle obtained by a call to zookeeper.init
- scheme: the id of authentication scheme. Natively supported:
- 'digest' password-based authentication
- cert: application credentials. The actual value depends on the scheme.
- completion: the routine to invoke when the request completes. One of
- the following result codes may be passed into the completion callback:
- OK operation completed successfully
- AUTHFAILED authentication failed
-
- RETURNS:
- OK on success or one of the following errcodes on failure:
- BADARGUMENTS - invalid input parameters
- INVALIDSTATE - zhandle state is either SESSION_EXPIRED_STATE or AUTH_FAI
-LED_STATE
- MARSHALLINGERROR - failed to marshall a request; possibly, out of memory
- SYSTEMERROR - a system error occured
-*/
-#warning [emaland] make these magically synchronous for now?
-static VALUE method_add_auth(VALUE self, VALUE scheme,
- VALUE cert, VALUE completion,
- VALUE completion_data) {
- struct zk_rb_data* zk;
- Data_Get_Struct(rb_iv_get(self, "@data"), struct zk_rb_data, zk);
-
- Check_Type(scheme, T_STRING);
- Check_Type(cert, T_STRING);
- // Check_Type(completion, T_OBJECT); // ???
+static VALUE method_get_next_event(VALUE self) {
+ FETCH_DATA_PTR(self, zk);
- check_errors(zoo_add_auth(zk->zh, RSTRING(scheme)->ptr,
- RSTRING(cert)->ptr, RSTRING(cert)->len,
- void_completion_callback, DATA_PTR(completion_data)));
- return Qtrue;
-}
-
-static VALUE method_async(VALUE self, VALUE path,
- VALUE completion, VALUE completion_data) {
- struct zk_rb_data* zk;
- Data_Get_Struct(rb_iv_get(self, "@data"), struct zk_rb_data, zk);
+ zkrb_event_t *event = zkrb_dequeue(zk->queue);
+ if (event == NULL) return Qnil;
- Check_Type(path, T_STRING);
- // Check_Type(completion, T_OBJECT); // ???
-
- check_errors(zoo_async(zk->zh, RSTRING(path)->ptr,
- string_completion_callback, DATA_PTR(completion_data)));
+ VALUE hash = zkrb_event_to_ruby(event);
+ zkrb_event_free(event);
+ return hash;
+}
- return Qtrue;
+static VALUE method_has_events(VALUE self) {
+ FETCH_DATA_PTR(self, zk);
+ return zkrb_peek(zk->queue) != NULL ? Qtrue : Qfalse;
}
static VALUE method_client_id(VALUE self) {
@@ -285,8 +385,8 @@ static VALUE method_client_id(VALUE self) {
static VALUE method_close(VALUE self) {
FETCH_DATA_PTR(self, zk);
- check_errors(zookeeper_close(zk->zh));
- return Qtrue;
+ int rc = zookeeper_close(zk->zh);
+ return INT2FIX(rc);
}
static VALUE method_deterministic_conn_order(VALUE self, VALUE yn) {
@@ -294,83 +394,14 @@ static VALUE method_deterministic_conn_order(VALUE self, VALUE yn) {
return Qnil;
}
-static VALUE id_to_ruby(struct Id *id) {
- VALUE hash = rb_hash_new();
- rb_hash_aset(hash, rb_str_new2("scheme"), rb_str_new2(id->scheme));
- rb_hash_aset(hash, rb_str_new2("id"), rb_str_new2(id->id));
- return hash;
-}
-
-static VALUE acl_to_ruby(struct ACL *acl) {
- VALUE hash = rb_hash_new();
- rb_hash_aset(hash, rb_str_new2("perms"), INT2NUM(acl->perms));
- rb_hash_aset(hash, rb_str_new2("id"), id_to_ruby(&(acl->id)));
- return hash;
-}
-
-static VALUE acl_vector_to_ruby(struct ACL_vector *acl_vector) {
- int i = 0;
- VALUE ary = rb_ary_new();
- for(i = 0; i < acl_vector->count; i++) {
- rb_ary_push(ary, acl_to_ruby(acl_vector->data+i));
- }
- return ary;
-}
-
-/*
- struct Stat {
- int64_t czxid;
- int64_t mzxid;
- int64_t ctime;
- int64_t mtime;
- int32_t version;
- int32_t cversion;
- int32_t aversion;
- int64_t ephemeralOwner;
- int32_t dataLength;
- int32_t numChildren;
- int64_t pzxid;
- }
-}
-*/
-static VALUE stat_to_ruby(struct Stat *stat) {
- VALUE hash = rb_hash_new();
- rb_hash_aset(hash, rb_str_new2("czxid"), UINT2NUM(stat->czxid));
- rb_hash_aset(hash, rb_str_new2("mzxid"), UINT2NUM(stat->mzxid));
- rb_hash_aset(hash, rb_str_new2("ctime"), UINT2NUM(stat->ctime));
- rb_hash_aset(hash, rb_str_new2("mtime"), UINT2NUM(stat->mtime));
- rb_hash_aset(hash, rb_str_new2("version"), INT2NUM(stat->version));
- rb_hash_aset(hash, rb_str_new2("cversion"), INT2NUM(stat->cversion));
- rb_hash_aset(hash, rb_str_new2("aversion"), INT2NUM(stat->aversion));
- rb_hash_aset(hash, rb_str_new2("ephemeralOwner"), UINT2NUM(stat->ephemeralOwner));
- rb_hash_aset(hash, rb_str_new2("dataLength"), INT2NUM(stat->dataLength));
- rb_hash_aset(hash, rb_str_new2("numChildren"), INT2NUM(stat->numChildren));
- rb_hash_aset(hash, rb_str_new2("pzxid"), UINT2NUM(stat->pzxid));
- return hash;
-}
-
-static VALUE method_get_acl(VALUE self, VALUE path) {
+static VALUE method_is_unrecoverable(VALUE self) {
FETCH_DATA_PTR(self, zk);
- Check_Type(path, T_STRING);
-
- // ZOOAPI int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
- // struct Stat *stat);
- struct ACL_vector acl;
- struct Stat stat;
- check_errors(zoo_get_acl(zk->zh, RSTRING(path)->ptr, &acl, &stat));
-
- VALUE result = rb_ary_new();
- rb_ary_push(result, acl_vector_to_ruby(&acl));
- rb_ary_push(result, stat_to_ruby(&stat));
- return result;
+ return is_unrecoverable(zk->zh) == ZINVALIDSTATE ? Qtrue : Qfalse;
}
-static VALUE method_is_unrecoverable(VALUE self) {
+static VALUE method_state(VALUE self) {
FETCH_DATA_PTR(self, zk);
- if(is_unrecoverable(zk->zh) == ZINVALIDSTATE)
- return Qtrue;
-
- return Qfalse;
+ return INT2NUM(zoo_state(zk->zh));
}
static VALUE method_recv_timeout(VALUE self) {
@@ -378,141 +409,55 @@ static VALUE method_recv_timeout(VALUE self) {
return INT2NUM(zoo_recv_timeout(zk->zh));
}
-#warning [emaland] make this a class method or global
+// how do you make a class method??
static VALUE method_set_debug_level(VALUE self, VALUE level) {
- FETCH_DATA_PTR(self, zk);
Check_Type(level, T_FIXNUM);
+ ZKRBDebugging = (FIX2INT(level) == ZOO_LOG_LEVEL_DEBUG);
zoo_set_debug_level(FIX2INT(level));
return Qnil;
}
-#warning [emaland] make this a class method or global
static VALUE method_zerror(VALUE self, VALUE errc) {
return rb_str_new2(zerror(FIX2INT(errc)));
}
-static VALUE method_state(VALUE self) {
- FETCH_DATA_PTR(self, zk);
- return INT2NUM(zoo_state(zk->zh));
-}
-
-#warning [emaland] make this a class method or global
-static VALUE method_set_log_stream(VALUE self, VALUE stream) {
- // convert stream to FILE*
- FILE *fp_stream = (FILE*)stream;
- zoo_set_log_stream(fp_stream);
- return Qnil;
-}
-
-static VALUE method_set_watcher(VALUE self, VALUE new_watcher) {
- FETCH_DATA_PTR(self, zk);
-#warning [emaland] needs to be tested/implemented
- return Qnil;
- // watcher_fn old_watcher = zoo_set_watcher(zk->zh, new_watcher);
- // return old_watcher;
-}
-
-void Init_zookeeper_c() {
- Zookeeper = rb_define_class("CZookeeper", rb_cObject);
-
+static void zkrb_define_methods(void) {
#define DEFINE_METHOD(method, args) { \
rb_define_method(Zookeeper, #method, method_ ## method, args); }
+#define DEFINE_CLASS_METHOD(method, args) { \
+ rb_define_singleton_method(Zookeeper, #method, method_ ## method, args); }
DEFINE_METHOD(initialize, 1);
- DEFINE_METHOD(get_children, 1);
- DEFINE_METHOD(exists, 2);
- DEFINE_METHOD(create, 3);
- DEFINE_METHOD(delete, 2);
- DEFINE_METHOD(get, 1);
- DEFINE_METHOD(set, -1);
-
- /* TODO */
- DEFINE_METHOD(add_auth, 3);
- DEFINE_METHOD(set_acl, -1);
- DEFINE_METHOD(async, 1);
+ DEFINE_METHOD(get_children, 4);
+ DEFINE_METHOD(exists, 4);
+ DEFINE_METHOD(create, 6);
+ DEFINE_METHOD(delete, 4);
+ DEFINE_METHOD(get, 4);
+ DEFINE_METHOD(set, 5);
+ DEFINE_METHOD(set_acl, 5);
+ DEFINE_METHOD(get_acl, 3);
DEFINE_METHOD(client_id, 0);
DEFINE_METHOD(close, 0);
DEFINE_METHOD(deterministic_conn_order, 1);
- DEFINE_METHOD(get_acl, 2);
DEFINE_METHOD(is_unrecoverable, 0);
DEFINE_METHOD(recv_timeout, 1);
- DEFINE_METHOD(set2, -1);
- DEFINE_METHOD(set_debug_level, 1);
- DEFINE_METHOD(set_log_stream, 1);
- DEFINE_METHOD(set_watcher, 2);
DEFINE_METHOD(state, 0);
+ // TODO
+ // DEFINE_METHOD(add_auth, 3);
+ // DEFINE_METHOD(async, 1);
+
+ // methods for the ruby-side event manager
+ DEFINE_METHOD(get_next_event, 0);
+ DEFINE_METHOD(has_events, 0);
+
+ // Make these class methods?
+ DEFINE_METHOD(set_debug_level, 1);
DEFINE_METHOD(zerror, 1);
+}
+void Init_zookeeper_c() {
+ ZKRBDebugging = 0;
- eNoNode = rb_define_class_under(Zookeeper, "NoNodeError", rb_eRuntimeError);
- eBadVersion = rb_define_class_under(Zookeeper, "BadVersionError", rb_eRuntimeError);
-
-#define EXPORT_CONST(x) { rb_define_const(Zookeeper, #x, INT2FIX(x)); }
-
- /* create flags */
- EXPORT_CONST(ZOO_EPHEMERAL);
- EXPORT_CONST(ZOO_SEQUENCE);
-
- /*
- session state
- */
- EXPORT_CONST(ZOO_EXPIRED_SESSION_STATE);
- EXPORT_CONST(ZOO_AUTH_FAILED_STATE);
- EXPORT_CONST(ZOO_CONNECTING_STATE);
- EXPORT_CONST(ZOO_ASSOCIATING_STATE);
- EXPORT_CONST(ZOO_CONNECTED_STATE);
-
- /* notifications */
- EXPORT_CONST(ZOOKEEPER_WRITE);
- EXPORT_CONST(ZOOKEEPER_READ);
-
- /* errors */
- EXPORT_CONST(ZOK);
- EXPORT_CONST(ZSYSTEMERROR);
- EXPORT_CONST(ZRUNTIMEINCONSISTENCY);
- EXPORT_CONST(ZDATAINCONSISTENCY);
- EXPORT_CONST(ZCONNECTIONLOSS);
- EXPORT_CONST(ZMARSHALLINGERROR);
- EXPORT_CONST(ZUNIMPLEMENTED);
- EXPORT_CONST(ZOPERATIONTIMEOUT);
- EXPORT_CONST(ZBADARGUMENTS);
- EXPORT_CONST(ZINVALIDSTATE);
-
- /** API errors. */
- EXPORT_CONST(ZAPIERROR);
- EXPORT_CONST(ZNONODE);
- EXPORT_CONST(ZNOAUTH);
- EXPORT_CONST(ZBADVERSION);
- EXPORT_CONST(ZNOCHILDRENFOREPHEMERALS);
- EXPORT_CONST(ZNODEEXISTS);
- EXPORT_CONST(ZNOTEMPTY);
- EXPORT_CONST(ZSESSIONEXPIRED);
- EXPORT_CONST(ZINVALIDCALLBACK);
- EXPORT_CONST(ZINVALIDACL);
- EXPORT_CONST(ZAUTHFAILED);
- EXPORT_CONST(ZCLOSING);
- EXPORT_CONST(ZNOTHING);
- EXPORT_CONST(ZSESSIONMOVED);
-
- /* debug levels */
- EXPORT_CONST(ZOO_LOG_LEVEL_ERROR);
- EXPORT_CONST(ZOO_LOG_LEVEL_WARN);
- EXPORT_CONST(ZOO_LOG_LEVEL_INFO);
- EXPORT_CONST(ZOO_LOG_LEVEL_DEBUG);
-
- /* ACL constants */
- EXPORT_CONST(ZOO_PERM_READ);
- EXPORT_CONST(ZOO_PERM_WRITE);
- EXPORT_CONST(ZOO_PERM_CREATE);
- EXPORT_CONST(ZOO_PERM_DELETE);
- EXPORT_CONST(ZOO_PERM_ADMIN);
- EXPORT_CONST(ZOO_PERM_ALL);
-
- /* Watch types */
- EXPORT_CONST(ZOO_CREATED_EVENT);
- EXPORT_CONST(ZOO_DELETED_EVENT);
- EXPORT_CONST(ZOO_CHANGED_EVENT);
- EXPORT_CONST(ZOO_CHILD_EVENT);
- EXPORT_CONST(ZOO_SESSION_EVENT);
- EXPORT_CONST(ZOO_NOTWATCHING_EVENT);
-
+ /* initialize Zookeeper class */
+ Zookeeper = rb_define_class("CZookeeper", rb_cObject);
+ zkrb_define_methods();
}
View
527 ext/zookeeper_lib.c
@@ -0,0 +1,527 @@
+/* Ruby wrapper for the Zookeeper C API
+
+This file contains three sets of helpers:
+ - the event queue that glues RB<->C together
+ - the completions that marshall data between RB<->C formats
+ - functions for translating between Ruby and C versions of ZK datatypes
+
+wickman@twitter.com
+*/
+
+#include "ruby.h"
+#include "zookeeper_lib.h"
+#include "c-client-src/zookeeper.h"
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define GET_SYM(str) ID2SYM(rb_intern(str))
+
+int ZKRBDebugging;
+
+/* push/pop is a misnomer, this is a queue */
+#warning [wickman] TODO enqueue, peek, dequeue => pthread_mutex_lock
+void zkrb_enqueue(zkrb_queue_t *q, zkrb_event_t *elt) {
+ q->tail->event = elt;
+ q->tail->next = (struct zkrb_event_ll_t *) malloc(sizeof(struct zkrb_event_ll_t));
+ q->tail = q->tail->next;
+ q->tail->event = NULL;
+ q->tail->next = NULL;
+}
+
+zkrb_event_t * zkrb_peek(zkrb_queue_t *q) {
+ if (q->head != NULL && q->head->event != NULL)
+ return q->head->event;
+ return NULL;
+}
+
+zkrb_event_t* zkrb_dequeue(zkrb_queue_t *q) {
+ if (q->head == NULL || q->head->event == NULL) {
+ return NULL;
+ } else {
+ struct zkrb_event_ll_t *old_root = q->head;
+ q->head = q->head->next;
+ zkrb_event_t *rv = old_root->event;
+ free(old_root);
+ return rv;
+ }
+}
+
+zkrb_queue_t *zkrb_queue_alloc(void) {
+ zkrb_queue_t *rq = malloc(sizeof(zkrb_queue_t));
+ rq->head = malloc(sizeof(struct zkrb_event_ll_t));
+ rq->head->event = NULL; rq->head->next = NULL;
+ rq->tail = rq->head;
+ return rq;
+}
+
+zkrb_event_t *zkrb_event_alloc(void) {
+ zkrb_event_t *rv = (zkrb_event_t *) malloc(sizeof(zkrb_event_t));
+ return rv;
+}
+
+void zkrb_event_free(zkrb_event_t *event) {
+ switch (event->type) {
+ case ZKRB_DATA: {
+ struct zkrb_data_completion *data_ctx = event->completion.data_completion;
+ free(data_ctx->data);
+ free(data_ctx->stat);
+ break;
+ }
+ case ZKRB_STAT: {
+ struct zkrb_stat_completion *stat_ctx = event->completion.stat_completion;
+ free(stat_ctx->stat);
+ break;
+ }
+ case ZKRB_STRING: {
+ struct zkrb_string_completion *string_ctx = event->completion.string_completion;
+ free(string_ctx->value);
+ break;
+ }
+ case ZKRB_STRINGS: {
+ struct zkrb_strings_completion *strings_ctx = event->completion.strings_completion;
+ int k;
+ for (k = 0; k < strings_ctx->values->count; ++k) free(strings_ctx->values->data[k]);
+ free(strings_ctx->values);
+ break;
+ }
+ case ZKRB_STRINGS_STAT: {
+ struct zkrb_strings_stat_completion *strings_stat_ctx = event->completion.strings_stat_completion;
+ int k;
+ for (k = 0; k < strings_stat_ctx->values->count; ++k) free(strings_stat_ctx->values->data[k]);
+ free(strings_stat_ctx->values);
+ free(strings_stat_ctx->stat);
+ break;
+ }
+ case ZKRB_ACL: {
+ struct zkrb_acl_completion *acl_ctx = event->completion.acl_completion;
+ if (acl_ctx->acl) {
+ deallocate_ACL_vector(acl_ctx->acl);
+ free(acl_ctx->acl);
+ }
+ free(acl_ctx->stat);
+ break;
+ }
+ case ZKRB_WATCHER: {
+ struct zkrb_watcher_completion *watcher_ctx = event->completion.watcher_completion;
+ free(watcher_ctx->path);
+ break;
+ }
+ case ZKRB_VOID: {
+ break;
+ }
+
+ default:
+#warning [wickman] TODO raise an exception?
+ fprintf(stderr, "ERROR?\n");
+ }
+ free(event);
+}
+
+/* this is called only from a method_get_latest_event, so the hash is
+ allocated on the proper thread stack */
+VALUE zkrb_event_to_ruby(zkrb_event_t *event) {
+ VALUE hash = rb_hash_new();
+
+ rb_hash_aset(hash, GET_SYM("req_id"), LL2NUM(event->req_id));
+ if (event->type != ZKRB_WATCHER)
+ rb_hash_aset(hash, GET_SYM("rc"), INT2FIX(event->rc));
+
+ switch (event->type) {
+ case ZKRB_DATA: {
+ struct zkrb_data_completion *data_ctx = event->completion.data_completion;
+ if (ZKRBDebugging) zkrb_print_stat(data_ctx->stat);
+ rb_hash_aset(hash, GET_SYM("data"), data_ctx->data ? rb_str_new2(data_ctx->data) : Qnil);
+ rb_hash_aset(hash, GET_SYM("stat"), data_ctx->stat ? zkrb_stat_to_rarray(data_ctx->stat) : Qnil);
+ break;
+ }
+ case ZKRB_STAT: {
+ struct zkrb_stat_completion *stat_ctx = event->completion.stat_completion;
+ rb_hash_aset(hash, GET_SYM("stat"), stat_ctx->stat ? zkrb_stat_to_rarray(stat_ctx->stat) : Qnil);
+ break;
+ }
+ case ZKRB_STRING: {
+ struct zkrb_string_completion *string_ctx = event->completion.string_completion;
+ rb_hash_aset(hash, GET_SYM("string"), string_ctx->value ? rb_str_new2(string_ctx->value) : Qnil);
+ break;
+ }
+ case ZKRB_STRINGS: {
+ struct zkrb_strings_completion *strings_ctx = event->completion.strings_completion;
+ rb_hash_aset(hash, GET_SYM("strings"), strings_ctx->values ? zkrb_string_vector_to_ruby(strings_ctx->values) : Qnil);
+ break;
+ }
+ case ZKRB_STRINGS_STAT: {
+ struct zkrb_strings_stat_completion *strings_stat_ctx = event->completion.strings_stat_completion;
+ rb_hash_aset(hash, GET_SYM("strings"), strings_stat_ctx->values ? zkrb_string_vector_to_ruby(strings_stat_ctx->values) : Qnil);
+ rb_hash_aset(hash, GET_SYM("stat"), strings_stat_ctx->stat ? zkrb_stat_to_rarray(strings_stat_ctx->stat) : Qnil);
+ break;
+ }
+ case ZKRB_ACL: {
+ struct zkrb_acl_completion *acl_ctx = event->completion.acl_completion;
+ rb_hash_aset(hash, GET_SYM("acl"), acl_ctx->acl ? zkrb_acl_vector_to_ruby(acl_ctx->acl) : Qnil);
+ rb_hash_aset(hash, GET_SYM("stat"), acl_ctx->stat ? zkrb_stat_to_rarray(acl_ctx->stat) : Qnil);
+ break;
+ }
+ case ZKRB_WATCHER: {
+ struct zkrb_watcher_completion *watcher_ctx = event->completion.watcher_completion;
+ rb_hash_aset(hash, GET_SYM("type"), INT2FIX(watcher_ctx->type));
+ rb_hash_aset(hash, GET_SYM("state"), INT2FIX(watcher_ctx->state));
+ rb_hash_aset(hash, GET_SYM("path"), watcher_ctx->path ? rb_str_new2(watcher_ctx->path) : Qnil);
+ break;
+ }
+ case ZKRB_VOID:
+ default:
+ break;
+ }
+
+ return hash;
+}
+
+void zkrb_print_stat(const struct Stat *s) {
+ fprintf(stderr, "stat {\n");
+ if (s != NULL) {
+ fprintf(stderr, "\t czxid: %lld\n", s->czxid);
+ fprintf(stderr, "\t mzxid: %lld\n", s->mzxid);
+ fprintf(stderr, "\t ctime: %lld\n", s->ctime);
+ fprintf(stderr, "\t mtime: %lld\n", s->mtime);
+ fprintf(stderr, "\t version: %d\n" , s->version);
+ fprintf(stderr, "\t cversion: %d\n" , s->cversion);
+ fprintf(stderr, "\t aversion: %d\n" , s->aversion);
+ fprintf(stderr, "\t ephemeralOwner: %lld\n", s->ephemeralOwner);
+ fprintf(stderr, "\t dataLength: %d\n" , s->dataLength);
+ fprintf(stderr, "\t numChildren: %d\n" , s->numChildren);
+ fprintf(stderr, "\t pzxid: %lld\n", s->pzxid);
+ } else {
+ fprintf(stderr, "\tNULL\n");
+ }
+ fprintf(stderr, "}\n");
+}
+
+zkrb_calling_context *zkrb_calling_context_alloc(int64_t req_id, zkrb_queue_t *queue) {
+ zkrb_calling_context *ctx = malloc(sizeof(zkrb_calling_context));
+ ctx->req_id = req_id;
+ ctx->queue = queue;
+ return ctx;
+}
+
+void zkrb_print_calling_context(zkrb_calling_context *ctx) {
+ fprintf(stderr, "calling context (%#x){\n", ctx);
+ fprintf(stderr, "\treq_id = %lld\n", ctx->req_id);
+ fprintf(stderr, "\tqueue = 0x%#x\n", ctx->queue);
+ fprintf(stderr, "}\n");
+}
+
+/*
+ process completions that get queued to the watcher queue, translate events
+ to completions that the ruby side dispatches via callbacks.
+*/
+
+#define ZKH_SETUP_EVENT(qptr, eptr) \
+ zkrb_calling_context *ctx = (zkrb_calling_context *) calling_ctx; \
+ zkrb_event_t *eptr = zkrb_event_alloc(); \
+ eptr->req_id = ctx->req_id; \
+ if (eptr->req_id != ZKRB_GLOBAL_REQ) free(ctx); \
+ zkrb_queue_t *qptr = ctx->queue;
+
+void zkrb_state_callback(
+ zhandle_t *zh, int type, int state, const char *path, void *calling_ctx) {
+ /* logging */
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_STATE WATCHER "
+ "type = %d, state = %d, path = 0x%#x, value = %s\n",
+ type, state, (void *) path, path ? path : "NULL");
+ }
+
+ /* save callback context */
+ struct zkrb_watcher_completion *wc = malloc(sizeof(struct zkrb_watcher_completion));
+ wc->type = type;
+ wc->state = state;
+ wc->path = malloc(strlen(path) + 1);
+ strcpy(wc->path, path);
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->type = ZKRB_WATCHER;
+ event->completion.watcher_completion = wc;
+
+ zkrb_enqueue(queue, event);
+}
+
+
+
+void zkrb_data_callback(
+ int rc, const char *value, int value_len, const struct Stat *stat, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_DATA WATCHER "
+ "rc = %d (%s), value = %s, len = %d\n",
+ rc, zerror(rc), value ? value : "NULL", value_len);
+ }
+
+ /* copy data completion */
+ struct zkrb_data_completion *dc = malloc(sizeof(struct zkrb_data_completion));
+ dc->data = dc->stat = NULL;
+ if (value != NULL) { dc->data = malloc(value_len); memcpy(dc->data, value, value_len); }
+ if (stat != NULL) { dc->stat = malloc(sizeof(struct Stat)); memcpy(dc->stat, stat, sizeof(struct Stat)); }
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_DATA;
+ event->completion.data_completion = dc;
+
+ zkrb_enqueue(queue, event);
+}
+
+void zkrb_stat_callback(
+ int rc, const struct Stat *stat, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_STAT WATCHER "
+ "rc = %d (%s)\n", rc, zerror(rc));
+ }
+
+ struct zkrb_stat_completion *sc = malloc(sizeof(struct zkrb_stat_completion));
+ sc->stat = NULL;
+ if (stat != NULL) { sc->stat = malloc(sizeof(struct Stat)); memcpy(sc->stat, stat, sizeof(struct Stat)); }
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_STAT;
+ event->completion.stat_completion = sc;
+
+ zkrb_enqueue(queue, event);
+}
+
+void zkrb_string_callback(
+ int rc, const char *string, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_STRING WATCHER "
+ "rc = %d (%s)\n", rc, zerror(rc));
+ }
+
+ struct zkrb_string_completion *sc = malloc(sizeof(struct zkrb_string_completion));
+ sc->value = NULL;
+ if (string != NULL) { sc->value = malloc(strlen(string) + 1); strcpy(sc->value, string); }
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_STRING;
+ event->completion.string_completion = sc;
+
+ zkrb_enqueue(queue, event);
+}
+
+void zkrb_strings_callback(
+ int rc, const struct String_vector *strings, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_STRINGS WATCHER "
+ "rc = %d (%s), calling_ctx = 0x%#x\n", rc, zerror(rc), calling_ctx);
+ }
+
+ /* copy string vector */
+ struct zkrb_strings_completion *sc = malloc(sizeof(struct zkrb_strings_completion));
+ sc->values = (strings != NULL) ? zkrb_clone_string_vector(strings) : NULL;
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_STRINGS;
+ event->completion.strings_completion = sc;
+
+ zkrb_enqueue(queue, event);
+}
+
+void zkrb_strings_stat_callback(
+ int rc, const struct String_vector *strings, const struct Stat *stat, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_STRINGS_STAT WATCHER "
+ "rc = %d (%s), calling_ctx = 0x%#x\n", rc, zerror(rc), calling_ctx);
+ }
+
+ struct zkrb_strings_stat_completion *sc = malloc(sizeof(struct zkrb_strings_stat_completion));
+ sc->stat = NULL;
+ if (stat != NULL) { sc->stat = malloc(sizeof(struct Stat)); memcpy(sc->stat, stat, sizeof(struct Stat)); }
+ sc->values = (strings != NULL) ? zkrb_clone_string_vector(strings) : NULL;
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_STRINGS_STAT;
+ event->completion.strings_completion = sc;
+
+ zkrb_enqueue(queue, event);
+}
+
+void zkrb_void_callback(
+ int rc, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_VOID WATCHER "
+ "rc = %d (%s)\n", rc, zerror(rc));
+ }
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_VOID;
+ event->completion.void_completion = NULL;
+
+ zkrb_enqueue(queue, event);
+}
+
+void zkrb_acl_callback(
+ int rc, struct ACL_vector *acls, struct Stat *stat, const void *calling_ctx) {
+ if (ZKRBDebugging) {
+ fprintf(stderr, "ZOOKEEPER_C_ACL WATCHER "
+ "rc = %d (%s)\n", rc, zerror(rc));
+ }
+
+ struct zkrb_acl_completion *ac = malloc(sizeof(struct zkrb_acl_completion));
+ ac->acl = ac->stat = NULL;
+ if (acls != NULL) { ac->acl = zkrb_clone_acl_vector(acls); }
+ if (stat != NULL) { ac->stat = malloc(sizeof(struct Stat)); memcpy(ac->stat, stat, sizeof(struct Stat)); }
+
+ ZKH_SETUP_EVENT(queue, event);
+ event->rc = rc;
+ event->type = ZKRB_ACL;
+ event->completion.acl_completion = ac;
+
+ /* should be synchronized */
+ zkrb_enqueue(queue, event);
+}
+
+VALUE zkrb_id_to_ruby(struct Id *id) {
+ VALUE hash = rb_hash_new();
+ rb_hash_aset(hash, GET_SYM("scheme"), rb_str_new2(id->scheme));
+ rb_hash_aset(hash, GET_SYM("id"), rb_str_new2(id->id));
+ return hash;
+}
+
+VALUE zkrb_acl_to_ruby(struct ACL *acl) {
+ VALUE hash = rb_hash_new();
+ rb_hash_aset(hash, GET_SYM("perms"), INT2NUM(acl->perms));
+ rb_hash_aset(hash, GET_SYM("id"), zkrb_id_to_ruby(&(acl->id)));
+ return hash;
+}
+
+#warning [wickman] TODO test zkrb_ruby_to_aclvector
+struct ACL_vector * zkrb_ruby_to_aclvector(VALUE acl_ary) {
+ Check_Type(acl_ary, T_ARRAY);
+
+ struct ACL_vector *v = malloc(sizeof(struct ACL_vector));
+ allocate_ACL_vector(v, RARRAY(acl_ary)->len);
+
+ int k;
+ for (k = 0; k < v->count; ++k) {
+ VALUE acl_val = rb_ary_entry(acl_ary, k);
+ v->data[k] = zkrb_ruby_to_acl(acl_val);
+ }
+
+ return v;
+}
+
+#warning [wickman] TODO test zkrb_ruby_to_aclvector
+struct ACL zkrb_ruby_to_acl(VALUE rubyacl) {
+ struct ACL acl;
+
+ VALUE perms = rb_iv_get(rubyacl, "@perms");
+ VALUE rubyid = rb_iv_get(rubyacl, "@id");
+ acl.perms = NUM2INT(perms);
+ acl.id = zkrb_ruby_to_id(rubyid);
+
+ return acl;
+}
+
+#warning [wickman] TODO zkrb_ruby_to_id error checking? test
+struct Id zkrb_ruby_to_id(VALUE rubyid) {
+ struct Id id;
+
+ VALUE scheme = rb_iv_get(rubyid, "@scheme");
+ VALUE ident = rb_iv_get(rubyid, "@id");
+
+ if (scheme != Qnil) {
+ id.scheme = malloc(RSTRING(scheme)->len + 1);
+ strncpy(id.scheme, RSTRING(scheme)->ptr, RSTRING(scheme)->len);
+ } else {
+ id.scheme = NULL;
+ }
+
+ if (ident != Qnil) {
+ id.id = malloc(RSTRING(ident)->len + 1);
+ strncpy(id.id, RSTRING(ident)->ptr, RSTRING(ident)->len);
+ } else {
+ id.id = NULL;
+ }
+
+ return id;
+}
+
+VALUE zkrb_acl_vector_to_ruby(struct ACL_vector *acl_vector) {
+ int i = 0;
+ VALUE ary = rb_ary_new();
+ for(i = 0; i < acl_vector->count; i++) {
+ rb_ary_push(ary, zkrb_acl_to_ruby(acl_vector->data+i));
+ }
+ return ary;
+}
+
+VALUE zkrb_string_vector_to_ruby(struct String_vector *string_vector) {
+ int i = 0;
+ VALUE ary = rb_ary_new();
+ for(i = 0; i < string_vector->count; i++) {
+ rb_ary_push(ary, rb_str_new2(string_vector->data[i]));
+ }
+ return ary;
+}
+
+VALUE zkrb_stat_to_rarray(const struct Stat* stat) {
+ return rb_ary_new3(11,
+ LL2NUM(stat->czxid),
+ LL2NUM(stat->mzxid),
+ LL2NUM(stat->ctime),
+ LL2NUM(stat->mtime),
+ INT2NUM(stat->version),
+ INT2NUM(stat->cversion),
+ INT2NUM(stat->aversion),
+ LL2NUM(stat->ephemeralOwner),
+ INT2NUM(stat->dataLength),
+ INT2NUM(stat->numChildren),
+ LL2NUM(stat->pzxid));
+}
+
+VALUE zkrb_stat_to_rhash(const struct Stat *stat) {
+ VALUE ary = rb_hash_new();
+ rb_hash_aset(ary, GET_SYM("czxid"), LL2NUM(stat->czxid));
+ rb_hash_aset(ary, GET_SYM("mzxid"), LL2NUM(stat->mzxid));
+ rb_hash_aset(ary, GET_SYM("ctime"), LL2NUM(stat->ctime));
+ rb_hash_aset(ary, GET_SYM("mtime"), LL2NUM(stat->mtime));
+ rb_hash_aset(ary, GET_SYM("version"), INT2NUM(stat->version));
+ rb_hash_aset(ary, GET_SYM("cversion"), INT2NUM(stat->cversion));
+ rb_hash_aset(ary, GET_SYM("aversion"), INT2NUM(stat->aversion));
+ rb_hash_aset(ary, GET_SYM("ephemeralOwner"), LL2NUM(stat->ephemeralOwner));
+ rb_hash_aset(ary, GET_SYM("dataLength"), INT2NUM(stat->dataLength));
+ rb_hash_aset(ary, GET_SYM("numChildren"), INT2NUM(stat->numChildren));
+ rb_hash_aset(ary, GET_SYM("pzxid"), LL2NUM(stat->pzxid));
+ return ary;
+}
+
+#warning [wickman] TODO test zkrb_clone_acl_vector
+struct ACL_vector * zkrb_clone_acl_vector(struct ACL_vector * src) {
+ struct ACL_vector * dst = malloc(sizeof(struct ACL_vector));
+ allocate_ACL_vector(dst, src->count);
+ int k;
+ for (k = 0; k < src->count; ++k) {
+ struct ACL * elt = &src->data[k];
+ dst->data[k].id.scheme = malloc(strlen(elt->id.scheme)+1);
+ dst->data[k].id.id = malloc(strlen(elt->id.id)+1);
+ strcpy(dst->data[k].id.scheme, elt->id.scheme);
+ strcpy(dst->data[k].id.id, elt->id.id);
+ dst->data[k].perms = elt->perms;
+ }
+ return dst;
+}
+
+#warning [wickman] TODO test zkrb_clone_string_vector
+struct String_vector * zkrb_clone_string_vector(struct String_vector * src) {
+ struct String_vector * dst = malloc(sizeof(struct String_vector));
+ allocate_String_vector(dst, src->count);
+ int k;
+ for (k = 0; k < src->count; ++k) {
+ dst->data[k] = malloc(strlen(src->data[k]) + 1);
+ strcpy(dst->data[k], src->data[k]);
+ }
+ return dst;
+}
View
151 ext/zookeeper_lib.h
@@ -0,0 +1,151 @@
+#ifndef ZOOKEEPER_LIB_H
+#define ZOOKEEPER_LIB_H
+
+#include "ruby.h"
+#include "c-client-src/zookeeper.h"
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define ZK_TRUE 1
+#define ZK_FALSE 0
+#define ZKRB_GLOBAL_REQ -1
+
+extern int ZKRBDebugging;
+
+struct zkrb_data_completion {
+ char *data;
+ struct Stat *stat;
+};
+
+struct zkrb_stat_completion {
+ struct Stat *stat;
+};
+
+struct zkrb_void_completion {
+};
+
+struct zkrb_string_completion {
+ char *value;
+};
+
+struct zkrb_strings_completion {
+ struct String_vector *values;
+};
+
+struct zkrb_strings_stat_completion {
+ struct String_vector *values;
+ struct Stat *stat;
+};
+
+struct zkrb_acl_completion {
+ struct ACL_vector *acl;
+ struct Stat *stat;
+};
+
+struct zkrb_watcher_completion {
+ int type;
+ int state;
+ char *path;
+};
+
+typedef struct {
+ int64_t req_id;
+ int rc;
+
+ enum {
+ ZKRB_DATA = 0,
+ ZKRB_STAT = 1,
+ ZKRB_VOID = 2,
+ ZKRB_STRING = 3,
+ ZKRB_STRINGS = 4,
+ ZKRB_STRINGS_STAT = 5,
+ ZKRB_ACL = 6,
+ ZKRB_WATCHER = 7
+ } type;
+
+ union {
+ struct zkrb_data_completion *data_completion;
+ struct zkrb_stat_completion *stat_completion;
+ struct zkrb_void_completion *void_completion;
+ struct zkrb_string_completion *string_completion;
+ struct zkrb_strings_completion *strings_completion;
+ struct zkrb_strings_stat_completion *strings_stat_completion;
+ struct zkrb_acl_completion *acl_completion;
+ struct zkrb_watcher_completion *watcher_completion;
+ } completion;
+} zkrb_event_t;
+
+struct zkrb_event_ll_t {
+ zkrb_event_t *event;
+ struct zkrb_event_ll_t *next;
+};
+
+typedef struct {
+ struct zkrb_event_ll_t *head;
+ struct zkrb_event_ll_t *tail;
+} zkrb_queue_t;
+
+zkrb_queue_t * zkrb_queue_alloc(void);
+zkrb_event_t * zkrb_event_alloc(void);
+void zkrb_event_free(zkrb_event_t *ptr);
+
+
+/* push/pop is a misnomer, this is a queue */
+void zkrb_enqueue(zkrb_queue_t *queue, zkrb_event_t *elt);
+zkrb_event_t * zkrb_peek(zkrb_queue_t *queue);
+zkrb_event_t * zkrb_dequeue(zkrb_queue_t *queue);
+
+void zkrb_print_stat(const struct Stat *s);
+
+typedef struct {
+ int64_t req_id;
+ zkrb_queue_t *queue;
+} zkrb_calling_context;
+
+void zkrb_print_calling_context(zkrb_calling_context *ctx);
+zkrb_calling_context *zkrb_calling_context_alloc(int64_t req_id, zkrb_queue_t *queue);
+
+/*
+ default process completions that get queued into the ruby client event queue
+*/
+
+void zkrb_state_callback(
+ zhandle_t *zh, int type, int state, const char *path, void *calling_ctx);
+
+void zkrb_data_callback(
+ int rc, const char *value, int value_len, const struct Stat *stat, const void *calling_ctx);
+
+void zkrb_stat_callback(
+ int rc, const struct Stat *stat, const void *calling_ctx);
+
+void zkrb_string_callback(
+ int rc, const char *string, const void *calling_ctx);
+
+void zkrb_strings_callback(
+ int rc, const struct String_vector *strings, const void *calling_ctx);
+
+void zkrb_strings_stat_callback(
+ int rc, const struct String_vector *strings, const struct Stat* stat, const void *calling_ctx);
+
+void zkrb_void_callback(
+ int rc, const void *calling_ctx);
+
+void zkrb_acl_callback(
+ int rc, struct ACL_vector *acls, struct Stat *stat, const void *calling_ctx);
+
+VALUE zkrb_event_to_ruby(zkrb_event_t *event);
+VALUE zkrb_acl_to_ruby(struct ACL *acl);
+VALUE zkrb_acl_vector_to_ruby(struct ACL_vector *acl_vector);
+VALUE zkrb_id_to_ruby(struct Id *id);
+VALUE zkrb_string_vector_to_ruby(struct String_vector *string_vector);
+VALUE zkrb_stat_to_rarray(const struct Stat *stat);
+VALUE zkrb_stat_to_rhash(const struct Stat* stat);
+
+struct ACL_vector * zkrb_ruby_to_aclvector(VALUE acl_ary);
+struct ACL_vector * zkrb_clone_acl_vector(struct ACL_vector * src);
+struct String_vector * zkrb_clone_string_vector(struct String_vector * src);
+struct ACL zkrb_ruby_to_acl(VALUE rubyacl);
+struct Id zkrb_ruby_to_id(VALUE rubyid);
+
+#endif /* ZOOKEEPER_LIB_H */
View
258 lib/zookeeper.rb
@@ -1,78 +1,234 @@
# Ruby wrapper for the Zookeeper C API
-# Phillip Pearson <pp@myelin.co.nz>
require 'zookeeper_c'
-
-class ZkStat
- attr_reader :version
- def initialize(ary)
- @czxid, @mzxid, @ctime, @mtime, @version, @cversion, @aversion, @ephemeralOwner = ary
- end
-end
+require 'thread'
+require 'zookeeper/callbacks'
+require 'zookeeper/constants'
+require 'zookeeper/exceptions'
+require 'zookeeper/stat'
+require 'zookeeper/acls'
class Zookeeper < CZookeeper
- def initialize(host)
+ include ZookeeperCallbacks
+ include ZookeeperConstants
+ include ZookeeperExceptions
+ include ZookeeperACLs
+ include ZookeeperStat
+
+ ZKRB_GLOBAL_CB_REQ = -1
+
+ # debug levels
+ ZOO_LOG_LEVEL_ERROR = 1
+ ZOO_LOG_LEVEL_WARN = 2
+ ZOO_LOG_LEVEL_INFO = 3
+ ZOO_LOG_LEVEL_DEBUG = 4
+
+ def initialize(host, timeout = 10)
+ @watcher_reqs = { ZKRB_GLOBAL_CB_REQ => { :watcher => get_default_global_watcher } }
+ @completion_reqs = {}
+ @req_mutex = Mutex.new
+ @current_req_id = 1
super(host)
- @watchers = {} # path => [ block, block, ... ]
+
+ if timeout > 0
+ time_to_stop = Time.now + timeout
+ until state == Zookeeper::ZOO_CONNECTED_STATE
+ break if Time.now > time_to_stop
+ sleep 0.1
+ end
+
+ return nil if state != Zookeeper::ZOO_CONNECTED_STATE
+ end
+
+ setup_dispatch_thread!
end
+
+public
+ def get(options = {})
+ assert_supported_keys(options, [:path, :watcher, :watcher_context, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+
+ req_id = setup_call(options)
+ rc, value, stat = super(req_id, options[:path], options[:callback], options[:watcher])
- def exists(path, &blk)
- (@watchers[path] ||= []) << blk if blk
- ZkStat.new(super(path, !!blk))
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:data => value, :stat => Stat.new(stat))
end
+
+ def set(options = {})
+ assert_supported_keys(options, [:path, :data, :version, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+ options[:version] ||= -1
- def stat(path, &blk)
- exists(path, &blk)
- rescue Zookeeper::NoNodeError
- nil
+ req_id = setup_call(options)
+ rc, stat = super(req_id, options[:path], options[:data], options[:callback], options[:version])
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:stat => Stat.new(stat))
end
+
+ def get_children(options = {})
+ assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
+ assert_required_keys(options, [:path])
+
+ req_id = setup_call(options)
+ rc, children, stat = super(req_id, options[:path], options[:callback], options[:watcher])
- def get(path)
- value, stat = super
- [value, ZkStat.new(stat)]
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:children => children, :stat => Stat.new(stat))
end
- def try_acquire(path, value)
- # create the parent node if it doesn't exist already
- create(path, "lock node", 0) unless stat(path)
+ def stat(options = {})
+ assert_supported_keys(options, [:path, :callback, :callback_context, :watcher, :watcher_context])
+ assert_required_keys(options, [:path])
+
+ req_id = setup_call(options)
+ rc, stat = exists(req_id, options[:path], options[:callback], options[:watcher])
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:stat => Stat.new(stat))
+ end
+
+ def create(options = {})
+ assert_supported_keys(options, [:path, :data, :acl, :ephemeral, :sequence, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+
+ flags = 0
+ flags |= ZOO_EPHEMERAL if options[:ephemeral]
+ flags |= ZOO_SEQUENCE if options[:sequence]
- # attempt to obtain the lock
- realpath = create("#{path}/lock-", value, Zookeeper::ZOO_EPHEMERAL | Zookeeper::ZOO_SEQUENCE)
- #puts "created lock node #{realpath}"
+ options[:acl] ||= ZOO_OPEN_ACL_UNSAFE
+
+ req_id = setup_call(options)
+ rc, newpath = super(req_id, options[:path], options[:data], options[:callback], options[:acl], flags)
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:path => newpath)
+ end
+
+ def delete(options = {})
+ assert_supported_keys(options, [:path, :version, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+ options[:version] ||= -1
+
+ req_id = setup_call(options)
+ rc = super(req_id, options[:path], options[:version], options[:callback])
+
+ { :req_id => req_id, :rc => rc }
+ end
- # see if we got it
- serial = /lock-(\d+)$/.match(realpath).captures[0].to_i
- have_lock = true
- ls(path).each do |child|
- if m = /lock-(\d+)$/.match(child)
- if m.captures[0].to_i < serial
- have_lock = false
- break
- end
+ def set_acl(options = {})
+ assert_supported_keys(options, [:path, :acl, :version, :callback, :callback_context])
+ assert_required_keys(options, [:path, :acl])
+ options[:version] ||= -1
+
+ req_id = setup_call(options)
+ rc = super(req_id, options[:path], options[:acl], options[:callback], options[:version])
+
+ { :req_id => req_id, :rc => rc }
+ end
+
+ def get_acl(options = {})
+ assert_supported_keys(options, [:path, :callback, :callback_context])
+ assert_required_keys(options, [:path])
+
+ req_id = setup_call(options)
+ rc, acls, stat = super(req_id, options[:path], options[:callback])
+
+ rv = { :req_id => req_id, :rc => rc }
+ options[:callback] ? rv : rv.merge(:acl => acls, :stat => Stat.new(stat))
+ end
+
+private
+ def setup_dispatch_thread!
+ @dispatcher = Thread.new {
+ while true do
+ dispatch_next_callback
+ sleep 0.1
end
- end
+ }
+ end
- # call block
- yield(have_lock)
+ def dispatch_next_callback
+ hash = get_next_event
+ return nil unless hash
+
+ is_completion = hash.has_key?(:rc)
+
+ hash[:stat] = Stat.new(hash[:stat]) if hash.has_key?(:stat)
+ hash[:acl] = hash[:acl].map { |acl| ACL.new(acl) } if hash[:acl]
+
+ callback_context = is_completion ? get_completion(hash[:req_id]) : get_watcher(hash[:req_id])
+ callback = is_completion ? callback_context[:callback] : callback_context[:watcher]
+ hash[:context] = callback_context[:context]
+
+ # TODO: Eventually enforce derivation from Zookeeper::Callback
+ if callback.respond_to?(:call)
+ callback.call(hash)
+ else
+ puts "dispatch_next_callback found non-callback => #{callback.inspect}"
+ end
+ end
+
+ def setup_call(opts)
+ req_id = nil
+ @req_mutex.synchronize {
+ req_id = @current_req_id
+ @current_req_id += 1
+ setup_completion(req_id, opts) if opts[:callback]
+ setup_watcher(req_id, opts) if opts[:watcher]
+ }
+ req_id
+ end
+
+ def setup_watcher(req_id, call_opts)
+ @watcher_reqs[req_id] = { :watcher => call_opts[:watcher],
+ :context => call_opts[:watcher_context] }
+ end
- # release the lock
- #puts "deleting #{realpath}"
- delete(realpath, stat(realpath).version)
+ def setup_completion(req_id, call_opts)
+ @completion_reqs[req_id] = { :callback => call_opts[:callback],
+ :context => call_opts[:callback_context] }
+ end
+
+ def get_watcher(req_id)
+ @req_mutex.synchronize {
+ req_id != ZKRB_GLOBAL_CB_REQ ? @watcher_reqs.delete(req_id) : @watcher_reqs[req_id]
+ }
+ end
+
+ def get_completion(req_id)
+ @req_mutex.synchronize { @completion_reqs.delete(req_id) }
end
- def watcher(type, state, path)
- raise Exception("watchers don't work in ruby yet") # ... until I figure out how to synchronize access to the Ruby interpreter
+public
+ # TODO: Sanitize user mistakes by unregistering watchers from ops that
+ # don't return ZOK (except wexists)? Make users clean up after themselves for now.
+ def unregister_watcher(req_id)
+ @req_mutex.synchronize {
+ @watcher_reqs.delete(req_id)
+ }
+ end
- return unless type == ZOO_SESSION_EVENT
+private
+ def get_default_global_watcher
+ Proc.new { |args|
+ puts "Ruby ZK Global CB called type=#{event_by_value(args[:type])} state=#{state_by_value(args[:state])}"
+ }
+ end
- case state
- when ZOO_CONNECTED_STATE
- puts "ruby watcher; got an event for #{path}"
+ def assert_supported_keys(args, supported)
+ unless (args.keys - supported).empty?
+ raise ZookeeperException::BadArguments,
+ "Supported arguments are: #{supported.inspect}, but arguments #{args.keys.inspect} were supplied instead"
+ end
+ end
- when ZOO_AUTH_FAILED_STATE
- raise Exception, "auth failure"
- when ZOO_EXPIRED_SESSION_STATE
- raise Exception, "session expired"
+ def assert_required_keys(args, required)
+ unless (required - args.keys).empty?
+ raise ZookeeperException::BadArguments,
+ "Required arguments are: #{required.inspect}, but only the arguments #{args.keys.inspect} were supplied."
end
end
end
+
View
31 lib/zookeeper/acls.rb
@@ -0,0 +1,31 @@
+module ZookeeperACLs
+ class Id
+ attr_reader :scheme, :id
+ def initialize(hash)
+ @scheme = hash[:scheme]
+ @id = hash[:id]
+ end
+ end
+
+ class ACL
+ attr_reader :perms, :id
+ def initialize(hash)
+ @perms = hash[:perms]
+ @id = hash[:id]
+ end
+ end
+
+ ZOO_PERM_READ = 0
+ ZOO_PERM_WRITE = 1
+ ZOO_PERM_CREATE = 2
+ ZOO_PERM_DELETE = 4
+ ZOO_PERM_ADMIN = 8
+ ZOO_PERM_ALL = ZOO_PERM_READ | ZOO_PERM_WRITE | ZOO_PERM_CREATE | ZOO_PERM_DELETE | ZOO_PERM_ADMIN
+
+ ZOO_ANYONE_ID_UNSAFE = Id.new(:scheme => "world", :id => "anyone")
+ ZOO_AUTH_IDS = Id.new(:scheme => "auth", :id => "")
+
+ ZOO_OPEN_ACL_UNSAFE = [ACL.new(:perms => ZOO_PERM_ALL, :id => ZOO_ANYONE_ID_UNSAFE)]
+ ZOO_READ_ACL_UNSAFE = [ACL.new(:perms => ZOO_PERM_READ, :id => ZOO_ANYONE_ID_UNSAFE)]
+ ZOO_CREATOR_ALL_ACL = [ACL.new(:perms => ZOO_PERM_ALL, :id => ZOO_AUTH_IDS)]
+end
View
89 lib/zookeeper/callbacks.rb
@@ -0,0 +1,89 @@
+module ZookeeperCallbacks
+ class Callback
+ attr_reader :proc, :completed, :context
+
+ def initialize
+ @completed = false
+ @proc = Proc.new do |hash|
+ initialize_context(hash)
+ yield if block_given?
+ @completed = true
+ end
+ end
+
+ def call(*args)
+ puts "call passed #{args.inspect}"
+ @proc.call(*args)
+ end
+
+ def completed?
+ @completed
+ end
+
+ def initialize_context(hash)
+ @context = nil
+ end
+ end
+
+ class WatcherCallback < Callback
+ ## wexists, awexists, wget, awget, wget_children, awget_children
+ attr_reader :type, :state, :path
+
+ def initialize_context(hash)
+ @type, @state, @path, @context = hash[:type], hash[:state], hash[:path], hash[:context]
+ end
+ end
+
+ class DataCallback < Callback
+ ## aget, awget
+ attr_reader :return_code, :data, :stat
+
+ def initialize_context(hash)
+ @return_code, @data, @stat, @context = hash[:rc], hash[:data], hash[:stat], hash[:context]
+ end
+ end
+
+ class StringCallback < Callback
+ ## acreate, async
+ attr_reader :return_code, :path
+
+ def initialize_context(hash)
+ @return_code, @path, @context = hash[:rc], hash[:path], hash[:context]
+ end
+ end
+
+ class StringsCallback < Callback
+ ## aget_children, awget_children
+ attr_reader :return_code, :children, :stat
+
+ def initialize_context(hash)
+ @return_code, @children, @stat, @context = hash[:rc], hash[:children], hash[:stat], hash[:context]
+ end
+ end
+
+ class StatCallback < Callback
+ ## aset, aexists, awexists
+ attr_reader :return_code, :stat
+
+ def initialize_context(hash)
+ @return_code, @stat, @context = hash[:rc], hash[:stat], hash[:context]
+ end
+ end
+
+ class VoidCallback < Callback
+ ## adelete, aset_acl, add_auth
+ attr_reader :return_code
+
+ def initialize_context(hash)
+ @return_code, @context = hash[:rc], hash[:context]
+ end
+ end
+
+ class ACLCallback < Callback
+ ## aget_acl
+ attr_reader :return_code, :acl, :stat
+ def initialize_context(hash)
+ @return_code, @acl, @stat, @context = hash[:rc], hash[:acl], hash[:stat], hash[:context]
+ end
+ end
+end
View
54 lib/zookeeper/constants.rb
@@ -0,0 +1,54 @@
+module ZookeeperConstants
+ # file type masks
+ ZOO_EPHEMERAL = 1
+ ZOO_SEQUENCE = 2
+
+ # session state
+ ZOO_EXPIRED_SESSION_STATE = -112
+ ZOO_AUTH_FAILED_STATE = -113
+ ZOO_CONNECTING_STATE = 1
+ ZOO_ASSOCIATING_STATE = 2
+ ZOO_CONNECTED_STATE = 3
+
+ # watch types
+ ZOO_CREATED_EVENT = 1
+ ZOO_DELETED_EVENT = 2
+ ZOO_CHANGED_EVENT = 3
+ ZOO_CHILD_EVENT = 4
+ ZOO_SESSION_EVENT = -1
+ ZOO_NOTWATCHING_EVENT = -2
+
+ def print_events
+ puts "ZK events:"
+ ZookeeperConstants::constants.each do |c|
+ puts "\t #{c}" if c =~ /^ZOO..*EVENT$/
+ end
+ end
+
+ def print_states
+ puts "ZK states:"
+ ZookeeperConstants::constants.each do |c|
+ puts "\t #{c}" if c =~ /^ZOO..*STATE$/
+ end
+ end
+
+ def event_by_value(v)
+ return unless v
+ ZookeeperConstants::constants.each do |c|
+ next unless c =~ /^ZOO..*EVENT$/
+ if eval("ZookeeperConstants::#{c}") == v
+ return c
+ end
+ end
+ end
+
+ def state_by_value(v)
+ return unless v
+ ZookeeperConstants::constants.each do |c|
+ next unless c =~ /^ZOO..*STATE$/
+ if eval("ZookeeperConstants::#{c}") == v
+ return c
+ end
+ end
+ end
+end
View
91 lib/zookeeper/exceptions.rb
@@ -0,0 +1,91 @@
+module ZookeeperExceptions
+ # exceptions/errors
+ ZOK = 0
+ ZSYSTEMERROR = -1
+ ZRUNTIMEINCONSISTENCY = -2
+ ZDATAINCONSISTENCY = -3
+ ZCONNECTIONLOSS = -4
+ ZMARSHALLINGERROR = -5
+ ZUNIMPLEMENTED = -6
+ ZOPERATIONTIMEOUT = -7
+ ZBADARGUMENTS = -8
+ ZINVALIDSTATE = -9
+
+ # api errors
+ ZAPIERROR = -100
+ ZNONODE = -101
+ ZNOAUTH = -102
+ ZBADVERSION = -103
+ ZNOCHILDRENFOREPHEMERALS = -108
+ ZNODEEXISTS = -110
+ ZNOTEMPTY = -111
+ ZSESSIONEXPIRED = -112
+ ZINVALIDCALLBACK = -113
+ ZINVALIDACL = -114
+ ZAUTHFAILED = -115
+ ZCLOSING = -116
+ ZNOTHING = -117
+ ZSESSIONMOVED = -118
+
+ class ZookeeperException < Exception
+ class EverythingOk < ZookeeperException; end
+ class SystemError < ZookeeperException; end
+ class RunTimeInconsistency < ZookeeperException; end
+ class DataInconsistency < ZookeeperException; end
+ class ConnectionLoss < ZookeeperException; end
+ class MarshallingError < ZookeeperException; end
+ class Unimplemented < ZookeeperException; end
+ class OperationTimeOut < ZookeeperException; end
+ class BadArguments < ZookeeperException; end
+ class InvalidState < ZookeeperException; end
+ class ApiError < ZookeeperException; end
+ class NoNode < ZookeeperException; end
+ class NoAuth < ZookeeperException; end
+ class BadVersion < ZookeeperException; end
+ class NoChildrenForEphemerals < ZookeeperException; end
+ class NodeExists < ZookeeperException; end
+ class NotEmpty < ZookeeperException; end
+ class SessionExpired < ZookeeperException; end
+ class InvalidCallback < ZookeeperException; end
+ class InvalidACL < ZookeeperException; end
+ class AuthFailed < ZookeeperException; end
+ class Closing < ZookeeperException; end
+ class Nothing < ZookeeperException; end
+ class SessionMoved < ZookeeperException; end
+
+ def self.by_code(code)
+ case code
+ when ZOK then EverythingOk
+ when ZSYSTEMERROR then SystemError
+ when ZRUNTIMEINCONSISTENCY then RunTimeInconsistency
+ when ZDATAINCONSISTENCY then DataInconsistency
+ when ZCONNECTIONLOSS then ConnectionLoss
+ when ZMARSHALLINGERROR then MarshallingError
+ when ZUNIMPLEMENTED then Unimplemented
+ when ZOPERATIONTIMEOUT then OperationTimeOut
+ when ZBADARGUMENTS then BadArguments
+ when ZINVALIDSTATE then InvalidState
+ when ZAPIERROR then ApiError
+ when ZNONODE then NoNode
+ when ZNOAUTH then NoAuth
+ when ZBADVERSION then BadVersion
+ when ZNOCHILDRENFOREPHEMERALS then NoChildrenForEphemerals
+ when ZNODEEXISTS then NodeExists
+ when ZNOTEMPTY then NotEmpty
+ when ZSESSIONEXPIRED then SessionExpired
+ when ZINVALIDCALLBACK then InvalidCallback
+ when ZINVALIDACL then InvalidACL
+ when ZAUTHFAILED then AuthFailed
+ when ZCLOSING then Closing
+ when ZNOTHING then Nothing
+ when ZSESSIONMOVED then SessionMoved
+ else Exception.new("no exception defined for code #{code}")
+ end
+ end
+
+ def self.raise_on_error(code)
+ exc = self.by_code(code)
+ raise exc unless exc == EverythingOk
+ end
+ end
+end
View
12 lib/zookeeper/stat.rb
@@ -0,0 +1,12 @@
+module ZookeeperStat
+ class Stat
+ attr_reader :version, :exists
+ def initialize(val)
+ @exists = !!val
+ @czxid, @mzxid, @ctime, @mtime, @version, @cversion, @aversion,
+ @ephemeralOwner, @dataLength, @numChildren, @pzxid = val if val.is_a?(Array)
+ val.each { |k,v| instance_variable_set "@#{k}", v } if val.is_a?(Hash)
+ raise ArgumentError unless (val.is_a?(Hash) or val.is_a?(Array) or val == nil)
+ end
+ end
+end
View
55 test/test_basic.rb
@@ -1,63 +1,40 @@
+require 'rubygems'
+
HERE = File.expand_path(File.dirname(__FILE__))
require "#{HERE}/../lib/zookeeper"
z = Zookeeper.new("localhost:2181")
-puts "root: #{z.get_children("/").inspect}"
+puts "root: #{z.get_children(:path => "/").inspect}"
path = "/testing_node"
puts "working with path #{path}"
-stat = z.stat(path)
+h = z.stat(:path => path)
+stat = h[:stat]
puts "exists? #{stat.inspect}"
-unless stat.nil?
- z.get_children(path).each do |o|
+if stat.exists
+ z.get_children(:path => path)[:children].each do |o|
puts " child object: #{o}"
end
- puts "delete: #{z.delete(path, stat.version).inspect}"
+ puts "delete: #{z.delete(:path => path, :version => stat.version).inspect}"
end
-puts "create: #{z.create(path, "initial value", 0).inspect}"
+puts "create: #{z.create(:path => path, :data => 'initial value').inspect}"
-value, stat = z.get(path)
+v = z.get(:path => path)
+value, stat = v[:data], v[:stat]
puts "current value #{value}, stat #{stat.inspect}"
-puts "set: #{z.set(path, "this is a test", stat.version).inspect}"
+puts "set: #{z.set(:path => path, :data => 'this is a test', :version => stat.version).inspect}"