diff --git a/ext/semian/resource.c b/ext/semian/resource.c index bbd438c6..456c007d 100644 --- a/ext/semian/resource.c +++ b/ext/semian/resource.c @@ -20,6 +20,9 @@ check_permissions_arg(VALUE permissions); static double check_default_timeout_arg(VALUE default_timeout); +static int +check_throttle_arg(VALUE throttle); + static void ms_to_timespec(long ms, struct timespec *ts); @@ -103,8 +106,13 @@ semian_resource_reset_workers(VALUE self) TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); sem_meta_lock(res->sem_id); - // This SETVAL will purge the SEM_UNDO table - ret = semctl(res->sem_id, SI_SEM_REGISTERED_WORKERS, SETVAL, 0); + { + // This SETVAL will purge the SEM_UNDO table + ret = semctl(res->sem_id, SI_SEM_REGISTERED_WORKERS, SETVAL, 0); + if (ret != -1) { + ret = semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, 0); + } + } sem_meta_unlock(res->sem_id); if (ret == -1) { @@ -238,6 +246,54 @@ semian_resource_in_use(VALUE self) return Qtrue; } +VALUE +semian_resource_throttle(VALUE self, VALUE value) +{ + semian_resource_t *res = NULL; + TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res); + + int val = check_throttle_arg(value); + if (val == -1) { + rb_raise(rb_eArgError, "Unknown throttle argument"); + } + + sem_meta_lock(res->sem_id); + { + // Get the current value + int tickets = get_sem_val(res->sem_id, SI_SEM_CONFIGURED_TICKETS); + + if (val > 0) { + int op = tickets - val; + dprintf("Throttling sem_id:%d (op:%d)", res->sem_id, op); + + // Use SEM_UNDO so if this process is terminated, the throttle is undone. + if (perform_semop(res->sem_id, SI_SEM_CONFIGURED_TICKETS, -op, SEM_UNDO, NULL) == -1) { + sem_meta_unlock(res->sem_id); + rb_raise(eInternal, "Unknown error"); + } + if (semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, op) == -1) { + sem_meta_unlock(res->sem_id); + rb_raise(eInternal, "Unknown error"); + } + } else { + int op = get_sem_val(res->sem_id, SI_SEM_TICKET_THROTTLE); + dprintf("Unthrottling sem_id:%d (op:%d)", res->sem_id, op); + + if (perform_semop(res->sem_id, SI_SEM_CONFIGURED_TICKETS, op, SEM_UNDO, NULL) == -1) { + sem_meta_unlock(res->sem_id); + rb_raise(eInternal, "Unknown error"); + } + if (semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, 0) == -1) { + sem_meta_unlock(res->sem_id); + rb_raise(eInternal, "Unknown error"); + } + } + } + sem_meta_unlock(res->sem_id); + + return Qnil; +} + static VALUE cleanup_semian_resource_acquire(VALUE self) { @@ -319,6 +375,21 @@ check_default_timeout_arg(VALUE default_timeout) return NUM2DBL(default_timeout); } +static int +check_throttle_arg(VALUE throttle) +{ + switch (rb_type(throttle)) { + case T_NIL: + case T_UNDEF: + return 0; + case T_FIXNUM: + case T_BIGNUM: + return RB_NUM2INT(throttle); + default: + return -1; + } +} + static void ms_to_timespec(long ms, struct timespec *ts) { diff --git a/ext/semian/resource.h b/ext/semian/resource.h index 2b10a9fa..6e1b513b 100644 --- a/ext/semian/resource.h +++ b/ext/semian/resource.h @@ -130,4 +130,8 @@ semian_resource_alloc(VALUE klass); VALUE semian_resource_in_use(VALUE self); +// Throttle the bulkhead to a given value +VALUE +semian_resource_throttle(VALUE self, VALUE throttle); + #endif //SEMIAN_RESOURCE_H diff --git a/ext/semian/semian.c b/ext/semian/semian.c index cbabfb7b..db6bc93a 100644 --- a/ext/semian/semian.c +++ b/ext/semian/semian.c @@ -55,6 +55,7 @@ void Init_semian() rb_define_method(cResource, "reset_registered_workers!", semian_resource_reset_workers, 0); rb_define_method(cResource, "unregister_worker", semian_resource_unregister_worker, 0); rb_define_method(cResource, "in_use?", semian_resource_in_use, 0); + rb_define_method(cResource, "throttle", semian_resource_throttle, 1); id_wait_time = rb_intern("wait_time"); id_timeout = rb_intern("timeout"); diff --git a/ext/semian/simple_integer.c b/ext/semian/simple_integer.c index 84a68b9c..48cf4100 100644 --- a/ext/semian/simple_integer.c +++ b/ext/semian/simple_integer.c @@ -97,7 +97,7 @@ semian_simple_integer_increment(int argc, VALUE *argv, VALUE self) rb_scan_args(argc, argv, "01", &val); int value = check_increment_arg(val); - if (perform_semop(res->sem_id, 0, value, SEM_UNDO, NULL) == -1) { + if (perform_semop(res->sem_id, 0, value, 0, NULL) == -1) { rb_raise(eInternal, "error incrementing simple integer, errno: %d (%s)", errno, strerror(errno)); } diff --git a/ext/semian/sliding_window.c b/ext/semian/sliding_window.c index 17564bbc..db211a30 100644 --- a/ext/semian/sliding_window.c +++ b/ext/semian/sliding_window.c @@ -28,14 +28,6 @@ static const rb_data_type_t semian_simple_sliding_window_type = { .flags = RUBY_TYPED_FREE_IMMEDIATELY, }; -static void init_fn(void* ptr) -{ - semian_simple_sliding_window_shared_t* res = (semian_simple_sliding_window_shared_t*)ptr; - res->max_size = 0; - res->length = 0; - res->start = 0; -} - static int check_max_size_arg(VALUE max_size) { @@ -93,7 +85,10 @@ check_scale_factor_arg(VALUE scale_factor) static VALUE grow_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size) { - if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil; + if (new_max_size > SLIDING_WINDOW_MAX_SIZE) { + sem_meta_unlock(sem_id); + rb_raise(rb_eArgError, "Cannot grow window to %d (MAX_SIZE=%d)", new_max_size, SLIDING_WINDOW_MAX_SIZE); + } int end = window->max_size ? (window->start + window->length) % window->max_size : 0; dprintf("Growing window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size); @@ -127,7 +122,10 @@ static void swap(int *a, int *b) { static VALUE shrink_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size) { - if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil; + if (new_max_size > SLIDING_WINDOW_MAX_SIZE) { + sem_meta_unlock(sem_id); + rb_raise(rb_eArgError, "Cannot shrink window to %d (MAX_SIZE=%d)", new_max_size, SLIDING_WINDOW_MAX_SIZE); + } int new_length = (new_max_size > window->length) ? window->length : new_max_size; @@ -256,7 +254,7 @@ semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size, dprintf("Initializing simple sliding window '%s' (key: %lu)", buffer, res->key); res->sem_id = initialize_single_semaphore(res->key, SEM_DEFAULT_PERMISSIONS, 1); - res->shmem = get_or_create_shared_memory(res->key, init_fn); + res->shmem = get_or_create_shared_memory(res->key); res->error_threshold = check_max_size_arg(max_size); res->scale_factor = check_scale_factor_arg(scale_factor); @@ -396,8 +394,8 @@ semian_simple_sliding_window_clear(VALUE self) return self; } +#if 0 // Handy for debugging the sliding window, but too noisy for regular debugging. -/* static void dprint_window(semian_simple_sliding_window_shared_t *window) { dprintf("---"); @@ -407,7 +405,7 @@ static void dprint_window(semian_simple_sliding_window_shared_t *window) } dprintf("---"); } -*/ +#endif VALUE semian_simple_sliding_window_reject(VALUE self) diff --git a/ext/semian/sysv_semaphores.c b/ext/semian/sysv_semaphores.c index b63c7367..fafa364c 100644 --- a/ext/semian/sysv_semaphores.c +++ b/ext/semian/sysv_semaphores.c @@ -200,6 +200,7 @@ initialize_new_semaphore_values(int sem_id, long permissions) init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = 0; init_vals[SI_SEM_REGISTERED_WORKERS] = 0; init_vals[SI_SEM_LOCK] = 1; + init_vals[SI_SEM_TICKET_THROTTLE] = 0; if (semctl(sem_id, 0, SETALL, init_vals) == -1) { raise_semian_syscall_error("semctl()", errno); @@ -214,7 +215,7 @@ wait_for_new_semaphore_set(uint64_t key, long permissions) union semun sem_opts; sem_opts.buf = &sem_ds; - int sem_id = semget(key, 1, permissions); + int sem_id = semget((key_t)key, 1, permissions); if (sem_id == -1){ raise_semian_syscall_error("semget()", errno); } diff --git a/ext/semian/sysv_semaphores.h b/ext/semian/sysv_semaphores.h index 0049d67b..93175c21 100644 --- a/ext/semian/sysv_semaphores.h +++ b/ext/semian/sysv_semaphores.h @@ -53,12 +53,14 @@ typedef VALUE (*my_blocking_fn_t)(void*); // SI_SEM_TICKETS semaphore for the tickets currently issued // SI_SEM_CONFIGURED_TICKETS semaphore to track the desired number of tickets available for issue // SI_SEM_REGISTERED_WORKERS semaphore for the number of workers currently registered +// SI_SEM_TICKET_THROTTLE semaphore to track the current throttle // SI_NUM_SEMAPHORES always leave this as last entry for count to be accurate #define FOREACH_SEMINDEX(SEMINDEX) \ SEMINDEX(SI_SEM_LOCK) \ SEMINDEX(SI_SEM_TICKETS) \ SEMINDEX(SI_SEM_CONFIGURED_TICKETS) \ SEMINDEX(SI_SEM_REGISTERED_WORKERS) \ + SEMINDEX(SI_SEM_TICKET_THROTTLE) \ SEMINDEX(SI_NUM_SEMAPHORES) \ #define GENERATE_ENUM(ENUM) ENUM, diff --git a/ext/semian/sysv_shared_memory.c b/ext/semian/sysv_shared_memory.c index be1e4b7d..92aeffa1 100644 --- a/ext/semian/sysv_shared_memory.c +++ b/ext/semian/sysv_shared_memory.c @@ -2,26 +2,8 @@ #include "util.h" -#define TIMEOUT_MS (5 * 1e6) -#define WAIT_MS (10) -#define RETRIES (TIMEOUT_MS / WAIT_MS) - void* -wait_for_shared_memory(uint64_t key) -{ - for (int i = 0; i < RETRIES; ++i) { - int shmid = shmget(key, SHM_DEFAULT_SIZE, SHM_DEFAULT_PERMISSIONS); - if (shmid != -1) { - return shmat(shmid, NULL, 0); - } - usleep(WAIT_MS); - } - - rb_raise(rb_eArgError, "could not get shared memory"); -} - -void* -get_or_create_shared_memory(uint64_t key, shared_memory_init_fn fn) +get_or_create_shared_memory(uint64_t key) { void* shmem = NULL; if (!key) return NULL; @@ -29,18 +11,24 @@ get_or_create_shared_memory(uint64_t key, shared_memory_init_fn fn) dprintf("Creating shared memory (key: %lu)", key); int shmid = shmget(key, SHM_DEFAULT_SIZE, IPC_CREAT | IPC_EXCL | SHM_DEFAULT_PERMISSIONS); if (shmid != -1) { - dprintf("Created shared memory (key: %lu)", key); - + dprintf("Created shared memory (key:%lu sem_id:%d)", key, shmid); shmem = shmat(shmid, NULL, 0); if (shmem == (void*)-1) { rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno)); } - if (fn) fn(shmem); - shmctl(key, IPC_RMID, NULL); } else { - shmem = wait_for_shared_memory(key); + shmid = shmget(key, SHM_DEFAULT_SIZE, SHM_DEFAULT_PERMISSIONS); + if (shmid == -1) { + rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno)); + } + + dprintf("Got shared memory (key:%lu sem_id:%d)", key, shmid); + shmem = shmat(shmid, NULL, 0); + if (shmem == (void*)-1) { + rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno)); + } } return shmem; diff --git a/ext/semian/sysv_shared_memory.h b/ext/semian/sysv_shared_memory.h index ee84560c..9528dd46 100644 --- a/ext/semian/sysv_shared_memory.h +++ b/ext/semian/sysv_shared_memory.h @@ -8,12 +8,10 @@ // Default permissions for shared memory #define SHM_DEFAULT_PERMISSIONS 0660 -#define SHM_DEFAULT_SIZE 1024 - -typedef void (*shared_memory_init_fn)(void*); +#define SHM_DEFAULT_SIZE 4096 void* -get_or_create_shared_memory(uint64_t key, shared_memory_init_fn fn); +get_or_create_shared_memory(uint64_t key); void free_shared_memory(void* key); diff --git a/ext/semian/test/Makefile b/ext/semian/test/Makefile new file mode 100644 index 00000000..ad6496c9 --- /dev/null +++ b/ext/semian/test/Makefile @@ -0,0 +1,18 @@ +# Makefile + +CC := gcc -std=gnu99 -I. -I.. +TESTS := types_test + +all : test + +test : types_test + +types_test : types_test.c + $(CC) -o $@ $< + ./$@ + +clean : + rm -f *.o + rm -f $(TESTS) + +.PHONY : all test types_test clean diff --git a/ext/semian/test/types_test.c b/ext/semian/test/types_test.c new file mode 100644 index 00000000..61085574 --- /dev/null +++ b/ext/semian/test/types_test.c @@ -0,0 +1,36 @@ +#include +#include + +#include "types.h" + +void assert_equal_int(int actual, int expected, const char *message) +{ + if (actual != expected) { + fprintf(stderr, "Error: got %d, expected %d (%s)\n", actual, expected, message); + exit(EXIT_FAILURE); + } +} + +void assert_le_int(int actual, int expected, const char *message) +{ + if (actual > expected) { + fprintf(stderr, "Error: got %d, which is greater than %d (%s)\n", actual, expected, message); + exit(EXIT_FAILURE); + } +} + +void test_sliding_window() +{ + semian_simple_sliding_window_shared_t window; + assert_le_int(sizeof(window), 4096, "window size is greater than a page"); + assert_equal_int(sizeof(window.data), SLIDING_WINDOW_MAX_SIZE * sizeof(int), "window data size"); +} + +int main(int argc, char **argv) +{ + printf("Info: Running test\n"); + + test_sliding_window(); + + return EXIT_SUCCESS; +} diff --git a/ext/semian/types.h b/ext/semian/types.h index b6a10a72..81d31d5d 100644 --- a/ext/semian/types.h +++ b/ext/semian/types.h @@ -10,7 +10,7 @@ For custom type definitions specific to semian #include #include -#define SLIDING_WINDOW_MAX_SIZE 4096 +#define SLIDING_WINDOW_MAX_SIZE 1000 // For sysV semop syscals // see man semop diff --git a/lib/semian/circuit_breaker.rb b/lib/semian/circuit_breaker.rb index 788035ca..3a132fdf 100644 --- a/lib/semian/circuit_breaker.rb +++ b/lib/semian/circuit_breaker.rb @@ -93,12 +93,14 @@ def transition_to_close log_state_transition(:closed) @state.close! @errors.clear + throttle(nil) end def transition_to_open notify_state_transition(:open) log_state_transition(:open) @state.open! + throttle(1) end def transition_to_half_open @@ -106,6 +108,16 @@ def transition_to_half_open log_state_transition(:half_open) @state.half_open! @successes.reset + throttle(@success_count_threshold) + end + + def throttle(val = nil) + resource = Semian[@name] + return if resource.nil? + bulkhead = resource.bulkhead + return if bulkhead.nil? + + bulkhead.throttle(val) end def success_threshold_reached? diff --git a/test/resource_test.rb b/test/resource_test.rb index 769f6364..251010ec 100644 --- a/test/resource_test.rb +++ b/test/resource_test.rb @@ -495,6 +495,17 @@ def test_memsize assert_equal 128, ObjectSpace.memsize_of(r) end + def test_throttle + id = Time.now.strftime('%H:%M:%S.%N') + resource = create_resource(id, quota: 0.5, timeout: 0.1) + fork_workers(resource: id, count: 15, quota: 0.5, wait_for_timeout: true) + assert_equal(8, resource.tickets) + resource.throttle(1) + assert_equal(1, resource.tickets) + resource.throttle(nil) + assert_equal(8, resource.tickets) + end + def create_resource(*args) @resources ||= [] resource = Semian::Resource.new(*args) diff --git a/test/simple_sliding_window_test.rb b/test/simple_sliding_window_test.rb index 56813bcb..ffafd599 100644 --- a/test/simple_sliding_window_test.rb +++ b/test/simple_sliding_window_test.rb @@ -218,9 +218,18 @@ def test_scale_factor end end - def test_max_size - 8192.times do |i| - @sliding_window << i + def test_huge_sliding_window + id = Time.now.strftime('%H:%M:%S.%N') + window = ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 1000) + 4000.times do |i| + window << i + end + end + + def test_huge_sliding_window_fails + id = Time.now.strftime('%H:%M:%S.%N') + assert_raises ArgumentError do + ::Semian::ThreadSafe::SlidingWindow.new(id, max_size: 1001) end end