
# ISW ORBAT ETL + WarSpotting Enrichment (MVP)

**Purpose:** Parse the ISW Russian ORBAT PDF (2023-10-12), extract a clean unit dataset and hierarchy,
build a robust alias bank, and enrich WarSpotting loss rows with ISW context (unit UID, echelon, service, military district).

**Key design choices (per user requirements):**
- **No garrison** fields (ignore basing locations)
- **No confidence scores** (we’ll skip bracket status for MVP)
- **Deterministic schema** and **linkable** to existing WarSpotting CSVs

**Outputs:**
- `isw_units.csv` — one row per unit
- `isw_hierarchy.csv` — parent→child edges
- `isw_aliases.csv` — aliases for matching
- `warspotting_enriched.csv` — original WarSpotting rows + ISW enrichment columns


In [None]:

# If running on Colab or a fresh environment, uncomment the following installs:
# %pip install pdfminer.six PyPDF2 rapidfuzz Unidecode
#
# pdfminer.six  -> robust text extraction from PDFs
# PyPDF2        -> secondary extractor (fallback)
# rapidfuzz     -> fast, reliable fuzzy string matching
# Unidecode     -> simple transliteration for aliases


In [None]:

from pathlib import Path
import re
import csv
from typing import List, Dict, Tuple, Optional
import hashlib

try:
    from rapidfuzz import fuzz, process  # type: ignore
except Exception:
    fuzz = None
    process = None

try:
    from unidecode import unidecode  # type: ignore
except Exception:
    def unidecode(s: str) -> str:
        return s

BASE_DIR = Path(".").resolve()
DATA_DIR = Path("/mnt/data")
ISW_PDF = DATA_DIR / "October 12, 2023 Russian Orbat_Final.pdf"
WARSPOTTING_CSV = DATA_DIR / "warspotting_norm_2025-09-08 (1).csv"
OUT_DIR = DATA_DIR / "isw_orbat_etl"
OUT_DIR.mkdir(parents=True, exist_ok=True)

UNITS_CSV = OUT_DIR / "isw_units.csv"
HIER_CSV  = OUT_DIR / "isw_hierarchy.csv"
ALIAS_CSV = OUT_DIR / "isw_aliases.csv"
ENRICHED_CSV = OUT_DIR / "warspotting_enriched.csv"

SOURCE_VERSION = "2023-10-12_ISW_ORBAT"
AS_OF = "2023-01-01"


## PDF Text Extraction

In [None]:

def extract_text_pdf(path: Path) -> str:
    text = ""
    try:
        from pdfminer.high_level import extract_text as pdfminer_extract_text  # type: ignore
        text = pdfminer_extract_text(str(path))
        if text and len(text.strip()) > 0:
            return text
    except Exception as e:
        print("[warn] pdfminer.six failed:", e)

    try:
        import PyPDF2  # type: ignore
        with open(path, "rb") as f:
            reader = PyPDF2.PdfReader(f)
            for page in reader.pages:
                page_text = page.extract_text() or ""
                text += page_text + "\n"
    except Exception as e:
        print("[warn] PyPDF2 failed:", e)

    return text

if not ISW_PDF.exists():
    print(f"[warn] ISW PDF not found at: {ISW_PDF}")
else:
    print(f"[ok] ISW PDF found at: {ISW_PDF}")


## Parsing Spec & Classifiers

In [None]:

ECHELON_PATTERNS = [
    ("army_corps", r"\bArmy Corps\b|\bAC\b"),
    ("army",       r"\bCombined Arms Army\b|\bCAA\b|\bArmy\b"),
    ("division",   r"\bDivision\b|\bDiv\.\b"),
    ("brigade",    r"\bBrigade\b|\bBde\b"),
    ("regiment",   r"\bRegiment\b|\bReg\.\b"),
    ("battalion",  r"\bBattalion\b|\bBn\b|\bBttn\b"),
]

SERVICE_HEADERS = {
    "army": ["Ground Forces", "Combined Arms Army", "Army"],
    "navy_coastal": ["Coastal Defense", "Naval Infantry", "Navy"],
    "vdv": ["Airborne Forces", "VDV", "Air Assault"],
    "gru_spetsnaz": ["Spetsnaz", "GRU"]
}

MD_HEADERS = {
    "WMD": ["Western Military District"],
    "SMD": ["Southern Military District"],
    "CMD": ["Central Military District"],
    "EMD": ["Eastern Military District"],
    "NFM": ["Northern Fleet"]
}

VCH_REGEX = re.compile(r"(?:в/ч|v/ch|vch)\s*[:#]?\s*(\d{5})")

IS_BRACKETED = re.compile(r"\[.*\]")

