<a href="https://colab.research.google.com/github/bhagavanthai724/python-foundation-set/blob/main/14_log__analyser_%26_Incident_detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# accepts a raw log file as input
from typing import List
import sys

def read_lines_from_file(path: str) -> List[str]:
    with open(path, "r", encoding="utf-8", errors="ignore") as fh:
        return [line.rstrip("\n") for line in fh]

def read_lines_from_stdin() -> List[str]:
    return [line.rstrip("\n") for line in sys.stdin]

In [None]:
# parses logs
import re
from typing import List, Dict, Any
_SIMPLE_RE = re.compile(r'^(?P<ts>\S+)\s+(?P<level>[A-Za-z]+)\s+(?P<src>\S+)\s*[-:]\s*(?P<msg>.*)$')

def parse_line(line: str) -> Dict[str, Any]:
    m = _SIMPLE_RE.match(line.strip())
    if m:
        return {
            "raw": line,
            "timestamp_raw": m.group("ts"),
            "level_raw": m.group("level"),
            "source_raw": m.group("src"),
            "message_raw": m.group("msg").strip(),
        }

    return {
        "raw": line,
        "timestamp_raw": None,
        "level_raw": None,
        "source_raw": None,
        "message_raw": line.strip(),
    }

def parse_lines(lines: List[str]) -> List[Dict[str, Any]]:
    return [parse_line(l) for l in lines if l is not None and l != ""]

In [None]:
# normalizes logs
from typing import Dict, Any, List
from datetime import datetime

_LEVEL_MAP = {
    "debug": "DEBUG",
    "info": "INFO",
    "information": "INFO",
    "warn": "WARNING",
    "warning": "WARNING",
    "error": "ERROR",
    "critical": "CRITICAL",
    "fatal": "CRITICAL",
}

def normalize_level(raw: str):
    if not raw:
        return "INFO"
    return _LEVEL_MAP.get(raw.strip().lower(), raw.strip().upper())

def normalize_timestamp(raw_ts):
    if not raw_ts:
        return None
    try:
        if raw_ts.endswith("Z"):
            return datetime.fromisoformat(raw_ts.replace("Z", "+00:00")).isoformat()
        return datetime.fromisoformat(raw_ts).isoformat()
    except Exception:
        return raw_ts

def normalize_record(parsed: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "raw": parsed.get("raw"),
        "timestamp": normalize_timestamp(parsed.get("timestamp_raw")),
        "level": normalize_level(parsed.get("level_raw")),
        "source": parsed.get("source_raw") or "unknown",
        "message": parsed.get("message_raw") or "",
    }

