In [21]:
# Create Level 4 implementation files: extractor, severity, and Streamlit app (STATIC MODEL).
from pathlib import Path
from textwrap import dedent
import tempfile

# 1) level4_severity.py
severity_py = dedent('''
# level4_severity.py
# -------------------
# Simple severity mapping based on crime Category.

SEVERITY_MAP = {
    # Severity 1
    "NON-CRIMINAL": 1, "SUSPICIOUS OCCURRENCE": 1, "MISSING PERSON": 1, "RUNAWAY": 1, "RECOVERED VEHICLE": 1,
    # Severity 2
    "WARRANTS": 2, "OTHER OFFENSES": 2, "VANDALISM": 2, "TRESPASS": 2, "DISORDERLY CONDUCT": 2, "BAD CHECKS": 2,
    # Severity 3
    "LARCENY/THEFT": 3, "VEHICLE THEFT": 3, "FORGERY/COUNTERFEITING": 3, "DRUG/NARCOTIC": 3,
    "STOLEN PROPERTY": 3, "FRAUD": 3, "BRIBERY": 3, "EMBEZZLEMENT": 3,
    # Severity 4
    "ROBBERY": 4, "WEAPON LAWS": 4, "BURGLARY": 4, "EXTORTION": 4,
    # Severity 5
    "KIDNAPPING": 5, "ARSON": 5
}

def assign_severity(category: str) -> int:
    if not category:
        return 0
    cat = str(category).strip().upper()
    return SEVERITY_MAP.get(cat, 0)
''').strip()

