Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added min_tickets to the bulkheads #248

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
/tmp
/pkg
*.gem
*.log
*.orig
/html/
Gemfile.lock
vendor/
Expand All @@ -18,3 +20,5 @@ nohup.out

# IntelliJ/RubyMine/CLion project files
.idea
CMakeLists.txt
cmake-build-debug
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,25 +174,27 @@ You may now set quotas per worker:
```ruby
client = Redis.new(semian: {
name: "inventory",
quota: 0.5,
quota: 0.49,
min_tickets: 2,
success_threshold: 2,
error_threshold: 4,
error_timeout: 20
})

```

Per the above example, you no longer need to care about the number of tickets.
Per the above example, you no longer need to care about the number of tickets. Rather, the tickets shall be computed as a proportion of the number of active workers.

Rather, the tickets shall be computed as a proportion of the number of active workers.
In this case, we'd allow 49% of the workers on a particular host to connect to this redis resource.

In this case, we'd allow 50% of the workers on a particular host to connect to this redis resource.
In particular, 1 worker = 1 ticket (due to `ceil`), 2 workers = 2 tickets (due to `min_tickets`), 4 workers = 2 tickets (due to `ceil`), 100 workers = 49 tickets.

**Note**:

- You must pass **exactly** one of ticket or quota.
- Tickets available will be the ceiling of the quota ratio to the number of workers
- So, with one worker, there will always be a minimum of 1 ticket
- So, with one worker, there will always be a minimum of 1 ticket
- If you want to guarantee 2 tickets when there are 2 workers, use `min_tickets: 2`
- Workers in different processes will automatically unregister when the process exits.

#### Net::HTTP
Expand Down Expand Up @@ -484,7 +486,9 @@ still experimenting with ways to figure out optimal ticket numbers. Generally
something below half the number of workers on the server for endpoints that are
queried frequently has worked well for us.

* **tickets**. Number of workers that can concurrently access a resource.
* **tickets**. Number of workers that can concurrently access a resource. (Mutually exclusive with **quota**.)
* **quota**. Percentage of workers that can concurrently access a resource. (Mutually exclusive with **tickets**.)
* **min_tickets**. Minimum number of tickets to allow when using **quota**.
* **timeout**. Time to wait in seconds to acquire a ticket if there are no tickets left.
We recommend this to be `0` unless you have very few workers running (i.e.
less than ~5).
Expand Down
4 changes: 2 additions & 2 deletions ext/semian/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
have_func 'rb_thread_blocking_region'
have_func 'rb_thread_call_without_gvl'

$CFLAGS = "-D_GNU_SOURCE -Werror -Wall "
if ENV.key?('DEBUG')
$CFLAGS = "-D_GNU_SOURCE -Werror -Wall -std=gnu99 "
if ENV.key?('DEBUG') || ENV.key?('SEMIAN_DEBUG')
$CFLAGS << "-O0 -g -DDEBUG"
else
$CFLAGS << "-O3"
Expand Down
74 changes: 55 additions & 19 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ static VALUE
cleanup_semian_resource_acquire(VALUE self);

static void
check_tickets_xor_quota_arg(VALUE tickets, VALUE quota);
check_tickets_xor_quota_arg(VALUE tickets, VALUE min_tickets, VALUE quota);

static double
check_quota_arg(VALUE quota);

static int
check_tickets_arg(VALUE tickets);

static int
check_min_tickets_arg(VALUE min_tickets);

static long
check_permissions_arg(VALUE permissions);

Expand Down Expand Up @@ -200,24 +203,19 @@ semian_resource_key(VALUE self)
}

VALUE
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout)
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout, VALUE min_tickets)
{
long c_permissions;
double c_timeout;
double c_quota;
int c_tickets;
semian_resource_t *res = NULL;
const char *c_id_str = NULL;

// Check and cast arguments
check_tickets_xor_quota_arg(tickets, quota);
c_quota = check_quota_arg(quota);
c_tickets = check_tickets_arg(tickets);
c_permissions = check_permissions_arg(permissions);
c_id_str = check_id_arg(id);
c_timeout = check_default_timeout_arg(default_timeout);
check_tickets_xor_quota_arg(tickets, min_tickets, quota);
double c_quota = check_quota_arg(quota);
int c_tickets = check_tickets_arg(tickets);
long c_permissions = check_permissions_arg(permissions);
const char *c_id_str = check_id_arg(id);
double c_timeout = check_default_timeout_arg(default_timeout);
int c_min_tickets = check_min_tickets_arg(min_tickets);

// Build semian resource structure
semian_resource_t *res = NULL;
TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

// Populate struct fields
Expand All @@ -227,7 +225,7 @@ semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VAL
res->wait_time = -1;

// Initialize the semaphore set
initialize_semaphore_set(res, c_id_str, c_permissions, c_tickets, c_quota);
initialize_semaphore_set(res, c_id_str, c_permissions, c_tickets, c_min_tickets, c_quota);

return self;
}
Expand Down Expand Up @@ -259,10 +257,22 @@ check_permissions_arg(VALUE permissions)
}