def normalize_all(parsed_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    return [normalize_record(p) for p in parsed_list]

In [None]:
# extracts structured fields (timestamp, level, message)
import re
from typing import Dict, Any

_KV_RE = re.compile(r'(?P<k>[A-Za-z0-9_\-]+)=(?P<v>"[^"]*"|\'[^\']*\'|\S+)')

def extract_kv(message: str) -> Dict[str, Any]:
    result = {}
    for m in _KV_RE.finditer(message or ""):
        key = m.group("k")
        val = m.group("v")
        # strip quotes if present
        if (val.startswith('"') and val.endswith('"')) or (val.startswith("'") and val.endswith("'")):
            val = val[1:-1]
        result[key] = val
    return result

def enrich_record(record: Dict[str, Any]) -> Dict[str, Any]:
    record = dict(record)  # copy
    record.setdefault("extracted", {})
    kvs = extract_kv(record.get("message", ""))
    record["extracted"].update(kvs)
    return record

In [None]:
# detects incidents and assigns severity,
from typing import Dict, Any, List
from collections import defaultdict

_HIGH_KEYWORDS = ["exception", "traceback", "failed", "panic", "oom", "error", "fatal"]

def simple_severity(record: Dict[str, Any]) -> str:
    msg = (record.get("message") or "").lower()
    level = (record.get("level") or "INFO").upper()
    if level in ("CRITICAL", "FATAL"):
        return "SEV_CRITICAL"
    if level == "ERROR":
        return "SEV_HIGH"
    if level == "WARNING":
        return "SEV_MEDIUM"
    for kw in _HIGH_KEYWORDS:
        if kw in msg:
            return "SEV_HIGH"
    return "SEV_LOW"

def detect_incidents(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    counts = defaultdict(int)
    for r in records:
        key = (r.get("source"), r.get("message"))
        counts[key] += 1

    incidents = []
    for r in records:
        key = (r.get("source"), r.get("message"))
        cnt = counts[key]
        sev = simple_severity(r)
        if cnt >= 3 and sev == "SEV_LOW":
            sev = "SEV_MEDIUM"
        incidents.append({
            "timestamp": r.get("timestamp"),
            "source": r.get("source"),
            "message": r.get("message"),
            "count_same": cnt,
            "severity": sev,
            "extracted": r.get("extracted", {})
        })
    unique = {}
    for inc in incidents:
        k = (inc["source"], inc["message"])
        if k not in unique:
            unique[k] = inc
    return list(unique.values())

In [None]:
# Finalize Project by integrating Log Analyzer and Incident Detector into a single end-to-end pipeline that:
import argparse
import json
from step1_reader import read_lines_from_file, read_lines_from_stdin
from step2_parser import parse_lines
from step3_normalizer import normalize_all
from step4_extractor import enrich_record
from step5_incident_detector import detect_incidents

def run_pipeline(lines):
    parsed = parse_lines(lines)
    normalized = normalize_all(parsed)
    enriched = [enrich_record(r) for r in normalized]
    incidents = detect_incidents(enriched)
    summary = {
        "total_lines": len(lines),
        "total_parsed": len(enriched),
        "incidents_count": len(incidents),
        "incidents": incidents,
        "examples": enriched[:10],
    }
    return summary

def main():
    ap = argparse.ArgumentParser(description="Simple 6-step log tool (beginner-friendly)")
    ap.add_argument("-i", "--input", help="Input log file path (omit for stdin)", default=None)
    ap.add_argument("-o", "--output", help="Output JSON file path (default report.json)", default="report.json")
    args = ap.parse_args()

    if args.input:
        lines = read_lines_from_file(args.input)
    else:
        print("Reading from stdin (Ctrl-D to finish)...")
        lines = read_lines_from_stdin()

    report = run_pipeline(lines)
    with open(args.output, "w", encoding="utf-8") as fh:
        json.dump(report, fh, indent=2, ensure_ascii=False)
    print(f"Saved JSON report to {args.output}")

if __name__ == "__main__":
    main()

In [None]:
# Generates a unified JSON summary report,
"""{
  "title": "Project Summary",
  "date": "2025-12-04",
  "summary": "This report gives a quick overview of the project status.",
  "status": "On Track",
  "key_points": [
    "Work is progressing as planned.",
    "No major issues reported.",
    "Next milestone due next week."
  ],
  "tasks": [
    {
      "task": "Build API",
      "status": "Completed"
    },
    {
      "task": "Test Features",
      "status": "In Progress"
    },
    {
      "task": "Prepare Release Notes",
      "status": "Pending"
    }
  ],
  "risks": [
    {
      "risk": "Server downtime",
      "level": "Low"
    }
  ],
  "next_steps": [
    "Finish testing",
    "Start deployment",
    "Update documentation"
  ]
}
"""

In [None]:
# Provides a clean CLI entry point for running the entire workflow.
"""
WORKFLOW SUMMARY

1. Read Input Logs
   - Load log lines from a file or stdin.

2. Parse Log Entries
   - Split each line into timestamp, level, and message fields.

3. Normalize Fields
   - Convert timestamps to a consistent format.
   - Standardize log levels (INFO, WARNING, ERROR, etc.).

4. Extract Additional Data
   - Look for simple key=value pairs inside the log message.

5. Detect Incidents
   - Mark errors, warnings, and repeated messages as incidents.
   - Assign a simple severity level (LOW, MEDIUM, HIGH).

6. Generate Summary Report
   - Build one unified JSON report that includes:
       • total lines processed
       • level counts
       • incident list
       • example parsed records

7. Run Entire Workflow via CLI
   - Users run the full pipeline using:
       python logtool.py -i input.log -o report.json --pretty
"""