# 2) level4_extraction.py
extraction_py = dedent('''
# level4_extraction.py
# ---------------------
from __future__ import annotations
import re
from typing import List, Dict, Any
from datetime import datetime
import pandas as pd

# Optional dependencies handled gracefully
def _extract_text_pdfplumber(path: str) -> str:
    try:
        import pdfplumber
    except Exception:
        return ""
    text_parts = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            try:
                text_parts.append(page.extract_text() or "")
            except Exception:
                continue
    return "\\n".join(text_parts).strip()

def _extract_text_pypdf(path: str) -> str:
    try:
        from PyPDF2 import PdfReader
    except Exception:
        return ""
    text_parts = []
    try:
        reader = PdfReader(path)
        for page in reader.pages:
            try:
                text_parts.append(page.extract_text() or "")
            except Exception:
                continue
    except Exception:
        return ""
    return "\\n".join(text_parts).strip()

def extract_text_from_pdf(path: str) -> str:
    # Try multiple extractors; return the first with sufficient words
    for fn in (_extract_text_pdfplumber, _extract_text_pypdf):
        txt = fn(path)
        if txt and len(txt.split()) > 3:
            return txt
    return ""

# (Optional) OCR for scanned PDFs if pytesseract & pillow are installed
def ocr_pdf_first_n_pages(path: str, n_pages: int = 3, dpi: int = 300) -> str:
    try:
        import pytesseract
        from pdf2image import convert_from_path
    except Exception:
        return ""
    text_parts = []
    try:
        images = convert_from_path(path, dpi=dpi, first_page=1, last_page=n_pages)
        for img in images:
            try:
                text_parts.append(pytesseract.image_to_string(img) or "")
            except Exception:
                continue
    except Exception:
        return ""
    return "\\n".join(text_parts).strip()

# ---------------- Parsing ----------------
DATE_PATTERNS = [
    r"(\\b\\d{4}-\\d{2}-\\d{2}\\b)",
    r"(\\b\\d{2}/\\d{2}/\\d{4}\\b)",
    r"(\\b\\d{1,2}-\\d{1,2}-\\d{2,4}\\b)",
]
TIME_PATTERNS = [r"(\\b\\d{1,2}:\\d{2}(?::\\d{2})?\\s?(?:AM|PM|am|pm)?\\b)"]
PD_DISTRICT_PATTERNS = [r"\\b(PD\\s*District|Police\\s*District|District)[:\\-\\s]+([A-Za-z\\s]+)\\b"]
ADDRESS_PATTERNS = [r"\\bAddress[:\\-\\s]+(.+)$", r"\\bLocation[:\\-\\s]+(.+)$"]
COORD_PATTERNS = [
    r"Latitude\\s*\\(?Y\\)?[:\\-\\s]*(-?\\d+\\.\\d+)\\b.*Longitude\\s*\\(?X\\)?[:\\-\\s]*(-?\\d+\\.\\d+)\\b",
    r"\\bLat[:\\-\\s]*(-?\\d+\\.\\d+)[,\\s]+Lon[g]?[:\\-\\s]*(-?\\d+\\.\\d+)\\b",
]

def _first_group(patterns: List[str], text: str, flags=re.MULTILINE) -> str | None:
    for pat in patterns:
        m = re.search(pat, text, flags)
        if m:
            return m.group(m.lastindex or 0)
    return None

def parse_fields_from_text(text: str) -> Dict[str, Any]:
    out = {
        "Descript": text.strip(),
        "Dates": None,
        "Address": None,
        "PdDistrict": None,
        "Longitude (X)": None,
        "Latitude (Y)": None,
    }
    # Parse date and (optionally) time
    date_raw = _first_group(DATE_PATTERNS, text) or ""
    time_raw = _first_group(TIME_PATTERNS, text) or ""
    date_val = None
    for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m-%d-%Y", "%m-%d-%y"):
        try:
            if date_raw:
                date_val = datetime.strptime(date_raw, fmt)
                break
        except ValueError:
            continue
    if date_val and time_raw:
        time_clean = time_raw.upper().replace(" ", "")
        for tfmt in ("%H:%M", "%H:%M:%S", "%I:%M%p", "%I:%M:%S%p"):
            try:
                t = datetime.strptime(time_clean, tfmt)
                date_val = date_val.replace(hour=t.hour, minute=t.minute, second=getattr(t, "second", 0))
                break
            except ValueError:
                continue
    out["Dates"] = date_val.isoformat(sep=" ") if date_val else None

    # Parse PD district
    pd_m = None
    for pat in PD_DISTRICT_PATTERNS:
        m = re.search(pat, text, flags=re.IGNORECASE)
        if m:
            pd_m = m.group(m.lastindex or 0)
            break
    out["PdDistrict"] = pd_m.strip() if pd_m else None

    # Parse address (take first line only if multi-line)
    addr = None
    for pat in ADDRESS_PATTERNS:
        m = re.search(pat, text, flags=re.IGNORECASE | re.MULTILINE)
        if m:
            addr = m.group(m.lastindex or 0)
            if "\\n" in addr:
                addr = addr.split("\\n", 1)[0]
            break
    out["Address"] = addr.strip() if addr else None

    # Parse coordinates if present
    for pat in COORD_PATTERNS:
        m = re.search(pat, text, flags=re.IGNORECASE | re.DOTALL)
        if m:
            try:
                lat = float(m.group(1)); lon = float(m.group(2))
                out["Latitude (Y)"] = float(lat)
                out["Longitude (X)"] = float(lon)
            except Exception:
                pass
            break
    return out

def features_from_rows(rows: List[Dict[str, Any]]) -> pd.DataFrame:
    # Build a clean DataFrame and derive basic time features
    df = pd.DataFrame(rows)
    for col in ["Descript", "Address", "PdDistrict"]:
        if col in df.columns:
            df[col] = df[col].fillna("").astype(str).str.strip()

    if "Dates" in df.columns:
        df["Dates"] = pd.to_datetime(df["Dates"], errors="coerce")
        df["Year"] = df["Dates"].dt.year
        df["Month"] = df["Dates"].dt.month
        df["DayOfMonth"] = df["Dates"].dt.day
        df["Hour"] = df["Dates"].dt.hour
        df["DayOfWeek"] = df["Dates"].dt.day_name()
    else:
        df["Dates"] = pd.NaT
        df["Year"] = None; df["Month"] = None; df["DayOfMonth"] = None; df["Hour"] = None; df["DayOfWeek"] = None

    for c in ["Latitude (Y)", "Longitude (X)"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # Reorder columns for consistency
    preferred_cols = [
        "Dates","Descript","Address","PdDistrict",
        "Latitude (Y)","Longitude (X)",
        "Year","Month","DayOfMonth","Hour","DayOfWeek"
    ]
    preferred_in_df = [c for c in preferred_cols if c in df.columns]
    rest = [c for c in df.columns if c not in preferred_in_df]
    df = df[preferred_in_df + rest]
    return df
''').strip()

