Skip to content

Commit

Permalink
Connect should be async
Browse files Browse the repository at this point in the history
  • Loading branch information
trondn committed Jan 7, 2011
1 parent 9a0229f commit db39157
Show file tree
Hide file tree
Showing 11 changed files with 425 additions and 210 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Expand Up @@ -42,6 +42,7 @@ libmembase_la_SOURCES = \
src/get.c \
src/handler.c \
src/packet.c \
src/server.c \
src/store.c \
src/tap.c \
src/utilities.c
Expand Down
4 changes: 1 addition & 3 deletions src/arithmetic.c
Expand Up @@ -70,9 +70,7 @@ libmembase_error_t libmembase_arithmetic(libmembase_t instance,
libmembase_server_start_packet(server, req.bytes, sizeof(req.bytes));
libmembase_server_write_packet(server, key, nkey);
libmembase_server_end_packet(server);

// @todo we might want to wait to flush the buffers..
libmembase_server_event_handler(0, EV_WRITE, server);
libmembase_server_send_packets(server);

return LIBMEMBASE_SUCCESS;
}
183 changes: 11 additions & 172 deletions src/create.c
Expand Up @@ -70,36 +70,6 @@ libmembase_t libmembase_create(const char *host,
return ret;
}

/**
* Release all allocated resources for this server instance
* @param server the server to destroy
*/
static void libmembase_destroy_server(libmembase_server_t *server)
{
if (server->sasl_conn != NULL) {
sasl_dispose(&server->sasl_conn);
}

if (server->ev_flags != 0) {
if (event_del(&server->ev_event) == -1) {
abort();
}
}

if (server->sock != INVALID_SOCKET) {
EVUTIL_CLOSESOCKET(server->sock);
}

if (server->ai != NULL) {
freeaddrinfo(server->ai);
}

free(server->output.data);
free(server->cmd_log.data);
free(server->input.data);
memset(server, 0xff, sizeof(*server));
}

LIBMEMBASE_API
void libmembase_destroy(libmembase_t instance)
{
Expand All @@ -121,33 +91,14 @@ void libmembase_destroy(libmembase_t instance)
}

for (size_t ii = 0; ii < instance->nservers; ++ii) {
libmembase_destroy_server(instance->servers + ii);
libmembase_server_destroy(instance->servers + ii);
}
free(instance->servers);

memset(instance, 0xff, sizeof(*instance));
free(instance);
}

/**
* Start the SASL auth for a given server by sending the SASL_LIST_MECHS
* packet to the server.
* @param server the server object to auth agains
*/
static void start_sasl_auth_server(libmembase_server_t *server)
{
protocol_binary_request_no_extras req = {
.message.header.request = {
.magic = PROTOCOL_BINARY_REQ,
.opcode = PROTOCOL_BINARY_CMD_SASL_LIST_MECHS,
.datatype = PROTOCOL_BINARY_RAW_BYTES
}
};
libmembase_server_complete_packet(server, req.bytes, sizeof(req.bytes));
// send the data and add it to libevent..
libmembase_server_event_handler(0, EV_WRITE, server);
}

/**
* Callback functions called from libsasl to get the username to use for
* authentication.
Expand Down Expand Up @@ -195,60 +146,6 @@ static int sasl_get_password(sasl_conn_t *conn, void *context, int id,
return SASL_OK;
}

/**
* Get the name of the local endpoint
* @param sock The socket to query the name for
* @param buffer The destination buffer
* @param buffz The size of the output buffer
* @return true if success, false otherwise
*/
static bool get_local_address(evutil_socket_t sock,
char *buffer,
size_t bufsz)
{
char h[NI_MAXHOST];
char p[NI_MAXSERV];
struct sockaddr_storage saddr;
socklen_t salen = sizeof(saddr);

if ((getsockname(sock, (struct sockaddr *)&saddr, &salen) < 0) ||
(getnameinfo((struct sockaddr *)&saddr, salen, h, sizeof(h),
p, sizeof(p), NI_NUMERICHOST | NI_NUMERICSERV) < 0) ||
(snprintf(buffer, bufsz, "%s;%s", h, p) < 0))
{
return false;
}

return true;
}

