Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Enhance the ticket information catch-up from other sites

Signed-off-by: Jiaju Zhang <jjzhang@suse.de>
  • Loading branch information...
commit f16e6d1a7750856d13c3f1530ac9f4e24b70199b 1 parent a3d3818
Jiaju Zhang jjzhang authored
1  src/booth.h
@@ -58,6 +58,7 @@ typedef enum {
58 58 BOOTHC_CMD_LIST = 1,
59 59 BOOTHC_CMD_GRANT,
60 60 BOOTHC_CMD_REVOKE,
  61 + BOOTHC_CMD_CATCHUP,
61 62 } cmd_request_t;
62 63
63 64 typedef enum {
7 src/main.c
@@ -308,7 +308,6 @@ void process_connection(int ci)
308 308 log_error("connection %d read data error %d", ci, rv);
309 309 goto out;
310 310 }
311   - h.len = 0;
312 311 }
313 312
314 313 switch (h.cmd) {
@@ -318,6 +317,7 @@ void process_connection(int ci)
318 317 break;
319 318
320 319 case BOOTHC_CMD_GRANT:
  320 + h.len = 0;
321 321 site = data;
322 322 ticket = data + BOOTH_NAME_LEN;
323 323 if (!check_ticket(ticket)) {
@@ -335,6 +335,7 @@ void process_connection(int ci)
335 335 break;
336 336
337 337 case BOOTHC_CMD_REVOKE:
  338 + h.len = 0;
338 339 site = data;
339 340 ticket = data + BOOTH_NAME_LEN;
340 341 if (!check_ticket(ticket)) {
@@ -351,6 +352,10 @@ void process_connection(int ci)
351 352 h.result = BOOTHC_RLT_REMOTE_OP;
352 353 break;
353 354
  355 + case BOOTHC_CMD_CATCHUP:
  356 + h.result = catchup_ticket(&data, h.len);
  357 + break;
  358 +
354 359 default:
355 360 log_error("connection %d cmd %x unknown", ci, h.cmd);
356 361 break;
132 src/ticket.c
@@ -27,6 +27,8 @@
27 27 #include "pacemaker.h"
28 28 #include "list.h"
29 29 #include "log.h"
  30 +#include "booth.h"
  31 +#include "timer.h"
30 32 #include "paxos_lease.h"
31 33 #include "paxos.h"
32 34
@@ -39,6 +41,14 @@ struct booth_msghdr {
39 41 uint32_t len;
40 42 } __attribute__((packed));
41 43
  44 +struct ticket_msg {
  45 + char id[BOOTH_NAME_LEN+1];
  46 + uint32_t owner;
  47 + uint32_t expiry;
  48 + uint32_t ballot;
  49 + uint32_t result;
  50 +} __attribute__((packed));
  51 +
42 52 struct ticket {
43 53 char id[BOOTH_NAME_LEN+1];
44 54 pl_handle_t handle;
@@ -174,7 +184,7 @@ static int ticket_broadcast(void *value, int len)
174 184 free(buf);
175 185 return rv;
176 186 }
177   -
  187 +#if 0
178 188 static int ticket_read(const void *name, int *owner, int *ballot,
179 189 unsigned long long *expires)
180 190 {
@@ -200,6 +210,100 @@ static int ticket_read(const void *name, int *owner, int *ballot,
200 210
201 211 return 0;
202 212 }
  213 +#endif
  214 +static int ticket_parse(struct ticket_msg *tmsg)
  215 +{
  216 + struct ticket *tk;
  217 + int found = 0;
  218 +
  219 + if (!tmsg->result)
  220 + return -1;
  221 +
  222 + list_for_each_entry(tk, &ticket_list, list) {
  223 + if (!strcmp(tk->id, tmsg->id)) {
  224 + tk->owner = tmsg->owner;
  225 + tk->expires = current_time() + tmsg->expiry;
  226 + tk->ballot = tmsg->ballot;
  227 + found = 1;
  228 + break;
  229 + }
  230 + }
  231 +
  232 + if (!found)
  233 + return -1;
  234 + else
  235 + return 0;
  236 +}
  237 +
  238 +static int ticket_catchup(const void *name, int *owner, int *ballot,
  239 + unsigned long long *expires)
  240 +{
  241 + struct ticket *tk;
  242 + int i, s, buflen, rv = 0;
  243 + char *buf = NULL;
  244 + struct boothc_header *h;
  245 + struct ticket_msg *tmsg;
  246 + int myid = ticket_get_myid();
  247 +
  248 + if (booth_conf->node[myid].type != ARBITRATOR) {
  249 + list_for_each_entry(tk, &ticket_list, list) {
  250 + if (!strcmp(tk->id, name)) {
  251 + pcmk_handler.load_ticket(tk->id,
  252 + &tk->owner,
  253 + &tk->ballot,
  254 + &tk->expires);
  255 + if (current_time() >= tk->expires) {
  256 + tk->owner = -1;
  257 + tk->expires = 0;
  258 + pcmk_handler.store_ticket(tk->id,
  259 + tk->owner,
  260 + tk->ballot,
  261 + tk->expires);
  262 + }
  263 + }
  264 + }
  265 + }
  266 +
  267 + buflen = sizeof(struct boothc_header) + sizeof(struct ticket_msg);
  268 + buf = malloc(buflen);
  269 + if (!buf)
  270 + return -ENOMEM;
  271 + memset(buf, 0, buflen);
  272 +
  273 + h = (struct boothc_header *)buf;
  274 + h->magic = BOOTHC_MAGIC;
  275 + h->version = BOOTHC_VERSION;
  276 + h->cmd = BOOTHC_CMD_CATCHUP;
  277 + h->len = sizeof(struct ticket_msg);
  278 + tmsg = (struct ticket_msg *)(buf + sizeof(struct boothc_header));
  279 +
  280 + for (i = 0; i < booth_conf->node_count; i++) {
  281 + if (booth_conf->node[i].type == SITE &&
  282 + !(booth_conf->node[i].local)) {
  283 + strncpy(tmsg->id, name, BOOTH_NAME_LEN + 1);
  284 + s = booth_transport[TCP].open(&booth_conf->node[i]);
  285 + if (s < 0)
  286 + continue;
  287 + rv = booth_transport[TCP].send(s, buf, buflen);
  288 + if (rv < 0) {
  289 + booth_transport[TCP].close(s);
  290 + continue;
  291 + }
  292 + memset(tmsg, 0, sizeof(struct ticket_msg));
  293 + rv = booth_transport[TCP].recv(s, buf, buflen);
  294 + if (rv < 0) {
  295 + booth_transport[TCP].close(s);
  296 + continue;
  297 + }
  298 + booth_transport[TCP].close(s);
  299 + ticket_parse(tmsg);
  300 + memset(tmsg, 0, sizeof(struct ticket_msg));
  301 + }
  302 + }
  303 +
  304 + free(buf);
  305 + return rv;
  306 +}
203 307
204 308 static int ticket_write(pl_handle_t handle, struct paxos_lease_result *result)
205 309 {
@@ -348,11 +452,35 @@ int list_ticket(char **pdata, unsigned int *len)
348 452 return 0;
349 453 }
350 454
  455 +int catchup_ticket(char **pdata, unsigned int len)
  456 +{
  457 + struct ticket_msg *tmsg;
  458 + struct ticket *tk;
  459 +
  460 + assert(len == sizeof(struct ticket_msg));
  461 + tmsg = (struct ticket_msg *)(*pdata);
  462 + list_for_each_entry(tk, &ticket_list, list) {
  463 + if (!strcmp(tk->id, tmsg->id) && tk->owner == ticket_get_myid()
  464 + && current_time() < tk->expires) {
  465 + tmsg->owner = tk->owner;
  466 + tmsg->expiry = tk->expires - current_time();
  467 + tmsg->ballot = tk->ballot;
  468 + tmsg->result = 1;
  469 + break;
  470 + }
  471 + }
  472 +
  473 + if (!tmsg->result)
  474 + memset(*pdata, 0, len);
  475 +
  476 + return 0;
  477 +}
  478 +
351 479 const struct paxos_lease_operations ticket_operations = {
352 480 .get_myid = ticket_get_myid,
353 481 .send = ticket_send,
354 482 .broadcast = ticket_broadcast,
355   - .catchup = ticket_read,
  483 + .catchup = ticket_catchup,
356 484 .notify = ticket_write,
357 485 };
358 486
1  src/ticket.h
@@ -26,6 +26,7 @@ int check_site(char *site, int *local);
26 26 int grant_ticket(char *ticket, int force);
27 27 int revoke_ticket(char *ticket, int force);
28 28 int list_ticket(char **pdata, unsigned int *len);
  29 +int catchup_ticket(char **pdata, unsigned int len);
29 30 int ticket_recv(void *msg, int msglen);
30 31 int setup_ticket(void);
31 32
4 src/transport.c
@@ -23,6 +23,7 @@
23 23 #include <linux/rtnetlink.h>
24 24 #include <arpa/inet.h>
25 25 #include <netinet/in.h>
  26 +#include <netinet/tcp.h>
26 27 #include <unistd.h>
27 28 #include <fcntl.h>
28 29 #include <errno.h>
@@ -255,7 +256,7 @@ static void process_dead(int ci)
255 256
256 257 static void process_tcp_listener(int ci)
257 258 {
258   - int fd, i;
  259 + int fd, i, one = 1;
259 260 socklen_t addrlen;
260 261 struct sockaddr addr;
261 262 struct tcp_conn *conn;
@@ -266,6 +267,7 @@ static void process_tcp_listener(int ci)
266 267 fd, errno);
267 268 return;
268 269 }
  270 + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&one, sizeof(one));
269 271
270 272 conn = malloc(sizeof(struct tcp_conn));
271 273 if (!conn) {

0 comments on commit f16e6d1

Please sign in to comment.
Something went wrong with that request. Please try again.