Skip to content
This repository has been archived by the owner on Jan 31, 2020. It is now read-only.

Commit

Permalink
Merge pull request #10 from alepuccetti/cors-ft-aggr
Browse files Browse the repository at this point in the history
cors-ft-aggregate now takes files on the command line, not a list of files
  • Loading branch information
alistairking committed Jun 12, 2014
2 parents aebf0cf + bd8776b commit f96ca95
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 133 deletions.
24 changes: 14 additions & 10 deletions docs/doxygen/static/tools.md
Expand Up @@ -173,13 +173,16 @@ Usage
-----

~~~
usage: cors-ft-aggregate [-l] [-i interval] [-v value_field] -f field [-f field]... file_list
usage: cors-ft-aggregate [-l] [-i interval] [-v value_field] [-f field]... [-F file_list]
flowtuple_file [flowtuple_file]
-l treat the input files as containing legacy format data
-i <interval> new distribution interval in seconds. (default: 0)
a value of -1 aggregates to a single interval
a value of 0 uses the original interval
-v <value> field to use as aggregation value (default: packet_cnt)
-f <field> a tuple field to re-aggregate with
-i <interval> new distribution interval in seconds. (default: 0)
a value of -1 aggregates to a single interval
a value of 0 uses the original interval
-v <value> field to use as aggregation value (default: packet_cnt)
-f <field> a tuple field to re-aggregate with
-F <file_list> a file with the list flowtuple files
use '-' to read the list from standard input
Supported field names are:
src_ip, dst_ip, src_port, dst_port, protocol, ttl, tcp_flags,
Expand All @@ -192,10 +195,8 @@ Supported field names are:
- include the specified field of the original tuple in the re-aggregated
output
- can be used multiple times to specify a list of fields
- `file_list`
- file containing an ordered list of \ref plugins_ft binary files to be
re-aggregated
- use `-` to read the list from stdin (for use with `find (1)` etc).
- `flowtuple_file`
- Is possibile specify multiple sorted files on the command line

### Optional Arguments ###

Expand All @@ -219,6 +220,9 @@ Supported field names are:
- e.g. `src_ip` will give a value for each tuple which is the number of unique
source IP addresses which match the sub-tuple (as specified by the `field`
arguments)
- `file_list`
- a file with the list flowtuple files
- use `-` to read the list from standard input

### Example Uses ###

Expand Down
281 changes: 158 additions & 123 deletions tools/cors-ft-aggregate.c
Expand Up @@ -492,17 +492,121 @@ static int process_flowtuple(corsaro_flowtuple_t *tuple)
return 0;
}

/** Process a flowtuple file */
int process_flowtuple_file(char *file)
{
off_t len = 0;
corsaro_in_record_type_t type = CORSARO_IN_RECORD_TYPE_NULL;
corsaro_interval_t *interval_record;
corsaro_flowtuple_t *tuple;

fprintf(stderr, "processing %s\n", file);

/* this must be done before corsaro_init_output */
if(init_corsaro(file) != 0)
{
fprintf(stderr, "failed to init corsaro\n");
clean();
return -1;
}

/* dirty hack to not -1 on the last interval in the previous file */
if(last_interval_end.time > 0)
{
last_interval_end.time+=legacy;
}

while ((len = corsaro_in_read_record(corsaro, &type, record)) > 0) {
/* we want to know the current time, so we will watch for interval start
records */
if(type == CORSARO_IN_RECORD_TYPE_IO_INTERVAL_START)
{
interval_record = (corsaro_interval_t *)
corsaro_in_get_record_data(record);

if(interval_record->time <= last_dump_end.time)
{
fprintf(stderr, "ERROR: decrease in timestamp.\n"
"Are the input files sorted properly?\n");
clean();
return -1;
}

if(flowtuple_cnt == 0)
{
last_dump_end.time = interval_record->time;
next_interval = interval_record->time + interval;
}

/* an interval of 0 means dump at the source interval */
if(last_interval_end.time > 0)
{
/* this was a non-end interval, if it is legacy, subtract
one from the last_interval_end time */
last_interval_end.time-=legacy;
if(interval == 0)
{
dump_hash();
}
else if(interval > 0)
{
while(interval_record->time >= next_interval)
{
dump_hash();
next_interval += interval;
}
}
/* else, if interval < 0, only dump at the end */
}
}
else if(type == CORSARO_IN_RECORD_TYPE_IO_INTERVAL_END)
{
interval_record = (corsaro_interval_t *)
corsaro_in_get_record_data(record);

last_interval_end.time = interval_record->time;

}
else if(type == CORSARO_IN_RECORD_TYPE_FLOWTUPLE_FLOWTUPLE)
{
tuple = (corsaro_flowtuple_t *)corsaro_in_get_record_data(record);
flowtuple_cnt++;

process_flowtuple(tuple);
}

/* reset the type to NULL to indicate we don't care */
type = CORSARO_IN_RECORD_TYPE_NULL;
}

if(len < 0)
{
fprintf(stderr, "corsaro_in_read_record failed to read record\n");
clean();
return -1;
}

clean();

return 0;

}