/**
* Get the name of the remote enpoint
* @param sock The socket to query the name for
* @param buffer The destination buffer
* @param buffz The size of the output buffer
* @return true if success, false otherwise
*/
static bool get_remote_address(evutil_socket_t sock,
char *buffer,
size_t bufsz)
{
char h[NI_MAXHOST];
char p[NI_MAXSERV];
struct sockaddr_storage saddr;
socklen_t salen = sizeof(saddr);

if ((getpeername(sock, (struct sockaddr *)&saddr, &salen) < 0) ||
(getnameinfo((struct sockaddr *)&saddr, salen, h, sizeof(h),
p, sizeof(p), NI_NUMERICHOST | NI_NUMERICSERV) < 0) ||
(snprintf(buffer, bufsz, "%s;%s", h, p) < 0))
{
return false;
}

return true;
}

/**
* Update the list of servers and connect to the new ones
* @param instance the instance to update the serverlist for.
Expand All @@ -272,9 +169,8 @@ static void libmembase_update_serverlist(libmembase_t instance)

// @todo we shouldn't kill all of them, but fix that later on (remember
// to cancel all ongoing crap etc..
libmembase_server_t *servers = instance->servers;
for (size_t ii = 0; ii < instance->nservers; ++ii) {
libmembase_destroy_server(instance->servers + ii);
libmembase_server_destroy(instance->servers + ii);
}
free(instance->servers);
instance->servers = NULL;
Expand All @@ -283,7 +179,7 @@ static void libmembase_update_serverlist(libmembase_t instance)
uint16_t max = (uint16_t)vbucket_config_get_num_vbuckets(instance->vbucket_config);
size_t num = (size_t)vbucket_config_get_num_servers(instance->vbucket_config);
instance->nservers = num;
servers = calloc(num, sizeof(libmembase_server_t));
instance->servers = calloc(num, sizeof(libmembase_server_t));

instance->sasl.name = vbucket_config_get_user(instance->vbucket_config);
memset(instance->sasl.password.buffer, 0,
Expand All @@ -302,61 +198,6 @@ static void libmembase_update_serverlist(libmembase_t instance)
};
memcpy(instance->sasl.callbacks, sasl_callbacks, sizeof(sasl_callbacks));

for (size_t ii = 0; ii < num; ++ii) {
servers[ii].instance = instance;
servers[ii].current_packet = (size_t)-1;

struct addrinfo hints = {
.ai_flags = AI_PASSIVE,
.ai_socktype = SOCK_STREAM,
.ai_family = AF_UNSPEC
};

char *h;
h = strdup(vbucket_config_get_server(instance->vbucket_config, (int)ii));
char *p = strchr(h, ':');
*p = '\0';
++p;

int error = getaddrinfo(h, p, &hints, &servers[ii].ai);
if (error == 0) {
/* @todo make the connects non-blocking */
struct addrinfo *ai = servers[ii].ai;
while (ai != NULL) {
servers[ii].sock = socket(ai->ai_family,
ai->ai_socktype,
ai->ai_protocol);
if (servers[ii].sock != -1) {
if (connect(servers[ii].sock, ai->ai_addr,
ai->ai_addrlen) != -1 &&
evutil_make_socket_nonblocking(servers[ii].sock) == 0) {

char local[NI_MAXHOST + NI_MAXSERV + 2];
char remote[NI_MAXHOST + NI_MAXSERV + 2];

get_local_address(servers[ii].sock, local,
sizeof(local));

get_remote_address(servers[ii].sock, remote,
sizeof(remote));

int ret = sasl_client_new("membase", h,
local, remote,
instance->sasl.callbacks, 0,
&servers[ii].sasl_conn);
assert(ret == SASL_OK);
break;
}
EVUTIL_CLOSESOCKET(servers[ii].sock);
servers[ii].sock = -1;
}
ai = ai->ai_next;
}
} else {
servers[ii].sock = -1;
servers[ii].ai = NULL;
}
}

