Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle bulkhead from the circuit breaker on state transition #247

Open
wants to merge 3 commits into
base: mkipper/global-circuit-breaker-simple-integer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 73 additions & 2 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ check_permissions_arg(VALUE permissions);
static double
check_default_timeout_arg(VALUE default_timeout);

static int
check_throttle_arg(VALUE throttle);

static void
ms_to_timespec(long ms, struct timespec *ts);

Expand Down Expand Up @@ -103,8 +106,13 @@ semian_resource_reset_workers(VALUE self)
TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

sem_meta_lock(res->sem_id);
// This SETVAL will purge the SEM_UNDO table
ret = semctl(res->sem_id, SI_SEM_REGISTERED_WORKERS, SETVAL, 0);
{
// This SETVAL will purge the SEM_UNDO table
ret = semctl(res->sem_id, SI_SEM_REGISTERED_WORKERS, SETVAL, 0);
if (ret != -1) {
ret = semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, 0);
}
}
sem_meta_unlock(res->sem_id);

if (ret == -1) {
Expand Down Expand Up @@ -238,6 +246,54 @@ semian_resource_in_use(VALUE self)
return Qtrue;
}

VALUE
semian_resource_throttle(VALUE self, VALUE value)
{
semian_resource_t *res = NULL;
TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

int val = check_throttle_arg(value);
if (val == -1) {
rb_raise(rb_eArgError, "Unknown throttle argument");
}

sem_meta_lock(res->sem_id);
{
// Get the current value
int tickets = get_sem_val(res->sem_id, SI_SEM_CONFIGURED_TICKETS);

if (val > 0) {
int op = tickets - val;
dprintf("Throttling sem_id:%d (op:%d)", res->sem_id, op);

// Use SEM_UNDO so if this process is terminated, the throttle is undone.
if (perform_semop(res->sem_id, SI_SEM_CONFIGURED_TICKETS, -op, SEM_UNDO, NULL) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
if (semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, op) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
} else {
int op = get_sem_val(res->sem_id, SI_SEM_TICKET_THROTTLE);
dprintf("Unthrottling sem_id:%d (op:%d)", res->sem_id, op);

if (perform_semop(res->sem_id, SI_SEM_CONFIGURED_TICKETS, op, SEM_UNDO, NULL) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
if (semctl(res->sem_id, SI_SEM_TICKET_THROTTLE, SETVAL, 0) == -1) {
sem_meta_unlock(res->sem_id);
rb_raise(eInternal, "Unknown error");
}
}
}
sem_meta_unlock(res->sem_id);

return Qnil;
}

static VALUE
cleanup_semian_resource_acquire(VALUE self)
{
Expand Down Expand Up @@ -319,6 +375,21 @@ check_default_timeout_arg(VALUE default_timeout)
return NUM2DBL(default_timeout);
}

static int
check_throttle_arg(VALUE throttle)
{
switch (rb_type(throttle)) {
case T_NIL:
case T_UNDEF:
return 0;
case T_FIXNUM:
case T_BIGNUM:
return RB_NUM2INT(throttle);
default:
return -1;
}
}

static void
ms_to_timespec(long ms, struct timespec *ts)
{
Expand Down
4 changes: 4 additions & 0 deletions ext/semian/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,8 @@ semian_resource_alloc(VALUE klass);
VALUE
semian_resource_in_use(VALUE self);

// Throttle the bulkhead to a given value
VALUE
semian_resource_throttle(VALUE self, VALUE throttle);

#endif //SEMIAN_RESOURCE_H
1 change: 1 addition & 0 deletions ext/semian/semian.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void Init_semian()
rb_define_method(cResource, "reset_registered_workers!", semian_resource_reset_workers, 0);
rb_define_method(cResource, "unregister_worker", semian_resource_unregister_worker, 0);
rb_define_method(cResource, "in_use?", semian_resource_in_use, 0);
rb_define_method(cResource, "throttle", semian_resource_throttle, 1);

id_wait_time = rb_intern("wait_time");
id_timeout = rb_intern("timeout");
Expand Down
2 changes: 1 addition & 1 deletion ext/semian/simple_integer.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ semian_simple_integer_increment(int argc, VALUE *argv, VALUE self)
rb_scan_args(argc, argv, "01", &val);

int value = check_increment_arg(val);
if (perform_semop(res->sem_id, 0, value, SEM_UNDO, NULL) == -1) {
if (perform_semop(res->sem_id, 0, value, 0, NULL) == -1) {
rb_raise(eInternal, "error incrementing simple integer, errno: %d (%s)", errno, strerror(errno));
}

Expand Down
24 changes: 11 additions & 13 deletions ext/semian/sliding_window.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ static const rb_data_type_t semian_simple_sliding_window_type = {
.flags = RUBY_TYPED_FREE_IMMEDIATELY,
};

static void init_fn(void* ptr)
{
semian_simple_sliding_window_shared_t* res = (semian_simple_sliding_window_shared_t*)ptr;
res->max_size = 0;
res->length = 0;
res->start = 0;
}

static int
check_max_size_arg(VALUE max_size)
{
Expand Down Expand Up @@ -93,7 +85,10 @@ check_scale_factor_arg(VALUE scale_factor)
static VALUE
grow_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) {
sem_meta_unlock(sem_id);
rb_raise(rb_eArgError, "Cannot grow window to %d (MAX_SIZE=%d)", new_max_size, SLIDING_WINDOW_MAX_SIZE);
}

int end = window->max_size ? (window->start + window->length) % window->max_size : 0;
dprintf("Growing window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size);
Expand Down Expand Up @@ -127,7 +122,10 @@ static void swap(int *a, int *b) {
static VALUE
shrink_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) {
sem_meta_unlock(sem_id);
rb_raise(rb_eArgError, "Cannot shrink window to %d (MAX_SIZE=%d)", new_max_size, SLIDING_WINDOW_MAX_SIZE);
}

int new_length = (new_max_size > window->length) ? window->length : new_max_size;

Expand Down Expand Up @@ -256,7 +254,7 @@ semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size,

dprintf("Initializing simple sliding window '%s' (key: %lu)", buffer, res->key);
res->sem_id = initialize_single_semaphore(res->key, SEM_DEFAULT_PERMISSIONS, 1);
res->shmem = get_or_create_shared_memory(res->key, init_fn);
res->shmem = get_or_create_shared_memory(res->key);
res->error_threshold = check_max_size_arg(max_size);
res->scale_factor = check_scale_factor_arg(scale_factor);

Expand Down Expand Up @@ -396,8 +394,8 @@ semian_simple_sliding_window_clear(VALUE self)
return self;
}

#if 0
// Handy for debugging the sliding window, but too noisy for regular debugging.
/*
static void dprint_window(semian_simple_sliding_window_shared_t *window)
{
dprintf("---");
Expand All @@ -407,7 +405,7 @@ static void dprint_window(semian_simple_sliding_window_shared_t *window)
}
dprintf("---");
}
*/
#endif

VALUE
semian_simple_sliding_window_reject(VALUE self)
Expand Down
3 changes: 2 additions & 1 deletion ext/semian/sysv_semaphores.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ initialize_new_semaphore_values(int sem_id, long permissions)
init_vals[SI_SEM_TICKETS] = init_vals[SI_SEM_CONFIGURED_TICKETS] = 0;
init_vals[SI_SEM_REGISTERED_WORKERS] = 0;
init_vals[SI_SEM_LOCK] = 1;
init_vals[SI_SEM_TICKET_THROTTLE] = 0;

if (semctl(sem_id, 0, SETALL, init_vals) == -1) {
raise_semian_syscall_error("semctl()", errno);
Expand All @@ -214,7 +215,7 @@ wait_for_new_semaphore_set(uint64_t key, long permissions)
union semun sem_opts;
sem_opts.buf = &sem_ds;

int sem_id = semget(key, 1, permissions);
int sem_id = semget((key_t)key, 1, permissions);
if (sem_id == -1){
raise_semian_syscall_error("semget()", errno);
}
Expand Down
2 changes: 2 additions & 0 deletions ext/semian/sysv_semaphores.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ typedef VALUE (*my_blocking_fn_t)(void*);
// SI_SEM_TICKETS semaphore for the tickets currently issued
// SI_SEM_CONFIGURED_TICKETS semaphore to track the desired number of tickets available for issue
// SI_SEM_REGISTERED_WORKERS semaphore for the number of workers currently registered
// SI_SEM_TICKET_THROTTLE semaphore to track the current throttle
// SI_NUM_SEMAPHORES always leave this as last entry for count to be accurate
#define FOREACH_SEMINDEX(SEMINDEX) \
SEMINDEX(SI_SEM_LOCK) \
SEMINDEX(SI_SEM_TICKETS) \
SEMINDEX(SI_SEM_CONFIGURED_TICKETS) \
SEMINDEX(SI_SEM_REGISTERED_WORKERS) \
SEMINDEX(SI_SEM_TICKET_THROTTLE) \
SEMINDEX(SI_NUM_SEMAPHORES) \

#define GENERATE_ENUM(ENUM) ENUM,
Expand Down
36 changes: 12 additions & 24 deletions ext/semian/sysv_shared_memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,33 @@

#include "util.h"

#define TIMEOUT_MS (5 * 1e6)
#define WAIT_MS (10)
#define RETRIES (TIMEOUT_MS / WAIT_MS)

void*
wait_for_shared_memory(uint64_t key)
{
for (int i = 0; i < RETRIES; ++i) {
int shmid = shmget(key, SHM_DEFAULT_SIZE, SHM_DEFAULT_PERMISSIONS);
if (shmid != -1) {
return shmat(shmid, NULL, 0);
}
usleep(WAIT_MS);
}

rb_raise(rb_eArgError, "could not get shared memory");
}

void*
get_or_create_shared_memory(uint64_t key, shared_memory_init_fn fn)
get_or_create_shared_memory(uint64_t key)
{
void* shmem = NULL;
if (!key) return NULL;

dprintf("Creating shared memory (key: %lu)", key);
int shmid = shmget(key, SHM_DEFAULT_SIZE, IPC_CREAT | IPC_EXCL | SHM_DEFAULT_PERMISSIONS);
if (shmid != -1) {
dprintf("Created shared memory (key: %lu)", key);

dprintf("Created shared memory (key:%lu sem_id:%d)", key, shmid);
shmem = shmat(shmid, NULL, 0);
if (shmem == (void*)-1) {
rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno));
}

if (fn) fn(shmem);

shmctl(key, IPC_RMID, NULL);
} else {
shmem = wait_for_shared_memory(key);
shmid = shmget(key, SHM_DEFAULT_SIZE, SHM_DEFAULT_PERMISSIONS);
if (shmid == -1) {
rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno));
}

dprintf("Got shared memory (key:%lu sem_id:%d)", key, shmid);
shmem = shmat(shmid, NULL, 0);
if (shmem == (void*)-1) {
rb_raise(rb_eArgError, "could not get shared memory (%s)", strerror(errno));
}
}

return shmem;
Expand Down
6 changes: 2 additions & 4 deletions ext/semian/sysv_shared_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@

// Default permissions for shared memory
#define SHM_DEFAULT_PERMISSIONS 0660
#define SHM_DEFAULT_SIZE 1024

typedef void (*shared_memory_init_fn)(void*);
#define SHM_DEFAULT_SIZE 4096

void*
get_or_create_shared_memory(uint64_t key, shared_memory_init_fn fn);
get_or_create_shared_memory(uint64_t key);

void
free_shared_memory(void* key);
Expand Down
18 changes: 18 additions & 0 deletions ext/semian/test/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Makefile

CC := gcc -std=gnu99 -I. -I..
TESTS := types_test

all : test

test : types_test

types_test : types_test.c
$(CC) -o $@ $<
./$@

clean :
rm -f *.o
rm -f $(TESTS)

.PHONY : all test types_test clean
36 changes: 36 additions & 0 deletions ext/semian/test/types_test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include <stdio.h>
#include <stdlib.h>

#include "types.h"

void assert_equal_int(int actual, int expected, const char *message)
{
if (actual != expected) {
fprintf(stderr, "Error: got %d, expected %d (%s)\n", actual, expected, message);
exit(EXIT_FAILURE);
}
}

void assert_le_int(int actual, int expected, const char *message)
{
if (actual > expected) {
fprintf(stderr, "Error: got %d, which is greater than %d (%s)\n", actual, expected, message);
exit(EXIT_FAILURE);
}
}

void test_sliding_window()
{
semian_simple_sliding_window_shared_t window;
assert_le_int(sizeof(window), 4096, "window size is greater than a page");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about sysconf(_SC_PAGESIZE) instead of 4096?

assert_equal_int(sizeof(window.data), SLIDING_WINDOW_MAX_SIZE * sizeof(int), "window data size");
}

int main(int argc, char **argv)
{
printf("Info: Running test\n");

test_sliding_window();

return EXIT_SUCCESS;
}
2 changes: 1 addition & 1 deletion ext/semian/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ For custom type definitions specific to semian
#include <sys/sem.h>
#include <sys/time.h>

#define SLIDING_WINDOW_MAX_SIZE 4096
#define SLIDING_WINDOW_MAX_SIZE 1000

// For sysV semop syscals
// see man semop
Expand Down
12 changes: 12 additions & 0 deletions lib/semian/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,31 @@ def transition_to_close
log_state_transition(:closed)
@state.close!
@errors.clear
throttle(nil)
end

def transition_to_open
notify_state_transition(:open)
log_state_transition(:open)
@state.open!
throttle(1)
end

def transition_to_half_open
notify_state_transition(:half_open)
log_state_transition(:half_open)
@state.half_open!
@successes.reset
throttle(@success_count_threshold)
end

def throttle(val = nil)
resource = Semian[@name]
return if resource.nil?
bulkhead = resource.bulkhead
return if bulkhead.nil?

bulkhead.throttle(val)
end

def success_threshold_reached?
Expand Down