Skip to content

Commit

Permalink
adding shared memory and semaphore management
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Lauzon committed Mar 29, 2010
1 parent 489d805 commit 8cff952
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 4 deletions.
148 changes: 145 additions & 3 deletions libketama/ketama.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,86 @@

char k_error[255] = "";

int num_sem_ids = 0;
int num_shm_ids = 0;
int num_data = 0;
int sem_ids_size = 1024;
int shm_ids_size = 1024;
int shm_data_size = 1024;
int *sem_ids = NULL;
int *shm_ids = NULL;
int **shm_data = NULL;

static void
init_sem_id_tracker() {
sem_ids = malloc(sizeof(int)*1024);
}

static void
init_shm_id_tracker() {
shm_ids = malloc(sizeof(int)*1024);
}

static void
init_shm_data_tracker() {
shm_data = malloc(sizeof(int*)*1024);
}


static void
track_shm_data(int *data) {
if (num_data == shm_data_size) {
void *tmp = realloc(shm_data, sizeof(int*)*(shm_data_size + 1024));
if (tmp != NULL) {
shm_data = tmp;
} else {
sprintf( k_error, "Cannot realloc shm data tracker");
exit(1);
}

shm_data_size += 1024;
}

shm_data[num_data] = data;
num_data++;
}

static void
track_sem_id(int semid) {
if (num_sem_ids == sem_ids_size) {
void *tmp = realloc(sem_ids, sizeof(int)*(sem_ids_size + 1024));
if (tmp != NULL) {
sem_ids = tmp;
} else {
sprintf( k_error, "Cannot realloc semids");
exit(1);
}

sem_ids_size += 1024;
}

sem_ids[num_sem_ids] = semid;
num_sem_ids++;
}

static void
track_shm_id(int shmid) {
if (num_shm_ids == shm_ids_size) {
void *tmp = realloc(shm_ids, sizeof(int)*(shm_ids_size + 1024));
if (tmp != NULL) {
shm_ids = tmp;
} else {
sprintf( k_error, "Cannot realloc shmids");
exit(1);
}

shm_ids_size += 1024;
}

shm_ids[num_shm_ids] = shmid;
num_shm_ids++;
}

/** \brief Locks the semaphore.
* \param sem_set_id The semaphore handle that you want to lock. */
static void
Expand Down Expand Up @@ -73,15 +153,22 @@ ketama_sem_unlock( int sem_set_id )
static int
ketama_sem_init( key_t key )
{
if (sem_ids == NULL) {
init_sem_id_tracker();
}

int sem_set_id;

sem_set_id = semget( key, 1, 0 );
track_sem_id(sem_set_id);

if ( sem_set_id == -1 )
{
// create a semaphore set with ID SEM_ID
sem_set_id = semget( key, 1, IPC_CREAT | 0666 );
if ( sem_set_id == -1 )
track_sem_id(sem_set_id);

if ( sem_set_id == -1 )
{
strcpy( k_error, "Could not open semaphore!" );
return 0;
Expand Down Expand Up @@ -291,6 +378,14 @@ ketama_get_server( char* key, ketama_continuum cont )
static int
ketama_create_continuum( key_t key, char* filename )
{
if (shm_ids == NULL) {
init_shm_id_tracker();
}

if (shm_data == NULL) {
init_shm_data_tracker();
}

int shmid;
int* data; /* Pointer to shmem location */
unsigned int numservers = 0;
Expand Down Expand Up @@ -361,7 +456,9 @@ ketama_create_continuum( key_t key, char* filename )

/* Add data to shmmem */
shmid = shmget( key, MC_SHMSIZE, 0644 | IPC_CREAT );
data = shmat( shmid, (void *)0, 0 );
track_shm_id(shmid);

data = shmat( shmid, (void *)0, 0 );
if ( data == (void *)(-1) )
{
strcpy( k_error, "Can't open shmmem for writing." );
Expand Down Expand Up @@ -390,6 +487,14 @@ ketama_create_continuum( key_t key, char* filename )
int
ketama_roll( ketama_continuum* contptr, char* filename )
{
if (shm_ids == NULL) {
init_shm_id_tracker();
}

if (shm_data == NULL) {
init_shm_data_tracker();
}

strcpy( k_error, "" );

key_t key;
Expand Down Expand Up @@ -434,7 +539,9 @@ ketama_roll( ketama_continuum* contptr, char* filename )
while ( !fmodtime || modtime != *fmodtime )
{
shmid = shmget( key, MC_SHMSIZE, 0 ); // read only attempt.
data = shmat( shmid, (void *)0, SHM_RDONLY );
track_shm_id(shmid);

data = shmat( shmid, (void *)0, SHM_RDONLY );

if ( data == (void *)(-1) || (*contptr)->modtime != 0 )
{
Expand All @@ -455,6 +562,8 @@ ketama_roll( ketama_continuum* contptr, char* filename )
syslog( LOG_INFO, "ketama_create_continuum() successfully finished.\n" );*/

shmid = shmget( key, MC_SHMSIZE, 0 ); // read only attempt.
track_shm_id(shmid);

data = shmat( shmid, (void *)0, SHM_RDONLY );
ketama_sem_unlock( sem_set_id );
}
Expand All @@ -469,6 +578,8 @@ ketama_roll( ketama_continuum* contptr, char* filename )
(*contptr)->modtime = ++data;
(*contptr)->array = data + sizeof( void* );
fmodtime = (time_t*)( (*contptr)->modtime );

track_shm_data(data);
}

return 1;
Expand All @@ -478,6 +589,37 @@ ketama_roll( ketama_continuum* contptr, char* filename )
void
ketama_smoke( ketama_continuum contptr )
{
int i;
if (shm_data != NULL) {
for (i = 0; i < num_data; i++) {
shmdt(shm_data[i]);
}
free(shm_data);
shm_data = NULL;
num_data = 0;
shm_data_size = 1024;
}

if (sem_ids != NULL) {
for (i = 0; i < num_sem_ids; i++) {
semctl(sem_ids[i], 0, IPC_RMID, 0);
}
free(sem_ids);
sem_ids = NULL;
num_sem_ids = 0;
sem_ids_size = 1024;
}

if (shm_ids != NULL) {
for (i = 0; i < num_shm_ids; i++) {
shmctl(shm_ids[i], IPC_RMID, 0);
}
free(shm_ids);
shm_ids = NULL;
num_shm_ids = 0;
shm_ids_size = 1024;
}

free( contptr );
}

Expand Down
2 changes: 1 addition & 1 deletion libketama/ketama_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ int main(int argc, char **argv)

printf( "%u %u %s\n", kh, m->point, m->ip );
}

ketama_smoke(c);
return 0;
}

0 comments on commit 8cff952

Please sign in to comment.