Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
AJ ONeal committed Jan 24, 2011
1 parent f5ab9da commit 44e8706
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 166 deletions.
145 changes: 0 additions & 145 deletions paired-threaded-ipc/dummy-rc.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,151 +80,6 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)

static struct argp argp = { options, parse_opt, 0, doc };


/***********************************
*
* libev connection client
*
*/
static bool evn_stream_priv_send(struct evn_stream* stream, void* data, int size);

void evn_stream_end(EV_P_ struct evn_stream* stream)
{
evn_stream_destroy(EV_A_ stream);
}

bool evn_stream_write(EV_P_ struct evn_stream* stream, void* data, int size)
{
if (NULL == stream->_write_bufferlist)
{
if (evn_stream_priv_send(stream, data, size))
{
evn_debugs("All data was sent without queueing");
return true;
}
//stream->_write_bufferlist = evn_bufferlist_create(size, 0);
}
//evn_bufferlist_add(stream->_write_bufferlist, data, size);

// Ensure that we listen for EV_WRITE
if (!(stream->io.events & EV_WRITE))
{
ev_io_stop(EV_A_ &stream->io);
ev_io_set(&stream->io, stream->fd, EV_READ | EV_WRITE);
}
ev_io_start(EV_A_ &stream->io);

return false;
}

static bool evn_stream_priv_send(struct evn_stream* stream, void* data, int size)
{
const char* test_data = ".awesome_sauce";
//const int MAX_SEND = 4096;
int sent;

evn_debugs("priv_send");
if (NULL == data)
{
evn_debugs("no data, skipping");
return true;
}

// if the buffer exists, append the data to the buffer
// while sent != 0 and buffer != NULL
// void* data = bufferlist_peek(stream->bufferlist, MAX_SEND);
sent = send(stream->fd, test_data, strlen(test_data) + 1, MSG_DONTWAIT);
// bufferlist_shift_void(stream->bufferlist, sent);

return sent == strlen(test_data);
}

static void evn_stream_priv_on_write(EV_P_ ev_io *w, int revents)
{
struct evn_stream* stream = (struct evn_stream*) w;

evn_debugs("EV_WRITE");

evn_stream_priv_send(stream, NULL, 0);

// If the buffer is finally empty, send the `drain` event
if (NULL == stream->_write_bufferlist)
{
ev_io_stop(EV_A_ &stream->io);
ev_io_set(&stream->io, stream->fd, EV_READ);
ev_io_start(EV_A_ &stream->io);
evn_debugs("pre-drain");
if (stream->drain) { stream->drain(EV_A_ stream); }
// and the
return;
}
evn_debugs("post-null");
}

static inline void evn_stream_priv_on_activity(EV_P_ ev_io *w, int revents)
{
evn_debugs("Stream Activity");
if (revents & EV_READ)
{
// evn_stream_read_priv_cb
}
else if (revents & EV_WRITE)
{
evn_stream_priv_on_write(EV_A, w, revents);
}
else
{
// Never Happens
evn_debugs("[ERR] ev_io received something other than EV_READ or EV_WRITE");
}
}

static void evn_stream_priv_on_connect(EV_P_ ev_io *w, int revents)
{
struct evn_stream* stream = (struct evn_stream*) w;
evn_debugs("Stream Connect");

//ev_cb_set (ev_TYPE *watcher, callback)
//ev_io_set (&w, STDIN_FILENO, EV_READ);

ev_io_stop(EV_A_ &stream->io);
ev_io_init(&stream->io, evn_stream_priv_on_activity, stream->fd, EV_WRITE);
ev_io_start(EV_A_ &stream->io);
//ev_cb_set(&stream->io, evn_stream_priv_on_activity);

if (stream->connect) { stream->connect(EV_A_ stream); }
}

