Skip to content

Commit

Permalink
Fixed issue where it will segfault when using ignore* options.
Browse files Browse the repository at this point in the history
This problem emerged after the implementation of the multithread feature in
commit 1e116d0. Additionally, it involves refactoring some of the code to align
more closely with the processing methodology used prior to the introduction of
multithread parsing.
  • Loading branch information
allinurl committed Dec 31, 2023
1 parent 0550432 commit 728fd7d
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 87 deletions.
13 changes: 8 additions & 5 deletions src/goaccess.c
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ read_client (void *ptr_data) {
/* Parse tailed lines */
static void
parse_tail_follow (GLog *glog, FILE *fp) {
GLogItem *logitem;
GLogItem *logitem = NULL;
#ifdef WITH_GETLINE
char *buf = NULL;
#else
Expand All @@ -783,13 +783,16 @@ parse_tail_follow (GLog *glog, FILE *fp) {
while (fgets (buf, LINE_BUFFER, fp) != NULL) {
#endif
pthread_mutex_lock (&gdns_thread.mutex);
logitem = parse_line (glog, buf, 0);
if ((parse_line (glog, buf, 0, &logitem)) == 0 && logitem != NULL) {
printf ("enter prccess_log\n");
process_log (logitem);
}
if (logitem != NULL) {
if (logitem->errstr == NULL)
process_log (logitem);
count_process (glog);
free_glog (logitem);
logitem = NULL;
}
printf ("**count_process\n");
//count_process (glog);
pthread_mutex_unlock (&gdns_thread.mutex);
glog->bytes += strlen (buf);
#ifdef WITH_GETLINE
Expand Down
178 changes: 101 additions & 77 deletions src/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -1848,65 +1848,90 @@ parse_json_format (GLogItem *logitem, char *str) {
return parse_json_string (logitem, str, parse_json_specifier);
}

/* Atomically updates glog->lp.ts with the maximum timestamp value from
* logitem->dt.
*
* On error (if mktime fails), returns -1.
* On success, returns the updated timestamp value, which is also stored in
* glog->lp.ts.
*/
static int
atomic_lpts_update (GLog *glog, GLogItem *logitem) {
int64_t oldts = 0, newts = 0;
/* atomic update loop */
newts = mktime (&logitem->dt); // Get timestamp from logitem->dt
while (!__sync_bool_compare_and_swap (&glog->lp.ts, oldts, newts)) {
oldts = glog->lp.ts; /* Reread glog->lp.ts if CAS failed */
if (oldts >= newts) {
break; /* No need to update if oldts is already greater */
}
}

return newts;
}

static int
cleanup_logitem (int ret, GLogItem *logitem) {
free_glog (logitem);
return ret;
}

/* Process a line from the log and store it accordingly taking into
* account multiple parsing options prior to setting data into the
* corresponding data structure.
*
* On error, logitem->errstr will contains the error message. */
GLogItem *
parse_line (GLog *glog, char *line, int dry_run) {
int64_t oldts, newts;
GLogItem *logitem;
int ret = 0;
int
parse_line (GLog *glog, char *line, int dry_run, GLogItem **logitem_out) {
char *fmt = conf.log_format;

logitem = init_log_item (glog);
int ret = 0;
GLogItem *logitem = NULL;

/* soft ignore these lines */
if (valid_line (line)) {
logitem->errstr = xstrdup ("Invalid line");
return logitem;
printf ("^^^^soft invalid\n");
return -1;
}

logitem = init_log_item (glog);

/* Parse a line of log, and fill structure with appropriate values */
if (conf.is_json_log_format)
ret = parse_json_format (logitem, line);
else
ret = parse_format (logitem, line, fmt);

/* invalid log line (format issue) */
if (ret) {
if (logitem->errstr == NULL)
logitem->errstr = xstrdup ("Parse format error");
printf ("^^^^invalid fields\n");
process_invalid (glog, logitem, line);
return logitem;
return cleanup_logitem (ret, logitem);
}

if (!glog->piping && conf.fname_as_vhost && glog->fname_as_vhost)
logitem->vhost = xstrdup (glog->fname_as_vhost);

if (verify_missing_fields (logitem)) {
logitem->errstr = xstrdup ("Missing fields");
/* valid format but missing fields */
if (ret || (ret = verify_missing_fields (logitem))) {
printf ("^^^^missing fields\n");
process_invalid (glog, logitem, line);
return logitem;
return cleanup_logitem (ret, logitem);
}
// glog->lp.ts = max(glog->lp.ts, logitem->dt)
newts = mktime (&logitem->dt);
for (;;) {
oldts = glog->lp.ts;
if (oldts >= newts)
break;
if (__sync_bool_compare_and_swap (&glog->lp.ts, oldts, newts))
break;

/* From here on, valid format but possible ignoring of lines */
if (atomic_lpts_update (glog, logitem) == -1)
return cleanup_logitem (ret, logitem);

if (should_restore_from_disk (glog)) {
printf ("^^^^restore from disk\n");
return cleanup_logitem (ret, logitem);
}
if (newts == -1)
return logitem;

if (should_restore_from_disk (glog))
return logitem;
count_process (glog);

/* testing log only */
if (dry_run)
return logitem;
return cleanup_logitem (ret, logitem);

/* agent will be null in cases where %u is not specified */
if (logitem->agent == NULL) {
Expand All @@ -1917,51 +1942,71 @@ parse_line (GLog *glog, char *line, int dry_run) {
logitem->ignorelevel = ignore_line (logitem);
/* ignore line */
if (logitem->ignorelevel == IGNORE_LEVEL_PANEL)
return logitem;
return cleanup_logitem (ret, logitem);

if (is_404 (logitem))
logitem->is_404 = 1;
else if (is_static (logitem->req))
logitem->is_static = 1;

logitem->uniq_key = get_uniq_visitor_key (logitem);
*logitem_out = logitem;

return logitem;
return ret;
}

/* Entry point to process the given line from the log.
*
* On error, NULL is returned.
* On success or soft ignores, GLogItem is returned. */
static GLogItem *
read_line (GLog *glog, char *line, int *test, int *cnt, int dry_run) {
GLogItem *logitem;
read_line (GLog *glog, char *line, int *test, uint32_t *cnt, int dry_run) {
GLogItem *logitem = NULL;
int ret = 0;

/* start processing log line */
logitem = parse_line (glog, line, dry_run);
if (logitem != NULL) {
/* soft ignore */
if (logitem->errstr != NULL && strcmp (logitem->errstr, "Invalid line") == 0)
return logitem;
/* flip the test if at least one valid record was found during our log format
* test */
if (logitem->errstr == NULL)
*test = 0;
}
/* Begin processing the log line - in case of an invalid log format, flip
* the test only if there's at least one valid record discovered during the log
* format test. This condition applies solely when reading a log from the
* beginning, not when tailing an ongoing log. */
if ((ret = parse_line (glog, line, dry_run, &logitem)) == 0)
*test = 0;

/* soft ignore these lines from parse_line */
if (ret == -1)
return NULL;

/* reached num of lines to test and no valid records were found, log
* format is likely not matching */
if (conf.num_tests && ++(*cnt) >= (int) conf.num_tests && *test) {
if (conf.num_tests && ++(*cnt) >= conf.num_tests && *test) {
uncount_processed (glog);
uncount_invalid (glog);
if (logitem != NULL)
free_glog (logitem);
return NULL;
}

return logitem;
}

/* Parse chunk of lines to logitems */
static void *
read_lines_thread (void *arg) {
GJob *job = (GJob *) arg;

for (int i = 0; i < job->p; i++) {
/* ensure we don't process more than we should when testing for log format,
* else free chunk and stop processing threads */
if (!job->test || (job->test && job->cnt < conf.num_tests))
job->logitems[i] =
read_line (job->glog, job->lines[i], &job->test, &job->cnt, job->dry_run);
else
conf.stop_processing = 1;

#ifdef WITH_GETLINE
free (job->lines[i]);
#endif
}
return (void *) 0;
}

/* A replacement for GNU getline() to dynamically expand fgets buffer.
*
* On error, NULL is returned.
Expand Down Expand Up @@ -2006,37 +2051,16 @@ fgetline (FILE *fp) {
return NULL;
}

/* Parse chunk of lines to logitems */
void *
read_lines_thread (void *arg) {
GJob *job = (GJob *) arg;

for (int i = 0; i < job->p; i++) {
/* ensure we don't process more than we should when testing for log format,
* else free chunk and stop processing threads */
if (!job->test || (job->test && job->cnt < conf.num_tests))
job->logitems[i] =
read_line (job->glog, job->lines[i], &job->test, &job->cnt, job->dry_run);
else
conf.stop_processing = 1;

#ifdef WITH_GETLINE
free (job->lines[i]);
#endif
}
return (void *) 0;
}

void *
process_lines_thread (void *arg) {
GJob *job = (GJob *) arg;
for (int i = 0; i < job->p; i++) {
if (job->logitems[i] == NULL)
break;
if (!job->dry_run && job->logitems[i]->errstr == NULL)
if (job->logitems[i] != NULL && !job->dry_run && job->logitems[i]->errstr == NULL) {
printf ("==process_log\n");

process_log (job->logitems[i]);
count_process (job->glog);
free_glog (job->logitems[i]);
free_glog (job->logitems[i]);
}
}
return (void *) 0;
}
Expand All @@ -2049,8 +2073,8 @@ process_lines_thread (void *arg) {
* On success, 0 is returned. */
static int
read_lines (FILE *fp, GLog *glog, int dry_run) {
int b, k, cnt = 0, test = conf.num_tests > 0 ? 1 : 0;
void *status;
int b = 0, k = 0, cnt = 0, test = conf.num_tests > 0 ? 1 : 0;
void *status = NULL;
char *s = NULL;
GJob jobs[2][conf.jobs];
pthread_t threads[conf.jobs];
Expand All @@ -2065,11 +2089,11 @@ read_lines (FILE *fp, GLog *glog, int dry_run) {
jobs[b][k].test = test;
jobs[b][k].dry_run = dry_run;
jobs[b][k].running = 0;
jobs[b][k].logitems = calloc (conf.chunk_size, sizeof (GLogItem));
jobs[b][k].lines = calloc (conf.chunk_size, sizeof (char *));
jobs[b][k].logitems = xcalloc (conf.chunk_size, sizeof (GLogItem));
jobs[b][k].lines = xcalloc (conf.chunk_size, sizeof (char *));
#ifndef WITH_GETLINE
for (int i = 0; i < conf.chunk_size; i++)
jobs[b][k].lines[i] = calloc (LINE_BUFFER, sizeof (char));
jobs[b][k].lines[i] = xcalloc (LINE_BUFFER, sizeof (char));
#endif
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ typedef struct Logs_ {

/* Pthread jobs for multi-thread */
typedef struct GJob_ {
int p, cnt, test, dry_run, running;
uint32_t cnt;
int p, test, dry_run, running;
GLog *glog;
GLogItem **logitems;
char **lines;
Expand Down Expand Up @@ -194,18 +195,17 @@ typedef struct GRawData_ {
char *extract_by_delim (const char **str, const char *end);
char *fgetline (FILE * fp);
char **test_format (Logs * logs, int *len);
int parse_line (GLog * glog, char *line, int dry_run, GLogItem ** logitem_out);
int parse_log (Logs * logs, int dry_run);
GLogItem *parse_line (GLog * glog, char *line, int dry_run);
void *read_lines_thread (void *arg);
void *process_lines_thread (void *arg);
int set_glog (Logs * logs, const char *filename);
int set_initial_persisted_data (GLog * glog, FILE * fp, const char *fn);
int set_log (Logs * logs, const char *value);
void free_glog (GLogItem * logitem);
void free_logerrors (GLog * glog);
void free_logs (Logs * logs);
void free_glog (GLogItem * logitem);
void free_raw_data (GRawData * raw_data);
void output_logerrors (void);
void *process_lines_thread (void *arg);
void reset_struct (Logs * logs);

GLogItem *init_log_item (GLog * glog);
Expand Down

0 comments on commit 728fd7d

Please sign in to comment.