Permalink
Browse files

ringuf read support in nfdumpp

  • Loading branch information...
root
root committed Dec 20, 2016
1 parent 4c95d13 commit e2d57c0c3830be902862a171526e0b17fe34c93e
Showing with 68 additions and 11 deletions.
  1. +6 −2 ChangeLog
  2. +10 −0 bin/flist.c
  3. +1 −0 bin/flist.h
  4. +51 −9 bin/nfdumpp.c
View
@@ -1,9 +1,13 @@
Revision history for C and Perl extension of nfdump.
1.26 2016-12-xx
- - ringbuf support
- - nfdumpp: new shm output format (-o shm)
+ - c: ringbuf support to handle data beween libnf instances
+ - c: new functions lnf_ring_init, lnf_ring_info, lnf_ring_read,
+ lnf_ring_write, lnf_ring_free
+ - c: new examples examples/lnf_ex13_ring_write.c, examples/lnf_ex14_ring_read.c
+ - nfdumpp: new shm (ringbuf) output format (-o shm)
- nfdumpp: new --loop-read option for continuos reading of input file(s)
+ - nfdumpp: new --shm-read option for read data from shm (ringbuf)
1.25 2016-12-04
- libnf and modules moved to separate repositories (libnf, libnf-ffilter, libnf-perl)
View
@@ -71,6 +71,16 @@ int flist_pop(flist_t **l, char *buff) {
return 0;
}
+int flist_is_empty(flist_t **l) {
+
+ if (*l == NULL) {
+ return 1;
+ } else {
+ return 0;
+ }
+
+}
+
int flist_count(flist_t **l) {
flist_t *node;
View
@@ -11,6 +11,7 @@ typedef struct flist_s {
int flist_init(flist_t **l);
int flist_push(flist_t **l, char *name);
int flist_pop(flist_t **l, char *buff);
+int flist_is_empty(flist_t **l);
int flist_count(flist_t **l);
int flist_lookup_dir(flist_t **l, char *path);
View
@@ -34,6 +34,7 @@ lnf_filter_t *filterp;
output_t output[MAX_OUTPUTS];
int numoutputs;
int loopread = 0;
+int shmread = 0;
char filter[1024];
#define NFDUMPP_FILTER_DEFAULT 0
@@ -46,40 +47,60 @@ struct option longopts[] = {
{ "num-threads", required_argument, NULL, 1 },
{ "filter-type", required_argument, NULL, 2 },
{ "loop-read", no_argument, NULL, 3 },
+ { "shm-read", no_argument, NULL, 4 },
{ 0, 0, 0, 0 }
};
/* process one file */
-int process_file(char *filename, lnf_filter_t *filterp);
-int process_file(char *filename, lnf_filter_t *filterp) {
+int process_file(char *name, lnf_filter_t *filterp);
+int process_file(char *name, lnf_filter_t *filterp) {
lnf_file_t *filep;
lnf_rec_t *recp;
+ lnf_ring_t *ringp;
// lnf_brec1_t brec;
int i = 0;
int tid;
int match;
int o;
+ int ret;
+ char buf[LNF_MAX_STRING];
tid = (int)pthread_self();
- if (strcmp(filename, "-") == 0) {
+ /* read data from shm/ringbuf */
+ if (shmread) {
+ if (lnf_ring_init(&ringp, name, 0) != LNF_OK) {
+ fprintf(stderr, "Can not initialise ring buffer %s\n", name);
+ lnf_error(buf, LNF_MAX_STRING);
+ fprintf(stderr, "%s\n", buf);
+ return 0;
+ }
+ /* read data from stdin */
+ } else if (strcmp(name, "-") == 0) {
if (lnf_open(&filep, NULL, LNF_READ, NULL) != LNF_OK) {
fprintf(stderr, "[#%x] Can not open stdin\n", tid);
return 0;
}
+ /* read data from regular file */
} else {
- if (lnf_open(&filep, filename, LNF_READ | loopread ? LNF_READ_LOOP : 0, NULL) != LNF_OK) {
- fprintf(stderr, "[#%x] Can not open file %s\n", tid, filename);
+ if (lnf_open(&filep, name, LNF_READ | loopread ? LNF_READ_LOOP : 0, NULL) != LNF_OK) {
+ fprintf(stderr, "[#%x] Can not open file %s\n", tid, name);
return 0;
}
-
}
lnf_rec_init(&recp);
- while (lnf_read(filep, recp) != LNF_EOF) {
+ while (1) {
+ if (shmread) {
+ ret = lnf_ring_read(ringp, recp);
+ } else {
+ ret = lnf_read(filep, recp);
+ }
+
+ if (ret == LNF_EOF) { break; } /* exit from while */
i++;
@@ -95,17 +116,20 @@ int process_file(char *filename, lnf_filter_t *filterp) {
output_write(&output[o], recp);
}
}
+ }
+ if (shmread) {
+ lnf_close(filep);
+ } else {
+ lnf_ring_free(ringp);
}
- lnf_close(filep);
// printf("[#%x] Total input records in file %s : %d\n", tid, filename, i);
return i;
}
-
/* thread loop */
void *process_thread(void *p);
void *process_thread(void *p) {
@@ -149,6 +173,7 @@ void *process_thread(void *p) {
}
+
int main(int argc, char **argv) {
// lnf_rec_t *recp;
@@ -211,7 +236,13 @@ int main(int argc, char **argv) {
case 3: /* loop read */
loopread = 1;
break;
+ case 4: /* shm read */
+ shmread = 1;
+ loopread = 1; /* shm read is loop read at the same time */
+ break;
case 'r':
+ flist_push(&flist, optarg);
+ break;
case 'R':
flist_lookup_dir(&flist, optarg);
break;
@@ -271,6 +302,7 @@ int main(int argc, char **argv) {
NUM_THREADS_FACTOR * 100, numthreads);
printf(" --filter-type = nfdump|libnf : use original nfdump filter or new libnf implementation \n");
printf(" --loop-read : read input files in endless loop \n");
+ printf(" --shm-read : read data from shm ring buffer \n");
printf("\n");
exit(1);
}
@@ -313,6 +345,15 @@ int main(int argc, char **argv) {
// lnf_filter_free(filterp);
}
+ /* if there are no files defined switch to shmloop mode and use default shm name */
+ if (flist_is_empty(&flist)) {
+ printf("\n\nXXXX \n\n");
+ flist_push(&flist, RINGBUF_NAME);
+ shmread = 1;
+ numthreads = 1;
+ }
+
+
output_start(&output[0]);
/* init progress bar */
@@ -323,6 +364,7 @@ int main(int argc, char **argv) {
/* prepare and run threads */
pthread_mutex_init(&mutex, NULL);
+ /* more tahn one input file -> create thread */
for (i = 0 ; i < numthreads ; i++ ) {
if ( pthread_create(&th[i], NULL, process_thread, NULL) < 0) {
fprintf(stderr, "Can not create thread for %d\n", i);

0 comments on commit e2d57c0

Please sign in to comment.