In [133]:
import os
import sys


class LogProcessor:
    def __init__(self):
        self.log_level_counters = {}
        self.total_lines_processed = 0
        self.parse_failed = 0
        self.loggers = {}

        # keep contract explicit
        self.valid_levels = {"INFO", "WARN", "ERROR"}

    def inspect_log_file(self, file_path: str):
        if not os.path.exists(file_path):  # fail early file not found
            raise FileNotFoundError(f"File not found: {file_path}")

        try:
            with open(file_path, "r", encoding="utf-8") as zk_logs:
                for raw_line in zk_logs:
                    line = raw_line.strip()

                    if not line:
                        self.parse_failed += 1
                        continue

                    # Keep your approach: split once on " - "
                    event = line.split(" - ", 2)

                    if len(event) >= 2:
                        # event[1] is like: "WARN  [QuorumPeer...]" (level + bracket section)
                        level_token = event[1].split()[0] if event[1].split() else "UNKNOWN"
                        level = level_token if level_token in self.valid_levels else "UNKNOWN"

                        self._increment_log_level_counter(level)  # LogLevel counting

                        # IMPORTANT FIX:
                        # previously doing: event[1].split()[1]
                        # That isn't reliable. Extract logger/source from the actual bracket content in the full line.
                        raw_logger = self._extract_bracket_content(line)  # e.g. "QuorumPeer[myid=1](plain=...)..."
                        self._count_loggers(raw_logger)

                    else:
                        # malformed line (can't split into expected segments)
                        self._increment_log_level_counter("UNKNOWN")
                        self._count_loggers("")  # count as UNKNOWN_SOURCE

                    self.total_lines_processed += 1

                return self.get_file_report()

        except OSError as err:
            print(f"Failed to inspect file: {err}")
            raise

    def _increment_log_level_counter(self, log_level: str):
        self.log_level_counters[log_level] = self.log_level_counters.get(log_level, 0) + 1

    def _get_level_counters(self):
        # stable order for nicer reports
        ordered = ["INFO", "WARN", "ERROR", "UNKNOWN"]
        pieces = []
        for lvl in ordered:
            if lvl in self.log_level_counters:
                pieces.append(f"{lvl} {self.log_level_counters[lvl]}")

        # include any unexpected levels (just in case)
        for lvl, cnt in self.log_level_counters.items():
            if lvl not in ordered:
                pieces.append(f"{lvl} {cnt}")
        return "\t".join(pieces)

    # ------------------------
    # Logger extraction helpers
    # ------------------------

    def _extract_bracket_content(self, line: str) -> str:
        """
        Extract the first [...] block from the line.
        Returns "" if not found or malformed.
        """
        start = line.find("[")
        if start == -1:
            return ""
        end = line.find("]", start + 1)
        if end == -1 or end <= start:
            return ""
        return line[start + 1 : end]

    def sanitize_logger(self, logger: str) -> str:
        """
        Your original sanitization was deleting separators and "gluing" tokens together.
        That produced outputs like: 10.10.34.113888QuorumCnxManagerListener493

        Instead, we keep a STABLE definition of "logger/source":
          - take the bracket content (already extracted)
          - keep only the part BEFORE the first ':' (common ZK pattern)
            e.g. "NIOServerCxn.Factory:0.0.0.0/..." -> "NIOServerCxn.Factory"
          - if empty/malformed -> UNKNOWN_SOURCE
        """
        if not logger:
            return "UNKNOWN_SOURCE"

        # stable source name = left side before first ':'
        stable = logger.split(":", 1)[0].strip()
        if not stable:
            return "UNKNOWN_SOURCE"

        # light clean-up: keep readable chars, but DON'T glue meaningfully separated tokens
        # (we allow dots, underscores, and hyphens too)
        clean = []
        for ch in stable:
            if ch.isalnum() or ch in {".", "_", "-"}:
                clean.append(ch)
        stable_clean = "".join(clean).strip()

        return stable_clean if stable_clean else "UNKNOWN_SOURCE"

    def _count_loggers(self, logger: str):
        clean_logger = self.sanitize_logger(logger)
        self.loggers[clean_logger] = self.loggers.get(clean_logger, 0) + 1

    def get_top_five_loggers(self):
        """
        Your version returned the first 5 inserted, not the top 5 by count.
        This one sorts by count descending.
        """
        if not self.loggers:
            return "UNKNOWN_SOURCE 0"

        items = sorted(self.loggers.items(), key=lambda kv: kv[1], reverse=True)
        top_five = items[:5]
        return "\n".join(f"{name} {count}" for name, count in top_five)

    def get_file_report(self):
        return f"""
Log Level Counters:\t{self._get_level_counters()}
Total lines processed:\t{self.total_lines_processed}
Total Failed to process lines:\t{self.parse_failed}
Top 5 Loggers:
{self.get_top_five_loggers()}
"""





In [134]:
# ---- pseudo main (jupyter-friendly) ----
try:
    log_processor = LogProcessor()

    # keep your path style; adjust if needed in your notebook environment
    zookeeper_logs = "data/Zookeeper_2k.log"

    print(log_processor.inspect_log_file(zookeeper_logs))

except OSError as error:
    print(f"Failed to Load file with error: {error}")
    sys.exit(2)



Log Level Counters:	INFO 669	WARN 1318	ERROR 13
Total lines processed:	2000
Total Failed to process lines:	0
Top 5 Loggers:
SendWorker 576
RecvWorker 557
NIOServerCxn.Factory 222
QuorumPeermyid1 130
10.10.34.13 106



In [None]:
# ---- pseudo main (CLI-friendly: read path from sys.argv[1]) ----
try:
    log_processor = LogProcessor()

    # Validate user input
    if len(sys.argv) != 2:
        print(f"Usage: {sys.argv[0]} <path_to_log_file>")
        sys.exit(2)

    zookeeper_logs = sys.argv[1].strip()
    if not zookeeper_logs:
        print("Error: log file path is empty.")
        sys.exit(2)

    # Run
    report = log_processor.inspect_log_file(zookeeper_logs)
    print(report)

except FileNotFoundError as error:
    print(f"Error: {error}")
    sys.exit(2)

except OSError as error:
    print(f"Failed to load file with error: {error}")
    sys.exit(2)
