Skip to content

Commit

Permalink
Enhance the ticket information catch-up from other sites
Browse files Browse the repository at this point in the history
Signed-off-by: Jiaju Zhang <jjzhang@suse.de>
  • Loading branch information
jjzhang committed Mar 23, 2012
1 parent a3d3818 commit f16e6d1
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/booth.h
Expand Up @@ -58,6 +58,7 @@ typedef enum {
BOOTHC_CMD_LIST = 1,
BOOTHC_CMD_GRANT,
BOOTHC_CMD_REVOKE,
BOOTHC_CMD_CATCHUP,
} cmd_request_t;

typedef enum {
Expand Down
7 changes: 6 additions & 1 deletion src/main.c
Expand Up @@ -308,7 +308,6 @@ void process_connection(int ci)
log_error("connection %d read data error %d", ci, rv);
goto out;
}
h.len = 0;
}

switch (h.cmd) {
Expand All @@ -318,6 +317,7 @@ void process_connection(int ci)
break;

case BOOTHC_CMD_GRANT:
h.len = 0;
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
Expand All @@ -335,6 +335,7 @@ void process_connection(int ci)
break;

case BOOTHC_CMD_REVOKE:
h.len = 0;
site = data;
ticket = data + BOOTH_NAME_LEN;
if (!check_ticket(ticket)) {
Expand All @@ -351,6 +352,10 @@ void process_connection(int ci)
h.result = BOOTHC_RLT_REMOTE_OP;
break;

case BOOTHC_CMD_CATCHUP:
h.result = catchup_ticket(&data, h.len);
break;

default:
log_error("connection %d cmd %x unknown", ci, h.cmd);
break;
Expand Down
132 changes: 130 additions & 2 deletions src/ticket.c
Expand Up @@ -27,6 +27,8 @@
#include "pacemaker.h"
#include "list.h"
#include "log.h"
#include "booth.h"
#include "timer.h"
#include "paxos_lease.h"
#include "paxos.h"

Expand All @@ -39,6 +41,14 @@ struct booth_msghdr {
uint32_t len;
} __attribute__((packed));

struct ticket_msg {
char id[BOOTH_NAME_LEN+1];
uint32_t owner;
uint32_t expiry;
uint32_t ballot;
uint32_t result;
} __attribute__((packed));

struct ticket {
char id[BOOTH_NAME_LEN+1];
pl_handle_t handle;
Expand Down Expand Up @@ -174,7 +184,7 @@ static int ticket_broadcast(void *value, int len)
free(buf);
return rv;
}

#if 0
static int ticket_read(const void *name, int *owner, int *ballot,
unsigned long long *expires)
{
Expand All @@ -200,6 +210,100 @@ static int ticket_read(const void *name, int *owner, int *ballot,

return 0;
}
#endif
static int ticket_parse(struct ticket_msg *tmsg)
{
struct ticket *tk;
int found = 0;

if (!tmsg->result)
return -1;

list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, tmsg->id)) {
tk->owner = tmsg->owner;
tk->expires = current_time() + tmsg->expiry;
tk->ballot = tmsg->ballot;
found = 1;
break;
}
}

if (!found)
return -1;
else
return 0;
}

static int ticket_catchup(const void *name, int *owner, int *ballot,
unsigned long long *expires)
{
struct ticket *tk;
int i, s, buflen, rv = 0;
char *buf = NULL;
struct boothc_header *h;
struct ticket_msg *tmsg;
int myid = ticket_get_myid();

if (booth_conf->node[myid].type != ARBITRATOR) {
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, name)) {
pcmk_handler.load_ticket(tk->id,
&tk->owner,
&tk->ballot,
&tk->expires);
if (current_time() >= tk->expires) {
tk->owner = -1;
tk->expires = 0;
pcmk_handler.store_ticket(tk->id,
tk->owner,
tk->ballot,
tk->expires);
}
}
}
}

buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg);
buf = malloc(buflen);
if (!buf)
return -ENOMEM;
memset(buf, 0, buflen);

h = (struct boothc_header *)buf;
h->magic = BOOTHC_MAGIC;
h->version = BOOTHC_VERSION;
h->cmd = BOOTHC_CMD_CATCHUP;
h->len = sizeof(struct ticket_msg);
tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header));

for (i = 0; i < booth_conf->node_count; i++) {
if (booth_conf->node[i].type == SITE &&
!(booth_conf->node[i].local)) {
strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1);
s = booth_transport[TCP].open(&booth_conf->node[i]);
if (s < 0)
continue;
rv = booth_transport[TCP].send(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
continue;
}
memset(tmsg, 0, sizeof(struct ticket_msg));
rv = booth_transport[TCP].recv(s, buf, buflen);
if (rv < 0) {
booth_transport[TCP].close(s);
continue;
}
booth_transport[TCP].close(s);
ticket_parse(tmsg);
memset(tmsg, 0, sizeof(struct ticket_msg));
}
}

free(buf);
return rv;
}

static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
{
Expand Down Expand Up @@ -348,11 +452,35 @@ int list_ticket(char **pdata, unsigned int *len)
return 0;
}

int catchup_ticket(char **pdata, unsigned int len)
{
struct ticket_msg *tmsg;
struct ticket *tk;

assert(len == sizeof(struct ticket_msg));
tmsg = (struct ticket_msg *)(*pdata);
list_for_each_entry(tk, &ticket_list, list) {
if (!strcmp(tk->id, tmsg->id) && tk->owner == ticket_get_myid()
&& current_time() < tk->expires) {
tmsg->owner = tk->owner;
tmsg->expiry = tk->expires - current_time();
tmsg->ballot = tk->ballot;
tmsg->result = 1;
break;
}
}

if (!tmsg->result)
memset(*pdata, 0, len);

return 0;
}

const struct paxos_lease_operations ticket_operations = {
.get_myid = ticket_get_myid,
.send = ticket_send,
.broadcast = ticket_broadcast,
.catchup = ticket_read,
.catchup = ticket_catchup,
.notify = ticket_write,
};

Expand Down
1 change: 1 addition & 0 deletions src/ticket.h
Expand Up @@ -26,6 +26,7 @@ int check_site(char *site, int *local);
int grant_ticket(char *ticket, int force);
int revoke_ticket(char *ticket, int force);
int list_ticket(char **pdata, unsigned int *len);
int catchup_ticket(char **pdata, unsigned int len);
int ticket_recv(void *msg, int msglen);
int setup_ticket(void);

Expand Down
4 changes: 3 additions & 1 deletion src/transport.c
Expand Up @@ -23,6 +23,7 @@
#include <linux/rtnetlink.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
Expand Down Expand Up @@ -255,7 +256,7 @@ static void process_dead(int ci)

static void process_tcp_listener(int ci)
{
int fd, i;
int fd, i, one = 1;
socklen_t addrlen;
struct sockaddr addr;
struct tcp_conn *conn;
Expand All @@ -266,6 +267,7 @@ static void process_tcp_listener(int ci)
fd, errno);
return;
}
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));

conn = malloc(sizeof(struct tcp_conn));
if (!conn) {
Expand Down

0 comments on commit f16e6d1

Please sign in to comment.