def normalize_unit_text(s: str) -> str:
    s = unidecode(s or "")
    s = s.strip().lower()
    s = re.sub(r"[^\w\s\-\/]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def extract_number_token(name: str) -> Optional[str]:
    m = re.search(r"\b(\d{1,4})(?:st|nd|rd|th)?\b", name)
    return m.group(1) if m else None

ABBREV_MAP = {
    "motorized rifle regiment": "mrr",
    "motor rifle regiment": "mrr",
    "motorized rifle division": "mrd",
    "motor rifle division": "mrd",
    "tank regiment": "tr",
    "tank division": "td",
    "air assault regiment": "aar",
    "airborne regiment": "abr",
    "artillery regiment": "ar",
    "motorized rifle brigade": "mrb",
    "motor rifle brigade": "mrb",
    "separate motorized rifle brigade": "smrb",
    "separate tank battalion": "stb",
    "air assault brigade": "aab",
    "separate artillery brigade": "sab",
    "brigade": "bde",
    "division": "div",
    "regiment": "reg",
    "battalion": "bn",
    "army corps": "ac",
    "combined arms army": "caa",
    "army": "army",
}

RU_MAP = {
    "motorized rifle regiment": "мотострелковый полк",
    "motorized rifle division": "мотострелковая дивизия",
    "tank regiment": "танковый полк",
    "tank division": "танковая дивизия",
    "air assault regiment": "десантно-штурмовой полк",
    "air assault brigade": "десантно-штурмовая бригада",
    "regiment": "полк",
    "division": "дивизия",
    "brigade": "бригада",
    "battalion": "батальон",
}


In [None]:

def detect_echelon(line: str) -> Optional[str]:
    for ekind, pattern in ECHELON_PATTERNS:
        if re.search(pattern, line, flags=re.IGNORECASE):
            return ekind
    return None

def generate_unit_uid(vch: Optional[str], name: str, path_hint: str) -> str:
    if vch:
        return f"ISW23:VCH_{vch}"
    h = hashlib.sha1((name + "|" + path_hint).encode("utf-8")).hexdigest()[:10]
    return f"ISW23:HASH_{h}"


In [None]:

def generate_aliases(unit_name_official: str):
    aliases = []
    official = unit_name_official.strip()
    aliases.append((official, "EN", "official"))

    norm = normalize_unit_text(official)
    num = extract_number_token(norm)

    t = None
    for k in ABBREV_MAP.keys():
        if k in norm:
            t = k
            break

    if num and t:
        abbrev = ABBREV_MAP.get(t, None)
        if abbrev:
            aliases.append((f"{num} {abbrev}".upper(), "EN", "abbrev_num_space"))
            aliases.append((f"{num}{abbrev}".upper(), "EN", "abbrev_num_concat"))
            aliases.append((f"{num} {abbrev}".lower(), "EN", "abbrev_num_space"))
            aliases.append((f"{num}{abbrev}".lower(), "EN", "abbrev_num_concat"))
            aliases.append((f"{num}th {abbrev}".lower(), "EN", "abbrev_ordinal"))

    if num and t and t in RU_MAP:
        ru = RU_MAP[t]
        aliases.append((f"{num} {ru}", "RU", "ru_native_num"))
        aliases.append((f"{num}-й {ru}", "RU", "ru_native_num_ord"))

    aliases.append((norm, "EN", "normalized"))
    return aliases


In [None]:

def parse_isw_text(raw_text: str):
    units = []
    edges = []
    aliases = []

    current_service = None
    current_md = None

    lines = [ln.strip() for ln in raw_text.splitlines() if ln.strip()]
    path_hint = []
    last_unit_by_level = {"army":None, "army_corps":None, "division":None, "brigade":None, "regiment":None, "battalion":None}

    def reset_below(level: str):
        order = ["army_corps","army","division","brigade","regiment","battalion"]
        if level in order:
            idx = order.index(level)
            for l in order[idx+1:]:
                last_unit_by_level[l] = None

    for ln in lines:
        for service, keys in SERVICE_HEADERS.items():
            if any(k.lower() in ln.lower() for k in keys):
                current_service = service
                path_hint = [service]
                break

        for md, keys in MD_HEADERS.items():
            if any(k.lower() in ln.lower() for k in keys):
                current_md = md
                path_hint = [*(path_hint or []), md]
                break

        echelon = detect_echelon(ln)
        if not echelon:
            continue

        vch = None
        m = VCH_REGEX.search(ln)
        if m:
            vch = m.group(1)

        name_official = re.sub(VCH_REGEX, "", ln).strip(" -–—:·•")
        name_official = re.sub(r"^[\u2022\-\*•]+", "", name_official).strip()

        uid = generate_unit_uid(vch, name_official, " > ".join(path_hint))

        unit_row = {
            "unit_uid": uid,
            "unit_name_official": name_official,
            "echelon": echelon,
            "service": current_service or "",
            "military_district": current_md or "",
            "as_of": AS_OF,
            "source_version": SOURCE_VERSION,
        }
        units.append(unit_row)

        hierarchy_order = ["army_corps","army","division","brigade","regiment","battalion"]
        e_idx = hierarchy_order.index(echelon)
        parent_uid = None
        for j in range(e_idx-1, -1, -1):
            higher = hierarchy_order[j]
            if last_unit_by_level.get(higher):
                parent_uid = last_unit_by_level[higher]
                break

        last_unit_by_level[echelon] = uid
        reset_below(echelon)

        if parent_uid:
            edges.append({
                "parent_uid": parent_uid,
                "child_uid": uid,
                "parent_level": hierarchy_order[hierarchy_order.index(echelon)-1] if e_idx>0 else "",
                "child_level": echelon,
            })

        for alias_text, lang, kind in generate_aliases(name_official):
            aliases.append({
                "unit_uid": uid,
                "alias_text": alias_text,
                "lang": lang,
                "kind": kind,
            })

    return units, edges, aliases


In [None]:

ISW_UNITS_HEADER = [
    "unit_uid","unit_name_official","echelon","service","military_district","as_of","source_version"
]

ISW_HIER_HEADER = [
    "parent_uid","child_uid","parent_level","child_level"
]

ISW_ALIAS_HEADER = [
    "unit_uid","alias_text","lang","kind"
]

def write_csv(path: Path, rows: List[Dict], header: List[str]):
    import csv
    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=header)
        w.writeheader()
        for r in rows:
            w.writerow({k: r.get(k,"") for k in header})

