diff --git a/.gitignore b/.gitignore index 95d00207..3834a68b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ /tmp /pkg *.gem +*.log +*.orig /html/ Gemfile.lock vendor/ @@ -18,3 +20,5 @@ nohup.out # IntelliJ/RubyMine/CLion project files .idea +CMakeLists.txt +cmake-build-debug diff --git a/.travis.yml b/.travis.yml index 46be05f4..352b8fc0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,15 +13,32 @@ before_install: services: - docker +gemfile: + - Gemfile + - gemfiles/hiredis-0-6.gemfile + - gemfiles/mysql2-0-4-10.gemfile + - gemfiles/mysql2-0-5-0.gemfile + +env: + - SEMIAN_CIRCUIT_BREAKER_IMPL=worker + - SEMIAN_CIRCUIT_BREAKER_IMPL=host + matrix: - include: - - env: GEMFILE=gemfiles/hiredis-0-6.gemfile - - env: GEMFILE=gemfiles/mysql2-0-4-10.gemfile - - env: GEMFILE=gemfiles/mysql2-0-5-0.gemfile - - env: GEMFILE=Gemfile + exclude: + - gemfile: gemfiles/hiredis-0-6.gemfile + env: SEMIAN_CIRCUIT_BREAKER_IMPL=host + - gemfile: gemfiles/mysql2-0-4-10.gemfile + env: SEMIAN_CIRCUIT_BREAKER_IMPL=host + - gemfile: gemfiles/mysql2-0-5-0.gemfile + env: SEMIAN_CIRCUIT_BREAKER_IMPL=host script: - - docker build --build-arg BUNDLE_GEMFILE="$GEMFILE" -t shopify/semian-ci:latest -f dockerfiles/semian-ci . + - | + docker build \ + --build-arg BUNDLE_GEMFILE="$BUNDLE_GEMFILE" \ + --build-arg SEMIAN_CIRCUIT_BREAKER_IMPL="$SEMIAN_CIRCUIT_BREAKER_IMPL" \ + -t shopify/semian-ci:latest \ + -f dockerfiles/semian-ci . - travis_retry docker-compose -f docker-compose.ci.yml up --force-recreate --exit-code-from semian notifications: diff --git a/Gemfile b/Gemfile index 383b1c51..f4ef742b 100644 --- a/Gemfile +++ b/Gemfile @@ -3,4 +3,5 @@ gemspec group :development, :test do gem 'rubocop' + gem 'minitest-reporters' end diff --git a/README.md b/README.md index 8a9159e1..32da4895 100644 --- a/README.md +++ b/README.md @@ -448,6 +448,33 @@ There are three configuration parameters for circuit breakers in Semian: * **success_threshold**. The amount of successes on the circuit until closing it again, that is to start accepting all requests to the circuit. * **half_open_resource_timeout**. Timeout for the resource in seconds when the circuit is half-open (only supported for MySQL and Net::HTTP). +* **scale_factor**. When using [host-based circuits](#host-based-circuits), the + scaling factor to determine how to scale `error_threshold * num_workers` to + achieve faster circuit opens. + +#### Host-Based Circuits + +On systems with [SysV support][sysv], we can share circuit error information +between processes on a server. This means that those processes can effectively +share information between each other about resource health, leading to faster, +more efficient opening of circuits. + +As an example, imagine a system with _N_ processes on a host with an error +threshold of _E_, and a client timeout of _T_. By default, the system needs to +see _N * E_ errors to open the circuit. We can reduce this by using the +`scale_factor` configuration parameter. If we set `scale_factor` to _1 / N_, +the total number of errors we'd need to see server-wide is still _E_. In +this configuration, we can reduce the time-to-open for a circuit from _E * T_ +to simply _T_ (provided that _N_ is greater than _E_). + +You should run a simulation with your workloads to determine an efficient +scaling factor that will produce a time-to-open reduction but isn't too +sensitive. + +The circuit breaker implementation is based on the environment variable +`SEMIAN_CIRCUIT_BREAKER_IMPL`. Set it to `worker` to disable sharing circuit +state and `host` to enable host-based circuits. The default is `worker` for +backward compatibility. ### Bulkheading diff --git a/Rakefile b/Rakefile index b111f0e9..aca13d6b 100644 --- a/Rakefile +++ b/Rakefile @@ -41,8 +41,6 @@ require 'rake/testtask' Rake::TestTask.new 'test' do |t| t.libs = %w(lib test) t.pattern = "test/*_test.rb" - t.verbose = false - t.warning = false end # ========================================================== diff --git a/dockerfiles/semian-ci b/dockerfiles/semian-ci index 7af9afb6..4313c2ff 100644 --- a/dockerfiles/semian-ci +++ b/dockerfiles/semian-ci @@ -7,7 +7,10 @@ RUN apt-get update \ RUN gem install bundler -WORKDIR /app +WORKDIR /home/travis/build/Shopify/semian COPY . . ARG BUNDLE_GEMFILE ENV BUNDLE_GEMFILE="${BUNDLE_GEMFILE}" + +ARG SEMIAN_CIRCUIT_BREAKER_IMPL +ENV SEMIAN_CIRCUIT_BREAKER_IMPL="${SEMIAN_CIRCUIT_BREAKER_IMPL}" diff --git a/ext/semian/extconf.rb b/ext/semian/extconf.rb index ac2c511b..88ab4e3f 100644 --- a/ext/semian/extconf.rb +++ b/ext/semian/extconf.rb @@ -23,8 +23,8 @@ have_func 'rb_thread_blocking_region' have_func 'rb_thread_call_without_gvl' -$CFLAGS = "-D_GNU_SOURCE -Werror -Wall " -if ENV.key?('DEBUG') +$CFLAGS = "-D_GNU_SOURCE -Werror -Wall -std=gnu99 " +if ENV.key?('DEBUG') || ENV.key?('SEMIAN_DEBUG') $CFLAGS << "-O0 -g -DDEBUG" else $CFLAGS << "-O3" diff --git a/ext/semian/resource.c b/ext/semian/resource.c index 0eaeec5d..bbd438c6 100644 --- a/ext/semian/resource.c +++ b/ext/semian/resource.c @@ -1,5 +1,7 @@ #include "resource.h" +#include "util.h" + static VALUE cleanup_semian_resource_acquire(VALUE self); @@ -15,9 +17,6 @@ check_tickets_arg(VALUE tickets); static long check_permissions_arg(VALUE permissions); -static const -char *check_id_arg(VALUE id); - static double check_default_timeout_arg(VALUE default_timeout); @@ -118,13 +117,12 @@ semian_resource_reset_workers(VALUE self) VALUE semian_resource_unregister_worker(VALUE self) { - int ret; semian_resource_t *res = NULL; - TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); sem_meta_lock(res->sem_id); - ret = perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, -1, IPC_NOWAIT | SEM_UNDO, NULL); + dprintf("Unregistering worker for sem_id:%d", res->sem_id); + int ret = perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, -1, IPC_NOWAIT | SEM_UNDO, NULL); sem_meta_unlock(res->sem_id); if ( ret == -1) { @@ -202,22 +200,16 @@ semian_resource_key(VALUE self) VALUE semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout) { - long c_permissions; - double c_timeout; - double c_quota; - int c_tickets; - semian_resource_t *res = NULL; - const char *c_id_str = NULL; - // Check and cast arguments check_tickets_xor_quota_arg(tickets, quota); - c_quota = check_quota_arg(quota); - c_tickets = check_tickets_arg(tickets); - c_permissions = check_permissions_arg(permissions); - c_id_str = check_id_arg(id); - c_timeout = check_default_timeout_arg(default_timeout); + double c_quota = check_quota_arg(quota); + int c_tickets = check_tickets_arg(tickets); + long c_permissions = check_permissions_arg(permissions); + const char *c_id_str = check_id_arg(id); + double c_timeout = check_default_timeout_arg(default_timeout); // Build semian resource structure + semian_resource_t *res = NULL; TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); // Populate struct fields @@ -314,23 +306,6 @@ check_tickets_arg(VALUE tickets) return c_tickets; } -static const char* -check_id_arg(VALUE id) -{ - const char *c_id_str = NULL; - - if (TYPE(id) != T_SYMBOL && TYPE(id) != T_STRING) { - rb_raise(rb_eTypeError, "id must be a symbol or string"); - } - if (TYPE(id) == T_SYMBOL) { - c_id_str = rb_id2name(rb_to_id(id)); - } else if (TYPE(id) == T_STRING) { - c_id_str = RSTRING_PTR(id); - } - - return c_id_str; -} - static double check_default_timeout_arg(VALUE default_timeout) { @@ -361,6 +336,8 @@ static inline void semian_resource_free(void *ptr) { semian_resource_t *res = (semian_resource_t *) ptr; + dprintf("Freeing resource sem_id:%d", res->sem_id); + if (res->name) { free(res->name); res->name = NULL; diff --git a/ext/semian/semian.c b/ext/semian/semian.c index f4d0c3ea..cbabfb7b 100644 --- a/ext/semian/semian.c +++ b/ext/semian/semian.c @@ -1,5 +1,7 @@ #include "semian.h" +static int use_c_circuits(); + void Init_semian() { VALUE cSemian, cResource; @@ -64,4 +66,31 @@ void Init_semian() /* Maximum number of tickets available on this system. */ rb_define_const(cSemian, "MAX_TICKETS", INT2FIX(system_max_semaphore_count)); + + if (use_c_circuits()) { + Init_SimpleInteger(); + Init_SlidingWindow(); + } +} + +static int +use_c_circuits() { + char *circuit_impl = getenv("SEMIAN_CIRCUIT_BREAKER_IMPL"); + if (circuit_impl == NULL) { + fprintf(stderr, "Warning: Defaulting to Semian worker-based circuit breaker implementation\n"); + return 0; + } else { + if (!strcmp(circuit_impl, "worker")) { + fprintf(stderr, "Info: Semian using worker-based circuit implementation\n"); + return 0; + } else if (!strcmp(circuit_impl, "host")) { + fprintf(stderr, "Info: Semian using host-based circuit implementation\n"); + return 1; + } else { + fprintf(stderr, "Warning: Unknown Semian circuit breaker implementation: '%s'\n", circuit_impl); + return 0; + } + } + + rb_raise(rb_eArgError, "Unknown Semian circuit breaker implementation"); } diff --git a/ext/semian/semian.h b/ext/semian/semian.h index 55090931..000bd593 100644 --- a/ext/semian/semian.h +++ b/ext/semian/semian.h @@ -8,6 +8,8 @@ Implements Init_semian, which is used as C/Ruby entrypoint. #define SEMIAN_H #include "resource.h" +#include "simple_integer.h" +#include "sliding_window.h" void Init_semian(); diff --git a/ext/semian/simple_integer.c b/ext/semian/simple_integer.c new file mode 100644 index 00000000..48cf4100 --- /dev/null +++ b/ext/semian/simple_integer.c @@ -0,0 +1,155 @@ +#include "simple_integer.h" + +#include "sysv_semaphores.h" +#include "types.h" +#include "util.h" + +void +semian_simple_integer_dfree(void* ptr) +{ +} + +size_t +semian_simple_integer_dsize(const void* ptr) +{ + return sizeof(semian_simple_integer_t); +} + +static const rb_data_type_t semian_simple_integer_type = { + .wrap_struct_name = "semian_simple_integer", + .function = { + .dmark = NULL, + .dfree = semian_simple_integer_dfree, + .dsize = semian_simple_integer_dsize, + }, + .data = NULL, + .flags = RUBY_TYPED_FREE_IMMEDIATELY, +}; + +int check_increment_arg(VALUE val) +{ + VALUE retval; + + switch (rb_type(val)) { + case T_NIL: + case T_UNDEF: + retval = 1; break; + case T_FLOAT: + rb_warn("incrementing SingleInteger by a floating point value, converting to fixnum"); + retval = (int)(RFLOAT_VALUE(val)); break; + case T_FIXNUM: + case T_BIGNUM: + retval = RB_NUM2INT(val); break; + default: + rb_raise(rb_eArgError, "unknown type for val: %d", TYPE(val)); + } + + return retval; +} + +void +Init_SimpleInteger() +{ + dprintf("Init_SimpleInteger\n"); + + VALUE cSemian = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSimple = rb_const_get(cSemian, rb_intern("Simple")); + VALUE cSimpleInteger = rb_const_get(cSimple, rb_intern("Integer")); + + rb_define_alloc_func(cSimpleInteger, semian_simple_integer_alloc); + rb_define_method(cSimpleInteger, "initialize_simple_integer", semian_simple_integer_initialize, 1); + rb_define_method(cSimpleInteger, "increment", semian_simple_integer_increment, -1); + rb_define_method(cSimpleInteger, "reset", semian_simple_integer_reset, 0); + rb_define_method(cSimpleInteger, "value", semian_simple_integer_value_get, 0); + rb_define_method(cSimpleInteger, "value=", semian_simple_integer_value_set, 1); +} + +VALUE +semian_simple_integer_alloc(VALUE klass) +{ + semian_simple_integer_t *res; + VALUE obj = TypedData_Make_Struct(klass, semian_simple_integer_t, &semian_simple_integer_type, res); + return obj; +} + +VALUE +semian_simple_integer_initialize(VALUE self, VALUE name) +{ + semian_simple_integer_t *res; + TypedData_Get_Struct(self, semian_simple_integer_t, &semian_simple_integer_type, res); + res->key = generate_key(to_s(name)); + + dprintf("Initializing simple integer '%s' (key: %lu)", to_s(name), res->key); + res->sem_id = initialize_single_semaphore(res->key, SEM_DEFAULT_PERMISSIONS, 0); + + return self; +} + +VALUE +semian_simple_integer_increment(int argc, VALUE *argv, VALUE self) +{ + semian_simple_integer_t *res; + TypedData_Get_Struct(self, semian_simple_integer_t, &semian_simple_integer_type, res); + + // This is definitely the worst API ever. + // https://silverhammermba.github.io/emberb/c/#parsing-arguments + VALUE val; + rb_scan_args(argc, argv, "01", &val); + + int value = check_increment_arg(val); + if (perform_semop(res->sem_id, 0, value, 0, NULL) == -1) { + rb_raise(eInternal, "error incrementing simple integer, errno: %d (%s)", errno, strerror(errno)); + } + + // Return the current value, but know that there is a race condition here: + // It's not necessarily the same value after incrementing above, since + // semop() doesn't return the modified value. + int retval = get_sem_val(res->sem_id, 0); + if (retval == -1) { + rb_raise(eInternal, "error getting simple integer, errno: %d (%s)", errno, strerror(errno)); + } + + return RB_INT2NUM(retval); +} + +VALUE +semian_simple_integer_reset(VALUE self) +{ + semian_simple_integer_t *res; + TypedData_Get_Struct(self, semian_simple_integer_t, &semian_simple_integer_type, res); + + if (set_sem_val(res->sem_id, 0, 0) == -1) { + rb_raise(eInternal, "error resetting simple integer, errno: %d (%s)", errno, strerror(errno)); + } + + return Qnil; +} + +VALUE +semian_simple_integer_value_get(VALUE self) +{ + semian_simple_integer_t *res; + TypedData_Get_Struct(self, semian_simple_integer_t, &semian_simple_integer_type, res); + + int val = get_sem_val(res->sem_id, 0); + if (val == -1) { + rb_raise(eInternal, "error getting simple integer, errno: %d (%s)", errno, strerror(errno)); + } + + return RB_INT2NUM(val); +} + +VALUE +semian_simple_integer_value_set(VALUE self, VALUE val) +{ + semian_simple_integer_t *res; + TypedData_Get_Struct(self, semian_simple_integer_t, &semian_simple_integer_type, res); + + VALUE to_i = rb_funcall(val, rb_intern("to_i"), 0); + int value = RB_NUM2INT(to_i); + if (set_sem_val(res->sem_id, 0, value) == -1) { + rb_raise(eInternal, "error setting simple integer, errno: %d (%s)", errno, strerror(errno)); + } + + return Qnil; +} diff --git a/ext/semian/simple_integer.h b/ext/semian/simple_integer.h new file mode 100644 index 00000000..e0463a58 --- /dev/null +++ b/ext/semian/simple_integer.h @@ -0,0 +1,15 @@ +#ifndef EXT_SEMIAN_SIMPLE_INTEGER_H +#define EXT_SEMIAN_SIMPLE_INTEGER_H + +#include + +void Init_SimpleInteger(); + +VALUE semian_simple_integer_alloc(VALUE klass); +VALUE semian_simple_integer_initialize(VALUE self, VALUE name); +VALUE semian_simple_integer_increment(int argc, VALUE *argv, VALUE self); +VALUE semian_simple_integer_reset(VALUE self); +VALUE semian_simple_integer_value_get(VALUE self); +VALUE semian_simple_integer_value_set(VALUE self, VALUE val); + +#endif // EXT_SEMIAN_SIMPLE_INTEGER_H \ No newline at end of file diff --git a/ext/semian/sliding_window.c b/ext/semian/sliding_window.c new file mode 100644 index 00000000..db211a30 --- /dev/null +++ b/ext/semian/sliding_window.c @@ -0,0 +1,483 @@ +#include "sliding_window.h" + +#include "util.h" +#include "sysv_semaphores.h" +#include "sysv_shared_memory.h" + +void +semian_simple_sliding_window_dfree(void* ptr) +{ + semian_simple_sliding_window_t* res = (semian_simple_sliding_window_t*)ptr; + free_shared_memory(res->shmem); +} + +size_t +semian_simple_sliding_window_dsize(const void* ptr) +{ + return sizeof(semian_simple_sliding_window_t); +} + +static const rb_data_type_t semian_simple_sliding_window_type = { + .wrap_struct_name = "semian_simple_sliding_window", + .function = { + .dmark = NULL, + .dfree = semian_simple_sliding_window_dfree, + .dsize = semian_simple_sliding_window_dsize, + }, + .data = NULL, + .flags = RUBY_TYPED_FREE_IMMEDIATELY, +}; + +static int +check_max_size_arg(VALUE max_size) +{ + int retval = -1; + switch (rb_type(max_size)) { + case T_NIL: + case T_UNDEF: + retval = SLIDING_WINDOW_MAX_SIZE; break; + case T_FLOAT: + rb_warn("semian sliding window max_size is a float, converting to fixnum"); + retval = (int)(RFLOAT_VALUE(max_size)); break; + case T_FIXNUM: + case T_BIGNUM: + retval = RB_NUM2INT(max_size); break; + default: + rb_raise(rb_eArgError, "unknown type for max_size: %d", TYPE(max_size)); + } + + if (retval <= 0) { + rb_raise(rb_eArgError, "max_size must be greater than zero"); + } else if (retval > SLIDING_WINDOW_MAX_SIZE) { + rb_raise(rb_eArgError, "max_size cannot be greater than %d", SLIDING_WINDOW_MAX_SIZE); + } + + return retval; +} + +static float +check_scale_factor_arg(VALUE scale_factor) +{ + float retval = 1.0; + switch(rb_type(scale_factor)) { + case T_NIL: + case T_UNDEF: + retval = 1.0; break; + case T_FLOAT: + retval = rb_float_value(scale_factor); break; + case T_FIXNUM: + case T_BIGNUM: + rb_warn("semian sliding window scale_factor is an int, converting to float"); + retval = (float)RB_NUM2INT(scale_factor); break; + default: + rb_raise(rb_eArgError, "unknown type for scale_factor: %d", TYPE(scale_factor)); + } + + if (retval <= 0.0) { + rb_raise(rb_eArgError, "scale_factor must be greater than zero"); + } else if (retval > 1.0) { + rb_raise(rb_eArgError, "scale_factor cannot be greater than 1.0"); + } + + return retval; +} + +static VALUE +grow_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size) +{ + if (new_max_size > SLIDING_WINDOW_MAX_SIZE) { + sem_meta_unlock(sem_id); + rb_raise(rb_eArgError, "Cannot grow window to %d (MAX_SIZE=%d)", new_max_size, SLIDING_WINDOW_MAX_SIZE); + } + + int end = window->max_size ? (window->start + window->length) % window->max_size : 0; + dprintf("Growing window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size); + + if (window->length == 0) { + window->start = 0; + } else if (end > window->start) { + // Easy case - the window doesn't wrap around + } else { + // Hard case - the window wraps, and data might need to move + int offset = new_max_size - window->max_size; + for (int i = offset - window->start - 1; i >= 0; --i) { + int srci = window->start + i; + int dsti = window->start + offset + i; + window->data[dsti] = window->data[srci]; + } + window->start += offset; + } + + window->max_size = new_max_size; + + return RB_INT2NUM(new_max_size); +} + +static void swap(int *a, int *b) { + int c = *a; + *a = *b; + *b = c; +} + +static VALUE +shrink_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size) +{ + if (new_max_size > SLIDING_WINDOW_MAX_SIZE) { + sem_meta_unlock(sem_id); + rb_raise(rb_eArgError, "Cannot shrink window to %d (MAX_SIZE=%d)", new_max_size, SLIDING_WINDOW_MAX_SIZE); + } + + int new_length = (new_max_size > window->length) ? window->length : new_max_size; + + int end = window->max_size ? (window->start + window->length) % window->max_size : 0; + dprintf("Shrinking window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size); + + if (window->length == 0) { + window->start = 0; + } else if (end > window->start) { + // Easy case - the window doesn't wrap around + window->start = window->start + new_length; + } else { + // Hard case - the window wraps, so re-index the data + // Adapted from http://www.cplusplus.com/reference/algorithm/rotate/ + int first = 0; + int middle = (end - new_max_size + window->max_size) % window->max_size; + int last = window->max_size; + int next = middle; + while (first != next) { + swap(&window->data[first++], &window->data[next++]); + if (next == last) { + next = middle; + } else if (first == middle) { + middle = next; + } + } + window->start = 0; + } + + window->max_size = new_max_size; + window->length = new_length; + + return RB_INT2NUM(new_max_size); +} + +static VALUE +resize_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size) +{ + if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil; + + if (window->max_size < new_max_size) { + return grow_window(sem_id, window, new_max_size); + } else if (window->max_size > new_max_size) { + return shrink_window(sem_id, window, new_max_size); + } + + return Qnil; +} + +// Get the C object for a Ruby instance +static semian_simple_sliding_window_t* +get_object(VALUE self) +{ + semian_simple_sliding_window_t *res; + TypedData_Get_Struct(self, semian_simple_sliding_window_t, &semian_simple_sliding_window_type, res); + return res; +} + +void +Init_SlidingWindow() +{ + dprintf("Init_SlidingWindow"); + + VALUE cSemian = rb_const_get(rb_cObject, rb_intern("Semian")); + VALUE cSimple = rb_const_get(cSemian, rb_intern("Simple")); + VALUE cSlidingWindow = rb_const_get(cSimple, rb_intern("SlidingWindow")); + + rb_define_alloc_func(cSlidingWindow, semian_simple_sliding_window_alloc); + rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 3); + rb_define_method(cSlidingWindow, "size", semian_simple_sliding_window_size, 0); + rb_define_method(cSlidingWindow, "length", semian_simple_sliding_window_size, 0); // Alias + rb_define_method(cSlidingWindow, "resize_to", semian_simple_sliding_window_resize_to, 1); + rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size_get, 0); + rb_define_method(cSlidingWindow, "max_size=", semian_simple_sliding_window_max_size_set, 1); + rb_define_method(cSlidingWindow, "values", semian_simple_sliding_window_values, 0); + rb_define_method(cSlidingWindow, "last", semian_simple_sliding_window_last, 0); + rb_define_method(cSlidingWindow, "push", semian_simple_sliding_window_push, 1); + rb_define_method(cSlidingWindow, "<<", semian_simple_sliding_window_push, 1); // Alias + rb_define_method(cSlidingWindow, "clear", semian_simple_sliding_window_clear, 0); + rb_define_method(cSlidingWindow, "destroy", semian_simple_sliding_window_clear, 0); // Alias + rb_define_method(cSlidingWindow, "reject!", semian_simple_sliding_window_reject, 0); +} + +VALUE +semian_simple_sliding_window_alloc(VALUE klass) +{ + semian_simple_sliding_window_t *res; + VALUE obj = TypedData_Make_Struct(klass, semian_simple_sliding_window_t, &semian_simple_sliding_window_type, res); + return obj; +} + +static int +get_number_of_registered_workers(semian_simple_sliding_window_t* res) +{ + int sem_id = semget(res->parent_key, SI_NUM_SEMAPHORES, SEM_DEFAULT_PERMISSIONS); + if (sem_id == -1) { + dprintf("Warning: Could not get semaphore for key=%lu", res->parent_key); + return 1; + } + + int retval = semctl(sem_id, SI_SEM_REGISTERED_WORKERS, GETVAL); + if (retval == -1) { + dprintf("Warning: Could not get SI_SEM_REGISTERED_WORKERS for sem_id=%d", sem_id); + return 1; + } + + return retval; +} + +static int max(int a, int b) { + return a > b ? a : b; +} + +VALUE +semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, VALUE scale_factor) +{ + semian_simple_sliding_window_t *res = get_object(self); + + char buffer[1024]; + strcpy(buffer, to_s(name)); + strcat(buffer, "_sliding_window"); + res->key = generate_key(buffer); + + // Store the parent key, not the parent sem_id, since it might not exist yet. + res->parent_key = generate_key(to_s(name)); + + dprintf("Initializing simple sliding window '%s' (key: %lu)", buffer, res->key); + res->sem_id = initialize_single_semaphore(res->key, SEM_DEFAULT_PERMISSIONS, 1); + res->shmem = get_or_create_shared_memory(res->key); + res->error_threshold = check_max_size_arg(max_size); + res->scale_factor = check_scale_factor_arg(scale_factor); + + sem_meta_lock(res->sem_id); + { + int workers = get_number_of_registered_workers(res); + float scale = (workers > 1) ? res->scale_factor : 1.0; // TODO: Parameterize + int error_threshold = max(res->error_threshold, (int) ceil(workers * scale * res->error_threshold)); + + dprintf(" workers:%d scale:%0.2f error_threshold:%d", workers, scale, error_threshold); + resize_window(res->sem_id, res->shmem, error_threshold); + } + sem_meta_unlock(res->sem_id); + + return self; +} + +VALUE +semian_simple_sliding_window_size(VALUE self) +{ + semian_simple_sliding_window_t *res = get_object(self); + VALUE retval; + + sem_meta_lock(res->sem_id); + { + retval = RB_INT2NUM(res->shmem->length); + } + sem_meta_unlock(res->sem_id); + + return retval; +} + +VALUE +semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size) +{ + semian_simple_sliding_window_t *res = get_object(self); + VALUE retval = Qnil; + + int new_max_size = RB_NUM2INT(new_size); + if (new_max_size < 1) { + rb_raise(rb_eArgError, "cannot resize to %d", new_max_size); + } + + sem_meta_lock(res->sem_id); + { + retval = resize_window(res->sem_id, res->shmem, new_max_size); + } + sem_meta_unlock(res->sem_id); + + return retval; +} + +VALUE +semian_simple_sliding_window_max_size_get(VALUE self) +{ + semian_simple_sliding_window_t *res = get_object(self); + VALUE retval; + + sem_meta_lock(res->sem_id); + { + retval = RB_INT2NUM(res->shmem->max_size); + } + sem_meta_unlock(res->sem_id); + + return retval; +} + +VALUE +semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size) +{ + semian_simple_sliding_window_t *res = get_object(self); + VALUE retval; + + int new_max_size = RB_NUM2INT(new_size); + if (new_max_size < 1) { + rb_raise(rb_eArgError, "max_size must be positive"); + } + + sem_meta_lock(res->sem_id); + { + retval = resize_window(res->sem_id, res->shmem, new_max_size); + } + sem_meta_unlock(res->sem_id); + + return retval; +} + +VALUE +semian_simple_sliding_window_values(VALUE self) +{ + semian_simple_sliding_window_t *res = get_object(self); + VALUE retval; + + sem_meta_lock(res->sem_id); + { + retval = rb_ary_new_capa(res->shmem->length); + for (int i = 0; i < res->shmem->length; ++i) { + int index = (res->shmem->start + i) % res->shmem->max_size; + int value = res->shmem->data[index]; + rb_ary_store(retval, i, RB_INT2NUM(value)); + } + } + sem_meta_unlock(res->sem_id); + + return retval; +} + +VALUE +semian_simple_sliding_window_last(VALUE self) +{ + semian_simple_sliding_window_t *res = get_object(self); + VALUE retval; + + sem_meta_lock(res->sem_id); + { + int index = (res->shmem->start + res->shmem->length - 1) % res->shmem->max_size; + retval = RB_INT2NUM(res->shmem->data[index]); + } + sem_meta_unlock(res->sem_id); + + return retval; +} + +VALUE +semian_simple_sliding_window_clear(VALUE self) +{ + semian_simple_sliding_window_t *res = get_object(self); + + sem_meta_lock(res->sem_id); + { + dprintf("Clearing sliding window"); + res->shmem->length = 0; + res->shmem->start = 0; + } + sem_meta_unlock(res->sem_id); + + return self; +} + +#if 0 +// Handy for debugging the sliding window, but too noisy for regular debugging. +static void dprint_window(semian_simple_sliding_window_shared_t *window) +{ + dprintf("---"); + for (int i = 0; i < window->length; ++i) { + const int index = (window->start + i) % window->max_size; + dprintf(" %0d: data[%d] = %d", i, index, window->data[index]); + } + dprintf("---"); +} +#endif + +VALUE +semian_simple_sliding_window_reject(VALUE self) +{ + semian_simple_sliding_window_t *res = get_object(self); + + rb_need_block(); + + sem_meta_lock(res->sem_id); + { + semian_simple_sliding_window_shared_t *window = res->shmem; + const int start = window->start; + const int length = window->length; + const int max_size = window->max_size; + + if (max_size && length) { + int wptr = (start + length + max_size - 1) % max_size; + + // Walk the sliding window backward, from the last element to the first, + // pushing the entries to the back of the ring. When we've gone through + // every element, set the start pointer to the new location. + // + // Example, deleting "2": + // S E S E + // [x,x,0,1,2,3,x,x] --> [x,x,x,0,1,3,x,x] + // 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + // + // The runtime of this algorithm is theta(n), but n tends to be small. + // + dprintf("Before reject! start:%d length:%d max_size:%d", window->start, window->length, window->max_size); + for (int i = 0; i < length; ++i) { + const int rptr = (start + length + max_size - i - 1) % max_size; + + const int value = window->data[rptr]; + if (RTEST(rb_yield(RB_INT2NUM(value)))) { + window->length--; + } else { + window->data[wptr] = value; + wptr = (wptr + max_size - 1) % max_size; + } + } + + window->start = (wptr + 1) % max_size; + dprintf("After reject! start:%d length:%d max_size:%d", window->start, window->length, window->max_size); + } + } + sem_meta_unlock(res->sem_id); + + return self; +} + +VALUE +semian_simple_sliding_window_push(VALUE self, VALUE value) +{ + semian_simple_sliding_window_t *res = get_object(self); + + sem_meta_lock(res->sem_id); + { + dprintf("Before: start:%d length:%d max_size:%d", res->shmem->start, res->shmem->length, res->shmem->max_size); + // If the window is full, make room by popping off the front. + if (res->shmem->length == res->shmem->max_size) { + res->shmem->length--; + res->shmem->start = (res->shmem->start + 1) % res->shmem->max_size; + } + + // Push onto the back of the window. + int index = (res->shmem->start + res->shmem->length) % res->shmem->max_size; + res->shmem->length++; + res->shmem->data[index] = RB_NUM2INT(value); + dprintf("Pushed %d onto data[%d] (length %d)", RB_NUM2INT(value), index, res->shmem->length); + } + sem_meta_unlock(res->sem_id); + + return self; +} diff --git a/ext/semian/sliding_window.h b/ext/semian/sliding_window.h new file mode 100644 index 00000000..c9471a06 --- /dev/null +++ b/ext/semian/sliding_window.h @@ -0,0 +1,21 @@ +#ifndef EXT_SEMIAN_SLIDING_WINDOW_H +#define EXT_SEMIAN_SLIDING_WINDOW_H + +#include +#include "types.h" + +void Init_SlidingWindow(); + +VALUE semian_simple_sliding_window_alloc(VALUE klass); +VALUE semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, VALUE scale_factor); +VALUE semian_simple_sliding_window_size(VALUE self); +VALUE semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size); +VALUE semian_simple_sliding_window_max_size_get(VALUE self); +VALUE semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size); +VALUE semian_simple_sliding_window_push(VALUE self, VALUE value); +VALUE semian_simple_sliding_window_values(VALUE self); +VALUE semian_simple_sliding_window_last(VALUE self); +VALUE semian_simple_sliding_window_clear(VALUE self); +VALUE semian_simple_sliding_window_reject(VALUE self); + +#endif // EXT_SEMIAN_SLIDING_WINDOW_H diff --git a/ext/semian/sysv_semaphores.c b/ext/semian/sysv_semaphores.c index bed7e4f3..7c0b0c52 100644 --- a/ext/semian/sysv_semaphores.c +++ b/ext/semian/sysv_semaphores.c @@ -1,14 +1,13 @@ #include "sysv_semaphores.h" -#include -static key_t -generate_key(const char *name); +#include +#include "util.h" static void * acquire_semaphore(void *p); static int -wait_for_new_semaphore_set(key_t key, long permissions); +wait_for_new_semaphore_set(uint64_t key, long permissions); static void initialize_new_semaphore_values(int sem_id, long permissions); @@ -30,10 +29,12 @@ raise_semian_syscall_error(const char *syscall, int error_num) void initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota) { - res->key = generate_key(id_str); + dprintf("Initializing semaphore set for key:%lu", res->key); + res->strkey = (char*) malloc((2 /*for 0x*/+ sizeof(uint64_t) /*actual key*/+ 1 /*null*/) * sizeof(char)); sprintf(res->strkey, "0x%08x", (unsigned int) res->key); + res->sem_id = semget(res->key, SI_NUM_SEMAPHORES, IPC_CREAT | IPC_EXCL | permissions); /* @@ -43,6 +44,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis if (res->sem_id != -1) { // Happy path - we are the first worker, initialize the semaphore set. initialize_new_semaphore_values(res->sem_id, permissions); + dprintf("Created semaphore set (key:%lu sem_id:%d)", res->key, res->sem_id); } else { // Something went wrong if (errno != EEXIST) { @@ -59,6 +61,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis Ensure that a worker for this process is registered. Note that from ruby we ensure that at most one worker may be registered per process. */ + dprintf("Registering worker for sem_id:%d", res->sem_id); if (perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, 1, SEM_UNDO, NULL) == -1) { rb_raise(eInternal, "error incrementing registered workers, errno: %d (%s)", errno, strerror(errno)); } @@ -126,6 +129,16 @@ get_sem_val(int sem_id, int sem_index) return ret; } +int +set_sem_val(int sem_id, int sem_index, int val) +{ + int ret = semctl(sem_id, sem_index, SETVAL, val); + if (ret == -1) { + rb_raise(eInternal, "error setting value of %s for sem %d, errno: %d (%s)", SEMINDEX_STRING[sem_index], sem_id, errno, strerror(errno)); + } + return ret; +} + void sem_meta_lock(int sem_id) { @@ -164,9 +177,7 @@ acquire_semaphore(void *p) semian_resource_t *res = (semian_resource_t *) p; res->error = 0; res->wait_time = -1; -#ifdef DEBUG - print_sem_vals(res->sem_id); -#endif + dprint_sem_vals("acquire_semaphore", res->sem_id); struct timespec begin, end; int benchmark_result = clock_gettime(CLOCK_MONOTONIC, &begin); @@ -181,31 +192,6 @@ acquire_semaphore(void *p) return NULL; } -static key_t -generate_key(const char *name) -{ - char semset_size_key[20]; - char *uniq_id_str; - - // It is necessary for the cardinatily of the semaphore set to be part of the key - // or else sem_get will complain that we have requested an incorrect number of sems - // for the desired key, and have changed the number of semaphores for a given key - sprintf(semset_size_key, "_NUM_SEMS_%d", SI_NUM_SEMAPHORES); - uniq_id_str = malloc(strlen(name)+strlen(semset_size_key)+1); - strcpy(uniq_id_str, name); - strcat(uniq_id_str, semset_size_key); - - union { - unsigned char str[SHA_DIGEST_LENGTH]; - key_t key; - } digest; - SHA1((const unsigned char *) uniq_id_str, strlen(uniq_id_str), digest.str); - free(uniq_id_str); - /* TODO: compile-time assertion that sizeof(key_t) > SHA_DIGEST_LENGTH */ - return digest.key; -} - - static void initialize_new_semaphore_values(int sem_id, long permissions) { @@ -218,28 +204,22 @@ initialize_new_semaphore_values(int sem_id, long permissions) if (semctl(sem_id, 0, SETALL, init_vals) == -1) { raise_semian_syscall_error("semctl()", errno); } -#ifdef DEBUG - print_sem_vals(sem_id); -#endif + dprint_sem_vals("initialize_new_semaphore_values", sem_id); } static int -wait_for_new_semaphore_set(key_t key, long permissions) +wait_for_new_semaphore_set(uint64_t key, long permissions) { - int i; - int sem_id = -1; - union semun sem_opts; struct semid_ds sem_ds; - + union semun sem_opts; sem_opts.buf = &sem_ds; - sem_id = semget(key, 1, permissions); + int sem_id = semget((key_t)key, 1, permissions); if (sem_id == -1){ raise_semian_syscall_error("semget()", errno); } - for (i = 0; i < ((INTERNAL_TIMEOUT * MICROSECONDS_IN_SECOND) / INIT_WAIT); i++) { - + for (int i = 0; i < ((INTERNAL_TIMEOUT * MICROSECONDS_IN_SECOND) / INIT_WAIT); i++) { if (semctl(sem_id, 0, IPC_STAT, sem_opts) == -1) { raise_semian_syscall_error("semctl()", errno); } @@ -269,3 +249,37 @@ diff_timespec_ms(struct timespec *end, struct timespec *begin) long begin_ms = (begin->tv_sec * 1e3) + (begin->tv_nsec / 1e6); return end_ms - begin_ms; } + +int +initialize_single_semaphore(uint64_t key, long permissions, int value) +{ + dprintf("Initializing single semaphore for key:%lu with value %d", key, value); + int sem_id = semget(key, 1, IPC_CREAT | IPC_EXCL | permissions); + + /* + This approach is based on http://man7.org/tlpi/code/online/dist/svsem/svsem_good_init.c.html + which avoids race conditions when initializing semaphore sets. + */ + if (sem_id != -1) { + // Happy path - we are the first worker, initialize the semaphore set. + dprintf("Created semaphore (key:%lu sem_id:%d)", key, sem_id); + if (semctl(sem_id, 0, SETVAL, value) == -1) { + raise_semian_syscall_error("semctl() failed to set semaphore initial value", errno); + } + } else { + if (errno == EEXIST) { + // The semaphore set already exists, ensure it is initialized + sem_id = wait_for_new_semaphore_set(key, permissions); + } else { + raise_semian_syscall_error("semget() failed to initialize semaphore values", errno); + } + } + + set_semaphore_permissions(sem_id, permissions); + + // Set otime for the first time by acquiring the sem lock. + sem_meta_unlock(sem_id); // +1 + sem_meta_lock(sem_id); // -1 + + return sem_id; +} diff --git a/ext/semian/sysv_semaphores.h b/ext/semian/sysv_semaphores.h index c3f2d106..0049d67b 100644 --- a/ext/semian/sysv_semaphores.h +++ b/ext/semian/sysv_semaphores.h @@ -11,13 +11,13 @@ and functions associated directly weth semops. #include #include -#include #include #include #include #include "types.h" #include "tickets.h" +#include "util.h" // Defines for ruby threading primitives #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) @@ -28,6 +28,8 @@ and functions associated directly weth semops. // 1.9 typedef VALUE (*my_blocking_fn_t)(void*); #define WITHOUT_GVL(fn,a,ubf,b) rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) +#else +#define WITHOUT_GVL(fn,a,ubf,b) #endif // Time to wait for timed ops to complete @@ -39,6 +41,10 @@ typedef VALUE (*my_blocking_fn_t)(void*); // Helper definition to prevent magic number for conversion of microseconds to seconds #define MICROSECONDS_IN_SECOND 1000000 +// Default permissions for semaphore set - execute permissions are not meaningful for +// semaphores +#define SEM_DEFAULT_PERMISSIONS 0660 + // Here we define an enum value and string representation of each semaphore // This allows us to key the sem value and string rep in sync easily // utilizing pre-processor macros. @@ -90,6 +96,10 @@ perform_semop(int sem_id, short index, short op, short flags, struct timespec *t int get_sem_val(int sem_id, int sem_index); +// Set the current number of tickets in a semaphore by its semaphore index +int +set_sem_val(int sem_id, int sem_index, int val); + // Obtain an exclusive lock on the semaphore set critical section void sem_meta_lock(int sem_id); @@ -106,17 +116,22 @@ get_semaphore(int key); void * acquire_semaphore_without_gvl(void *p); -#ifdef DEBUG +// Initializes a semaphore set with a single semaphore, for general purpose +// locking +int +initialize_single_semaphore(uint64_t key, long permissions, int value); + static inline void -print_sem_vals(int sem_id) +dprint_sem_vals(const char *msg, int sem_id) { - printf("lock %d, tickets: %d configured: %d, registered workers %d\n", - get_sem_val(sem_id, SI_SEM_LOCK), - get_sem_val(sem_id, SI_SEM_TICKETS), - get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS), - get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS) + dprintf("%s (sem_id:%d lock:%d tickets:%d configured:%d, registered_workers:%d)", + msg, + sem_id, + get_sem_val(sem_id, SI_SEM_LOCK), + get_sem_val(sem_id, SI_SEM_TICKETS), + get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS), + get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS) ); } -#endif #endif // SEMIAN_SEMSET_H diff --git a/ext/semian/sysv_shared_memory.c b/ext/semian/sysv_shared_memory.c new file mode 100644 index 00000000..92aeffa1 --- /dev/null +++ b/ext/semian/sysv_shared_memory.c @@ -0,0 +1,42 @@ +#include "sysv_shared_memory.h" + +#include "util.h" + +void* +get_or_create_shared_memory(uint64_t key) +{ + void* shmem = NULL; + if (!key) return NULL; + + dprintf("Creating shared memory (key: %lu)", key); + int shmid = shmget(key, SHM_DEFAULT_SIZE, IPC_CREAT | IPC_EXCL | SHM_DEFAULT_PERMISSIONS); + if (shmid != -1) { + dprintf("Created shared memory (key:%lu sem_id:%d)", key, shmid); + shmem = shmat(shmid, NULL, 0); + if (shmem == (void*)-1) { + rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno)); + } + + shmctl(key, IPC_RMID, NULL); + } else { + shmid = shmget(key, SHM_DEFAULT_SIZE, SHM_DEFAULT_PERMISSIONS); + if (shmid == -1) { + rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno)); + } + + dprintf("Got shared memory (key:%lu sem_id:%d)", key, shmid); + shmem = shmat(shmid, NULL, 0); + if (shmem == (void*)-1) { + rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno)); + } + } + + return shmem; +} + +void +free_shared_memory(void* shmem) +{ + if (!shmem) return; + shmdt(shmem); +} diff --git a/ext/semian/sysv_shared_memory.h b/ext/semian/sysv_shared_memory.h new file mode 100644 index 00000000..9528dd46 --- /dev/null +++ b/ext/semian/sysv_shared_memory.h @@ -0,0 +1,19 @@ +#ifndef EXT_SEMIAN_SYSV_SHARED_MEMORY_H +#define EXT_SEMIAN_SYSV_SHARED_MEMORY_H + +#include +#include +#include +#include + +// Default permissions for shared memory +#define SHM_DEFAULT_PERMISSIONS 0660 +#define SHM_DEFAULT_SIZE 4096 + +void* +get_or_create_shared_memory(uint64_t key); + +void +free_shared_memory(void* key); + +#endif // EXT_SEMIAN_SYSV_SHARED_MEMORY_H diff --git a/ext/semian/test/Makefile b/ext/semian/test/Makefile new file mode 100644 index 00000000..ad6496c9 --- /dev/null +++ b/ext/semian/test/Makefile @@ -0,0 +1,18 @@ +# Makefile + +CC := gcc -std=gnu99 -I. -I.. +TESTS := types_test + +all : test + +test : types_test + +types_test : types_test.c + $(CC) -o $@ $< + ./$@ + +clean : + rm -f *.o + rm -f $(TESTS) + +.PHONY : all test types_test clean diff --git a/ext/semian/test/types_test.c b/ext/semian/test/types_test.c new file mode 100644 index 00000000..61085574 --- /dev/null +++ b/ext/semian/test/types_test.c @@ -0,0 +1,36 @@ +#include +#include + +#include "types.h" + +void assert_equal_int(int actual, int expected, const char *message) +{ + if (actual != expected) { + fprintf(stderr, "Error: got %d, expected %d (%s)\n", actual, expected, message); + exit(EXIT_FAILURE); + } +} + +void assert_le_int(int actual, int expected, const char *message) +{ + if (actual > expected) { + fprintf(stderr, "Error: got %d, which is greater than %d (%s)\n", actual, expected, message); + exit(EXIT_FAILURE); + } +} + +void test_sliding_window() +{ + semian_simple_sliding_window_shared_t window; + assert_le_int(sizeof(window), 4096, "window size is greater than a page"); + assert_equal_int(sizeof(window.data), SLIDING_WINDOW_MAX_SIZE * sizeof(int), "window data size"); +} + +int main(int argc, char **argv) +{ + printf("Info: Running test\n"); + + test_sliding_window(); + + return EXIT_SUCCESS; +} diff --git a/ext/semian/tickets.c b/ext/semian/tickets.c index 369f3058..cb4ff6e0 100644 --- a/ext/semian/tickets.c +++ b/ext/semian/tickets.c @@ -49,9 +49,7 @@ update_ticket_count(int sem_id, int tickets) delta = tickets - get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS); -#ifdef DEBUG - print_sem_vals(sem_id); -#endif + dprint_sem_vals("update_ticket_count", sem_id); if (perform_semop(sem_id, SI_SEM_TICKETS, delta, 0, &ts) == -1) { if (delta < 0 && errno == EAGAIN) { rb_raise(eTimeout, "timeout while trying to update ticket count"); diff --git a/ext/semian/types.h b/ext/semian/types.h index de0254ec..81d31d5d 100644 --- a/ext/semian/types.h +++ b/ext/semian/types.h @@ -10,6 +10,8 @@ For custom type definitions specific to semian #include #include +#define SLIDING_WINDOW_MAX_SIZE 1000 + // For sysV semop syscals // see man semop union semun { @@ -38,4 +40,28 @@ typedef struct { long wait_time; } semian_resource_t; +// Internal simple integer structure +typedef struct { + uint64_t key; + int sem_id; +} semian_simple_integer_t; + +// Shared simple sliding window structure +typedef struct { + int max_size; + int length; + int start; + int data[SLIDING_WINDOW_MAX_SIZE]; +} semian_simple_sliding_window_shared_t; + +// Internal simple sliding window structure +typedef struct { + uint64_t key; + int sem_id; + uint64_t parent_key; + int error_threshold; + float scale_factor; + semian_simple_sliding_window_shared_t* shmem; +} semian_simple_sliding_window_t; + #endif // SEMIAN_TYPES_H diff --git a/ext/semian/util.c b/ext/semian/util.c new file mode 100644 index 00000000..44b2e61e --- /dev/null +++ b/ext/semian/util.c @@ -0,0 +1,52 @@ +#include "util.h" + +const char* check_id_arg(VALUE id) +{ + if (TYPE(id) != T_SYMBOL && TYPE(id) != T_STRING) { + rb_raise(rb_eTypeError, "id must be a symbol or string"); + } + + const char *c_id_str = NULL; + if (TYPE(id) == T_SYMBOL) { + c_id_str = rb_id2name(rb_to_id(id)); + } else if (TYPE(id) == T_STRING) { + c_id_str = RSTRING_PTR(id); + } + + return c_id_str; +} + +key_t generate_key(const char *name) +{ + char semset_size_key[128]; + char *uniq_id_str; + + // It is necessary for the cardinatily of the semaphore set to be part of the key + // or else sem_get will complain that we have requested an incorrect number of sems + // for the desired key, and have changed the number of semaphores for a given key + const int NUM_SEMAPHORES = 4; + sprintf(semset_size_key, "_NUM_SEMS_%d", NUM_SEMAPHORES); + uniq_id_str = malloc(strlen(name) + strlen(semset_size_key) + 1); + strcpy(uniq_id_str, name); + strcat(uniq_id_str, semset_size_key); + + union { + unsigned char str[SHA_DIGEST_LENGTH]; + key_t key; + } digest; + SHA1((const unsigned char *) uniq_id_str, strlen(uniq_id_str), digest.str); + free(uniq_id_str); + /* TODO: compile-time assertion that sizeof(key_t) > SHA_DIGEST_LENGTH */ + return digest.key; +} + +const char* to_s(VALUE obj) { + if (RB_TYPE_P(obj, T_STRING)) { + return RSTRING_PTR(obj); + } else if (RB_TYPE_P(obj, T_SYMBOL)) { + return rb_id2name(SYM2ID(obj)); + } + + rb_raise(rb_eArgError, "could not convert object to string"); + return NULL; +} diff --git a/ext/semian/util.h b/ext/semian/util.h new file mode 100644 index 00000000..8a2470bc --- /dev/null +++ b/ext/semian/util.h @@ -0,0 +1,34 @@ +#ifndef EXT_SEMIAN_UTIL_H +#define EXT_SEMIAN_UTIL_H + +#include +#include +#include + +#include +#include + +#if defined(DEBUG) || defined(SEMIAN_DEBUG) +# define DEBUG_TEST 1 +#else +# define DEBUG_TEST 0 +#endif + +#define dprintf(fmt, ...) \ + do { \ + if (DEBUG_TEST) { \ + const pid_t pid = getpid(); \ + struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); \ + struct tm t; localtime_r(&(ts.tv_sec), &t); \ + char buf[128]; strftime(buf, sizeof(buf), "%H:%M:%S", &t); \ + printf("%s.%ld [DEBUG] (%d): %s:%d - " fmt "\n", buf, ts.tv_nsec, pid, __FILE__, __LINE__, ##__VA_ARGS__); \ + } \ + } while (0) + +const char* check_id_arg(VALUE id); + +key_t generate_key(const char *name); + +const char* to_s(VALUE obj); + +#endif // EXT_SEMIAN_UTIL_H diff --git a/lib/semian.rb b/lib/semian.rb index 7cdd504f..ebeef4ee 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -259,6 +259,7 @@ def create_circuit_breaker(name, **options) exceptions: Array(exceptions) + [::Semian::BaseError], half_open_resource_timeout: options[:half_open_resource_timeout], implementation: implementation(**options), + scale_factor: options[:scale_factor], ) end @@ -295,7 +296,19 @@ def require_keys!(required = [], **options) end end +def hostname + v = %w(KUBE_HOSTNAME KUBE_HOST_NAME KUBE_NODENAME KUBE_NODE_NAME NODENAME NODE_NAME HOSTNAME HOST_NAME) + var = v.find { |x| ENV.include?(x) } + ENV[var] if var +end + +def force_host_circuits? + return false unless ENV.include?('SEMIAN_CIRCUIT_BREAKER_FORCE_HOST') + ENV['SEMIAN_CIRCUIT_BREAKER_FORCE_HOST'].split(',').include?(hostname) +end + if Semian.semaphores_enabled? + ENV['SEMIAN_CIRCUIT_BREAKER_IMPL'] = 'host' if force_host_circuits? require 'semian/semian' else Semian::MAX_TICKETS = 0 diff --git a/lib/semian/circuit_breaker.rb b/lib/semian/circuit_breaker.rb index a19a0f36..788035ca 100644 --- a/lib/semian/circuit_breaker.rb +++ b/lib/semian/circuit_breaker.rb @@ -3,21 +3,24 @@ class CircuitBreaker #:nodoc: extend Forwardable def_delegators :@state, :closed?, :open?, :half_open? + def_delegators :@errors, :size, :max_size, :values attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error def initialize(name, exceptions:, success_threshold:, error_threshold:, - error_timeout:, implementation:, half_open_resource_timeout: nil) + error_timeout:, implementation:, half_open_resource_timeout: nil, scale_factor: nil) @name = name.to_sym @success_count_threshold = success_threshold @error_count_threshold = error_threshold + @scale_factor = scale_factor @error_timeout = error_timeout @exceptions = exceptions @half_open_resource_timeout = half_open_resource_timeout - @errors = implementation::SlidingWindow.new(max_size: @error_count_threshold) - @successes = implementation::Integer.new - @state = implementation::State.new + @errors = implementation::SlidingWindow.new(name, max_size: @error_count_threshold, scale_factor: @scale_factor) + @successes = implementation::Integer.new("#{name}_successes") + state_val = implementation::Integer.new("#{name}_state") + @state = implementation::State.new(state_val) reset end @@ -62,7 +65,7 @@ def mark_failed(error) def mark_success return unless half_open? - @successes.increment + @successes.increment(1) transition_to_close if success_threshold_reached? end @@ -131,7 +134,7 @@ def push_time(window, time: Time.now) def log_state_transition(new_state) return if @state.nil? || new_state == @state.value - str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}." + str = "[#{self.class.name}] State transition from #{@state} to #{new_state}." str << " success_count=#{@successes.value} error_count=#{@errors.size}" str << " success_count_threshold=#{@success_count_threshold} error_count_threshold=#{@error_count_threshold}" str << " error_timeout=#{@error_timeout} error_last_at=\"#{@errors.last}\"" @@ -159,5 +162,9 @@ def maybe_with_half_open_resource_timeout(resource, &block) result end + + def to_s + "" + end end end diff --git a/lib/semian/protected_resource.rb b/lib/semian/protected_resource.rb index 73c10fac..5c032083 100644 --- a/lib/semian/protected_resource.rb +++ b/lib/semian/protected_resource.rb @@ -4,7 +4,7 @@ class ProtectedResource def_delegators :@bulkhead, :destroy, :count, :semid, :tickets, :registered_workers def_delegators :@circuit_breaker, :reset, :mark_failed, :mark_success, :request_allowed?, - :open?, :closed?, :half_open? + :open?, :closed?, :half_open?, :size, :max_size, :values attr_reader :bulkhead, :circuit_breaker, :name attr_accessor :updated_at diff --git a/lib/semian/resource.rb b/lib/semian/resource.rb index 1dd72658..0505bfa8 100644 --- a/lib/semian/resource.rb +++ b/lib/semian/resource.rb @@ -36,6 +36,18 @@ def count 0 end + def size + 0 + end + + def max_size + 0 + end + + def values + [] + end + def tickets 0 end diff --git a/lib/semian/simple_integer.rb b/lib/semian/simple_integer.rb index 51bc498b..8e1b8ff9 100644 --- a/lib/semian/simple_integer.rb +++ b/lib/semian/simple_integer.rb @@ -5,15 +5,22 @@ module Simple class Integer #:nodoc: attr_accessor :value - def initialize + def initialize(name) + initialize_simple_integer(name) if respond_to?(:initialize_simple_integer) reset end + def use_host_circuits + ENV['SEMIAN_CIRCUIT_BREAKER_IMPL'] == 'host' + end + def increment(val = 1) + raise StandardError, "Shouldn't call increment if using host circuits" if use_host_circuits @value += val end def reset + raise StandardError, "Shouldn't call reset if using host circuits" if use_host_circuits @value = 0 end diff --git a/lib/semian/simple_sliding_window.rb b/lib/semian/simple_sliding_window.rb index c31cda02..6896e0ca 100644 --- a/lib/semian/simple_sliding_window.rb +++ b/lib/semian/simple_sliding_window.rb @@ -3,25 +3,44 @@ module Semian module Simple class SlidingWindow #:nodoc: - extend Forwardable - - def_delegators :@window, :size, :last - attr_reader :max_size - # A sliding window is a structure that stores the most @max_size recent timestamps # like this: if @max_size = 4, current time is 10, @window =[5,7,9,10]. # Another push of (11) at 11 sec would make @window [7,9,10,11], shifting off 5. - def initialize(max_size:) + def initialize(name, max_size:, scale_factor: nil) + initialize_sliding_window(name, max_size, scale_factor) if respond_to?(:initialize_sliding_window) + + @name = name.to_sym @max_size = max_size @window = [] end + def use_host_circuits + ENV['SEMIAN_CIRCUIT_BREAKER_IMPL'] == 'host' + end + + def size + raise StandardError, "Shouldn't call size if using host circuits" if use_host_circuits + @window.size + end + + def last + raise StandardError, "Shouldn't call last if using host circuits" if use_host_circuits + @window.last + end + + def values + raise StandardError, "Shouldn't call values if using host circuits" if use_host_circuits + @window + end + def reject!(&block) + raise StandardError, "Shouldn't call reject! if using host circuits" if use_host_circuits @window.reject!(&block) end def push(value) + raise StandardError, "Shouldn't call push if using host circuits" if use_host_circuits resize_to(@max_size - 1) # make room @window << value self @@ -29,14 +48,28 @@ def push(value) alias_method :<<, :push def clear + raise StandardError, "Shouldn't call clear if using host circuits" if use_host_circuits @window.clear self end alias_method :destroy, :clear + def max_size + raise StandardError, "Shouldn't call max_size if using host circuits" if use_host_circuits + @max_size + end + + def max_size=(value) + raise StandardError, "Shouldn't call max_size= if using host circuits" if use_host_circuits + raise ArgumentError, "max_size must be positive" if value <= 0 + @max_size = value + resize_to(value) + end + private def resize_to(size) + raise StandardError, "Shouldn't call resize_to if using host circuits" if use_host_circuits @window = @window.last(size) if @window.size >= size end end diff --git a/lib/semian/simple_state.rb b/lib/semian/simple_state.rb index 322d333d..0357a345 100644 --- a/lib/semian/simple_state.rb +++ b/lib/semian/simple_state.rb @@ -1,34 +1,43 @@ module Semian module Simple class State #:nodoc: - def initialize + extend Forwardable + + def_delegators :@value, :value + + # State constants. Looks like a flag, but is actually an enum. + UNKNOWN = 0x0 + CLOSED = 0x1 + OPEN = 0x2 + HALF_OPEN = 0x4 + + def initialize(value) + @value = value reset end - attr_reader :value - def open? - value == :open + @value.value == OPEN end def closed? - value == :closed + @value.value == CLOSED end def half_open? - value == :half_open + @value.value == HALF_OPEN end def open! - @value = :open + @value.value = OPEN end def close! - @value = :closed + @value.value = CLOSED end def half_open! - @value = :half_open + @value.value = HALF_OPEN end def reset @@ -38,6 +47,21 @@ def reset def destroy reset end + + def to_s + case @value.value + when UNKNOWN + "unknown" + when CLOSED + "closed" + when OPEN + "open" + when HALF_OPEN + "half_open" + else + "" + end + end end end diff --git a/lib/semian/unprotected_resource.rb b/lib/semian/unprotected_resource.rb index 8a88b826..dd2c3d3a 100644 --- a/lib/semian/unprotected_resource.rb +++ b/lib/semian/unprotected_resource.rb @@ -29,6 +29,18 @@ def count 0 end + def size + 0 + end + + def max_size + 0 + end + + def values + [] + end + def semid 0 end diff --git a/scripts/cleanup_ipc.sh b/scripts/cleanup_ipc.sh new file mode 100755 index 00000000..b491f249 --- /dev/null +++ b/scripts/cleanup_ipc.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +ME=`whoami` + +IPCS_S=`ipcs -s | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "` +IPCS_M=`ipcs -m | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "` +IPCS_Q=`ipcs -q | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "` + +for id in $IPCS_M; do + ipcrm -m $id; +done + +for id in $IPCS_S; do + ipcrm -s $id; +done + +for id in $IPCS_Q; do + ipcrm -q $id; +done + diff --git a/test/adapter_test.rb b/test/adapter_test.rb index 791a03b2..55a9549d 100644 --- a/test/adapter_test.rb +++ b/test/adapter_test.rb @@ -21,10 +21,10 @@ def test_adapter_registers_consumer end def test_unregister - skip if ENV["SKIP_FLAKY_TESTS"] + skip if flaky client = Semian::AdapterTestClient.new(quota: 0.5) assert_nil(Semian.resources[:testing]) - resource = Semian.register(:testing, tickets: 2, error_threshold: 0, error_timeout: 0, success_threshold: 0) + resource = Semian.register(:testing, tickets: 2, error_threshold: 1, error_timeout: 0, success_threshold: 0) assert_equal(Semian.resources[:testing], resource) assert_equal 1, resource.registered_workers diff --git a/test/circuit_breaker_test.rb b/test/circuit_breaker_test.rb index be4c01c9..d97e4d9d 100644 --- a/test/circuit_breaker_test.rb +++ b/test/circuit_breaker_test.rb @@ -4,15 +4,45 @@ class TestCircuitBreaker < Minitest::Test include CircuitBreakerHelper def setup + id = Time.now.strftime('%H:%M:%S.%N') @strio = StringIO.new Semian.logger = Logger.new @strio begin - Semian.destroy(:testing) + Semian.destroy(id) rescue nil end - Semian.register(:testing, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1) - @resource = Semian[:testing] + Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1) + @resource = Semian[id] + @resource.reset + end + + def test_destroy + id = Time.now.strftime('%H:%M:%S.%N') + + # Create the resource and check that it was reset. + Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1) + resource = Semian[id] + assert_equal(0, resource.size) + assert_equal(2, resource.max_size) + assert_equal([], resource.values) + + # Open the circuit. + open_circuit!(resource, 2) + assert_equal(2, resource.size) + assert_equal(2, resource.max_size) + + # Destroy the resource and check that it was destroyed. + Semian.destroy(id) + resource = Semian[id] + assert_nil(resource, "Resource was not destroyed") + + # Re-create the resource and check that it was reset. + Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1) + resource = Semian[id] + assert_equal(0, resource.size) + assert_equal(2, resource.max_size) + assert_equal([], resource.values) end def test_acquire_yield_when_the_circuit_is_closed @@ -40,6 +70,7 @@ def test_after_error_timeout_is_elapsed_requests_are_attempted_again end def test_until_success_threshold_is_reached_a_single_error_will_reopen_the_circuit + assert_equal(0, @resource.size) half_open_cicuit! trigger_error! assert_circuit_opened @@ -148,6 +179,10 @@ def test_semian_wide_env_var_disables_circuit_breaker end class RawResource + def initialize + @timeout = 2 + end + def timeout @timeout || 2 end diff --git a/test/grpc_test.rb b/test/grpc_test.rb index 6fc8a630..0ecb1095 100644 --- a/test/grpc_test.rb +++ b/test/grpc_test.rb @@ -64,7 +64,7 @@ def test_unavailable_server_opens_the_circuit end def test_timeout_opens_the_circuit - skip if ENV["SKIP_FLAKY_TESTS"] + skip if flaky stub = build_insecure_stub(EchoStub, host: "#{SemianConfig['toxiproxy_upstream_host']}:#{SemianConfig['grpc_toxiproxy_port']}", opts: {timeout: 0.1}) run_services_on_server(@server, services: [EchoService]) do Toxiproxy['semian_test_grpc'].downstream(:latency, latency: 1000).apply do @@ -86,7 +86,7 @@ def test_timeout_opens_the_circuit def test_instrumentation notified = false subscriber = Semian.subscribe do |event, resource, scope, adapter| - next if event != :success + next unless event == :success notified = true assert_equal Semian[@host], resource diff --git a/test/helpers/circuit_breaker_helper.rb b/test/helpers/circuit_breaker_helper.rb index 7d86ae0d..55fda301 100644 --- a/test/helpers/circuit_breaker_helper.rb +++ b/test/helpers/circuit_breaker_helper.rb @@ -14,7 +14,9 @@ def half_open_cicuit!(resource = @resource, backwards_time_travel = 10) end def trigger_error!(resource = @resource, error = SomeError) - resource.acquire { raise error } + resource.acquire do + raise error + end rescue error end diff --git a/test/lru_hash_test.rb b/test/lru_hash_test.rb index 0793f30a..138a97ca 100644 --- a/test/lru_hash_test.rb +++ b/test/lru_hash_test.rb @@ -113,8 +113,9 @@ def test_clean_instrumentation notified = false subscriber = Semian.subscribe do |event, resource, scope, adapter, payload| + next unless event == :lru_hash_gc + notified = true - assert_equal :lru_hash_gc, event assert_equal @lru_hash, resource assert_nil scope assert_nil adapter @@ -234,6 +235,7 @@ def create_circuit_breaker(name, exceptions = true, bulkhead = false, error_time exceptions: [::Semian::BaseError], half_open_resource_timeout: nil, implementation: implementation, + scale_factor: 1.0, ) circuit_breaker.mark_failed(nil) if exceptions Semian::ProtectedResource.new(name, create_bulkhead(name, bulkhead), circuit_breaker) diff --git a/test/net_http_test.rb b/test/net_http_test.rb index 726d9d36..33672034 100644 --- a/test/net_http_test.rb +++ b/test/net_http_test.rb @@ -361,7 +361,7 @@ def test_5xxs_trip_circuit_when_fatal_server_flag_enabled end def test_5xxs_dont_raise_exceptions_unless_fatal_server_flag_enabled - skip if ENV["SKIP_FLAKY_TESTS"] + skip if flaky with_semian_configuration do with_server do http = Net::HTTP.new(SemianConfig['http_host'], SemianConfig['http_port_service_a']) diff --git a/test/resource_test.rb b/test/resource_test.rb index ad837852..769f6364 100644 --- a/test/resource_test.rb +++ b/test/resource_test.rb @@ -1,12 +1,12 @@ require 'test_helper' +require 'objspace' class TestResource < Minitest::Test include ResourceHelper - # Time epsilon to account for super fast machines - EPSILON = 0.1 - def setup + @workers = [] + @resources = [] Semian.destroy(:testing) rescue nil @@ -56,7 +56,7 @@ def test_register_with_quota def test_unregister_past_0 workers = 10 - resource = Semian.register(:testing, tickets: workers * 2, error_threshold: 0, error_timeout: 0, success_threshold: 0) + resource = Semian.register(:testing, tickets: workers * 2, error_threshold: 1, error_timeout: 0, success_threshold: 0) fork_workers(count: workers, tickets: 0, timeout: 0.5, wait_for_timeout: true) do Semian.unregister(:testing) @@ -71,7 +71,7 @@ def test_unregister_past_0 def test_reset_registered_workers workers = 10 - resource = Semian.register(:testing, tickets: 1, error_threshold: 0, error_timeout: 0, success_threshold: 0) + resource = Semian.register(:testing, tickets: 1, error_threshold: 1, error_timeout: 0, success_threshold: 0) fork_workers(count: workers - 1, tickets: 0, timeout: 0.5, wait_for_timeout: true) @@ -141,10 +141,9 @@ def test_acquire_return_val end def test_acquire_timeout - fork_workers(count: 2, tickets: 1, timeout: 1, wait_for_timeout: true) + fork_workers(count: 2, tickets: 1, timeout: 1, wait_for_timeout: true, epsilon: 0.1) signal_workers('TERM') - timeouts = count_worker_timeouts - assert 1, timeouts + assert_equal(1, count_worker_timeouts) end def test_acquire_timeout_override @@ -156,8 +155,7 @@ def test_acquire_timeout_override signal_workers('TERM') - timeouts = count_worker_timeouts - assert 0, timeouts + assert_equal(0, count_worker_timeouts) end def test_acquire_with_fork @@ -382,10 +380,12 @@ def test_sem_undo end end + # TODO(michaelkipper): Shouldn't need to rescue InternalError, this test + # should deterministically throw SyscallError. def test_destroy resource = create_resource :testing, tickets: 1 resource.destroy - assert_raises Semian::SyscallError do + assert_raises(Semian::InternalError, Semian::SyscallError) do resource.acquire {} end end @@ -490,6 +490,11 @@ def test_multiple_register_with_fork assert_equal 0, timeouts end + def test_memsize + r = create_resource :testing, tickets: 1 + assert_equal 128, ObjectSpace.memsize_of(r) + end + def create_resource(*args) @resources ||= [] resource = Semian::Resource.new(*args) @@ -515,7 +520,7 @@ def destroy_resources # Active workers are accumulated in the instance variable @workers, # and workers must be cleaned up between tests by the teardown script # An exit value of 100 is to keep track of timeouts, 0 for success. - def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, timeout: 0.1, wait_for_timeout: false) + def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, timeout: 0.1, wait_for_timeout: false, epsilon: 0) fail 'Must provide at least one of tickets or quota' unless tickets || quota @workers ||= [] @@ -541,13 +546,12 @@ def fork_workers(count:, resource: :testing, quota: nil, tickets: nil, timeout: end sleep rescue => e - puts "Unhandled exception occurred in worker" puts e exit! 2 end end end - sleep((count / 2.0).ceil * timeout + EPSILON) if wait_for_timeout # give time for threads to timeout + sleep((count / 2.0).ceil * timeout + epsilon) if wait_for_timeout # give time for threads to timeout end def count_worker_timeouts diff --git a/test/semian_test.rb b/test/semian_test.rb index 65d6958f..db504b6f 100644 --- a/test/semian_test.rb +++ b/test/semian_test.rb @@ -91,4 +91,15 @@ def test_disabled_via_semian_wide_env_var ensure ENV.delete('SEMIAN_DISABLED') end + + def test_force_host_circuits + refute force_host_circuits? + ENV['SEMIAN_CIRCUIT_BREAKER_FORCE_HOST'] = 'machine-1,machine-2,machine-3' + refute force_host_circuits? + ENV['KUBE_HOSTNAME'] = 'machine-2' + assert force_host_circuits? + ensure + ENV.delete('SEMIAN_CIRCUIT_BREAKER_FORCE_HOST') + ENV.delete('KUBE_HOSTNAME') + end end diff --git a/test/simple_integer_test.rb b/test/simple_integer_test.rb index 2df82439..aaeec381 100644 --- a/test/simple_integer_test.rb +++ b/test/simple_integer_test.rb @@ -2,11 +2,12 @@ class TestSimpleInteger < Minitest::Test def setup - @integer = ::Semian::ThreadSafe::Integer.new + id = Time.now.strftime('%H:%M:%S.%N') + @integer = ::Semian::ThreadSafe::Integer.new(id) end def teardown - @integer.destroy + @integer.destroy unless @integer.nil? end module IntegerTestCases @@ -14,7 +15,7 @@ def test_access_value assert_equal(0, @integer.value) @integer.value = 99 assert_equal(99, @integer.value) - time_now = (Time.now).to_i + time_now = (Time.now).to_i % (2**15) @integer.value = time_now assert_equal(time_now, @integer.value) @integer.value = 6 @@ -41,6 +42,44 @@ def test_reset @integer.reset assert_equal(0, @integer.value) end + + def assert_equal_with_retry(expected) + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time < 1.0 + return true if expected == yield + sleep(0.1) + end + + assert_equal(expected, yield) + end + + # Without locks, this only passes around 1 in every 5 runs + def test_increment_race + process_count = 16 + 10.times do + pids = [] + + id = Time.now.strftime('%H:%M:%S.%N') + @integer = ::Semian::ThreadSafe::Integer.new(id) + + process_count.times do + pids << fork do + @integer.increment(1) + sleep(60) + end + end + + if ENV['SEMIAN_CIRCUIT_BREAKER_IMPL'] == 'host' + # Host-based circuits: Forked processes should increment the host integer. + assert_equal_with_retry(process_count) { @integer.value } + else + # Worker-based circuits: Forked processes should increment the worker integer. + assert_equal_with_retry(0) { @integer.value } + end + ensure + pids.each { |pid| Process.kill('TERM', pid) } + end + end end include IntegerTestCases diff --git a/test/simple_sliding_window_test.rb b/test/simple_sliding_window_test.rb index 2e10d18e..a96efd55 100644 --- a/test/simple_sliding_window_test.rb +++ b/test/simple_sliding_window_test.rb @@ -2,12 +2,31 @@ class TestSimpleSlidingWindow < Minitest::Test def setup - @sliding_window = ::Semian::ThreadSafe::SlidingWindow.new(max_size: 6) + id = Time.now.strftime('%H:%M:%S.%N') + @sliding_window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 6) @sliding_window.clear end def teardown - @sliding_window.destroy + @sliding_window.destroy unless @sliding_window.nil? + end + + def test_clear + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 6) + window << 1 << 2 << 3 + assert_equal(3, window.size) + window.clear + assert_equal(0, window.size) + end + + def test_destroy + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 6) + window << 1 << 2 << 3 + assert_equal(3, window.size) + window.destroy + assert_equal(0, window.size) end def test_sliding_window_push @@ -24,18 +43,201 @@ def test_sliding_window_edge_falloff assert_sliding_window(@sliding_window, [2, 3, 4, 5, 6, 7], 6) end - def resize_to_less_than_1_raises + def test_sliding_window_reject + @sliding_window << 0 << 1 << 2 << 3 << 4 << 5 << 6 << 7 + assert_equal(6, @sliding_window.size) + assert_sliding_window(@sliding_window, [2, 3, 4, 5, 6, 7], 6) + @sliding_window.reject! { |val| val <= 3 } + assert_sliding_window(@sliding_window, [4, 5, 6, 7], 6) + end + + def test_sliding_window_reject_non_contiguous + @sliding_window << 0 << 1 << 2 << 3 << 4 << 5 << 6 << 7 + assert_equal(6, @sliding_window.size) + assert_sliding_window(@sliding_window, [2, 3, 4, 5, 6, 7], 6) + @sliding_window.reject! { |val| val == 3 } + assert_sliding_window(@sliding_window, [2, 4, 5, 6, 7], 6) + end + + def test_resize_to_less_than_1_raises + assert_raises ArgumentError do + @sliding_window.max_size = 0 + end + end + + def test_resize_to_1_works + assert_sliding_window(@sliding_window, [], 6) + @sliding_window.max_size = 1 + assert_sliding_window(@sliding_window, [], 1) + end + + def test_resize_to_simple + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 + assert_sliding_window(window, [0, 1], 4) + window.max_size = 8 + assert_sliding_window(window, [0, 1], 8) + window << 2 << 3 << 4 << 5 + assert_sliding_window(window, [0, 1, 2, 3, 4, 5], 8) + end + + def test_resize_to_simple_full + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 << 2 << 3 + assert_sliding_window(window, [0, 1, 2, 3], 4) + window.max_size = 8 + assert_sliding_window(window, [0, 1, 2, 3], 8) + window << 4 << 5 + assert_sliding_window(window, [0, 1, 2, 3, 4, 5], 8) + end + + def test_resize_to_simple_floating + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 << 2 << 3 + assert_sliding_window(window, [0, 1, 2, 3], 4) + window.reject! { |val| val < 2 } + assert_sliding_window(window, [2, 3], 4) + window.max_size = 8 + assert_sliding_window(window, [2, 3], 8) + window << 4 << 5 + assert_sliding_window(window, [2, 3, 4, 5], 8) + end + + def test_resize_to_hard + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 << 2 << 3 << 4 << 5 + assert_sliding_window(window, [2, 3, 4, 5], 4) + window.max_size = 8 + assert_sliding_window(window, [2, 3, 4, 5], 8) + window << 6 << 7 + assert_sliding_window(window, [2, 3, 4, 5, 6, 7], 8) + window << 8 << 9 + assert_sliding_window(window, [2, 3, 4, 5, 6, 7, 8, 9], 8) + end + + def test_resize_to_shrink_simple + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 + assert_sliding_window(window, [0, 1], 4) + window.max_size = 2 + assert_sliding_window(window, [0, 1], 2) + end + + def test_resize_to_shrink_simple_full + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 << 2 << 3 + assert_sliding_window(window, [0, 1, 2, 3], 4) + window.max_size = 2 + assert_sliding_window(window, [2, 3], 2) + end + + def test_resize_to_shrink_hard + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + window << 0 << 1 << 2 << 3 << 4 << 5 + assert_sliding_window(window, [2, 3, 4, 5], 4) + window.max_size = 2 + assert_sliding_window(window, [4, 5], 2) + end + + def test_resize_to_shrink_all_index + 8.times do |offset| + id = Time.now.strftime("%H:%M:%S.%N-#{offset}") + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + offset.times { window << offset } + window << 0 << 1 << 2 << 3 + assert_sliding_window(window, [0, 1, 2, 3], 4) + window.max_size = 2 + assert_sliding_window(window, [2, 3], 2) + end + end + + def test_resize_to_grow_all_index + 8.times do |offset| + id = Time.now.strftime("%H:%M:%S.%N-#{offset}") + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 4) + offset.times { window << offset } + window << 0 << 1 << 2 << 3 + assert_sliding_window(window, [0, 1, 2, 3], 4) + window.max_size = 8 + assert_sliding_window(window, [0, 1, 2, 3], 8) + window << 4 << 5 << 6 << 7 + assert_sliding_window(window, [0, 1, 2, 3, 4, 5, 6, 7], 8) + window << 8 << 9 + assert_sliding_window(window, [2, 3, 4, 5, 6, 7, 8, 9], 8) + end + end + + def test_scale_factor + pids = [] + skip unless ENV['SEMIAN_CIRCUIT_BREAKER_IMPL'] == 'host' + + id = Time.now.strftime("%H:%M:%S.%N") + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 1, scale_factor: 0.5) + + fork_worker = Proc.new do |i| + pid = fork do + # TODO(michaelkipper): We need to create a bulkhead here, because the sliding window + # has a coupling to it via the registered worker semaphore. This + # sucks, and should be removed by refactoring the bulkhead out of + # the Semian resource concept. + ::Semian::Resource.new(id, tickets: 1) + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 10, scale_factor: 0.5) + window << (i * 100) + sleep(3600) + end + + puts "Forker worker #{pid}" + sleep(0.1) + pid + end + + pids << fork_worker.call(1) + assert_equal(10, window.max_size) + pids << fork_worker.call(2) + assert_equal(10, window.max_size) + pids << fork_worker.call(3) + assert_equal(15, window.max_size) + pids << fork_worker.call(4) + assert_equal(20, window.max_size) + + assert_equal(4, window.size) + ensure + pids.each do |pid| + begin + Process.kill('TERM', pid) + rescue + nil + end + end + end + + def test_huge_sliding_window + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 1000) + 4000.times do |i| + window << i + end + end + + def test_huge_sliding_window_fails + skip unless ENV['SEMIAN_CIRCUIT_BREAKER_IMPL'] == 'host' + id = Time.now.strftime('%H:%M:%S.%N') assert_raises ArgumentError do - @sliding_window.resize_to 0 + ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 1001) end end private def assert_sliding_window(sliding_window, array, max_size) - # Get private member, the sliding_window doesn't expose the entire array - data = sliding_window.instance_variable_get("@window") - assert_equal(array, data) - assert_equal(max_size, sliding_window.max_size) + assert_equal(array, sliding_window.values, "Window contents were different") + assert_equal(max_size, sliding_window.max_size, "Window max_size was not equal") end end diff --git a/test/simple_state_test.rb b/test/simple_state_test.rb index 0da9dd53..9e8a9032 100644 --- a/test/simple_state_test.rb +++ b/test/simple_state_test.rb @@ -2,7 +2,9 @@ class TestSimpleEnum < Minitest::Test def setup - @state = ::Semian::ThreadSafe::State.new + id = Time.now.strftime('%H:%M:%S.%N') + state_val = ::Semian::ThreadSafe::Integer.new(id) + @state = ::Semian::ThreadSafe::State.new(state_val) end def teardown @@ -17,25 +19,25 @@ def test_start_closed? def test_open @state.open! assert @state.open? - assert_equal @state.value, :open + assert_equal ::Semian::Simple::State::OPEN, @state.value end def test_close @state.close! assert @state.closed? - assert_equal @state.value, :closed + assert_equal ::Semian::Simple::State::CLOSED, @state.value end def test_half_open @state.half_open! assert @state.half_open? - assert_equal @state.value, :half_open + assert_equal ::Semian::Simple::State::HALF_OPEN, @state.value end def test_reset @state.reset assert @state.closed? - assert_equal @state.value, :closed + assert_equal ::Semian::Simple::State::CLOSED, @state.value end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 32f0d63e..4adfc30b 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,4 +1,5 @@ require 'minitest/autorun' +require 'minitest/reporters' require 'semian' require 'semian/mysql2' require 'semian/redis' @@ -62,3 +63,9 @@ class Minitest::Test include BackgroundHelper end + +Minitest::Reporters.use! [Minitest::Reporters::SpecReporter.new()] + +def flaky + ENV["SKIP_FLAKY_TESTS"] +end