static void
check_tickets_xor_quota_arg(VALUE tickets, VALUE quota)
check_tickets_xor_quota_arg(VALUE tickets, VALUE min_tickets, VALUE quota)
{
if ((TYPE(tickets) == T_NIL && TYPE(quota) == T_NIL) ||(TYPE(tickets) != T_NIL && TYPE(quota) != T_NIL)){
rb_raise(rb_eArgError, "Must pass exactly one of ticket or quota");
const char *msg = "Must pass exactly one of ticket or quota/min_tickets";
if (TYPE(quota) != T_NIL) {
if (TYPE(tickets) != T_NIL) {
dprintf("FOO");
rb_raise(rb_eArgError, msg);
}
} else if (TYPE(tickets) != T_NIL) {
if (TYPE(quota) != T_NIL || TYPE(min_tickets) != T_NIL) {
dprintf("FOO");
rb_raise(rb_eArgError, msg);
}
} else {
dprintf("FOO");
rb_raise(rb_eArgError, msg);
}
}

Expand Down Expand Up @@ -308,6 +318,32 @@ check_tickets_arg(VALUE tickets)
return c_tickets;
}

static int
check_min_tickets_arg(VALUE min_tickets)
{
int retval = -1;

switch (rb_type(min_tickets)) {
case T_NIL:
case T_UNDEF:
return -1;
case T_FLOAT:
rb_warn("semian min_tickets value %f is a float, converting to fixnum", RFLOAT_VALUE(min_tickets));
retval = (int) RFLOAT_VALUE(min_tickets);
break;
case T_FIXNUM:
retval = FIX2LONG(min_tickets); break;
default:
retval = -1; break;
}

if (retval <= 0 || retval > system_max_semaphore_count) {
rb_raise(rb_eArgError, "max_tickets must be in range [1,%d)", system_max_semaphore_count);
}

return retval;
}

static const char*
check_id_arg(VALUE id)
{
Expand Down
2 changes: 1 addition & 1 deletion ext/semian/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int system_max_semaphore_count;
* Creates a new Resource. Do not create resources directly. Use Semian.register.
*/
VALUE
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout);
semian_resource_initialize(VALUE self, VALUE id, VALUE tickets, VALUE quota, VALUE permissions, VALUE default_timeout, VALUE min_tickets);