struct evn_stream* evn_create_connection(EV_P_ char* sock_path) {
int stream_fd;
struct evn_stream* stream;
struct sockaddr_un* sock;

stream_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (-1 == stream_fd) {
perror("[drc] socket");
exit(1);
}
stream = evn_stream_create(stream_fd);
sock = &stream->socket;

// initialize the connect callback so that it starts the stdin asap
ev_io_init(&stream->io, evn_stream_priv_on_connect, stream_fd, EV_WRITE);
ev_io_start(EV_A_ &stream->io);

sock->sun_family = AF_UNIX;
strcpy(sock->sun_path, sock_path);
stream->socket_len = strlen(sock->sun_path) + 1 + sizeof(sock->sun_family);

if (-1 == connect(stream_fd, (struct sockaddr *) sock, stream->socket_len)) {
perror("connect");
exit(EXIT_FAILURE);
}

return stream;
}


//
// Client Callbacks
//
Expand Down
159 changes: 150 additions & 9 deletions paired-threaded-ipc/evn.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#define EVN_MAX_RECV 4096
static int evn_server_unix_create(struct sockaddr_un* socket_un, char* sock_path);
static bool evn_stream_priv_send(struct evn_stream* stream, void* data, int size);
static char recv_data[EVN_MAX_RECV];

inline struct evn_stream* evn_stream_create(int fd) {
Expand All @@ -19,25 +20,28 @@ inline struct evn_stream* evn_stream_create(int fd) {
// stream->type = 0;
// stream->oneshot = false;
evn_set_nonblock(stream->fd);
ev_io_init(&stream->io, evn_stream_read_priv_cb, stream->fd, EV_READ);
ev_io_init(&stream->io, evn_stream_priv_on_read, stream->fd, EV_READ);

evn_debugs(".");
return stream;
}

int evn_stream_destroy(EV_P_ struct evn_stream* stream)
bool evn_stream_destroy(EV_P_ struct evn_stream* stream)
{
bool result;

// TODO delay freeing of server until streams have closed
// or link loop directly to stream?
ev_io_stop(EV_A_ &stream->io);
close(stream->fd);
if (stream->close) { stream->close(EV_A_ stream, false); }
result = close(stream->fd) ? true : false;
if (stream->close) { stream->close(EV_A_ stream, result); }
free(stream);
return 0;

return result;
}

// This callback is called when data is readable on the unix socket.
void evn_stream_read_priv_cb(EV_P_ ev_io *w, int revents)
void evn_stream_priv_on_read(EV_P_ ev_io *w, int revents)
{
void* data;
struct evn_exception error;
Expand Down Expand Up @@ -80,7 +84,7 @@ void evn_stream_read_priv_cb(EV_P_ ev_io *w, int revents)
}
}

void evn_server_connection_priv_cb(EV_P_ ev_io *w, int revents)
void evn_server_priv_on_connection(EV_P_ ev_io *w, int revents)
{
evn_debugs("new connection - EV_READ - server->fd has become readable");

Expand Down Expand Up @@ -162,7 +166,7 @@ int evn_server_destroy(EV_P_ struct evn_server* server)
return 0;
}

struct evn_server* evn_server_create(EV_P_ evn_server_connection_cb* on_connection)
struct evn_server* evn_server_create(EV_P_ evn_server_on_connection* on_connection)
{
struct evn_server* server = calloc(1, sizeof(struct evn_server));
server->EV_A = EV_A;
Expand Down Expand Up @@ -191,8 +195,145 @@ int evn_server_listen(struct evn_server* server, char* sock_path)
exit(EXIT_FAILURE);
}

ev_io_init(&server->io, evn_server_connection_priv_cb, server->fd, EV_READ);
ev_io_init(&server->io, evn_server_priv_on_connection, server->fd, EV_READ);
ev_io_start(server->EV_A_ &server->io);

return 0;
}

bool evn_stream_end(EV_P_ struct evn_stream* stream)
{
return evn_stream_destroy(EV_A_ stream);
}