print("[ok] Headers prepared.")


## Pilot Extraction (Optional)

In [None]:

if ISW_PDF.exists():
    raw = extract_text_pdf(ISW_PDF)
    if not raw.strip():
        print("[warn] PDF text extraction returned empty text.")
    else:
        units, edges, aliases = parse_isw_text(raw)
        print(f"[info] Parsed units: {len(units)} | edges: {len(edges)} | aliases: {len(aliases)}")
        write_csv(UNITS_CSV, units, ISW_UNITS_HEADER)
        write_csv(HIER_CSV, edges, ISW_HIER_HEADER)
        write_csv(ALIAS_CSV, aliases, ISW_ALIAS_HEADER)
        print(f"[ok] Wrote: {UNITS_CSV}")
        print(f"[ok] Wrote: {HIER_CSV}")
        print(f"[ok] Wrote: {ALIAS_CSV}")
else:
    print("[skip] ISW PDF not present; load later.")


## WarSpotting Load & Normalization

In [None]:

import pandas as pd

def read_warspotting(path: Path) -> 'pd.DataFrame':
    df = pd.read_csv(path)
    for col in ["unit_canonical","unit_text","model","platform_class","location","tags","status","source"]:
        if col in df.columns:
            df[col] = df[col].fillna("")
    return df

def extract_vch_from_text(s: str) -> Optional[str]:
    if not isinstance(s, str):
        return None
    m = VCH_REGEX.search(s)
    return m.group(1) if m else None