/*
* call-seq:
Expand Down
2 changes: 1 addition & 1 deletion ext/semian/semian.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void Init_semian()
eInternal = rb_const_get(cSemian, rb_intern("InternalError"));

rb_define_alloc_func(cResource, semian_resource_alloc);
rb_define_method(cResource, "initialize_semaphore", semian_resource_initialize, 5);
rb_define_method(cResource, "initialize_semaphore", semian_resource_initialize, 6);
rb_define_method(cResource, "acquire", semian_resource_acquire, -1);
rb_define_method(cResource, "count", semian_resource_count, 0);
rb_define_method(cResource, "semid", semian_resource_id, 0);
Expand Down
3 changes: 2 additions & 1 deletion ext/semian/sysv_semaphores.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ raise_semian_syscall_error(const char *syscall, int error_num)
}

void
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota)
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, int min_tickets, double quota)
{

res->key = generate_key(id_str);
Expand Down Expand Up @@ -69,6 +69,7 @@ initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permis
configure_tickets_args_t configure_tickets_args = (configure_tickets_args_t){
.sem_id = res->sem_id,
.tickets = tickets,
.min_tickets = min_tickets,
.quota = quota,
};
rb_protect(
Expand Down
7 changes: 3 additions & 4 deletions ext/semian/sysv_semaphores.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and functions associated directly weth semops.

#include "types.h"
#include "tickets.h"
#include "util.h"

// Defines for ruby threading primitives
#if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H)
Expand Down Expand Up @@ -71,7 +72,7 @@ raise_semian_syscall_error(const char *syscall, int error_num);

// Initialize the sysv semaphore structure
void
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, double quota);
initialize_semaphore_set(semian_resource_t* res, const char* id_str, long permissions, int tickets, int min_tickets, double quota);

// Set semaphore UNIX octal permissions
void
Expand Down Expand Up @@ -106,17 +107,15 @@ get_semaphore(int key);
void *
acquire_semaphore_without_gvl(void *p);

#ifdef DEBUG
static inline void
print_sem_vals(int sem_id)
{
printf("lock %d, tickets: %d configured: %d, registered workers %d\n",
dprintf("lock %d, tickets: %d configured: %d, registered workers %d",
get_sem_val(sem_id, SI_SEM_LOCK),
get_sem_val(sem_id, SI_SEM_TICKETS),
get_sem_val(sem_id, SI_SEM_CONFIGURED_TICKETS),
get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS)
);
}
#endif

#endif // SEMIAN_SEMSET_H
25 changes: 19 additions & 6 deletions ext/semian/tickets.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ static VALUE
update_ticket_count(int sem_id, int count);

static int
calculate_quota_tickets(int sem_id, double quota);
calculate_quota_tickets(int sem_id, double quota, int min_tickets);

// Must be called with the semaphore meta lock already acquired
VALUE
Expand All @@ -14,7 +14,7 @@ configure_tickets(VALUE value)
configure_tickets_args_t *args = (configure_tickets_args_t *)value;

if (args->quota > 0) {
args->tickets = calculate_quota_tickets(args->sem_id, args->quota);
args->tickets = calculate_quota_tickets(args->sem_id, args->quota, args->min_tickets);
}

/*
Expand Down Expand Up @@ -68,9 +68,22 @@ update_ticket_count(int sem_id, int tickets)
}

static int
calculate_quota_tickets (int sem_id, double quota)
min(const int a, const int b)
{
int tickets = 0;
tickets = (int) ceil(get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS) * quota);
return tickets;
return a < b ? a : b;
}

static int
max(const int a, const int b)
{
return a > b ? a : b;
}

static int
calculate_quota_tickets(int sem_id, double quota, int min_tickets)
{
int workers = get_sem_val(sem_id, SI_SEM_REGISTERED_WORKERS);
int tickets = (int) ceil(workers * quota);
dprintf("Calculating quota tickets - sem_id:%d quota:%0.2f%% workers:%d min_tickets:%d tickets:%d", sem_id, quota * 100.0, workers, min_tickets, tickets);
return min_tickets > 0 ? min(workers, max(tickets, min_tickets)) : tickets;
thegedge marked this conversation as resolved.
Show resolved Hide resolved
michaelkipper marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions ext/semian/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ union semun {
typedef struct {
int sem_id;
int tickets;
int min_tickets;
double quota;
} configure_tickets_args_t;

Expand Down
25 changes: 25 additions & 0 deletions ext/semian/util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef EXT_SEMIAN_UTIL_H
#define EXT_SEMIAN_UTIL_H

#include <stdarg.h>
#include <stdio.h>
#include <time.h>

#ifdef DEBUG
# define DEBUG_TEST 1
#else
# define DEBUG_TEST 0
#endif

#define dprintf(fmt, ...) \
do { \
if (DEBUG_TEST) { \
const pid_t pid = getpid(); \
struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); \
struct tm t; localtime_r(&(ts.tv_sec), &t); \
char buf[128]; strftime(buf, sizeof(buf), "%H:%M:%S", &t); \
printf("%s.%ld [DEBUG] (%d): %s:%d - " fmt "\n", buf, ts.tv_nsec, pid, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
} while (0)

#endif // EXT_SEMIAN_UTIL_H
4 changes: 2 additions & 2 deletions lib/semian/resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def instance(*args)
end
end

def initialize(name, tickets: nil, quota: nil, permissions: 0660, timeout: 0)
def initialize(name, tickets: nil, quota: nil, permissions: 0660, timeout: 0, min_tickets: nil)
if Semian.semaphores_enabled?
initialize_semaphore(name, tickets, quota, permissions, timeout) if respond_to?(:initialize_semaphore)
initialize_semaphore(name, tickets, quota, permissions, timeout, min_tickets) if respond_to?(:initialize_semaphore)
else
Semian.issue_disabled_semaphores_warning
end
Expand Down
20 changes: 20 additions & 0 deletions scripts/cleanup-ipc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

ME=`whoami`

IPCS_S=`ipcs -s | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "`
IPCS_M=`ipcs -m | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "`
IPCS_Q=`ipcs -q | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" "`

for id in $IPCS_M; do
ipcrm -m $id;
done

for id in $IPCS_S; do
ipcrm -s $id;
done

for id in $IPCS_Q; do
ipcrm -q $id;
done
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also tack on an xargs for all of the above:

ipcs -s | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" " | xargs -n1 ipcrm -s
ipcs -m | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" " | xargs -n1 ipcrm -m
ipcs -q | egrep "0x[0-9a-f]+ [0-9]+" | grep $ME | cut -f2 -d" " | xargs -n1 ipcrm -q


Loading