From 07143d21f273ce39e1dafa0f07aaae5df495e777 Mon Sep 17 00:00:00 2001 From: "Neil T. Dantam" Date: Wed, 18 Feb 2015 17:41:25 -0600 Subject: [PATCH] Fix achcat buffers and signal handling * malloc buffer for received messages and resize on ACH_OVERFLOW * Install signal handler to set shutdown flag and call ach_cancel() --- src/achcat.c | 63 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/src/achcat.c b/src/achcat.c index 55f07a5..5706e0f 100644 --- a/src/achcat.c +++ b/src/achcat.c @@ -66,6 +66,7 @@ #include #include #include +#include #include "ach.h" #include "achutil.h" @@ -76,9 +77,11 @@ int subscribe(ach_channel_t *); int pub_chan(ach_channel_t *, ach_attr_t *); int sub_chan(ach_channel_t *, ach_attr_t *); -char pbuffer[4096]; -char sbuffer[8]; +ach_channel_t g_pub, g_sub; +bool g_sub_open = false; + +char pbuffer[4096]; /* options */ int opt_msg_size = 256; @@ -91,10 +94,19 @@ int opt_sub = 0; FILE *fin; FILE *fout; - +volatile sig_atomic_t g_shutdown = 0; static void sighandler(int sig, siginfo_t *siginfo, void *context) { (void)sig; (void)siginfo; (void)context; + g_shutdown = 1; + if( g_sub_open ) { + enum ach_status r = ach_cancel(&g_sub, NULL); + if( ACH_OK != r ) { + const char msg[] = "failed to cancel\n"; + ssize_t i = write(STDERR_FILENO, msg, sizeof(msg)); + (void)i; + } + } } /** setup the signal handler */ @@ -130,8 +142,11 @@ void *publish_loop(void* pub) { int publish( ach_channel_t *chan) { + + sighandler_install(); + enum ach_status r = ACH_OK; - while(1) { + while(! g_shutdown) { char *fr; /* get size */ fr = fgets( pbuffer, (int)sizeof(pbuffer), fin ); @@ -156,14 +171,17 @@ int publish( ach_channel_t *chan) { int subscribe( ach_channel_t *chan) { + size_t size = 8; + char *buf = (char*)malloc(size); sighandler_install(); ach_status_t r; - while(1) { + errno = 0; + while(!g_shutdown) { size_t frame_size = 0; size_t fr; - r = ach_get( chan, sbuffer, sizeof(sbuffer), &frame_size, NULL, 0 ); + r = ach_get( chan, buf, size, &frame_size, NULL, ACH_O_WAIT ); switch(r) { case ACH_OK: case ACH_MISSED_FRAME: @@ -171,6 +189,13 @@ int subscribe( ach_channel_t *chan) { case ACH_STALE_FRAMES: usleep(1000); continue; + case ACH_OVERFLOW: + free(buf); + size = frame_size; + buf = (char*)malloc(frame_size); + continue; + case ACH_CANCELED: + continue; default: fprintf(stderr, "sub: ach_error: %s (%d), %s (%d)\n", ach_result_to_string(r), r, strerror(errno), errno); @@ -178,19 +203,20 @@ int subscribe( ach_channel_t *chan) { } /*fprintf(stderr, "sub: got %d bytes\n", frame_size);*/ /*fprintf(stderr, "sub: %s\n", sbuffer);*/ - fr = fwrite( sbuffer, sizeof(char), frame_size, fout ); + fr = fwrite( buf, sizeof(char), frame_size, fout ); if ( fr != frame_size ) { r = ACH_OK; break; } fflush(fout); } + /*fprintf(stderr,"end of subscribe\n");*/ enum ach_status r2 = ach_close( chan ); if( ACH_OK != r2 ) { fprintf(stderr, "another error on ach_close(): %s\n", ach_result_to_string(r)); } - return r; + return g_shutdown ? ACH_OK : r ; } @@ -241,23 +267,21 @@ int main( int argc, char **argv ) { return 1; } - ach_channel_t pub, sub; - memset( &pub, 0, sizeof(pub)); - memset( &sub, 0, sizeof(sub)); { enum ach_status r; ach_attr_t attr; ach_attr_init( &attr ); /*attr.map_anon = opt_pub && opt_sub;*/ if( opt_pub && ! opt_sub ) { - r = ach_open( &pub, opt_chan_name, &attr ); + r = ach_open( &g_pub, opt_chan_name, &attr ); if( ACH_OK != r ) abort(); } else if( opt_sub && !opt_pub ) { - r = ach_open( &sub, opt_chan_name, &attr ); + r = ach_open( &g_sub, opt_chan_name, &attr ); if( ACH_OK != r ) { fprintf(stderr, "Could not sub channel: %s\n", ach_result_to_string(r) ); abort(); } + g_sub_open = true; }else if (opt_pub && opt_sub ) { ach_create_attr_t cattr; ach_create_attr_init( &cattr ); @@ -270,9 +294,10 @@ int main( int argc, char **argv ) { assert( cattr.shm ); attr.map = ACH_MAP_ANON; attr.shm = cattr.shm; - r = ach_open( &sub, opt_chan_name, &attr ); + r = ach_open( &g_sub, opt_chan_name, &attr ); if( ACH_OK != r ) abort(); - r = ach_open( &pub, opt_chan_name, &attr ); + g_sub_open = true; + r = ach_open( &g_pub, opt_chan_name, &attr ); if( ACH_OK != r ) abort(); } else { assert(0); @@ -283,16 +308,16 @@ int main( int argc, char **argv ) { /* check for io case */ if( opt_pub && opt_sub ) { pthread_t pub_thread; - pthread_create( &pub_thread, NULL, publish_loop, &pub ); + pthread_create( &pub_thread, NULL, publish_loop, &g_pub ); - subscribe(&sub); + subscribe(&g_sub); void *v; pthread_join(pub_thread, &v); return 0; } /* normal cases */ - if( opt_sub ) return subscribe(&sub); - if( opt_pub ) return publish(&pub); + if( opt_sub ) return subscribe(&g_sub); + if( opt_pub ) return publish(&g_pub); assert( 0 ); return 0; }