def norm_for_matching(s: str) -> str:
    s = unidecode(str(s or ""))
    s = s.lower()
    s = re.sub(r"[^\w\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

if WARSPOTTING_CSV.exists():
    ws_df = read_warspotting(WARSPOTTING_CSV)
    print("[ok] WarSpotting loaded:", len(ws_df))
else:
    ws_df = None
    print("[warn] WarSpotting CSV not found; set the correct path.")


## Matching & Enrichment (ISW → WarSpotting)

In [None]:

def load_isw_tables():
    import pandas as pd
    u = pd.read_csv(UNITS_CSV) if UNITS_CSV.exists() else pd.DataFrame(columns=ISW_UNITS_HEADER)
    h = pd.read_csv(HIER_CSV)  if HIER_CSV.exists()  else pd.DataFrame(columns=ISW_HIER_HEADER)
    a = pd.read_csv(ALIAS_CSV) if ALIAS_CSV.exists() else pd.DataFrame(columns=ISW_ALIAS_HEADER)
    return u, h, a

def build_alias_bank(alias_df):
    bank = {}
    for uid, sub in alias_df.groupby("unit_uid"):
        vals = []
        for s in sub["alias_text"].astype(str).tolist():
            vals.append(s)
            vals.append(norm_for_matching(s))
        bank[uid] = list(dict.fromkeys(vals))
    return bank

def invert_alias_bank(bank):
    inv = {}
    for uid, aliases in bank.items():
        for a in aliases:
            na = norm_for_matching(a)
            inv.setdefault(na, []).append(uid)
    return inv

def enrich_warspotting(ws, units_df, alias_df):
    ws = ws.copy()
    for col in ["isw_unit_uid","isw_unit_name_official","isw_echelon","isw_service","isw_military_district"]:
        ws[col] = ""

    bank = build_alias_bank(alias_df)
    inv = invert_alias_bank(bank)
    units_map = units_df.set_index("unit_uid").to_dict(orient="index")

    def try_match(row):
        text_combo = " ".join([str(row.get("unit_canonical","")), str(row.get("unit_text",""))])
        vch = extract_vch_from_text(text_combo)
        if vch:
            uid = f"ISW23:VCH_{vch}"
            if uid in units_map:
                return uid

        for col in ["unit_canonical","unit_text"]:
            cand = norm_for_matching(row.get(col, ""))
            if cand and cand in inv:
                cands = inv[cand]
                if len(cands) == 1:
                    return cands[0]
                lead_num = extract_number_token(cand)
                if lead_num:
                    for cu in cands:
                        off = units_map[cu]["unit_name_official"]
                        if lead_num in (extract_number_token(off or "") or ""):
                            return cu
                return cands[0]

        if process is not None:
            # Build alias universe once
            alias_universe = []
            rev_map = {}
            for uid, alist in bank.items():
                for a in alist:
                    na = norm_for_matching(a)
                    if na not in rev_map:
                        rev_map[na] = uid
                        alias_universe.append(na)

            for col in ["unit_canonical","unit_text"]:
                q = str(row.get(col,""))
                if not q.strip():
                    continue
                from rapidfuzz import process, fuzz  # safe here
                matches = process.extract(norm_for_matching(q), alias_universe, scorer=fuzz.WRatio, limit=5)
                for alias_text, score, _idx in matches:
                    if score >= 92:
                        return rev_map[alias_text]

        return None

    for idx in range(len(ws)):
        uid = try_match(ws.iloc[idx])
        if uid:
            ws.at[idx, "isw_unit_uid"] = uid
            urec = units_map.get(uid, {})
            ws.at[idx, "isw_unit_name_official"] = urec.get("unit_name_official","")
            ws.at[idx, "isw_echelon"] = urec.get("echelon","")
            ws.at[idx, "isw_service"] = urec.get("service","")
            ws.at[idx, "isw_military_district"] = urec.get("military_district","")
    return ws

# Run enrichment if inputs exist (you can re-run after building ISW tables)
if WARSPOTTING_CSV.exists() and UNITS_CSV.exists() and ALIAS_CSV.exists():
    units_df, hier_df, alias_df = load_isw_tables()
    enriched = enrich_warspotting(ws_df, units_df, alias_df)
    enriched.to_csv(ENRICHED_CSV, index=False)
    print(f"[ok] Enriched CSV written: {ENRICHED_CSV} (rows={len(enriched)})")
else:
    print("[info] Skipping enrichment run for now; ensure ISW tables and WarSpotting CSV exist.")


## QA & Integrity Checks

In [None]:

def qa_isw_tables(units_df, hier_df, alias_df):
    print("Units:", len(units_df), "Edges:", len(hier_df), "Aliases:", len(alias_df))
    dup_uids = units_df["unit_uid"][units_df["unit_uid"].duplicated()].unique().tolist() if len(units_df)>0 else []
    if dup_uids:
        print("[warn] duplicate unit_uid:", dup_uids[:5], "…")
    if len(hier_df)>0:
        missing = set(hier_df["child_uid"]) - set(units_df["unit_uid"])
        if missing:
            print("[warn] edges reference missing child uids:", list(missing)[:5], "…")
    if len(alias_df)>0:
        alias_counts = alias_df.groupby("unit_uid").size().describe()
        print("Alias coverage (per unit):\n", alias_counts)

# Example:
# import pandas as pd
# u, h, a = load_isw_tables()
# qa_isw_tables(u, h, a)



## Notes / Tuning

- Adjust `ECHELON_PATTERNS`, `SERVICE_HEADERS`, `MD_HEADERS` to exactly match headings in the ISW PDF once you view `extract_text_pdf` output.
- Expand `ABBREV_MAP`/`RU_MAP` as you encounter more unit types.
- For speed, prebuild a single alias universe (we do this inside the fuzzy branch now).
- For tighter matches, add parent-path hints (e.g., prefer matches under “20th CAA” when the text mentions it).
