Skip to content

Commit

Permalink
Added support for sharding/partitioning cluster.
Browse files Browse the repository at this point in the history
A sharding clustering is a presence cluster where each node keeps its own set of watchers and presentities of an interest. Any new Published presentity is replicated to all the cluster nodes, but stored only by nodes interested in it (if there are watchers for that presentity). For initial subscriptions, there is quering mechnism inside the cluster - to try to fetch the presentity data from any other node.
  • Loading branch information
bogdan-iancu committed Mar 23, 2018
1 parent a196fb8 commit f7bed45
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 31 deletions.
101 changes: 97 additions & 4 deletions modules/presence/hash.c
Expand Up @@ -396,13 +396,22 @@ phtable_t* new_phtable(void)
LM_ERR("initializing lock [%d]\n", i);
goto error;
}

htable[i].entries= (pres_entry_t*)shm_malloc(sizeof(pres_entry_t));
if(htable[i].entries== NULL)
{
ERR_MEM(SHARE_MEM);
}
memset(htable[i].entries, 0, sizeof(pres_entry_t));
htable[i].entries->next= NULL;

htable[i].cq_entries= (cluster_query_entry_t*)shm_malloc(sizeof(cluster_query_entry_t));
if(htable[i].cq_entries== NULL)
{
ERR_MEM(SHARE_MEM);
}
memset(htable[i].cq_entries, 0, sizeof(cluster_query_entry_t));
htable[i].cq_entries->next= NULL;
}

return htable;
Expand All @@ -414,9 +423,10 @@ phtable_t* new_phtable(void)
{
if(htable[i].entries)
shm_free(htable[i].entries);
else
break;
lock_destroy(&htable[i].lock);
if(htable[i].cq_entries)
shm_free(htable[i].cq_entries);
if (htable[i].lock)
lock_destroy(&htable[i].lock);
}
shm_free(htable);
}
Expand All @@ -428,13 +438,15 @@ void destroy_phtable(void)
{
int i;
pres_entry_t* p, *prev_p;
cluster_query_entry_t* cq, *prev_cq;

if(pres_htable== NULL)
return;

for(i= 0; i< phtable_size; i++)
{
lock_destroy(&pres_htable[i].lock);

p= pres_htable[i].entries;
while(p)
{
Expand All @@ -444,6 +456,15 @@ void destroy_phtable(void)
shm_free(prev_p->sphere);
shm_free(prev_p);
}

cq= pres_htable[i].cq_entries;
while(cq)
{
prev_cq= cq;
cq= cq->next;
shm_free(prev_cq);
}

}
shm_free(pres_htable);
}
Expand Down Expand Up @@ -497,7 +518,8 @@ void update_pres_etag(pres_entry_t* p, str* etag)
p->etag_count++;
}

pres_entry_t* insert_phtable(str* pres_uri, int event, str* etag, char* sphere, int init_turn)
pres_entry_t* insert_phtable(str* pres_uri, int event, str* etag, char* sphere,
unsigned int flags, int init_turn)
{
unsigned int hash_code;
pres_entry_t* p= NULL;
Expand Down Expand Up @@ -526,6 +548,7 @@ pres_entry_t* insert_phtable(str* pres_uri, int event, str* etag, char* sphere,
strcpy(p->sphere, sphere);
}
p->event= event;
p->flags = flags;
update_pres_etag(p, etag);

hash_code= core_hash(pres_uri, NULL, phtable_size);
Expand Down Expand Up @@ -694,3 +717,73 @@ int update_phtable(presentity_t* presentity, str pres_uri, str body)
pkg_free(sphere);
return ret;
}


