Permalink
Browse files

RCBC-27 EventMachine plugin

Introduce plugin which allows to use EventMachine as event loop for
couchbase client.

With Ruby 1.9 and Fibers it is possible to use blocking interface,
otherwise connection should be asynchronouse and connection.run should not
be used.

Change-Id: I6a5bd7c86f9ce5f3b7090b0c58bc411d6b6a52f7
Reviewed-on: http://review.couchbase.org/23347
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information...
1 parent b6ebc57 commit 89f76f3cbf39476e49bcd478019b6bb2121524a7 @funny-falcon funny-falcon committed with avsej Jan 17, 2013
View
55 README.markdown
@@ -6,9 +6,6 @@ are related libraries available:
* [couchbase-model][6] the ActiveModel implementation, git repository:
[https://github.com/couchbase/couchbase-ruby-model][7]
-* [em-couchbase][8] EventMachine friendly implementation of couchbase
- client, git repository: [https://github.com/couchbase/couchbase-ruby-client-em][9]
-
## SUPPORT
If you found an issue, please file it in our [JIRA][1]. Also you are
@@ -520,13 +517,61 @@ Note that errors object in view results usually goes *after* the rows,
so you will likely receive a number of view results successfully before
the error is detected.
+## Engines
+
+As far as couchbase gem uses [libcouchbase][8] as the backend, you can
+choose from several asynchronous IO options:
+
+* `:default` this one is used by default and implemented as the part
+ of the ruby extensions (this mean you don't need any dependencies
+ apart from libcouchbase2-core and libcouchbase-dev to build and use
+ it). This engine honours ruby GVL, so when it comes to waiting for
+ IO operations from kernel it release the GVL allowing interpreter to
+ run your code. This technique isn't available on windows, but down't
+ worry `:default` engine still accessible and will pick up statically
+ linked on that platform `:libevent` engine.
+
+* `:libev` and `:libevent`, these two engines require installed
+ libcouchbase2-libev and libcouchbase2-libevent packages
+ correspondingly. Currently they aren't so friendly to GVL but still
+ useful.
+
+* `:eventmachine` engine. From version 1.2.2 it is possible to use
+ great [EventMachine][9] library as underlying IO backend and
+ integrate couchbase gem to your current asynchronous application.
+ This engine will be only accessible on the MRI ruby 1.9+. Checkout
+ simple example of usage:
+
+ require 'eventmachine'
+ require 'couchbase'
+
+ EM.epoll = true if EM.epoll?
+ EM.kqueue = true if EM.kqueue?
+ EM.run do
+ con = Couchbase.connect :engine => :eventmachine, :async => true
+ con.on_connect do |res|
+ puts "connected: #{res.inspect}"
+ if res.success?
+ con.set("emfoo", "bar") do |res|
+ puts "set: #{res.inspect}"
+ con.get("emfoo") do |res|
+ puts "get: #{res.inspect}"
+ EM.stop
+ end
+ end
+ else
+ EM.stop
+ end
+ end
+ end
+
[1]: http://couchbase.com/issues/browse/RCBC
[2]: http://freenode.net/irc_servers.shtml
[3]: http://www.couchbase.com/develop/c/current
[4]: https://github.com/mxcl/homebrew/pulls/avsej
[5]: http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped
[6]: https://rubygems.org/gems/couchbase-model
[7]: https://github.com/couchbase/couchbase-ruby-model
-[8]: https://rubygems.org/gems/em-couchbase
-[9]: https://github.com/couchbase/couchbase-ruby-client-em
+[8]: http://www.couchbase.com/develop/c/current
+[9]: http://rubygems.org/gems/eventmachine
View
98 RELEASE_NOTES.markdown
@@ -16,25 +16,53 @@ bugfixes. Do not forget to update this doc in every important patch.
* [minor] View#fetch_all - async method for fetching all records
- conn.run do
- doc.recent_posts.fetch_all do |posts|
- do_something_with_all_posts(posts)
- end
- end
+ conn.run do
+ doc.recent_posts.fetch_all do |posts|
+ do_something_with_all_posts(posts)
+ end
+ end
* [major] Allow to use Bucket instance in completely asynchronous
environment like this, without blocking on connect:
- conn = Couchbase.new(:async => true)
- conn.run do
- conn.on_connect do |res|
- if res.success?
- #
- # schedule async requests
- #
- end
- end
- end
+ conn = Couchbase.new(:async => true)
+ conn.run do
+ conn.on_connect do |res|
+ if res.success?
+ #
+ # schedule async requests
+ #
+ end
+ end
+ end
+
+* [major] RCBC-27 EventMachine plugin to integrate with EventMachine
+ library. Note that the plugin is experimental at this stage.
+ Example:
+
+ require 'eventmachine'
+ require 'couchbase'
+
+ EM.epoll = true if EM.epoll?
+ EM.kqueue = true if EM.kqueue?
+ EM.run do
+ con = Couchbase.connect(:engine => :eventmachine, :async => true)
+ con.on_connect do |res|
+ puts "connected: #{res.inspect}"
+ if res.success?
+ con.set("emfoo", "bar") do |res|
+ puts "set: #{res.inspect}"
+ con.get("emfoo") do |res|
+ puts "get: #{res.inspect}"
+ EM.stop
+ end
+ end
+ else
+ EM.stop
+ end
+ end
+ end
+
## 1.2.1 (2012-12-28)
@@ -120,11 +148,11 @@ bugfixes. Do not forget to update this doc in every important patch.
* RCBC-52 Implement bucket create/delete operations. Examples:
- conn = Couchbase::Cluster.new(:hostname => "localhost",
- :username => "Administrator", :password => "secret")
- conn.create_bucket("my_protected_bucket",
- :ram_quota => 500, # megabytes
- :sasl_password => "s3cr3tBuck3t")
+ conn = Couchbase::Cluster.new(:hostname => "localhost",
+ :username => "Administrator", :password => "secret")
+ conn.create_bucket("my_protected_bucket",
+ :ram_quota => 500, # megabytes
+ :sasl_password => "s3cr3tBuck3t")
* Propagate status code for HTTP responses
@@ -168,13 +196,13 @@ bugfixes. Do not forget to update this doc in every important patch.
* RCBC-28 Implement Bucket#unlock
- # Unlock the single key
- val, _, cas = c.get("foo", :lock => true, :extended => true)
- c.unlock("foo", :cas => cas)
+ # Unlock the single key
+ val, _, cas = c.get("foo", :lock => true, :extended => true)
+ c.unlock("foo", :cas => cas)
- # Unlock several keys
- c.unlock("foo" => cas1, :bar => cas2)
- #=> {"foo" => true, "bar" => true}
+ # Unlock several keys
+ c.unlock("foo" => cas1, :bar => cas2)
+ #=> {"foo" => true, "bar" => true}
* Fix CAS conversion for Bucket#delete method for 32-bit systems
@@ -191,7 +219,7 @@ bugfixes. Do not forget to update this doc in every important patch.
* RCBC-37 Allow to pass intial list of nodes which will allow to
iterate addresses until alive node will be found.
- Couchbase.connect(:node_list => ['example.com:8091', 'example.org:8091', 'example.net'])
+ Couchbase.connect(:node_list => ['example.com:8091', 'example.org:8091', 'example.net'])
* RCBC-70 Fixed UTF-8 in the keys. Original discussion
https://groups.google.com/d/topic/couchbase/bya0lSf9uGE/discussion
@@ -208,15 +236,15 @@ bugfixes. Do not forget to update this doc in every important patch.
* RCBC-6 Implement Bucket#observe command to query durable state.
Examples:
- # Query state of single key
- c.observe("foo")
- #=> [#<Couchbase::Result:0x00000001650df0 ...>, ...]
+ # Query state of single key
+ c.observe("foo")
+ #=> [#<Couchbase::Result:0x00000001650df0 ...>, ...]
- # Query state of multiple keys
- keys = ["foo", "bar"]
- stats = c.observe(keys)
- stats.size #=> 2
- stats["foo"] #=> [#<Couchbase::Result:0x00000001650df0 ...>, ...]
+ # Query state of multiple keys
+ keys = ["foo", "bar"]
+ stats = c.observe(keys)
+ stats.size #=> 2
+ stats["foo"] #=> [#<Couchbase::Result:0x00000001650df0 ...>, ...]
* RCBC-49 Storage functions with durability requirements
View
1 couchbase.gemspec
@@ -44,4 +44,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'mini_portile'
s.add_development_dependency 'yajl-ruby', '~> 1.1.0'
s.add_development_dependency 'active_support'
+ s.add_development_dependency 'eventmachine'
end
View
33 ext/couchbase_ext/bucket.c
@@ -257,6 +257,10 @@ do_scan_connection_options(struct cb_bucket_st *bucket, int argc, VALUE *argv)
bucket->engine = cb_sym_libev;
} else if (arg == cb_sym_libevent) {
bucket->engine = cb_sym_libevent;
+#ifdef BUILD_EVENTMACHINE_PLUGIN
+ } else if (arg == cb_sym_eventmachine) {
+ bucket->engine = cb_sym_eventmachine;
+#endif
} else {
VALUE ins = rb_funcall(arg, rb_intern("inspect"), 0);
rb_raise(rb_eArgError, "Couchbase: unknown engine %s", RSTRING_PTR(ins));
@@ -279,6 +283,17 @@ do_scan_connection_options(struct cb_bucket_st *bucket, int argc, VALUE *argv)
rb_str_freeze(bucket->authority);
}
+ static VALUE
+em_disconnect_block(VALUE unused, VALUE self)
+{
+ struct cb_bucket_st *bucket = DATA_PTR(self);
+ if (bucket->handle) {
+ return cb_bucket_disconnect(self);
+ }
+ (void)unused;
+ return Qnil;
+}
+
static void
do_connect(struct cb_bucket_st *bucket)
{
@@ -302,6 +317,11 @@ do_connect(struct cb_bucket_st *bucket)
ciops.v.v0.type = LCB_IO_OPS_LIBEVENT;
} else if (bucket->engine == cb_sym_libev) {
ciops.v.v0.type = LCB_IO_OPS_LIBEV;
+ } else if (bucket->engine == cb_sym_eventmachine) {
+ ciops.version = 1;
+ ciops.v.v1.sofile = NULL;
+ ciops.v.v1.symbol = "cb_create_ruby_em_io_opts";
+ ciops.v.v1.cookie = bucket;
} else {
#ifdef _WIN32
ciops.v.v0.type = LCB_IO_OPS_DEFAULT;
@@ -359,6 +379,10 @@ do_connect(struct cb_bucket_st *bucket)
rb_exc_raise(cb_check_error(err, "failed to connect libcouchbase instance to server", Qnil));
}
bucket->exception = Qnil;
+ if (bucket->engine == cb_sym_eventmachine && !bucket->async_disconnect_hook_set) {
+ bucket->async_disconnect_hook_set = 1;
+ rb_block_call(em_m, cb_id_add_shutdown_hook, 0, NULL, em_disconnect_block, bucket->self);
+ }
if (!bucket->async) {
lcb_wait(bucket->handle);
if (bucket->exception != Qnil) {
@@ -443,9 +467,10 @@ cb_bucket_alloc(VALUE klass)
* {Bucket#incr} and {Bucket#decr}).
* @option options [Symbol] :engine (:default) the IO engine to use
* Currently following engines are supported:
- * :default :: Built-in engine (multi-thread friendly)
- * :libevent :: libevent IO plugin from libcouchbase (optional)
- * :libev :: libev IO plugin from libcouchbase (optional)
+ * :default :: Built-in engine (multi-thread friendly)
+ * :libevent :: libevent IO plugin from libcouchbase (optional)
+ * :libev :: libev IO plugin from libcouchbase (optional)
+ * :eventmachine :: EventMachine plugin (builtin, but requires EM gem and ruby 1.9+)
* @option options [true, false] :async (false) If true, the
* connection instance will be considered always asynchronous and
* IO interaction will be occured only when {Couchbase::Bucket#run}
@@ -506,6 +531,7 @@ cb_bucket_init(int argc, VALUE *argv, VALUE self)
bucket->destroying = 0;
bucket->connected = 0;
bucket->on_connect_proc = Qnil;
+ bucket->async_disconnect_hook_set = 0;
do_scan_connection_options(bucket, argc, argv);
do_connect(bucket);
@@ -556,6 +582,7 @@ cb_bucket_init_copy(VALUE copy, VALUE orig)
copy_b->environment = orig_b->environment;
copy_b->timeout = orig_b->timeout;
copy_b->exception = Qnil;
+ copy_b->async_disconnect_hook_set = 0;
if (orig_b->on_error_proc != Qnil) {
copy_b->on_error_proc = rb_funcall(orig_b->on_error_proc, cb_id_dup, 0);
}
View
30 ext/couchbase_ext/couchbase_ext.c
@@ -30,6 +30,7 @@ VALUE cb_mError;
VALUE cb_mMarshal;
VALUE cb_mMultiJson;
VALUE cb_mURI;
+VALUE em_m;
/* Symbols */
ID cb_sym_add;
@@ -57,6 +58,7 @@ ID cb_sym_development;
ID cb_sym_document;
ID cb_sym_engine;
ID cb_sym_environment;
+ID cb_sym_eventmachine;
ID cb_sym_extended;
ID cb_sym_flags;
ID cb_sym_format;
@@ -101,6 +103,7 @@ ID cb_sym_unlock;
ID cb_sym_username;
ID cb_sym_version;
ID cb_sym_view;
+ID cb_id_add_shutdown_hook;
ID cb_id_arity;
ID cb_id_call;
ID cb_id_create_timer;
@@ -127,6 +130,7 @@ ID cb_id_iv_time_to_replicate;
ID cb_id_iv_value;
ID cb_id_load;
ID cb_id_match;
+ID cb_id_next_tick;
ID cb_id_observe_and_wait;
ID cb_id_parse;
ID cb_id_parse_body_bang;
@@ -194,6 +198,9 @@ Init_couchbase_ext(void)
{
VALUE interned;
+ /* just a holder for EventMachine module */
+ em_m = 0;
+
cb_mMultiJson = rb_const_get(rb_cObject, rb_intern("MultiJson"));
cb_mURI = rb_const_get(rb_cObject, rb_intern("URI"));
cb_mMarshal = rb_const_get(rb_cObject, rb_intern("Marshal"));
@@ -888,6 +895,26 @@ Init_couchbase_ext(void)
* end
* end
*
+ * @example
+ * EM.run do
+ * pool = Pool.new
+ * connection = Couchbase.new(:engine => :eventmachine, :async => true)
+ * connection.on_connect do |result|
+ * unless result.success?
+ * $stderr.puts "Could not connect to CouchBase #{result.error}"
+ * else
+ * pool.add result.bucket
+ * end
+ * end
+ * end
+ *
+ * @example
+ * EM.run do
+ * pool = Pool.new
+ * connection = Couchbase.new(:engine => :eventmachine, :async => true)
+ * connection.on_connect = pool.method(:couchbase_connect_callback)
+ * end
+ *
* @return [Proc] the effective callback */
/* rb_define_attr(cb_cBucket, "on_connect", 1, 1); */
rb_define_method(cb_cBucket, "on_connect", cb_bucket_on_connect_get, 0);
@@ -1050,6 +1077,7 @@ Init_couchbase_ext(void)
rb_define_method(cb_cTimer, "cancel", cb_timer_cancel, 0);
/* Define cb_symbols */
+ cb_id_add_shutdown_hook = rb_intern("add_shutdown_hook");
cb_id_arity = rb_intern("arity");
cb_id_call = rb_intern("call");
cb_id_create_timer = rb_intern("create_timer");
@@ -1061,6 +1089,7 @@ Init_couchbase_ext(void)
cb_id_host = rb_intern("host");
cb_id_load = rb_intern("load");
cb_id_match = rb_intern("match");
+ cb_id_next_tick = rb_intern("next_tick");
cb_id_observe_and_wait = rb_intern("observe_and_wait");
cb_id_parse = rb_intern("parse");
cb_id_parse_body_bang = rb_intern("parse_body!");
@@ -1097,6 +1126,7 @@ Init_couchbase_ext(void)
cb_sym_document = ID2SYM(rb_intern("document"));
cb_sym_engine = ID2SYM(rb_intern("engine"));
cb_sym_environment = ID2SYM(rb_intern("environment"));
+ cb_sym_eventmachine = ID2SYM(rb_intern("eventmachine"));
cb_sym_extended = ID2SYM(rb_intern("extended"));
cb_sym_flags = ID2SYM(rb_intern("flags"));
cb_sym_format = ID2SYM(rb_intern("format"));
View
39 ext/couchbase_ext/couchbase_ext.h
@@ -23,6 +23,9 @@
#endif
#include "couchbase_config.h"
+#ifdef HAVE_RB_FIBER_YIELD
+#define BUILD_EVENTMACHINE_PLUGIN
+#endif
#ifdef HAVE_STDINT_H
#include <stdint.h>
#endif
@@ -111,6 +114,7 @@ struct cb_bucket_st
VALUE node_list;
st_table *object_space;
char destroying;
+ char async_disconnect_hook_set;
VALUE self; /* the pointer to bucket representation in ruby land */
};
@@ -168,6 +172,7 @@ extern VALUE cb_mError;
extern VALUE cb_mMarshal;
extern VALUE cb_mMultiJson;
extern VALUE cb_mURI;
+extern VALUE em_m;
/* Symbols */
extern ID cb_sym_add;
@@ -195,6 +200,7 @@ extern ID cb_sym_development;
extern ID cb_sym_document;
extern ID cb_sym_engine;
extern ID cb_sym_environment;
+extern ID cb_sym_eventmachine;
extern ID cb_sym_extended;
extern ID cb_sym_flags;
extern ID cb_sym_format;
@@ -239,6 +245,7 @@ extern ID cb_sym_unlock;
extern ID cb_sym_username;
extern ID cb_sym_version;
extern ID cb_sym_view;
+extern ID cb_id_add_shutdown_hook;
extern ID cb_id_arity;
extern ID cb_id_call;
extern ID cb_id_create_timer;
@@ -265,6 +272,7 @@ extern ID cb_id_iv_time_to_replicate;
extern ID cb_id_iv_value;
extern ID cb_id_load;
extern ID cb_id_match;
+extern ID cb_id_next_tick;
extern ID cb_id_observe_and_wait;
extern ID cb_id_parse;
extern ID cb_id_parse_body_bang;
@@ -570,7 +578,38 @@ struct cb_params_st
void cb_params_destroy(struct cb_params_st *params);
void cb_params_build(struct cb_params_st *params);
+/* common plugin functions */
+lcb_ssize_t cb_io_recv(struct lcb_io_opt_st *iops, lcb_socket_t sock, void *buffer, lcb_size_t len, int flags);
+lcb_ssize_t cb_io_recvv(struct lcb_io_opt_st *iops, lcb_socket_t sock, struct lcb_iovec_st *iov, lcb_size_t niov);
+lcb_ssize_t cb_io_send(struct lcb_io_opt_st *iops, lcb_socket_t sock, const void *msg, lcb_size_t len, int flags);
+lcb_ssize_t cb_io_sendv(struct lcb_io_opt_st *iops, lcb_socket_t sock, struct lcb_iovec_st *iov, lcb_size_t niov);
+lcb_socket_t cb_io_socket(struct lcb_io_opt_st *iops, int domain, int type, int protocol);
+void cb_io_close(struct lcb_io_opt_st *iops, lcb_socket_t sock);
+int cb_io_connect(struct lcb_io_opt_st *iops, lcb_socket_t sock, const struct sockaddr *name, unsigned int namelen);
+
+/* plugin init functions */
LIBCOUCHBASE_API
lcb_error_t cb_create_ruby_mt_io_opts(int version, lcb_io_opt_t *io, void *arg);
+
+/* shortcut functions */
+ static inline VALUE
+rb_funcall_0(VALUE self, ID method)
+{
+ return rb_funcall2(self, method, 0, NULL);
+}
+
+ static inline VALUE
+rb_funcall_1(VALUE self, ID method, VALUE arg)
+{
+ return rb_funcall2(self, method, 1, &arg);
+}
+
+ static inline VALUE
+rb_funcall_2(VALUE self, ID method, VALUE arg1, VALUE arg2)
+{
+ VALUE args[2] = {arg1, arg2};
+ return rb_funcall2(self, method, 2, args);
+}
+
#endif
View
450 ext/couchbase_ext/eventmachine_plugin.c
@@ -0,0 +1,450 @@
+/* vim: ft=c et ts=8 sts=4 sw=4 cino=
+ *
+ * Copyright 2012 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "couchbase_ext.h"
+
+#ifdef BUILD_EVENTMACHINE_PLUGIN
+
+#include <errno.h>
+
+VALUE cb_mEm;
+VALUE cb_cEmSocket;
+VALUE em_cPeriodicTimer;
+VALUE cb_cEmEvent;
+VALUE rb_mObSpace;
+ID cb_id_add_timer;
+ID cb_id_cancel_timer;
+ID cb_id_detach;
+ID cb_id_define_finalizer;
+ID cb_id_iv_event;
+ID cb_id_notify_readable_p;
+ID cb_id_notify_writable_p;
+ID cb_id_set_notify_readable;
+ID cb_id_set_notify_writable;
+ID cb_id_undefine_finalizer;
+ID cb_id_watch;
+VALUE cb_sym_clear_holder;
+VALUE cb_sym_resume;
+
+typedef struct rb_em_event rb_em_event;
+typedef struct rb_em_loop rb_em_loop;
+struct rb_em_event {
+ lcb_socket_t socket;
+ void *cb_data;
+ void (*handler)(lcb_socket_t sock, short which, void *cb_data);
+ VALUE holder;
+ lcb_uint32_t usec;
+ short canceled;
+ short current_flags;
+ VALUE self;
+ rb_em_loop *loop;
+};
+
+struct rb_em_loop {
+ VALUE fiber;
+ struct cb_bucket_st *bucket;
+};
+
+ static void
+rb_em_event_mark(void *p)
+{
+ if (p) {
+ rb_em_event *ev = p;
+ rb_gc_mark(ev->holder);
+ rb_gc_mark(ev->loop->bucket->self);
+ }
+}
+
+ static void
+rb_em_event_free(void *p)
+{
+ if (p) {
+ rb_em_event *ev = p;
+ ev->self = 0;
+ ev->holder = 0;
+ ev->loop = NULL;
+ }
+}
+
+ static void
+rb_em_event_run_callback(rb_em_event *ev, short flags)
+{
+ if (ev->loop->fiber) {
+ ev->current_flags = flags;
+ rb_fiber_resume(ev->loop->fiber, 1, &ev->self);
+ } else {
+ ev->handler(ev->socket, flags, ev->cb_data);
+ }
+}
+
+ static VALUE
+rb_em_event_call(VALUE self)
+{
+ rb_em_event *ev;
+ Data_Get_Struct(self, rb_em_event, ev);
+
+ ev->holder = 0;
+ rb_em_event_run_callback(ev, 0);
+
+ if (!ev->canceled && !ev->holder) {
+ ev->holder = rb_funcall_2(em_m, cb_id_add_timer, rb_float_new((double)ev->usec / 1.0e6), self);
+ }
+
+ return Qnil;
+}
+
+ static VALUE
+rb_em_event_clear_holder(VALUE self)
+{
+ rb_em_event *ev;
+ Data_Get_Struct(self, rb_em_event, ev);
+
+ ev->holder = 0;
+
+ return Qnil;
+}
+
+ static void
+rb_em_event_setup_finalizer(rb_em_event *ev)
+{
+ rb_funcall_2(rb_mObSpace, cb_id_define_finalizer, ev->holder,
+ rb_obj_method(ev->self, cb_sym_clear_holder));
+}
+
+ static void
+rb_em_event_clear_finalizer(rb_em_event *ev)
+{
+ rb_funcall_1(rb_mObSpace, cb_id_undefine_finalizer, ev->holder);
+}
+
+ static VALUE
+rb_em_socket_notify_readable(VALUE self)
+{
+ VALUE event = rb_ivar_get(self, cb_id_iv_event);
+ rb_em_event *ev;
+
+ if (RTEST(event)) {
+ Data_Get_Struct(event, rb_em_event, ev);
+ rb_em_event_run_callback(ev, LCB_READ_EVENT);
+ } else {
+ rb_funcall_0(self, cb_id_detach);
+ }
+
+ return Qnil;
+}
+
+ static VALUE
+rb_em_socket_notify_writable(VALUE self)
+{
+ VALUE event = rb_ivar_get(self, cb_id_iv_event);
+ rb_em_event *ev;
+
+ if (RTEST(event)) {
+ Data_Get_Struct(event, rb_em_event, ev);
+ rb_em_event_run_callback(ev, LCB_WRITE_EVENT);
+ } else {
+ rb_funcall_0(self, cb_id_detach);
+ }
+
+ return Qnil;
+}
+
+ static void
+cb_gc_em_loop_mark(void *p, struct cb_bucket_st *bucket)
+{
+ rb_em_loop *loop = p;
+ rb_gc_mark(loop->fiber);
+ (void)bucket;
+}
+
+ static rb_em_loop *
+rb_em_loop_create(struct cb_bucket_st *bucket)
+{
+ rb_em_loop *loop = calloc(1, sizeof(*loop));
+ loop->bucket = bucket;
+ cb_gc_protect_ptr(bucket, loop, cb_gc_em_loop_mark);
+ return loop;
+}
+
+ static void
+rb_em_loop_destroy(rb_em_loop *loop)
+{
+ cb_gc_unprotect_ptr(loop->bucket, loop);
+ free(loop);
+}
+
+ static void
+initialize_event_machine_plugin() {
+ VALUE em_cConnection;
+
+ rb_mObSpace = rb_const_get(rb_cObject, rb_intern("ObjectSpace"));
+
+ em_m = rb_const_get(rb_cObject, rb_intern("EM"));
+ em_cConnection = rb_const_get(em_m, rb_intern("Connection"));
+ em_cPeriodicTimer = rb_const_get(em_m, rb_intern("PeriodicTimer"));
+
+ cb_mEm = rb_define_module_under(cb_mCouchbase, "EM");
+
+ cb_cEmEvent = rb_define_class_under(cb_mEm, "Event", rb_cObject);
+ rb_define_method(cb_cEmEvent, "call", rb_em_event_call, 0);
+ rb_define_method(cb_cEmEvent, "clear_holder", rb_em_event_clear_holder, 0);
+
+ cb_cEmSocket = rb_define_class_under(cb_mEm, "Socket", em_cConnection);
+ rb_define_method(cb_cEmSocket, "notify_readable", rb_em_socket_notify_readable, 0);
+ rb_define_method(cb_cEmSocket, "notify_writable", rb_em_socket_notify_writable, 0);
+
+ cb_id_add_timer = rb_intern("add_timer");
+ cb_id_cancel_timer = rb_intern("cancel_timer");
+ cb_id_define_finalizer = rb_intern("define_finalizer");
+ cb_id_detach = rb_intern("detach");
+ cb_id_iv_event = rb_intern("@event");
+ cb_id_notify_readable_p = rb_intern("notify_readable?");
+ cb_id_notify_writable_p = rb_intern("notify_writable?");
+ cb_id_set_notify_readable = rb_intern("notify_readable=");
+ cb_id_set_notify_writable = rb_intern("notify_writable=");
+ cb_id_undefine_finalizer = rb_intern("undefine_finalizer");
+ cb_id_watch = rb_intern("watch");
+ cb_sym_clear_holder = ID2SYM(rb_intern("clear_holder"));
+ cb_sym_resume = ID2SYM(rb_intern("resume"));
+}
+
+ static void
+cb_gc_em_event_mark(void *p, struct cb_bucket_st *bucket)
+{
+ rb_em_event *ev = p;
+ rb_gc_mark(ev->self);
+ (void)bucket;
+}
+
+ static void *
+lcb_io_create_event(struct lcb_io_opt_st *iops)
+{
+ rb_em_loop *loop = iops->v.v0.cookie;
+ rb_em_event *ev = calloc(1, sizeof(rb_em_event));
+ VALUE res = Data_Wrap_Struct(cb_cEmEvent, rb_em_event_mark, rb_em_event_free, ev);
+ cb_gc_protect_ptr(loop->bucket, ev, cb_gc_em_event_mark);
+ ev->self = res;
+ ev->loop = loop;
+ ev->socket = -1;
+
+ return ev;
+}
+
+ static inline void
+rb_em_event_dealloc(rb_em_event *ev, rb_em_loop *loop)
+{
+ if (ev->self) {
+ DATA_PTR(ev->self) = 0;
+ }
+ cb_gc_unprotect_ptr(loop->bucket, ev);
+ free(ev);
+}
+
+ static int
+lcb_io_update_event(struct lcb_io_opt_st *iops,
+ lcb_socket_t sock,
+ void *event,
+ short flags,
+ void *cb_data,
+ void (*handler)(lcb_socket_t sock,
+ short which,
+ void *cb_data))
+{
+ rb_em_event *ev = event;
+
+ if (ev->holder == 0) {
+ ev->holder = rb_funcall_2(em_m, cb_id_watch, INT2FIX(sock), cb_cEmSocket);
+ rb_ivar_set(ev->holder, cb_id_iv_event, ev->self);
+ rb_em_event_setup_finalizer(ev);
+ }
+
+ ev->socket = sock;
+ ev->cb_data = cb_data;
+ ev->handler = handler;
+
+ rb_funcall_1(ev->holder, cb_id_set_notify_readable, (flags & LCB_READ_EVENT) ? Qtrue : Qfalse);
+ rb_funcall_1(ev->holder, cb_id_set_notify_writable, (flags & LCB_WRITE_EVENT) ? Qtrue : Qfalse);
+
+ (void)iops;
+ return 0;
+}
+
+ static void
+lcb_io_delete_event(struct lcb_io_opt_st *iops,
+ lcb_socket_t sock,
+ void *event)
+{
+ rb_em_event *ev = event;
+ if (ev->holder) {
+ rb_funcall_1(ev->holder, cb_id_set_notify_readable, Qfalse);
+ rb_funcall_1(ev->holder, cb_id_set_notify_writable, Qfalse);
+ }
+ (void)sock;
+ (void)iops;
+}
+
+ static void
+lcb_io_destroy_event(struct lcb_io_opt_st *iops,
+ void *event)
+{
+ rb_em_loop *loop = iops->v.v0.cookie;
+ rb_em_event *ev = event;
+ if (ev->holder) {
+ rb_em_event_clear_finalizer(ev);
+ rb_ivar_set(ev->holder, cb_id_iv_event, Qfalse);
+ rb_funcall_0(ev->holder, cb_id_detach);
+ ev->holder = 0;
+ }
+ rb_em_event_dealloc(ev, loop);
+}
+
+#define lcb_io_create_timer lcb_io_create_event
+
+ static int
+lcb_io_update_timer(struct lcb_io_opt_st *iops, void *timer,
+ lcb_uint32_t usec, void *cb_data,
+ void (*handler)(lcb_socket_t sock, short which, void *cb_data))
+{
+ rb_em_event *ev = timer;
+
+ if (ev->holder) {
+ rb_funcall_1(em_m, cb_id_cancel_timer, ev->holder);
+ ev->holder = 0;
+ }
+
+ ev->socket = (lcb_socket_t)-1;
+ ev->cb_data = cb_data;
+ ev->handler = handler;
+ ev->usec = usec;
+ ev->canceled = 0;
+ ev->holder = rb_funcall_2(em_m, cb_id_add_timer, rb_float_new((double)usec / 1.0e6), ev->self);
+
+ (void)iops;
+ return 0;
+}
+
+ static void
+lcb_io_delete_timer(struct lcb_io_opt_st *iops, void *timer)
+{
+ rb_em_event *ev = timer;
+
+ if (ev->holder) {
+ rb_funcall_1(em_m, cb_id_cancel_timer, ev->holder);
+ ev->holder = 0;
+ }
+ ev->canceled = 1;
+ (void)iops;
+}
+
+ static void
+lcb_io_destroy_timer(struct lcb_io_opt_st *iops, void *timer)
+{
+ rb_em_loop *loop = iops->v.v0.cookie;
+ rb_em_event *ev = timer;
+ if (!ev->canceled) {
+ lcb_io_delete_timer(iops, timer);
+ }
+ rb_em_event_dealloc(ev, loop);
+}
+
+ static void
+lcb_io_run_event_loop(struct lcb_io_opt_st *iops)
+{
+ rb_em_loop *loop = iops->v.v0.cookie;
+ VALUE fiber = rb_fiber_current();
+ VALUE event;
+ rb_em_event *ev;
+ loop->fiber = fiber;
+ for(;;) {
+ event = rb_fiber_yield(0, NULL);
+ if (!RTEST(event)) break;
+ Data_Get_Struct(event, rb_em_event, ev);
+ ev->handler(ev->socket, ev->current_flags, ev->cb_data);
+ }
+}
+
+ static void
+lcb_io_stop_event_loop(struct lcb_io_opt_st *iops)
+{
+ rb_em_loop *loop = iops->v.v0.cookie;
+ VALUE fiber = loop->fiber;
+ loop->fiber = 0;
+ if (fiber) {
+ VALUE method = rb_obj_method(fiber, cb_sym_resume);
+ rb_funcall_1(em_m, cb_id_next_tick, method);
+ }
+}
+
+ static void
+lcb_destroy_io_opts(struct lcb_io_opt_st *iops)
+{
+ rb_em_loop_destroy((rb_em_loop*)iops->v.v0.cookie);
+}
+
+ LIBCOUCHBASE_API lcb_error_t
+cb_create_ruby_em_io_opts(int version, lcb_io_opt_t *io, void *arg)
+{
+ struct lcb_io_opt_st *ret;
+ rb_em_loop *loop;
+ struct cb_bucket_st *bucket = arg;
+
+ if (version != 0) {
+ return LCB_PLUGIN_VERSION_MISMATCH;
+ }
+
+ if (!em_m) initialize_event_machine_plugin();
+
+ ret = calloc(1, sizeof(*ret));
+ if (ret == NULL) {
+ free(ret);
+ return LCB_CLIENT_ENOMEM;
+ }
+
+ ret->version = 0;
+ ret->dlhandle = NULL;
+ ret->destructor = lcb_destroy_io_opts;
+ /* consider that struct isn't allocated by the library,
+ * `need_cleanup' flag might be set in lcb_create() */
+ ret->v.v0.need_cleanup = 0;
+ ret->v.v0.recv = cb_io_recv;
+ ret->v.v0.send = cb_io_send;
+ ret->v.v0.recvv = cb_io_recvv;
+ ret->v.v0.sendv = cb_io_sendv;
+ ret->v.v0.socket = cb_io_socket;
+ ret->v.v0.close = cb_io_close;
+ ret->v.v0.connect = cb_io_connect;
+ ret->v.v0.delete_event = lcb_io_delete_event;
+ ret->v.v0.destroy_event = lcb_io_destroy_event;
+ ret->v.v0.create_event = lcb_io_create_event;
+ ret->v.v0.update_event = lcb_io_update_event;
+
+ ret->v.v0.delete_timer = lcb_io_delete_timer;
+ ret->v.v0.destroy_timer = lcb_io_destroy_timer;
+ ret->v.v0.create_timer = lcb_io_create_timer;
+ ret->v.v0.update_timer = lcb_io_update_timer;
+
+ ret->v.v0.run_event_loop = lcb_io_run_event_loop;
+ ret->v.v0.stop_event_loop = lcb_io_stop_event_loop;
+
+ loop = rb_em_loop_create(bucket);
+ ret->v.v0.cookie = loop;
+
+ *io = ret;
+ return LCB_SUCCESS;
+}
+
+#endif
View
1 ext/couchbase_ext/extconf.rb
@@ -140,6 +140,7 @@ def die(message)
have_func("rb_thread_blocking_region")
have_func("poll", "poll.h")
have_func("ppoll", "poll.h")
+have_func("rb_fiber_yield")
define("_GNU_SOURCE")
create_header("couchbase_config.h")
create_makefile("couchbase_ext")
View
154 ext/couchbase_ext/multithread_plugin.c
@@ -24,149 +24,9 @@
#include <rubysig.h>
#endif
#include <errno.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#ifdef HAVE_FCNTL_H
-#include <fcntl.h>
-#endif
#ifdef HAVE_POLL
#include <poll.h>
#endif
-#define INVALID_SOCKET (-1)
-
-/* Copied from libev plugin */
- static lcb_ssize_t
-lcb_io_recv(struct lcb_io_opt_st *iops, lcb_socket_t sock,
- void *buffer, lcb_size_t len, int flags)
-{
- lcb_ssize_t ret = recv(sock, buffer, len, flags);
- if (ret < 0) {
- iops->v.v0.error = errno;
- }
- return ret;
-}
-
- static lcb_ssize_t
-lcb_io_recvv(struct lcb_io_opt_st *iops, lcb_socket_t sock,
- struct lcb_iovec_st *iov, lcb_size_t niov)
-{
- struct msghdr msg;
- struct iovec vec[2];
- lcb_ssize_t ret;
-
- if (niov != 2) {
- return -1;
- }
- memset(&msg, 0, sizeof(msg));
- msg.msg_iov = vec;
- msg.msg_iovlen = iov[1].iov_len ? (lcb_size_t)2 : (lcb_size_t)1;
- msg.msg_iov[0].iov_base = iov[0].iov_base;
- msg.msg_iov[0].iov_len = iov[0].iov_len;
- msg.msg_iov[1].iov_base = iov[1].iov_base;
- msg.msg_iov[1].iov_len = iov[1].iov_len;
- ret = recvmsg(sock, &msg, 0);
-
- if (ret < 0) {
- iops->v.v0.error = errno;
- }
-
- return ret;
-}
-
- static lcb_ssize_t
-lcb_io_send(struct lcb_io_opt_st *iops, lcb_socket_t sock,
- const void *msg, lcb_size_t len, int flags)
-{
- lcb_ssize_t ret = send(sock, msg, len, flags);
- if (ret < 0) {
- iops->v.v0.error = errno;
- }
- return ret;
-}
-
- static lcb_ssize_t
-lcb_io_sendv(struct lcb_io_opt_st *iops, lcb_socket_t sock,
- struct lcb_iovec_st *iov, lcb_size_t niov)
-{
- struct msghdr msg;
- struct iovec vec[2];
- lcb_ssize_t ret;
-
- if (niov != 2) {
- return -1;
- }
- memset(&msg, 0, sizeof(msg));
- msg.msg_iov = vec;
- msg.msg_iovlen = iov[1].iov_len ? (lcb_size_t)2 : (lcb_size_t)1;
- msg.msg_iov[0].iov_base = iov[0].iov_base;
- msg.msg_iov[0].iov_len = iov[0].iov_len;
- msg.msg_iov[1].iov_base = iov[1].iov_base;
- msg.msg_iov[1].iov_len = iov[1].iov_len;
- ret = sendmsg(sock, &msg, 0);
-
- if (ret < 0) {
- iops->v.v0.error = errno;
- }
- return ret;
-}
-
- static int
-make_socket_nonblocking(lcb_socket_t sock)
-{
- int flags;
- if ((flags = fcntl(sock, F_GETFL, NULL)) < 0) {
- return -1;
- }
- if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) {
- return -1;
- }
-
- return 0;
-}
-
- static int
-close_socket(lcb_socket_t sock)
-{
- return close(sock);
-}
-
- static lcb_socket_t
-lcb_io_socket(struct lcb_io_opt_st *iops, int domain, int type,
- int protocol)
-{
- lcb_socket_t sock = socket(domain, type, protocol);
- if (sock == INVALID_SOCKET) {
- iops->v.v0.error = errno;
- } else {
- if (make_socket_nonblocking(sock) != 0) {
- int error = errno;
- iops->v.v0.close(iops, sock);
- iops->v.v0.error = error;
- sock = INVALID_SOCKET;
- }
- }
-
- return sock;
-}
-
- static void
-lcb_io_close(struct lcb_io_opt_st *iops, lcb_socket_t sock)
-{
- close_socket(sock);
- (void)iops;
-}
-
- static int
-lcb_io_connect(struct lcb_io_opt_st *iops, lcb_socket_t sock,
- const struct sockaddr *name, unsigned int namelen)
-{
- int ret = connect(sock, name, (socklen_t)namelen);
- if (ret < 0) {
- iops->v.v0.error = errno;
- }
- return ret;
-}
/* events sorted array */
typedef struct rb_mt_event rb_mt_event;
@@ -1183,13 +1043,13 @@ cb_create_ruby_mt_io_opts(int version, lcb_io_opt_t *io, void *arg)
/* consider that struct isn't allocated by the library,
* `need_cleanup' flag might be set in lcb_create() */
ret->v.v0.need_cleanup = 0;
- ret->v.v0.recv = lcb_io_recv;
- ret->v.v0.send = lcb_io_send;
- ret->v.v0.recvv = lcb_io_recvv;
- ret->v.v0.sendv = lcb_io_sendv;
- ret->v.v0.socket = lcb_io_socket;
- ret->v.v0.close = lcb_io_close;
- ret->v.v0.connect = lcb_io_connect;
+ ret->v.v0.recv = cb_io_recv;
+ ret->v.v0.send = cb_io_send;
+ ret->v.v0.recvv = cb_io_recvv;
+ ret->v.v0.sendv = cb_io_sendv;
+ ret->v.v0.socket = cb_io_socket;
+ ret->v.v0.close = cb_io_close;
+ ret->v.v0.connect = cb_io_connect;
ret->v.v0.delete_event = lcb_io_delete_event;
ret->v.v0.destroy_event = lcb_io_destroy_event;
ret->v.v0.create_event = lcb_io_create_event;
View
151 ext/couchbase_ext/plugin_common.c
@@ -0,0 +1,151 @@
+#include "couchbase_ext.h"
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#ifndef RUBY_WIN32_H
+# include <unistd.h>
+#ifdef HAVE_FCNTL_H
+# include <fcntl.h>
+#endif
+#define INVALID_SOCKET (-1)
+#else /* RUBY_WIN32_h */
+static st_table *socket_2_fd = NULL;
+#endif
+
+/* Copied from libev plugin */
+ lcb_ssize_t
+cb_io_recv(struct lcb_io_opt_st *iops, lcb_socket_t sock,
+ void *buffer, lcb_size_t len, int flags)
+{
+ lcb_ssize_t ret = recv(sock, buffer, len, flags);
+ if (ret < 0) {
+ iops->v.v0.error = errno;
+ }
+ return ret;
+}
+
+ lcb_ssize_t
+cb_io_recvv(struct lcb_io_opt_st *iops, lcb_socket_t sock,
+ struct lcb_iovec_st *iov, lcb_size_t niov)
+{
+ struct msghdr msg;
+ struct iovec vec[2];
+ lcb_ssize_t ret;
+
+ if (niov != 2) {
+ return -1;
+ }
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = vec;
+ msg.msg_iovlen = iov[1].iov_len ? (lcb_size_t)2 : (lcb_size_t)1;
+ msg.msg_iov[0].iov_base = iov[0].iov_base;
+ msg.msg_iov[0].iov_len = iov[0].iov_len;
+ msg.msg_iov[1].iov_base = iov[1].iov_base;
+ msg.msg_iov[1].iov_len = iov[1].iov_len;
+ ret = recvmsg(sock, &msg, 0);
+
+ if (ret < 0) {
+ iops->v.v0.error = errno;
+ }
+
+ return ret;
+}
+
+ lcb_ssize_t
+cb_io_send(struct lcb_io_opt_st *iops, lcb_socket_t sock,
+ const void *msg, lcb_size_t len, int flags)
+{
+ lcb_ssize_t ret = send(sock, msg, len, flags);
+ if (ret < 0) {
+ iops->v.v0.error = errno;
+ }
+ return ret;
+}
+
+ lcb_ssize_t
+cb_io_sendv(struct lcb_io_opt_st *iops, lcb_socket_t sock,
+ struct lcb_iovec_st *iov, lcb_size_t niov)
+{
+ struct msghdr msg;
+ struct iovec vec[2];
+ lcb_ssize_t ret;
+
+ if (niov != 2) {
+ return -1;
+ }
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = vec;
+ msg.msg_iovlen = iov[1].iov_len ? (lcb_size_t)2 : (lcb_size_t)1;
+ msg.msg_iov[0].iov_base = iov[0].iov_base;
+ msg.msg_iov[0].iov_len = iov[0].iov_len;
+ msg.msg_iov[1].iov_base = iov[1].iov_base;
+ msg.msg_iov[1].iov_len = iov[1].iov_len;
+ ret = sendmsg(sock, &msg, 0);
+
+ if (ret < 0) {
+ iops->v.v0.error = errno;
+ }
+ return ret;
+}
+
+ static int
+make_socket_nonblocking(lcb_socket_t sock)
+{
+ int flags = 0;
+#ifdef F_GETFL
+ if ((flags = fcntl(sock, F_GETFL, NULL)) < 0) {
+ return -1;
+ }
+#endif
+ if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) {
+ return -1;
+ }
+
+ return 0;
+}
+
+ static int
+close_socket(lcb_socket_t sock)
+{
+ return close(sock);
+}
+
+ lcb_socket_t
+cb_io_socket(struct lcb_io_opt_st *iops, int domain, int type,
+ int protocol)
+{
+ lcb_socket_t sock = socket(domain, type, protocol);
+ if (sock == INVALID_SOCKET) {
+ iops->v.v0.error = errno;
+ } else {
+ if (make_socket_nonblocking(sock) != 0) {
+ int error = errno;
+ iops->v.v0.close(iops, sock);
+ iops->v.v0.error = error;
+ sock = INVALID_SOCKET;
+ }
+ }
+
+ return sock;
+}
+
+ void
+cb_io_close(struct lcb_io_opt_st *iops, lcb_socket_t sock)
+{
+ close_socket(sock);
+ (void)iops;
+}
+
+ int
+cb_io_connect(struct lcb_io_opt_st *iops, lcb_socket_t sock,
+ const struct sockaddr *name, unsigned int namelen)
+{
+ int ret = connect(sock, name, (socklen_t)namelen);
+ if (ret < 0) {
+ iops->v.v0.error = errno;
+ }
+ return ret;
+}
+
View
70 test/test_eventmachine.rb
@@ -0,0 +1,70 @@
+# Author:: Couchbase <info@couchbase.com>
+# Copyright:: 2011, 2012 Couchbase, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require File.join(File.dirname(__FILE__), 'setup')
+require 'eventmachine'
+
+class TestEventmachine < MiniTest::Unit::TestCase
+
+ def setup
+ @mock = start_mock
+ end
+
+ def teardown
+ stop_mock(@mock)
+ end
+
+ if RUBY_VERSION.to_f >= 1.9
+
+ def test_trivial_set
+ EM.run do
+ conn = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
+ :engine => :eventmachine, :async => true)
+ conn.on_connect do |res|
+ assert res.success?, "connection must be successful"
+ conn.set(uniq_id, "bar") do |res|
+ assert res.success?, "the operation must be successful"
+ assert res.cas > 0, "the CAS value must be non-zero"
+ EM.stop
+ end
+ end
+ end
+ end
+
+ def test_trivial_get
+ EM.run do
+ conn = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
+ :engine => :eventmachine, :async => true)
+ conn.on_connect do |res|
+ assert res.success?, "connection must be successful"
+ conn.set(uniq_id, "bar") do |res|
+ assert res.success?, "the set operation must be successful"
+ cas = res.cas
+ conn.get(uniq_id) do |res|
+ assert res.success?, "the get operation must be successful"
+ assert_equal "bar", res.value
+ assert_equal cas, res.cas
+ EM.stop
+ end
+ end
+ end
+ end
+ end
+
+ end
+
+end

0 comments on commit 89f76f3

Please sign in to comment.