/** Print usage information to stderr */
static void usage(const char *name)
{
fprintf(stderr,
"usage: %s [-l] [-i interval] [-v value_field] [-f field]... file_list\n"
" -l treat the input files as containing legacy format data\n"
" -i <interval> new distribution interval in seconds. (default: 0)\n"
" a value of -1 aggregates to a single interval\n"
" a value of 0 uses the original interval\n"
" -v <value> field to use as aggregation value (default: packet_cnt)\n"
" -f <field> a tuple field to re-aggregate with\n"
"usage: %s [-l] [-i interval] [-v value_field] [-f field]... [-F file_list] \n"
" flowtuple_file [flowtuple_file]\n"
" -l treat the input files as containing legacy format data\n"
" -i <interval> new distribution interval in seconds. (default: 0)\n"
" a value of -1 aggregates to a single interval\n"
" a value of 0 uses the original interval\n"
" -v <value> field to use as aggregation value (default: packet_cnt)\n"
" -f <field> a tuple field to re-aggregate with\n"
" -F <file_list> a file with the list flowtuple files\n"
" use '-' to read the list from standard input\n"
"\n"
"Supported field names are:\n"
" src_ip, dst_ip, src_port, dst_port, protocol, ttl, tcp_flags, \n"
Expand All @@ -518,22 +622,14 @@ int main(int argc, char *argv[])

int field_cnt = 0;

/** The name of the file which contains the list of input files */
char *flist_name = NULL;
/** A pointer to the file which contains the list of input files */
FILE *flist = NULL;
/** The file currently being processed by corsaro */
char file[1024];

corsaro_in_record_type_t type = CORSARO_IN_RECORD_TYPE_NULL;
off_t len = 0;

corsaro_interval_t *interval_record;
corsaro_flowtuple_t *tuple;

int wanted_n_fields = 0;

while((opt = getopt(argc, argv, "li:f:v:?")) >= 0)
while((opt = getopt(argc, argv, "li:f:F:v:?")) >= 0)
{
switch(opt)
{
Expand Down Expand Up @@ -562,6 +658,27 @@ int main(int argc, char *argv[])
}
break;

case 'F':
/* check if a list of files has been already specified */
if (flist != NULL) {
fprintf(stderr,"a list of file has been already specified \n"
"this file is ignored: %s\n",optarg);
break;
}

/* check if the list of file is on stdin or in a file */
if(strcmp(optarg, "-") == 0)
{
flist = stdin;
}
else if((flist = fopen(optarg, "r")) == NULL)
{
fprintf(stderr, "failed to open list of input files (%s)\n"
"NB: File List MUST be sorted\n", optarg);
return -1;
}
break;

case 'v':
if(value_field >= 0)
{
Expand Down Expand Up @@ -614,33 +731,12 @@ int main(int argc, char *argv[])
exit(-1);
}

if(optind != argc - 1)
{
usage(argv[0]);
exit(-1);
}

if(value_field < 0)
{
fprintf(stderr, "No value field specified. Defaulting to packet count\n");
value_field = VALUE;
}

/* argv[1] is the list of corsaro files */
flist_name = argv[optind];

/* read each file in the list */
if(strcmp(flist_name, "-") == 0)
{
flist = stdin;
}
else if((flist = fopen(flist_name, "r")) == NULL)
{
fprintf(stderr, "failed to open list of input files (%s)\n"
"NB: File List MUST be sorted\n", flist_name);
return -1;
}

/* initialize the hash */
if(value_field == VALUE)
{
Expand All @@ -651,98 +747,38 @@ int main(int argc, char *argv[])
sixt_f = kh_init(sixt_map);
}

while(fgets(file, sizeof(file), flist) != NULL)
/* check if the user specified a list file */
if(flist != NULL)
{
/* chomp off the newline */
file[strlen(file)-1] = '\0';

fprintf(stderr, "processing %s\n", file);

/* this must be done before corsaro_init_output */
if(init_corsaro(file) != 0)
/* flist point to a file of list of paths */
while(fgets(file, sizeof(file), flist) != NULL)
{
fprintf(stderr, "failed to init corsaro\n");
clean();
return -1;
/* chomp off the newline */
file[strlen(file)-1] = '\0';
/* process flowtuple file */
if(process_flowtuple_file(file) == -1)
{
return -1;
}
}

/* dirty hack to not -1 on the last interval in the previous file */
if(last_interval_end.time > 0)
fclose(flist);
}
else /* user specified file directly on the command line */
{
if(optind >= argc)
{
last_interval_end.time+=legacy;
/* no files has been specified */
usage(argv[0]);
exit(-1);
}

while ((len = corsaro_in_read_record(corsaro, &type, record)) > 0) {
/* we want to know the current time, so we will watch for interval start
records */
if(type == CORSARO_IN_RECORD_TYPE_IO_INTERVAL_START)
{
interval_record = (corsaro_interval_t *)
corsaro_in_get_record_data(record);

if(interval_record->time <= last_dump_end.time)
{
fprintf(stderr, "ERROR: decrease in timestamp.\n"
"Are the input files sorted properly?\n");
clean();
return -1;
}

if(flowtuple_cnt == 0)
{
last_dump_end.time = interval_record->time;
next_interval = interval_record->time + interval;
}

/* an interval of 0 means dump at the source interval */
if(last_interval_end.time > 0)
{
/* this was a non-end interval, if it is legacy, subtract
one from the last_interval_end time */
last_interval_end.time-=legacy;
if(interval == 0)
{
dump_hash();
}
else if(interval > 0)
{
while(interval_record->time >= next_interval)
{
dump_hash();
next_interval += interval;
}
}
/* else, if interval < 0, only dump at the end */
}
}
else if(type == CORSARO_IN_RECORD_TYPE_IO_INTERVAL_END)
{
interval_record = (corsaro_interval_t *)
corsaro_in_get_record_data(record);

last_interval_end.time = interval_record->time;

}
else if(type == CORSARO_IN_RECORD_TYPE_FLOWTUPLE_FLOWTUPLE)
{
tuple = (corsaro_flowtuple_t *)corsaro_in_get_record_data(record);
flowtuple_cnt++;

process_flowtuple(tuple);
}

/* reset the type to NULL to indicate we don't care */
type = CORSARO_IN_RECORD_TYPE_NULL;
}

if(len < 0)
/* iterate over all the files on the command line */
for (int i = optind; i < argc; i++)
{
fprintf(stderr, "corsaro_in_read_record failed to read record\n");
clean();
return -1;
if(process_flowtuple_file(argv[i]) == -1)
{
return -1;
}
}

clean();
}

/* dump again if the hash is not empty */
Expand All @@ -766,6 +802,5 @@ int main(int argc, char *argv[])
sixt_v = NULL;
}

fclose(flist);
return 0;
}

0 comments on commit f96ca95

Please sign in to comment.