Skip to content

Commit

Permalink
Fix achcat buffers and signal handling
Browse files Browse the repository at this point in the history
* malloc buffer for received messages and resize on ACH_OVERFLOW
* Install signal handler to set shutdown flag and call ach_cancel()
  • Loading branch information
Neil T. Dantam committed Feb 18, 2015
1 parent a96e949 commit 07143d2
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions src/achcat.c
Expand Up @@ -66,6 +66,7 @@
#include <fcntl.h>
#include <inttypes.h>
#include <errno.h>
#include <stdbool.h>
#include "ach.h"
#include "achutil.h"

Expand All @@ -76,9 +77,11 @@ int subscribe(ach_channel_t *);
int pub_chan(ach_channel_t *, ach_attr_t *);
int sub_chan(ach_channel_t *, ach_attr_t *);

char pbuffer[4096];
char sbuffer[8];

ach_channel_t g_pub, g_sub;
bool g_sub_open = false;

char pbuffer[4096];

/* options */
int opt_msg_size = 256;
Expand All @@ -91,10 +94,19 @@ int opt_sub = 0;
FILE *fin;
FILE *fout;


volatile sig_atomic_t g_shutdown = 0;

static void sighandler(int sig, siginfo_t *siginfo, void *context) {
(void)sig; (void)siginfo; (void)context;
g_shutdown = 1;
if( g_sub_open ) {
enum ach_status r = ach_cancel(&g_sub, NULL);
if( ACH_OK != r ) {
const char msg[] = "failed to cancel\n";
ssize_t i = write(STDERR_FILENO, msg, sizeof(msg));
(void)i;
}
}
}

/** setup the signal handler */
Expand Down Expand Up @@ -130,8 +142,11 @@ void *publish_loop(void* pub) {


int publish( ach_channel_t *chan) {

sighandler_install();

enum ach_status r = ACH_OK;
while(1) {
while(! g_shutdown) {
char *fr;
/* get size */
fr = fgets( pbuffer, (int)sizeof(pbuffer), fin );
Expand All @@ -156,41 +171,52 @@ int publish( ach_channel_t *chan) {

int subscribe( ach_channel_t *chan) {

size_t size = 8;
char *buf = (char*)malloc(size);

sighandler_install();

ach_status_t r;
while(1) {
errno = 0;
while(!g_shutdown) {
size_t frame_size = 0;
size_t fr;
r = ach_get( chan, sbuffer, sizeof(sbuffer), &frame_size, NULL, 0 );
r = ach_get( chan, buf, size, &frame_size, NULL, ACH_O_WAIT );
switch(r) {
case ACH_OK:
case ACH_MISSED_FRAME:
break;
case ACH_STALE_FRAMES:
usleep(1000);
continue;
case ACH_OVERFLOW:
free(buf);
size = frame_size;
buf = (char*)malloc(frame_size);
continue;
case ACH_CANCELED:
continue;
default:
fprintf(stderr, "sub: ach_error: %s (%d), %s (%d)\n",
ach_result_to_string(r), r, strerror(errno), errno);
exit(-1);
}
/*fprintf(stderr, "sub: got %d bytes\n", frame_size);*/
/*fprintf(stderr, "sub: %s\n", sbuffer);*/
fr = fwrite( sbuffer, sizeof(char), frame_size, fout );
fr = fwrite( buf, sizeof(char), frame_size, fout );
if ( fr != frame_size ) {
r = ACH_OK;
break;
}
fflush(fout);
}

/*fprintf(stderr,"end of subscribe\n");*/
enum ach_status r2 = ach_close( chan );
if( ACH_OK != r2 ) {
fprintf(stderr, "another error on ach_close(): %s\n", ach_result_to_string(r));
}
return r;
return g_shutdown ? ACH_OK : r ;
}


Expand Down Expand Up @@ -241,23 +267,21 @@ int main( int argc, char **argv ) {
return 1;
}

ach_channel_t pub, sub;
memset( &pub, 0, sizeof(pub));
memset( &sub, 0, sizeof(sub));
{
enum ach_status r;
ach_attr_t attr;
ach_attr_init( &attr );
/*attr.map_anon = opt_pub && opt_sub;*/
if( opt_pub && ! opt_sub ) {
r = ach_open( &pub, opt_chan_name, &attr );
r = ach_open( &g_pub, opt_chan_name, &attr );
if( ACH_OK != r ) abort();
} else if( opt_sub && !opt_pub ) {
r = ach_open( &sub, opt_chan_name, &attr );
r = ach_open( &g_sub, opt_chan_name, &attr );
if( ACH_OK != r ) {
fprintf(stderr, "Could not sub channel: %s\n", ach_result_to_string(r) );
abort();
}
g_sub_open = true;
}else if (opt_pub && opt_sub ) {
ach_create_attr_t cattr;
ach_create_attr_init( &cattr );
Expand All @@ -270,9 +294,10 @@ int main( int argc, char **argv ) {
assert( cattr.shm );
attr.map = ACH_MAP_ANON;
attr.shm = cattr.shm;
r = ach_open( &sub, opt_chan_name, &attr );
r = ach_open( &g_sub, opt_chan_name, &attr );
if( ACH_OK != r ) abort();
r = ach_open( &pub, opt_chan_name, &attr );
g_sub_open = true;
r = ach_open( &g_pub, opt_chan_name, &attr );
if( ACH_OK != r ) abort();
} else {
assert(0);
Expand All @@ -283,16 +308,16 @@ int main( int argc, char **argv ) {
/* check for io case */
if( opt_pub && opt_sub ) {
pthread_t pub_thread;
pthread_create( &pub_thread, NULL, publish_loop, &pub );
pthread_create( &pub_thread, NULL, publish_loop, &g_pub );

subscribe(&sub);
subscribe(&g_sub);
void *v;
pthread_join(pub_thread, &v);
return 0;
}
/* normal cases */
if( opt_sub ) return subscribe(&sub);
if( opt_pub ) return publish(&pub);
if( opt_sub ) return subscribe(&g_sub);
if( opt_pub ) return publish(&g_pub);
assert( 0 );
return 0;
}

0 comments on commit 07143d2

Please sign in to comment.