bool evn_stream_write(EV_P_ struct evn_stream* stream, void* data, int size)
{
if (NULL == stream->_write_bufferlist)
{
if (evn_stream_priv_send(stream, data, size))
{
evn_debugs("All data was sent without queueing");
return true;
}
//stream->_write_bufferlist = evn_bufferlist_create(size, 0);
}
//evn_bufferlist_add(stream->_write_bufferlist, data, size);

// Ensure that we listen for EV_WRITE
if (!(stream->io.events & EV_WRITE))
{
ev_io_stop(EV_A_ &stream->io);
ev_io_set(&stream->io, stream->fd, EV_READ | EV_WRITE);
}
ev_io_start(EV_A_ &stream->io);

return false;
}

static bool evn_stream_priv_send(struct evn_stream* stream, void* data, int size)
{
const char* test_data = ".awesome_sauce";
//const int MAX_SEND = 4096;
int sent;

evn_debugs("priv_send");
if (NULL == data)
{
evn_debugs("no data, skipping");
return true;
}

// if the buffer exists, append the data to the buffer
// while sent != 0 and buffer != NULL
// void* data = bufferlist_peek(stream->bufferlist, MAX_SEND);
sent = send(stream->fd, test_data, strlen(test_data) + 1, MSG_DONTWAIT);
// bufferlist_shift_void(stream->bufferlist, sent);

return sent == strlen(test_data);
}

static void evn_stream_priv_on_write(EV_P_ ev_io *w, int revents)
{
struct evn_stream* stream = (struct evn_stream*) w;

evn_debugs("EV_WRITE");

evn_stream_priv_send(stream, NULL, 0);

// If the buffer is finally empty, send the `drain` event
if (NULL == stream->_write_bufferlist)
{
ev_io_stop(EV_A_ &stream->io);
ev_io_set(&stream->io, stream->fd, EV_READ);
ev_io_start(EV_A_ &stream->io);
evn_debugs("pre-drain");
if (stream->drain) { stream->drain(EV_A_ stream); }
// and the
return;
}
evn_debugs("post-null");
}

static inline void evn_stream_priv_on_activity(EV_P_ ev_io *w, int revents)
{
evn_debugs("Stream Activity");
if (revents & EV_READ)
{
// evn_stream_read_priv_cb
}
else if (revents & EV_WRITE)
{
evn_stream_priv_on_write(EV_A, w, revents);
}
else
{
// Never Happens
evn_debugs("[ERR] ev_io received something other than EV_READ or EV_WRITE");
}
}

static void evn_stream_priv_on_connect(EV_P_ ev_io *w, int revents)
{
struct evn_stream* stream = (struct evn_stream*) w;
evn_debugs("Stream Connect");

//ev_cb_set (ev_TYPE *watcher, callback)
//ev_io_set (&w, STDIN_FILENO, EV_READ);

ev_io_stop(EV_A_ &stream->io);
ev_io_init(&stream->io, evn_stream_priv_on_activity, stream->fd, EV_WRITE);
ev_io_start(EV_A_ &stream->io);
//ev_cb_set(&stream->io, evn_stream_priv_on_activity);

if (stream->connect) { stream->connect(EV_A_ stream); }
}

struct evn_stream* evn_create_connection(EV_P_ char* sock_path)
{
int stream_fd;
struct evn_stream* stream;
struct sockaddr_un* sock;

stream_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (-1 == stream_fd) {
perror("[drc] socket");
exit(1);
}
stream = evn_stream_create(stream_fd);
sock = &stream->socket;

// initialize the connect callback so that it starts the stdin asap
ev_io_init(&stream->io, evn_stream_priv_on_connect, stream_fd, EV_WRITE);
ev_io_start(EV_A_ &stream->io);

sock->sun_family = AF_UNIX;
strcpy(sock->sun_path, sock_path);
stream->socket_len = strlen(sock->sun_path) + 1 + sizeof(sock->sun_family);

if (-1 == connect(stream_fd, (struct sockaddr *) sock, stream->socket_len)) {
perror("connect");
exit(EXIT_FAILURE);
}

return stream;
}
Loading

0 comments on commit 44e8706

Please sign in to comment.