/*
* Run through all of the vbuckets and build a map of what they need.
Expand All @@ -371,18 +212,16 @@ static void libmembase_update_serverlist(libmembase_t instance)
instance->vb_server_map[ii] = (uint16_t)idx;
}

instance->servers = servers;
/* Now initialize the servers */
for (size_t ii = 0; ii < num; ++ii) {
instance->servers[ii].instance = instance;
libmembase_server_initialize(instance->servers + ii, (int)ii);
}

if (vbucket_config_get_user(instance->vbucket_config) == NULL) {
if (instance->vbucket_state_listener != NULL) {
for (size_t ii = 0; ii < instance->nservers; ++ii) {
// fire notifications!
instance->vbucket_state_listener(instance->servers + ii);
}
}
} else {
/* Notify anyone interested in this event... */
if (instance->vbucket_state_listener != NULL) {
for (size_t ii = 0; ii < instance->nservers; ++ii) {
start_sasl_auth_server(instance->servers + ii);
instance->vbucket_state_listener(instance->servers + ii);
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/event.c
Expand Up @@ -127,6 +127,9 @@ static void do_send_data(libmembase_server_t *c)
return;
default:
// FIXME!
fprintf(stderr, "Failed to write data: %s\n",
strerror(errno));
fflush(stderr);
abort();
}
} else {
Expand Down Expand Up @@ -159,11 +162,11 @@ void libmembase_server_event_handler(evutil_socket_t sock, short which, void *ar
}

if (c->output.avail == 0) {
libmembase_server_update_event(c, EV_READ,
libmembase_server_event_handler);
libmembase_server_update_event(c, EV_READ,
libmembase_server_event_handler);
} else {
libmembase_server_update_event(c, EV_READ | EV_WRITE,
libmembase_server_event_handler);
libmembase_server_update_event(c, EV_READ | EV_WRITE,
libmembase_server_event_handler);
}

if (c->instance->execute) {
Expand Down
4 changes: 2 additions & 2 deletions src/get.c
Expand Up @@ -60,7 +60,7 @@ libmembase_error_t libmembase_mget(libmembase_t instance,

for (size_t ii = 0; ii < instance->nservers; ++ii) {
libmembase_server_t *server = instance->servers + ii;
if (server->output.avail > 0) {
if (server->output.avail > 0 || server->pending.avail > 0) {
protocol_binary_request_noop req = {
.message.header.request = {
.magic = PROTOCOL_BINARY_REQ,
Expand All @@ -71,7 +71,7 @@ libmembase_error_t libmembase_mget(libmembase_t instance,
};
libmembase_server_complete_packet(server, req.bytes,
sizeof(req.bytes));
libmembase_server_event_handler(0, EV_WRITE, server);
libmembase_server_send_packets(server);
}
}

Expand Down
15 changes: 7 additions & 8 deletions src/handler.c
Expand Up @@ -220,10 +220,12 @@ static void sasl_list_mech_response_handler(libmembase_server_t *server,
.bodylen = ntohl((uint32_t)(bodysize))
}
};
libmembase_server_start_packet(server, req.bytes, sizeof(req.bytes));
libmembase_server_write_packet(server, chosenmech, keylen);
libmembase_server_write_packet(server, data, len);
libmembase_server_end_packet(server);
libmembase_server_buffer_start_packet(server, &server->output,
req.bytes, sizeof(req.bytes));
libmembase_server_buffer_write_packet(server, &server->output,
chosenmech, keylen);
libmembase_server_buffer_write_packet(server, &server->output, data, len);
libmembase_server_buffer_end_packet(server, &server->output);

// send the data and add it to libevent..
libmembase_server_event_handler(0, EV_WRITE, server);
Expand All @@ -234,12 +236,9 @@ static void sasl_auth_response_handler(libmembase_server_t *server,
{
uint16_t ret = ntohs(res->response.status);
if (ret == PROTOCOL_BINARY_RESPONSE_SUCCESS) {
// I should put the server to the notification!
if (server->instance->vbucket_state_listener != NULL) {
server->instance->vbucket_state_listener(server);
}
sasl_dispose(&server->sasl_conn);
server->sasl_conn = NULL;
libmembase_server_connected(server);
} else if (ret == PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE) {
// I don't know how to step yet ;-)
abort();
Expand Down

0 comments on commit db39157

Please sign in to comment.