Skip to content
Browse files

Allow asynchronous connections

This patch allows to use Bucket instance in completely asynchronous
environment like this, without blocking on connect:

  conn = Couchbase.new(:async => true)
  conn.run do
    conn.on_connect do |res|
      if res.success?
        #
        # schedule async requests
        #
      end
    end
  end

Change-Id: I8be3f6e687591019c9bb21ada097ca137933ce48
Reviewed-on: http://review.couchbase.org/23703
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information...
1 parent c4b3642 commit 6970994b5a72668229396f8b4280c52b306cd7a4 @avsej avsej committed Jan 4, 2013
View
14 RELEASE_NOTES.markdown
@@ -22,6 +22,20 @@ bugfixes. Do not forget to update this doc in every important patch.
end
end
+* [major] Allow to use Bucket instance in completely asynchronous
+ environment like this, without blocking on connect:
+
+ conn = Couchbase.new(:async => true)
+ conn.run do
+ conn.on_connect do |res|
+ if res.success?
+ #
+ # schedule async requests
+ #
+ end
+ end
+ end
+
## 1.2.1 (2012-12-28)
* [major] RCBC-101 Persistence constraints wasn't passed to mutation
View
5 ext/couchbase_ext/arithmetic.c
@@ -75,9 +75,10 @@ cb_bucket_arithmetic(int sign, int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, sign > 0 ? cb_sym_increment : cb_sym_decrement)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
159 ext/couchbase_ext/bucket.c
@@ -17,13 +17,45 @@
#include "couchbase_ext.h"
+ static VALUE
+trigger_on_connect_callback(VALUE self)
+{
+ struct cb_bucket_st *bucket = DATA_PTR(self);
+ VALUE on_connect_proc = bucket->on_connect_proc;
+ if (RTEST(on_connect_proc)) {
+ VALUE res = rb_class_new_instance(0, NULL, cb_cResult);
+ rb_ivar_set(res, cb_id_iv_error, bucket->exception);
+ bucket->exception = Qnil;
+ rb_ivar_set(res, cb_id_iv_operation, cb_sym_connect);
+ rb_ivar_set(res, cb_id_iv_value, self);
+ return rb_funcall(on_connect_proc, cb_id_call, 1, res);
+ } else {
+ bucket->trigger_connect_cb_on_set = 1;
+ return Qnil;
+ }
+}
+
static void
error_callback(lcb_t handle, lcb_error_t error, const char *errinfo)
{
struct cb_bucket_st *bucket = (struct cb_bucket_st *)lcb_get_cookie(handle);
lcb_breakout(handle);
bucket->exception = cb_check_error(error, errinfo, Qnil);
+ if (bucket->async && !bucket->connected) {
+ (void)trigger_on_connect_callback(bucket->self);
+ }
+}
+
+ static void
+configuration_callback(lcb_t handle, lcb_configuration_t config)
+{
+ struct cb_bucket_st *bucket = (struct cb_bucket_st *)lcb_get_cookie(handle);
+
+ if (config == LCB_CONFIGURATION_NEW) {
+ bucket->connected = 1;
+ (void)trigger_on_connect_callback(bucket->self);
+ }
}
void
@@ -63,6 +95,7 @@ cb_bucket_mark(void *ptr)
rb_gc_mark(bucket->password);
rb_gc_mark(bucket->exception);
rb_gc_mark(bucket->on_error_proc);
+ rb_gc_mark(bucket->on_connect_proc);
rb_gc_mark(bucket->key_prefix_val);
st_foreach(bucket->object_space, cb_bucket_mark_object_i, (st_data_t)bucket);
}
@@ -229,6 +262,7 @@ do_scan_connection_options(struct cb_bucket_st *bucket, int argc, VALUE *argv)
rb_raise(rb_eArgError, "Couchbase: unknown engine %s", RSTRING_PTR(ins));
}
}
+ bucket->async = RTEST(rb_hash_aref(opts, cb_sym_async));
} else {
opts = Qnil;
}
@@ -256,6 +290,7 @@ do_connect(struct cb_bucket_st *bucket)
lcb_destroy_io_ops(bucket->io);
bucket->handle = NULL;
bucket->io = NULL;
+ bucket->connected = 0;
}
{
@@ -308,6 +343,7 @@ do_connect(struct cb_bucket_st *bucket)
(void)lcb_set_http_data_callback(bucket->handle, cb_http_data_callback);
(void)lcb_set_observe_callback(bucket->handle, cb_observe_callback);
(void)lcb_set_unlock_callback(bucket->handle, cb_unlock_callback);
+ (void)lcb_set_configuration_callback(bucket->handle, configuration_callback);
if (bucket->timeout > 0) {
lcb_set_timeout(bucket->handle, bucket->timeout);
@@ -323,13 +359,15 @@ do_connect(struct cb_bucket_st *bucket)
rb_exc_raise(cb_check_error(err, "failed to connect libcouchbase instance to server", Qnil));
}
bucket->exception = Qnil;
- lcb_wait(bucket->handle);
- if (bucket->exception != Qnil) {
- lcb_destroy(bucket->handle);
- lcb_destroy_io_ops(bucket->io);
- bucket->handle = NULL;
- bucket->io = NULL;
- rb_exc_raise(bucket->exception);
+ if (!bucket->async) {
+ lcb_wait(bucket->handle);
+ if (bucket->exception != Qnil) {
+ lcb_destroy(bucket->handle);
+ lcb_destroy_io_ops(bucket->io);
+ bucket->handle = NULL;
+ bucket->io = NULL;
+ rb_exc_raise(bucket->exception);
+ }
}
}
@@ -408,6 +446,11 @@ cb_bucket_alloc(VALUE klass)
* :default :: Built-in engine (multi-thread friendly)
* :libevent :: libevent IO plugin from libcouchbase (optional)
* :libev :: libev IO plugin from libcouchbase (optional)
+ * @option options [true, false] :async (false) If true, the
+ * connection instance will be considered always asynchronous and
+ * IO interaction will be occured only when {Couchbase::Bucket#run}
+ * called. See {Couchbase::Bucket#on_connect} to hook your code
+ * after the instance will be connected.
*
* @example Initialize connection using default options
* Couchbase.new
@@ -454,12 +497,15 @@ cb_bucket_init(int argc, VALUE *argv, VALUE self)
bucket->default_format = cb_sym_document;
bucket->default_observe_timeout = 2500000;
bucket->on_error_proc = Qnil;
+ bucket->on_connect_proc = Qnil;
bucket->timeout = 0;
bucket->environment = cb_sym_production;
bucket->key_prefix_val = Qnil;
bucket->node_list = Qnil;
bucket->object_space = st_init_numtable();
bucket->destroying = 0;
+ bucket->connected = 0;
+ bucket->on_connect_proc = Qnil;
do_scan_connection_options(bucket, argc, argv);
do_connect(bucket);
@@ -513,9 +559,13 @@ cb_bucket_init_copy(VALUE copy, VALUE orig)
if (orig_b->on_error_proc != Qnil) {
copy_b->on_error_proc = rb_funcall(orig_b->on_error_proc, cb_id_dup, 0);
}
+ if (orig_b->on_connect_proc != Qnil) {
+ copy_b->on_connect_proc = rb_funcall(orig_b->on_connect_proc, cb_id_dup, 0);
+ }
copy_b->key_prefix_val = orig_b->key_prefix_val;
copy_b->object_space = st_init_numtable();
copy_b->destroying = 0;
+ copy_b->connected = 0;
do_connect(copy_b);
@@ -566,7 +616,7 @@ cb_bucket_reconnect(int argc, VALUE *argv, VALUE self)
cb_bucket_connected_p(VALUE self)
{
struct cb_bucket_st *bucket = DATA_PTR(self);
- return bucket->handle ? Qtrue : Qfalse;
+ return (bucket->handle && bucket->connected) ? Qtrue : Qfalse;
}
/* Document-method: async?
@@ -691,6 +741,51 @@ cb_bucket_on_error_get(VALUE self)
}
}
+ static
+VALUE trigger_on_connect_callback_block(VALUE nil, VALUE self)
+{
+ (void)nil;
+ return trigger_on_connect_callback(self);
+}
+
+ VALUE
+cb_bucket_on_connect_set(VALUE self, VALUE val)
+{
+ struct cb_bucket_st *bucket = DATA_PTR(self);
+
+ if (rb_respond_to(val, cb_id_call)) {
+ bucket->on_connect_proc = val;
+ if (bucket->trigger_connect_cb_on_set) {
+ bucket->trigger_connect_cb_on_set = 0;
+ if (bucket->async) {
+ VALUE args[] = {INT2FIX(0)};
+ /* setup timer with zero interval to call on_connect
+ * callback on the next tick */
+ rb_block_call(bucket->self, cb_id_create_timer, 1,
+ args, trigger_on_connect_callback_block, bucket->self);
+ } else {
+ trigger_on_connect_callback(self);
+ }
+ }
+ } else {
+ bucket->on_connect_proc = Qnil;
+ }
+
+ return bucket->on_connect_proc;
+}
+
+ VALUE
+cb_bucket_on_connect_get(VALUE self)
+{
+ struct cb_bucket_st *bucket = DATA_PTR(self);
+
+ if (rb_block_given_p()) {
+ return cb_bucket_on_connect_set(self, rb_block_proc());
+ } else {
+ return bucket->on_connect_proc;
+ }
+}
+
VALUE
cb_bucket_timeout_get(VALUE self)
{
@@ -984,7 +1079,7 @@ cb_bucket_inspect(VALUE self)
rb_id2name(SYM2ID(bucket->default_format)),
bucket->default_flags,
bucket->quiet ? "true" : "false",
- bucket->handle ? "true" : "false",
+ (bucket->handle && bucket->connected) ? "true" : "false",
bucket->timeout);
rb_str_buf_cat2(str, buf);
if (RTEST(bucket->key_prefix_val)) {
@@ -1015,12 +1110,14 @@ cb_maybe_do_loop(struct cb_bucket_st *bucket)
do_run(VALUE *args)
{
VALUE self = args[0], opts = args[1], proc = args[2], exc;
+ VALUE was_async = args[3];
struct cb_bucket_st *bucket = DATA_PTR(self);
if (bucket->handle == NULL) {
rb_raise(cb_eConnectError, "closed connection");
}
- if (bucket->async) {
+
+ if (bucket->running) {
rb_raise(cb_eInvalidError, "nested #run");
}
bucket->threshold = 0;
@@ -1033,12 +1130,29 @@ do_run(VALUE *args)
}
}
bucket->async = 1;
- cb_proc_call(bucket, proc, 1, self);
+ bucket->running = 1;
+ if (proc != Qnil) {
+ cb_proc_call(bucket, proc, 1, self);
+ }
+ if (bucket->exception != Qnil) {
+ exc = bucket->exception;
+ bucket->exception = Qnil;
+ if (was_async) {
+ cb_async_error_notify(bucket, exc);
+ /* XXX return here? */
+ } else {
+ rb_exc_raise(exc);
+ }
+ }
do_loop(bucket);
if (bucket->exception != Qnil) {
exc = bucket->exception;
bucket->exception = Qnil;
- rb_exc_raise(exc);
+ if (!was_async) {
+ rb_exc_raise(exc);
+ }
+ /* async connections notified immediately from the callbacks
+ * via cb_async_error_notify() */
}
return Qnil;
}
@@ -1049,7 +1163,9 @@ ensure_run(VALUE *args)
VALUE self = args[0];
struct cb_bucket_st *bucket = DATA_PTR(self);
- bucket->async = 0;
+ bucket->running = 0;
+ bucket->async = args[3];
+ bucket->running = args[4];
return Qnil;
}
@@ -1091,18 +1207,30 @@ ensure_run(VALUE *args)
* end
* # all commands were executed and sent is 3 now
*
+ * @example Use {Couchbase::Bucket#run} without block for async connection
+ * c = Couchbase.new(:async => true)
+ * c.run # ensure that instance connected
+ * c.set("foo", "bar"){|r| puts r.cas}
+ * c.run
+ *
* @return [nil]
*
* @raise [Couchbase::Error::Connect] if connection closed (see {Bucket#reconnect})
*/
VALUE
cb_bucket_run(int argc, VALUE *argv, VALUE self)
{
- VALUE args[3];
+ struct cb_bucket_st *bucket = DATA_PTR(self);
+ VALUE args[5];
- rb_need_block();
+ /* it is allowed to omit block for async connections */
+ if (!bucket->async) {
+ rb_need_block();
+ }
args[0] = self;
rb_scan_args(argc, argv, "01&", &args[1], &args[2]);
+ args[3] = bucket->async;
+ args[4] = bucket->running;
rb_ensure(do_run, (VALUE)args, ensure_run, (VALUE)args);
return Qnil;
}
@@ -1152,6 +1280,7 @@ cb_bucket_disconnect(VALUE self)
lcb_destroy_io_ops(bucket->io);
bucket->handle = NULL;
bucket->io = NULL;
+ bucket->connected = 0;
return Qtrue;
} else {
rb_raise(cb_eConnectError, "closed connection");
View
56 ext/couchbase_ext/couchbase_ext.c
@@ -35,11 +35,13 @@ VALUE cb_mURI;
ID cb_sym_add;
ID cb_sym_append;
ID cb_sym_assemble_hash;
+ID cb_sym_async;
ID cb_sym_body;
ID cb_sym_bucket;
ID cb_sym_cas;
ID cb_sym_chunked;
ID cb_sym_cluster;
+ID cb_sym_connect;
ID cb_sym_content_type;
ID cb_sym_create;
ID cb_sym_decrement;
@@ -101,6 +103,7 @@ ID cb_sym_version;
ID cb_sym_view;
ID cb_id_arity;
ID cb_id_call;
+ID cb_id_create_timer;
ID cb_id_delete;
ID cb_id_dump;
ID cb_id_dup;
@@ -552,6 +555,7 @@ Init_couchbase_ext(void)
* @return [String]
*/
rb_define_attr(cb_cResult, "value", 1, 0);
+ rb_define_alias(cb_cResult, "bucket", "value");
cb_id_iv_value = rb_intern("@value");
/* Document-method: cas
*
@@ -833,27 +837,62 @@ Init_couchbase_ext(void)
*
* This callback is using to deliver exceptions in asynchronous mode.
*
- * @yieldparam [Symbol] op The operation caused the error
- * @yieldparam [String] key The key which cause the error or +nil+
* @yieldparam [Exception] exc The exception instance
*
* @example Using lambda syntax
- * connection = Couchbase.new(:async => true)
- * connection.on_error = lambda {|op, key, exc| ... }
+ * connection = Couchbase.connect
+ * connection.on_error = lambda {|exc| ... }
* connection.run do |conn|
* conn.set("foo", "bar")
* end
*
* @example Using block syntax
- * connection = Couchbase.new(:async => true)
- * connection.on_error {|op, key, exc| ... }
- * ...
+ * connection = Couchbase.connect
+ * connection.on_error {|exc| ... }
+ * connection.run do |conn|
+ * conn.set("foo", "bar")
+ * end
*
* @return [Proc] the effective callback */
/* rb_define_attr(cb_cBucket, "on_error", 1, 1); */
rb_define_method(cb_cBucket, "on_error", cb_bucket_on_error_get, 0);
rb_define_method(cb_cBucket, "on_error=", cb_bucket_on_error_set, 1);
+ /* Document-method: on_connect
+ * Connection callback for asynchronous mode.
+ *
+ * @since 1.3.0
+ *
+ * This callback used to notify that bucket instance is connected
+ * and ready to handle requests in asynchronous mode.
+ *
+ * @yieldparam [Result] result The result instance, with valid
+ * properties +#error+, +#success?+, +#operation+ and +#bucket+
+ *
+ * @example Using lambda syntax
+ * connection = Couchbase.new(:async => true)
+ * connection.on_connect = lambda do |ret|
+ * if ret.success?
+ * conn.set("foo", "bar")
+ * end
+ * end
+ * connection.run
+ *
+ * @example Using block syntax
+ * connection = Couchbase.new(:async => true)
+ * connection.run do |conn|
+ * connection.on_connect do |ret|
+ * if ret.success?
+ * conn.set("foo", "bar")
+ * end
+ * end
+ * end
+ *
+ * @return [Proc] the effective callback */
+ /* rb_define_attr(cb_cBucket, "on_connect", 1, 1); */
+ rb_define_method(cb_cBucket, "on_connect", cb_bucket_on_connect_get, 0);
+ rb_define_method(cb_cBucket, "on_connect=", cb_bucket_on_connect_set, 1);
+
/* Document-method: url
*
* The config url for this connection.
@@ -1013,6 +1052,7 @@ Init_couchbase_ext(void)
/* Define cb_symbols */
cb_id_arity = rb_intern("arity");
cb_id_call = rb_intern("call");
+ cb_id_create_timer = rb_intern("create_timer");
cb_id_delete = rb_intern("delete");
cb_id_dump = rb_intern("dump");
cb_id_dup = rb_intern("dup");
@@ -1036,11 +1076,13 @@ Init_couchbase_ext(void)
cb_sym_add = ID2SYM(rb_intern("add"));
cb_sym_append = ID2SYM(rb_intern("append"));
cb_sym_assemble_hash = ID2SYM(rb_intern("assemble_hash"));
+ cb_sym_async = ID2SYM(rb_intern("async"));
cb_sym_body = ID2SYM(rb_intern("body"));
cb_sym_bucket = ID2SYM(rb_intern("bucket"));
cb_sym_cas = ID2SYM(rb_intern("cas"));
cb_sym_chunked = ID2SYM(rb_intern("chunked"));
cb_sym_cluster = ID2SYM(rb_intern("cluster"));
+ cb_sym_connect = ID2SYM(rb_intern("connect"));
cb_sym_content_type = ID2SYM(rb_intern("content_type"));
cb_sym_create = ID2SYM(rb_intern("create"));
cb_sym_decrement = ID2SYM(rb_intern("decrement"));
View
10 ext/couchbase_ext/couchbase_ext.h
@@ -91,6 +91,9 @@ struct cb_bucket_st
VALUE engine;
int async;
int quiet;
+ uint8_t connected; /* non-zero if instance has been connected. it is possible to defer connection with :async option */
+ uint8_t running; /* non-zero if event loop is running */
+ uint8_t trigger_connect_cb_on_set; /* if non-zero, the on_connect callback will be triggered immediately after set */
VALUE default_format; /* should update +default_flags+ on change */
uint32_t default_flags;
time_t default_ttl;
@@ -102,6 +105,7 @@ struct cb_bucket_st
size_t nbytes; /* the number of bytes scheduled to be sent */
VALUE exception; /* error delivered by error_callback */
VALUE on_error_proc; /* is using to deliver errors in async mode */
+ VALUE on_connect_proc; /* used to notify that instance ready to handle requests in async mode */
VALUE environment; /* sym_development or sym_production */
VALUE key_prefix_val;
VALUE node_list;
@@ -169,11 +173,13 @@ extern VALUE cb_mURI;
extern ID cb_sym_add;
extern ID cb_sym_append;
extern ID cb_sym_assemble_hash;
+extern ID cb_sym_async;
extern ID cb_sym_body;
extern ID cb_sym_bucket;
extern ID cb_sym_cas;
extern ID cb_sym_chunked;
extern ID cb_sym_cluster;
+extern ID cb_sym_connect;
extern ID cb_sym_content_type;
extern ID cb_sym_create;
extern ID cb_sym_decrement;
@@ -235,6 +241,7 @@ extern ID cb_sym_version;
extern ID cb_sym_view;
extern ID cb_id_arity;
extern ID cb_id_call;
+extern ID cb_id_create_timer;
extern ID cb_id_delete;
extern ID cb_id_dump;
extern ID cb_id_dup;
@@ -313,6 +320,7 @@ typedef void (*mark_f)(void *, struct cb_bucket_st*);
void cb_strip_key_prefix(struct cb_bucket_st *bucket, VALUE key);
VALUE cb_check_error(lcb_error_t rc, const char *msg, VALUE key);
VALUE cb_check_error_with_status(lcb_error_t rc, const char *msg, VALUE key, lcb_http_status_t status);
+int cb_bucket_connected_bang(struct cb_bucket_st *bucket, VALUE operation);
void cb_gc_protect_ptr(struct cb_bucket_st *bucket, void *ptr, mark_f mark_func);
void cb_gc_unprotect_ptr(struct cb_bucket_st *bucket, void *ptr);
VALUE cb_proc_call(struct cb_bucket_st *bucket, VALUE recv, int argc, ...);
@@ -378,6 +386,8 @@ VALUE cb_bucket_default_format_get(VALUE self);
VALUE cb_bucket_default_format_set(VALUE self, VALUE val);
VALUE cb_bucket_on_error_set(VALUE self, VALUE val);
VALUE cb_bucket_on_error_get(VALUE self);
+VALUE cb_bucket_on_connect_set(VALUE self, VALUE val);
+VALUE cb_bucket_on_connect_get(VALUE self);
VALUE cb_bucket_timeout_get(VALUE self);
VALUE cb_bucket_timeout_set(VALUE self, VALUE val);
VALUE cb_bucket_key_prefix_get(VALUE self);
View
5 ext/couchbase_ext/delete.c
@@ -107,9 +107,10 @@ cb_bucket_delete(int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_delete)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
5 ext/couchbase_ext/get.c
@@ -223,9 +223,10 @@ cb_bucket_get(int argc, VALUE *argv, VALUE self)
lcb_error_t err = LCB_SUCCESS;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_get)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
4 ext/couchbase_ext/http.c
@@ -292,8 +292,8 @@ cb_http_request_perform(VALUE self)
lcb_error_t err;
struct cb_bucket_st *bucket = req->bucket;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_http_request)) {
+ return Qnil;
}
ctx = cb_context_alloc(bucket);
View
5 ext/couchbase_ext/observe.c
@@ -119,9 +119,10 @@ cb_bucket_observe(int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_observe)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
5 ext/couchbase_ext/stats.c
@@ -113,9 +113,10 @@ cb_bucket_stats(int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_stats)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
43 ext/couchbase_ext/store.c
@@ -40,6 +40,25 @@ storage_observe_callback(VALUE args, VALUE cookie)
return Qnil;
}
+ VALUE
+storage_opcode_to_sym(lcb_storage_t operation)
+{
+ switch(operation) {
+ case LCB_ADD:
+ return cb_sym_add;
+ case LCB_REPLACE:
+ return cb_sym_replace;
+ case LCB_SET:
+ return cb_sym_set;
+ case LCB_APPEND:
+ return cb_sym_append;
+ case LCB_PREPEND:
+ return cb_sym_prepend;
+ default:
+ return Qnil;
+ }
+}
+
void
cb_storage_callback(lcb_t handle, const void *cookie, lcb_storage_t operation,
lcb_error_t error, const lcb_store_resp_t *resp)
@@ -52,25 +71,7 @@ cb_storage_callback(lcb_t handle, const void *cookie, lcb_storage_t operation,
cb_strip_key_prefix(bucket, key);
cas = resp->v.v0.cas > 0 ? ULL2NUM(resp->v.v0.cas) : Qnil;
- switch(operation) {
- case LCB_ADD:
- ctx->operation = cb_sym_add;
- break;
- case LCB_REPLACE:
- ctx->operation = cb_sym_replace;
- break;
- case LCB_SET:
- ctx->operation = cb_sym_set;
- break;
- case LCB_APPEND:
- ctx->operation = cb_sym_append;
- break;
- case LCB_PREPEND:
- ctx->operation = cb_sym_prepend;
- break;
- default:
- ctx->operation = Qnil;
- }
+ ctx->operation = storage_opcode_to_sym(operation);
exc = cb_check_error(error, "failed to store value", key);
if (exc != Qnil) {
rb_ivar_set(exc, cb_id_iv_cas, cas);
@@ -120,8 +121,8 @@ cb_bucket_store(lcb_storage_t cmd, int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, storage_opcode_to_sym(cmd))) {
+ return Qnil;
}
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
View
5 ext/couchbase_ext/touch.c
@@ -130,9 +130,10 @@ cb_bucket_touch(int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_touch)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
5 ext/couchbase_ext/unlock.c
@@ -120,9 +120,10 @@ cb_bucket_unlock(int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_unlock)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
17 ext/couchbase_ext/utils.c
@@ -65,6 +65,23 @@ cb_async_error_notify(struct cb_bucket_st *bucket, VALUE exc)
}
}
+ int
+cb_bucket_connected_bang(struct cb_bucket_st *bucket, VALUE operation)
+{
+ if (bucket->handle == NULL || !bucket->connected) {
+ VALUE exc = rb_exc_new2(cb_eConnectError, "not connected to the server");
+ rb_ivar_set(exc, cb_id_iv_operation, operation);
+ rb_ivar_set(exc, cb_id_iv_value, bucket->self);
+ if (bucket->async) {
+ cb_async_error_notify(bucket, exc);
+ } else {
+ rb_exc_raise(exc);
+ }
+ return 0;
+ }
+ return 1;
+}
+
static VALUE
func_call_failed(VALUE ptr, VALUE exc)
{
View
5 ext/couchbase_ext/version.c
@@ -94,9 +94,10 @@ cb_bucket_version(int argc, VALUE *argv, VALUE self)
lcb_error_t err;
struct cb_params_st params;
- if (bucket->handle == NULL) {
- rb_raise(cb_eConnectError, "closed connection");
+ if (!cb_bucket_connected_bang(bucket, cb_sym_version)) {
+ return Qnil;
}
+
memset(&params, 0, sizeof(struct cb_params_st));
rb_scan_args(argc, argv, "0*&", &params.args, &proc);
if (!bucket->async && proc != Qnil) {
View
63 test/test_async.rb
@@ -250,4 +250,67 @@ def test_send_threshold
end
end
+ def test_asynchronous_connection
+ connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port, :async => true)
+ refute connection.connected?, "new asynchronous connection must be disconnected"
+ connection.on_connect do |res|
+ assert res.success?, "on_connect called with error #{res.error.inspect}"
+ assert_same connection, res.bucket
+ end
+ connection.run {}
+ assert connection.connected?, "it should be connected after first run"
+ end
+
+ def test_it_calls_callback_immediately_if_connected_sync
+ connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port)
+ assert connection.connected?, "connection wasn't established in sync mode"
+ called = false
+ connection.on_connect do |res|
+ assert res.success?, "on_connect called with error #{res.error.inspect}"
+ called = true
+ end
+ assert called, "the callback hasn't been called on set"
+ called = false
+ connection.on_connect do |res|
+ assert res.success?, "on_connect called with error #{res.error.inspect}"
+ called = true
+ end
+ refute called, "the callback must not be called on subsequent sets"
+ end
+
+ def test_it_calls_callback_immediately_if_connected_async
+ connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port, :async => true)
+ refute connection.connected?, "new asynchronous connection must be disconnected"
+ called = false
+ connection.run {}
+ assert connection.connected?, "the connection must be established"
+ connection.run do
+ connection.on_connect do |res|
+ assert res.success?, "on_connect called with error #{res.error.inspect}"
+ called = true
+ end
+ end
+ assert called, "the callback hasn't been called on set"
+ called = false
+ connection.run do
+ connection.on_connect do |res|
+ assert res.success?, "on_connect called with error #{res.error.inspect}"
+ called = true
+ end
+ end
+ refute called, "the callback must not be called on subsequent sets"
+ end
+
+ def test_it_returns_error_if_user_start_work_on_disconnected_instance_outside_on_connect_callback
+ connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port, :async => true)
+ refute connection.connected?, "new asynchronous connection must be disconnected"
+ error = nil
+ connection.on_error do |ex|
+ error = ex
+ end
+ connection.run do |c|
+ c.set("foo", "bar")
+ end
+ assert_instance_of(Couchbase::Error::Connect, error)
+ end
end

0 comments on commit 6970994

Please sign in to comment.
Something went wrong with that request. Please try again.