cluster_query_entry_t* insert_cluster_query(str* pres_uri, int event,
unsigned int hash_code)
{
cluster_query_entry_t* p;

p = (cluster_query_entry_t*)shm_malloc
(sizeof(cluster_query_entry_t)+ pres_uri->len);
if (p==NULL){
LM_ERR("failed to allocate shm mem (needed %d)\n",
(int)(sizeof(cluster_query_entry_t)+ pres_uri->len));
return NULL;
}

p->pres_uri.s = (char*)(p + 1);
memcpy(p->pres_uri.s, pres_uri->s, pres_uri->len);
p->pres_uri.len = pres_uri->len;

p->event = event;

p->next= pres_htable[hash_code].cq_entries->next;
pres_htable[hash_code].cq_entries->next= p;

return p;

}


cluster_query_entry_t* search_cluster_query(str* pres_uri, int event, unsigned int hash_code)
{
cluster_query_entry_t* p;

LM_DBG("pres_uri= %.*s, event=%d\n", pres_uri->len, pres_uri->s,
event);
p = pres_htable[hash_code].cq_entries->next;
while(p) {
if ( p->event==event && p->pres_uri.len==pres_uri->len &&
strncmp(p->pres_uri.s, pres_uri->s, pres_uri->len)== 0 ) {
return p;
}
p = p->next;
}
return NULL;
}

int delete_cluster_query(str* pres_uri, int event, unsigned int hash_code)
{
cluster_query_entry_t* p, *old_p;

LM_DBG("pres_uri= %.*s, event=%d\n", pres_uri->len, pres_uri->s,
event);
p = pres_htable[hash_code].cq_entries;
while(p->next) {
if ( p->next->event==event && p->next->pres_uri.len==pres_uri->len &&
strncmp(p->next->pres_uri.s, pres_uri->s, pres_uri->len)== 0 ) {
break;
}
p = p->next;
}
if (p->next==NULL)
return -1;

old_p = p->next;
p->next= p->next->next;
shm_free(old_p);
return 0;
}


31 changes: 28 additions & 3 deletions modules/presence/hash.h
Expand Up @@ -116,6 +116,8 @@ typedef struct subscription* (*mem_copy_subs_t)(struct subscription* s, int mem_

void free_subs(struct subscription* s);

#define PRES_FLAG_REPLICATED (1<<0)

/* presentity hash table */
typedef struct pres_entry
{
Expand All @@ -125,36 +127,59 @@ typedef struct pres_entry
char* sphere;
char etag[ETAG_LEN];
int etag_len;
unsigned int flags;
/* ordering */
unsigned int current_turn;
unsigned int last_turn;
struct pres_entry* next;
}pres_entry_t;

typedef struct cluster_query_entry
{
str pres_uri;
int event;
struct cluster_query_entry* next;
}cluster_query_entry_t;


typedef struct pres_htable
{
pres_entry_t* entries;
pres_entry_t *entries;
cluster_query_entry_t *cq_entries;
gen_lock_t lock;
}phtable_t;


phtable_t* new_phtable(void);
void destroy_phtable(void);

pres_entry_t* search_phtable(str* pres_uri, int event, unsigned int hash_code);

pres_entry_t* search_phtable_etag(str* pres_uri, int event,
str* etag, unsigned int hash_code);

void update_pres_etag(pres_entry_t* p, str* etag);

pres_entry_t* insert_phtable(str* pres_uri, int event, str* etag, char* sphere, int init_turn);
pres_entry_t* insert_phtable(str* pres_uri, int event, str* etag,
char* sphere, unsigned int flags, int init_turn);

int update_phtable(struct presentity* presentity, str pres_uri, str body);

void next_turn_phtable(pres_entry_t* p_p, unsigned int hash_code);

int delete_phtable(pres_entry_t* p, unsigned int hash_code);

int delete_phtable_query(str *pres_uri, int event, str* etag);

void destroy_phtable(void);


cluster_query_entry_t* insert_cluster_query(str* pres_uri, int event,
unsigned int hash_code);

cluster_query_entry_t* search_cluster_query(str* pres_uri, int event,
unsigned int hash_code);

int delete_cluster_query(str* pres_uri, int event, unsigned int hash_code);

#endif

0 comments on commit f7bed45

Please sign in to comment.