In [None]:
import polars as pl
import traceback
pl.Config(tbl_rows=30)

In [None]:
LOG_FILE = '/usr/local/var/log/ichrisbirch/ichrisbirch.log'

In [None]:
def log_file_to_polars_df(filename: str, debug=False, return_errors=False) -> pl.DataFrame:
    schema = {
        'log_level': pl.Categorical,
        'timestamp': pl.Datetime,
        'logger_name': pl.String,
        'func_name': pl.String,
        'lineno': pl.Int16,
        'message': pl.String,
    }
    num_errors = 0
    previous_line = ''
    next_line = ''
    errors = []
    was_error = False
    last_error = None

    def _process_log_line(line: str):
        nonlocal num_errors
        nonlocal previous_line
        nonlocal next_line
        nonlocal was_error
        nonlocal last_error
        try:
            part, message = line.strip().split('|')
            log_level, timestamp, part = part.strip().rsplit(' ', maxsplit=2)
            logger_name, func_name, lineno = part.strip().split(':')
            previous_line = line
            if was_error:
                errors.append(f'NXT LINE: {line.strip()}\n')
            was_error = False
            last_error = None
        except Exception:
            num_errors += 1
            errors.append(traceback.format_exc())
            errors.append(f'PRE LINE: {previous_line.strip()}')
            errors.append(f'ERR LINE: {line.strip()}')
            previous_line = line
            was_error = True
            return None
        return {
            'log_level': log_level.strip('[] ').strip(),
            'timestamp': timestamp.strip(),
            'logger_name': logger_name.strip(),
            'func_name': func_name.strip(),
            'lineno': lineno.strip(),
            'message': message.strip(),
        }

    def _cast_columns(df, schema: dict):
        casts = [pl.col(k).cast(v) for k, v in schema.items()]
        return df.select(*casts)

    def _process_log_file(filename):
        with open(filename) as f:
            lines = []
            for line in f:
                if processed := _process_log_line(line):
                    lines.append(processed)
        return lines

    log_lines = _process_log_file(filename)
    df = pl.DataFrame(log_lines)
    converted = _cast_columns(df, schema)
    num_logs = len(log_lines)
    print(f'total errors while processing: {num_errors}/{num_logs} - {round(num_errors / num_logs, 4)}%')
    if debug:
        print()
        for error in errors:
            print(error)
    return converted


df = log_file_to_polars_df(LOG_FILE, debug=False)
df

In [None]:
df.group_by('log_level').len().sort('len', descending=True).plot.bar(x='log_level')

In [None]:
df.group_by(['logger_name', 'log_level']).agg(pl.len()).sort(['len'], descending=True)

In [None]:
df.group_by(['func_name']).agg(pl.len()).sort(['len'], descending=True)

In [None]:
df.filter(pl.col('log_level') == 'ERROR').group_by('logger_name').agg(pl.len()).sort(['len'], descending=True)