# 3) streamlit_app_level4.py  (Static model loading + safer inference)
app_py = dedent('''
# streamlit_app_level4.py
# -----------------------
import io
import numpy as np
import joblib
import pandas as pd
import streamlit as st
from pathlib import Path
from typing import List, Dict, Any
import tempfile

from level4_extraction import extract_text_from_pdf, ocr_pdf_first_n_pages, parse_fields_from_text, features_from_rows
from level4_severity import assign_severity

st.set_page_config(page_title="CityX - Level 4: PDF Extraction & Inference", page_icon="üìÑ", layout="wide")
st.title("Police Report Extraction ‚Üí Classification ‚Üí Severity")

# --------- Static model path ---------
DEFAULT_MODEL_PATH = Path("../models/crime_category_text_model.pkl")

# Flexible checker: accepts a Pipeline or a dict containing a pipeline or (vectorizer + classifier)
def _normalize_loaded_object(obj):
    # If it's a pipeline-like with predict
    if hasattr(obj, "predict"):
        return obj
    # If it's a dictionary
    if isinstance(obj, dict):
        for key in ("pipeline", "model", "estimator", "clf"):
            if key in obj and hasattr(obj[key], "predict"):
                return obj[key]
        if "vectorizer" in obj and "classifier" in obj and hasattr(obj["classifier"], "predict"):
            class _VecClfWrapper:
                def __init__(self, vectorizer, classifier):
                    self.vectorizer = vectorizer
                    self.classifier = classifier
                def predict(self, texts):
                    X = self.vectorizer.transform(texts)
                    return self.classifier.predict(X)
                def predict_proba(self, texts):
                    if hasattr(self.classifier, "predict_proba"):
                        X = self.vectorizer.transform(texts)
                        return self.classifier.predict_proba(X)
                    raise AttributeError("Classifier has no predict_proba")
                def decision_function(self, texts):
                    if hasattr(self.classifier, "decision_function"):
                        X = self.vectorizer.transform(texts)
                        return self.classifier.decision_function(X)
                    raise AttributeError("Classifier has no decision_function")
            return _VecClfWrapper(obj["vectorizer"], obj["classifier"])
    raise ValueError("Unsupported model format. Expect a Pipeline or dict with 'pipeline' or ('vectorizer' + 'classifier').")

@st.cache_resource
def load_static_model(path: Path):
    if not path.exists():
        return None, f"Model not found at: {path}"
    try:
        obj = joblib.load(path)
        model = _normalize_loaded_object(obj)
        return model, None
    except Exception as e:
        return None, f"Failed to load model: {e}"

with st.sidebar:
    st.header("‚öôÔ∏è Settings")
    # Allow user to see/override the static model path (optional)
    model_path_str = st.text_input("Static model path", value=str(DEFAULT_MODEL_PATH))
    #use_ocr = st.toggle("Try OCR fallback for scanned PDFs (slower)", value=False)
    st.divider()
    st.markdown("**Batch PDF upload**")
    pdf_files = st.file_uploader("Police report PDFs", type=["pdf"], accept_multiple_files=True, key="pdf_upl")

# --- Load static model once ---
model_status = st.empty()
clf, load_err = load_static_model(Path(model_path_str))
#if clf is not None:
    #model_status.success(f" Static model loaded from: {model_path_str}")
#else:
    #model_status.error(load_err or "Could not load the static model.")
    #st.stop()  # No model ‚Üí stop the app early

# --- Extraction ---
st.subheader("1) Extract key fields from uploaded PDFs")
rows: List[Dict[str, Any]] = []

if pdf_files:
    progress = st.progress(0)
    tmp_dir = Path(tempfile.gettempdir()) / "cityx_uploads"
    tmp_dir.mkdir(parents=True, exist_ok=True)

    for i, upl in enumerate(pdf_files):
        progress.progress((i+1)/len(pdf_files))

        tmp_path = tmp_dir / upl.name
        # Streamlit versions differ: sometimes .getbuffer(), other times .read()
        try:
            data = upl.getbuffer()
        except Exception:
            data = upl.read()
        with open(tmp_path, "wb") as f:
            f.write(data)

        text = extract_text_from_pdf(str(tmp_path))
        if not text and use_ocr:
            text = ocr_pdf_first_n_pages(str(tmp_path), n_pages=2, dpi=200)
        if not text:
            st.warning(f"Could not extract text from {upl.name}. It might be a scanned PDF and OCR is disabled or unavailable.")
            continue

        parsed = parse_fields_from_text(text)
        parsed["__source_pdf__"] = upl.name
        rows.append(parsed)

    progress.progress(1.0)

if not rows:
    st.info("Upload one or more PDF files to extract. Parsed results will appear here.")
else:
    df_extracted = features_from_rows(rows)
    st.write("**Extracted (editable) table** ‚Äì adjust any fields if needed before inference:")
    edited = st.data_editor(df_extracted, num_rows="dynamic", use_container_width=True, key="editor_level4")

    st.subheader("2) Run Level-2 Classifier on Descriptions ‚Üí Predict Category & Severity")
    can_infer = "Descript" in edited.columns
    infer_btn = st.button("Predict", type="primary", disabled=not can_infer)

    if infer_btn and can_infer:
        X_text = edited["Descript"].fillna("").astype(str).tolist()
        y_pred = None
        conf = None
        try:
            y_pred = clf.predict(X_text)
            if hasattr(clf, "decision_function"):
                arr = clf.decision_function(X_text)
                conf = np.abs(arr) if getattr(arr, "ndim", 1) == 1 else arr.max(axis=1)
            elif hasattr(clf, "predict_proba"):
                arr = clf.predict_proba(X_text)
                conf = arr.max(axis=1)
        except Exception as e:
            st.error(f"Prediction failed: {e}")
            y_pred = None

        if y_pred is None or len(y_pred) != len(edited):
            st.error(f"Prediction returned {0 if y_pred is None else len(y_pred)} results for {len(edited)} rows. Check model format or input.")
        else:
            result = edited.copy()
            result["PredictedCategory"] = pd.Series(y_pred, index=result.index).astype(str).str.upper()
            result["AssignedSeverity"] = result["PredictedCategory"].apply(assign_severity)
            if conf is not None:
                result["Confidence"] = pd.Series(conf, index=result.index).round(3)

            st.success("Inference complete.")
            st.dataframe(result, use_container_width=True)

            c1, c2, c3 = st.columns(3)
            with c1: st.metric("Reports", len(result))
            with c2: st.metric("Unique predicted types", result["PredictedCategory"].nunique())
            with c3: st.metric("Avg confidence", float(result.get("Confidence", pd.Series([0])).mean() or 0))

            st.download_button("Download Predictions (CSV)", data=result.to_csv(index=False), file_name="predictions_level4.csv", mime="text/csv")
            st.download_button("Download Predictions (JSON)", data=result.to_json(orient="records"), file_name="predictions_level4.json", mime="application/json")

            # Quick map preview (if coordinates available)
            try:
                import folium
                from streamlit_folium import folium_static
                df_map = result.dropna(subset=["Latitude (Y)", "Longitude (X)"])
                if not df_map.empty:
                    m = folium.Map(
                        location=[df_map["Latitude (Y)"].median(), df_map["Longitude (X)"].median()],
                        zoom_start=12,
                        tiles="CartoDB positron"
                    )

                    for _, r in df_map.iterrows():
                        folium.CircleMarker(
                            location=[r["Latitude (Y)"], r["Longitude (X)"]],
                            radius=5,
                            popup=f"{r.get('PredictedCategory','?')} | Sev {r.get('AssignedSeverity','?')}",
                        ).add_to(m)
                    st.write("**Map preview (if coordinates present):**")
                    folium_static(m, width=1000, height=450)
                else:
                    st.info("No valid coordinates to map. You can add/edit Lat/Lon in the table above.")
            except Exception as e:
                st.info(f"Map preview unavailable: {e}")
''').strip()

# Write files to disk (same creation mechanism as your original)
out1 = Path("level4_severity.py"); out1.parent.mkdir(parents=True, exist_ok=True); out1.write_text(severity_py, encoding="utf-8")
out2 = Path("level4_extraction.py"); out2.parent.mkdir(parents=True, exist_ok=True); out2.write_text(extraction_py, encoding="utf-8")
out3 = Path("streamlit_app_level4.py"); out3.parent.mkdir(parents=True, exist_ok=True); out3.write_text(app_py, encoding="utf-8")
print(out1, out2, out3)


level4_severity.py level4_extraction.py streamlit_app_level4.py
