# Docling parser, LLM enrichment, IOC extraction Pipeline

In [None]:
#1. Setup
#!pip install -q docling pypdfium2 pillow pyarrow fastparquet
import os, re, json, hashlib, base64, ipaddress, urllib3, sys
from datetime import datetime
from pathlib import Path
import pandas as pd
import tldextract

# OpenAI api key for enrichment, make sure you have enough API budget in OpenAI for your queries
OPENAI_API_KEY = "YOUR_OPENAI_TOKEN"

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
#2. Configuration
DATA_DIR = Path("./data_inputs")
OUT_DIR = Path("./generated")
OUT_DIR.mkdir(exist_ok=True, parents=True)
PARSED_DIR = OUT_DIR / "parsed"; PARSED_DIR.mkdir(exist_ok=True)


print("DATA_DIR:", DATA_DIR.resolve())
print("OUT_DIR:", OUT_DIR.resolve())

# MISP url and key to push IOCs to MISP
MISP_URL = "https://localhost"
MISP_KEY = "YOUR_MISP_TOKEN"
VERIFY_SSL = False

DATA_DIR: C:\Users\ronal\Desktop\LLM Labs\data_inputs
OUT_DIR: C:\Users\ronal\Desktop\LLM Labs\generated


In [None]:
#!pip install ipywidgets        # HF token for Hugging Face login: YOUR_HF_TOKEN
from huggingface_hub import login
login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
#3. Docling parser
import warnings
warnings.filterwarnings("ignore", message=".*pin_memory.*")

def sha1(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()

def as_plaintext_fallback(p: Path) -> str:
    try:
        return p.read_text(errors="ignore")
    except Exception:
        return ""

def docling_available() -> bool:
    try:
        import docling
        return True
    except Exception:
        return False

rows = []

if docling_available():
    from docling.document_converter import DocumentConverter
    converter = DocumentConverter()         # uses defaults, includes PDF, Office, HTML, etc.

    for f in sorted(DATA_DIR.rglob("*")):
        if not f.is_file():
            continue
        
        try:
            result = converter.convert(f)            # returns a ConversionResult
            doc = result.document                    # unified document abstraction
            md = doc.export_to_markdown()            # nice for manual review
            txt = doc.export_to_text()               # flat text for regex/LLM
        except Exception as e:
            # fallback for simple formats if Docling fails
            if f.suffix.lower() in {".txt", ".md", ".html", ".htm"}:
                txt = as_plaintext_fallback(f)
                md  = txt
            else:
                print(f"[Docling] failed on {f.name}: {e}", file=sys.stderr)
                continue

        # Save markdown sidecar (optional, helpful to inspect context around IOCs)
        sidecar = (PARSED_DIR / (f.name + ".md"))
        sidecar.write_text(md, encoding="utf-8", errors="ignore")

        rows.append({
            "source": f.name,
            "path": str(f),
            "sha1": sha1(txt[:200000]),     # content hash for dedupe/versioning
            "text": txt,
            "markdown_path": str(sidecar),
            "bytes": f.stat().st_size
        })
else:
    print("[Info] Docling not installed; using plaintext fallback for txt/md/html.", file=sys.stderr)
    for f in sorted(DATA_DIR.rglob("*")):
        if not f.is_file():
            continue
        if f.suffix.lower() in {".txt", ".md", ".html", ".htm"}:
            txt = as_plaintext_fallback(f)
            rows.append({
                "source": f.name,
                "path": str(f),
                "sha1": sha1(txt[:200000]),
                "text": txt,
                "markdown_path": "",
                "bytes": f.stat().st_size
            })

parsed_df = pd.DataFrame(rows)
parquet_path = OUT_DIR / "parsed_docs.parquet"
parsed_df.to_parquet(parquet_path, index=False)
print(f"Parsed docs: {len(parsed_df)} -> {parquet_path} (and markdown in {PARSED_DIR}).")

texts = {row.source: row.text for row in parsed_df.itertuples()}
print(f"texts available to IOC extractor: {len(texts)} files.")

Parameter `strict_text` has been deprecated and will be ignored.


Parsed docs: 1 -> generated\parsed_docs.parquet (and markdown in generated\parsed).
texts available to IOC extractor: 1 files.


In [None]:
#4. IOC Extraction
IOC_REGEX = {
    "ipv4": re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b"),
    "md5": re.compile(r"\b[a-fA-F0-9]{32}\b"),
    "sha1": re.compile(r"\b[a-fA-F0-9]{40}\b"),
    "sha256": re.compile(r"\b[a-fA-F0-9]{64}\b"),
    "url": re.compile(r"\bhttps?://[\w\-\.\/:\?#\[\]@!$&'()*+,;=%]+", re.IGNORECASE),
    "email": re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"),
}

def is_domain(token: str) -> bool:
    ext = tldextract.extract(token)         # split a string into domain, subdomain, and suffix by using PSL(Public Suffix List)-aware validation
    return bool(ext.domain and ext.suffix)  # true if there is a second level domain and a known public suffix

def extract_domains(text: str):
    candidates = set()
    for token in re.findall(r"[\w.-]+\.[a-zA-Z]{2,}", text):    # find dotty tokens look like domain
        if token.lower().startswith("http"):                    # exclude http string which already has been counted
            continue
        if is_domain(token):                                    # to keep only strings that tldextract recognizes as a registable domain
            candidates.add(token.lower())
    return sorted(candidates)

def extract_iocs_from_text(text: str):
    out = {k: [] for k in ["ipv4", "md5", "sha1", "sha256", "url", "email","domain"]}   # prepare an output dict for each IOC type
    for k, rx in IOC_REGEX.items():
        if k == "url":
            out[k] = list({m.group(0).strip(').,;"\'') for m in rx.finditer(text)})     # trim common trailing punctuation like ).,;"' that often clings to links in prose.
        else:
            out[k] = list({m.group(0) for m in rx.finditer(text)})
    out["domain"] = extract_domains(text)
    return out

def load_texts_from_dir(path: Path):
    texts = {}
    for p in path.glob("**/*"):
        if p.is_file() and p.suffix.lower() in {".txt",".md",".html",".htm",".log"}:    # load input files with these extensions
            try:
                texts[p.name] = p.read_text(errors="ignore")
            except Exception:
                pass
    return texts

#texts = load_texts_from_dir(DATA_DIR)
#print(f"Loaded {len(texts)} text files from {DATA_DIR}.")

rows = []
for fname, content in texts.items():
    iocs = extract_iocs_from_text(content)
    for t, values in iocs.items():
        for v in values:
            rows.append({"source": fname, "type": t, "value": v})

ioc_df = pd.DataFrame(rows).drop_duplicates().reset_index(drop=True)
ioc_csv = OUT_DIR / "iocs_extracted.csv"
ioc_df.to_csv(ioc_csv, index=False)
print(f"Extracted {len(ioc_df)} IOCs -> {ioc_csv}.")
ioc_df.head(10)

Extracted 75 IOCs -> generated\iocs_extracted.csv.


Unnamed: 0,source,type,value
0,group-ib-conti-threat-research-2022-en.pdf,ipv4,127.0.0.1
1,group-ib-conti-threat-research-2022-en.pdf,ipv4,21.9.2.172
2,group-ib-conti-threat-research-2022-en.pdf,md5,0762764e298c369a2de8afaec5174ed9
3,group-ib-conti-threat-research-2022-en.pdf,md5,1c6363248c917b9b2a0e37e547cb1bd5
4,group-ib-conti-threat-research-2022-en.pdf,md5,04a5b5ecf057134a96ba9beac224c672
5,group-ib-conti-threat-research-2022-en.pdf,md5,6078dbad380775d01ce9cf91cbe23d7b
6,group-ib-conti-threat-research-2022-en.pdf,md5,c720441cc3603483defcad7f2476c220
7,group-ib-conti-threat-research-2022-en.pdf,md5,790cfe1f9b1f7a1b8805f3c581aeb1c3
8,group-ib-conti-threat-research-2022-en.pdf,md5,bcf121ba763f4a0c07113046e5103900
9,group-ib-conti-threat-research-2022-en.pdf,md5,01a584f26eace00ff96f6511bab5bfee


In [None]:
#5. check extracted IOCs reputation in VirusTotal
import requests, time

VT_API_KEY = "YOUR_VT_TOKEN"
RATE_LIMIT_QPM = int("100")     # queries per minute; free VT API keys are low.
SLEEP_SEC = max(60.0 / max(1, RATE_LIMIT_QPM), 10.0)
CHUNK_CHECKPOINT = 25           # write csv checkpoint after this many lookups
START_OFFSET = int("0")
MAX_ITEMS = int("250")          # process at most this many items this run
VT_ONLY_NEW = True              # skip items already present in vt_reputation.csv file

IOC_CSV = OUT_DIR / "iocs_extracted.csv"    # input
OUT_CSV = OUT_DIR / "vt_reputation.csv"     # output

# load IOCs from in-memory df if present, else from csv file
if "ioc_df" in globals() and isinstance(ioc_df, pd.DataFrame) and not ioc_df.empty:
    src_df = ioc_df.copy()
else:
    if not IOC_CSV.exists():
        raise FileNotFoundError(f"IOC csv not found: {IOC_CSV}. Run Part 1 script to generate it first.")
    src_df = pd.read_csv(IOC_CSV)

# canonicalize and split
def get_values(df, t):
    return sorted(set(df.loc[df["type"] == t, "value"].dropna().astype(str)))

ips = get_values(src_df, "ipv4")
domains = get_values(src_df, "domain")
urls = get_values(src_df, "url")
hashes = sorted(set(pd.concat([
    src_df.loc[src_df["type"] == "md5", "value"],
    src_df.loc[src_df["type"] == "sha1", "value"],
    src_df.loc[src_df["type"] == "sha256", "value"],
], axis=0).dropna().astype(str)))

def vt_headers():
    return {"x-apikey": VT_API_KEY}

def vt_url_for(kind, value):
    if kind == "ip":
        return f"https://www.virustotal.com/api/v3/ip_addresses/{value}"
    if kind == "domain":
        return f"https://www.virustotal.com/api/v3/domains/{value}"
    if kind == "url":
        url_id = base64.urlsafe_b64encode(value.encode()).decode().strip("=")   # url_id is urlsafe base64 of the url, without padding
        return f"https://www.virustotal.com/api/v3/urls/{url_id}"
    if kind == "file":
        return f"https://www.virustotal.com/api/v3/files/{value}"
    raise ValueError(kind)

def classify_kind(value, explicit_type=None):
    if explicit_type:
        return explicit_type
    try:
        ipaddress.IPv4Address(value)
        return "ip"
    except Exception:
        pass
    if value.startswith("http://") or value.startswith("https://"):
        return "url"
    if len(value) in (32, 40, 64) and all(c in "0123456789abcdefABCDEF" for c in value):
        return "file"
    return "domain"

def parse_stats(kind, vt_json):
    stats = {"malicious": None, "suspicious": None, "undetected": None, "harmless": None}
    rep = None
    last_date = None
    try:
        attr = vt_json.get("data", {}).get("attributes", {})
        s = attr.get("last_analysis_stats") or {}
        stats.update({k: s.get(k) for k in stats.keys()})
        rep = attr.get("reputation")
        last_date = attr.get("last_analysis_date") or attr.get("creation_date")
    except Exception:
        pass
    return stats, rep, last_date

# load existing results to support resumable runs
if OUT_CSV.exists():
    out_df = pd.read_csv(OUT_CSV)
    already = set(zip(out_df["type"], out_df["value"]))
else:
    out_df = pd.DataFrame(columns=["type","value","malicious","suspicious","undetected","harmless","reputation","last_analysis_date","http_status","error"])
    already = set()

def iter_items():       # ordered priority: file hashes -> urls -> domains -> ips
    for kind, seq in (("file", hashes), ("url", urls), ("domain", domains), ("ip", ips)):
        for v in seq:
            yield kind, v

items = list(iter_items())

# slice by offset/limit
slice_items = items[START_OFFSET: START_OFFSET + MAX_ITEMS]

print(f"Planned lookups this run: {len(slice_items)} (from offset {START_OFFSET})")

rows = []
seen = 0
last_req_ts = 0.0

session = requests.Session()

for kind, value in slice_items:
    if VT_ONLY_NEW and (kind, value) in already:    # skip IOCs which are already checked
        continue
    elapsed = time.time() - last_req_ts
    if elapsed < SLEEP_SEC:
        time.sleep(SLEEP_SEC - elapsed)
    last_req_ts = time.time()

    url = vt_url_for(kind, value)
    status = None
    err = None
    stats, rep, when = {}, None, None

    try:
        r = session.get(url, headers=vt_headers(), timeout=30)
        status = r.status_code
        if r.status_code == 200:
            data = r.json()
            stats, rep, when = parse_stats(kind, data)
        elif r.status_code in (404, 400):
            err = f"VT status {r.status_code}"
        elif r.status_code == 429:
            err = "Rate limited (429). Increase SLEEP_SEC or lower RATE_LIMIT_QPM."
        else:
            err = f"HTTP {r.status_code}"
    except Exception as e:
        err = str(e)

    row = {
        "type": kind,
        "value": value,
        "malicious": stats.get("malicious"),
        "suspicious": stats.get("suspicious"),
        "undetected": stats.get("undetected"),
        "harmless": stats.get("harmless"),
        "reputation": rep,
        "last_analysis_date": when,
        "http_status": status,
        "error": err
    }
    rows.append(row)
    seen += 1

    # periodic checkpoint
    if seen % CHUNK_CHECKPOINT == 0:
        tmp = pd.DataFrame(rows)
        out_df = pd.concat([out_df, tmp], ignore_index=True)
        out_df.drop_duplicates(subset=["type","value"], keep="last", inplace=True)
        out_df.to_csv(OUT_CSV, index=False)
        print(f"[checkpoint] where {len(out_df)} rows to {OUT_CSV}.")
        rows.clear()

# final write
if rows:
    tmp = pd.DataFrame(rows)
    out_df = pd.concat([out_df, tmp], ignore_index=True)
    out_df.drop_duplicates(subset=["type","value"], keep="last", inplace=True)
    out_df.to_csv(OUT_CSV, index=False)

print(f"Done. Total rows in {OUT_CSV}: {len(out_df)}.")

# Use filters to show only 'malicious >= 1' or negative reputation.
flagged = out_df.fillna(0)
flagged = flagged[(flagged["malicious"].astype(float) >= 1) | (flagged["reputation"].astype(float) < 0)]
flagged_csv = OUT_DIR / "vt_flagged.csv"
flagged.to_csv(flagged_csv, index=False)
print("Flagged subset ->", flagged_csv)

Planned lookups this run: 74 (from offset 0)


  out_df = pd.concat([out_df, tmp], ignore_index=True)


[checkpoint] where 25 rows to generated\vt_reputation.csv.
[checkpoint] where 50 rows to generated\vt_reputation.csv.
Done. Total rows in generated\vt_reputation.csv: 74.
Flagged subset -> generated\vt_flagged.csv


  flagged = out_df.fillna(0)


In [None]:
#6. use filter to keep only "reputation < 0" in the final IOC list
VT_REPUTATION = OUT_DIR / "vt_reputation.csv"
IOCS_MISP_CSV = OUT_DIR / "iocs_misp.csv"
MISP_EVENT_JSON = OUT_DIR / "misp_event_vtneg.json"

if not VT_REPUTATION.exists():
    raise FileNotFoundError(f"{VT_REPUTATION} not found, run the VT reputation checker first.")

df = pd.read_csv(VT_REPUTATION)
# Keep only artifacts that exist in VT that have negative reputation
df = df[(df["http_status"] == 200) & (df["reputation"].astype(float) < 0)].copy()

# Normalize IOC types for MISP
def normalize_type_and_value(row):
    t, v = str(row["type"]).lower(), str(row["value"]).strip()
    if t == "ip":
        # allow only valid IPv4
        try:
            ipaddress.IPv4Address(v)
            return "ipv4", v
        except Exception:
            return None, None
    elif t == "domain":
        return "domain", v.lower()
    elif t == "url":
        # trim trailing punctuation
        v = v.strip(').,;\'"')
        return "url", v
    elif t == "file":
        # decide hash type by length
        hv = v.lower()
        L = len(hv)
        if L == 32 and re.fullmatch(r"[0-9a-f]{32}", hv):
            return "md5", hv
        if L == 40 and re.fullmatch(r"[0-9a-f]{40}", hv):
            return "sha1", hv
        if L == 64 and re.fullmatch(r"[0-9a-f]{64}", hv):
            return "sha256", hv
        return None, None
    else:
        return None, None

norm = df.apply(lambda r: normalize_type_and_value(r), axis=1, result_type="expand")
df["norm_type"], df["norm_value"] = norm[0], norm[1]
df = df.dropna(subset=["norm_type","norm_value"]).copy()

# Final minimal list for MISP
final_iocs = df[["norm_type","norm_value"]].drop_duplicates().rename(
    columns={"norm_type":"type", "norm_value":"value"}
)
final_iocs.to_csv(IOCS_MISP_CSV, index=False)
print(f"Prepared {len(final_iocs)} IOCs for feeding MISP -> {IOCS_MISP_CSV}.")

Prepared 27 IOCs for feeding MISP -> generated\iocs_misp.csv.


In [None]:
#7. LLM enrichment for the final IOCs
#1) load the data already ingested before
if 'ioc_df' not in globals() or ioc_df.empty:
    raise RuntimeError("ioc_df is missing/empty. Run IOC extraction first.")

def safe_ready(p: Path):
    try:
        return p.read_text(errors="ignore")
    except Exception:
        return ""

texts = {}
if DATA_DIR.exists():
    for p in DATA_DIR.glob("**/*"):
        if p.is_file() and p.suffix.lower() in {".txt",".md",".html","htm",".log"}:
            texts[p.name] = safe_ready(p)

#2) pull VT verification results to add signals to the prompt
VT_PATH = OUT_DIR / "vt_reputation.csv"
vt_df = pd.read_csv(VT_PATH) if VT_PATH.exists() else pd.DataFrame(columns=["type","value"])
def vt_row_for(ioc_type, ioc_value):
    if vt_df.empty:
        return None
    m = vt_df[(vt_df["type"].astype(str)==ioc_type) & (vt_df["value"].astype(str)==ioc_value)]
    return m.iloc[0].to_dict() if not m.empty else None

#3) build context snippets around each IOC value from its source text
def gather_snippets(row, window=160):
    src = str(row.get("source", ""))
    val = str(row.get("value", ""))
    text = texts.get(src, "")
    if not text or not val:
        return []
    pattern = re.escape(val)
    snippets = []
    for m in re.finditer(pattern, text, flags=re.IGNORECASE):
        start = max(0, m.start() - window)
        end = min(len(text), m.end() + window)
        snippets.append(text[start:end].replace("\n", " "))
        if len(snippets) >= 3:
            break
    return snippets

#4) LLM call per IOC
USE_LLM = bool(OPENAI_API_KEY)
print("LLM available:", USE_LLM)

def build_prompt(ioc_item, snippets, vt):       # build a prompt template to query in ChatGPT for IOC content enrichment
    evidence_lines = []
    if vt:
        ev = {
            "vt_http_status": vt.get("http_status"),
            "vt_reputation": vt.get("reputation"),
            "vt_malicious": vt.get("malicious"),
            "vt_suspicious": vt.get("suspicious"),
            "vt_undetected": vt.get("undetected"),
            "vt_harmless": vt.get("harmless"),
        }
        evidence_lines.append(f"VirusTotal: {ev}")
    if snippets:
        evidence_lines.append(f"Local context: {snippets[:2]}")
    return f"""
You are a CTI analyst. Given a single IOC and evidence, infer likely context and ATT&CK techniques.
Return STRICT JSON with keys:
- "context": short one-sentence summary of what this IOC likely represents (C2, payload, phish link, scanner, etc.)
- "attack_mapping": array of ATT&CK technique IDs (e.g., ["T1071","T1105"]); include only techniques you can justify
- "confidence": integer 0-100 for your overall assessment
- "rationale": 1-2 sentence justification referencing the evidence

IOC:
  type: {ioc_item.get('type')}
  value: {ioc_item.get('value')}
Evidence:
  {os.linesep.join(evidence_lines) if evidence_lines else "No external evidence."}

Constraints:
- Base your mapping on the evidence only. If insufficient, return an empty array for "attack_mapping" and low confidence.
- Use only valid ATT&CK technique IDs (Txxxx).
- JSON only, no extra keys or text.
""".strip()

from openai import OpenAI

client = OpenAI(api_key=OPENAI_API_KEY)     # pass api key to openai client

def call_llm_json(prompt, model="gpt-4o-mini", temperature=0.2):        # call model and define temperature
    try:
        resp = client.chat.completions.create(
            model=model,
            temperature=temperature,
            response_format={"type":"json_object"},
            messages=[
                {"role":"system","content":"You output strict JSON only."},
                {"role":"user","content":prompt}
            ],
        )
        return json.loads(resp.choices[0].message.content)
    except Exception as e:
        return {"context":"Insufficient enrichment (LLM error).","attack_mapping":[],"confidence":5,"rationale":str(e)[:200]}

#5) run enrichment per IOC
records = []
if OPENAI_API_KEY:
    QPS = float(os.getenv("LLM_QPS", "2"))
    sleep_s = max(0.0, 1.0 / max(0.1, QPS))
    for _, item in ioc_df.iterrows():
        snippets = gather_snippets(item)
        vt = vt_row_for(item["type"], item["value"])
        prompt = build_prompt(item, snippets, vt)
        enriched = call_llm_json(prompt)
        records.append({**item.to_dict(), **enriched})
        time.sleep(sleep_s)
else:
    for _, item in ioc_df.iterrows():
        records.append({
            **item.to_dict(),
            "context":"(no LLM - pass-through)",
            "attack_mapping":[],
            "confidence":0,
            "rationale":"LLM enrichment disabled."
        })
#6) save enriched output
enriched_json = OUT_DIR / "iocs_enriched.json"
with open(enriched_json, "w") as file:
    json.dump(records, file, indent=2)
print("Wrote:", enriched_json)
print("Example:", json.dumps(records[48], indent=2)[:600] if records else "(no records)")

LLM available: True
Wrote: generated\iocs_enriched.json
Example: {
  "source": "group-ib-conti-threat-research-2022-en.pdf",
  "type": "sha256",
  "value": "904e0855772f56721cc157641a26bb7963651e5a45c3bb90764328b17081abd5",
  "context": "This IOC likely represents a malicious payload or file associated with an attack.",
  "attack_mapping": [],
  "confidence": 20,
  "rationale": "There is no external evidence to provide context or specific techniques associated with this SHA256 hash, leading to a low confidence assessment."
}


In [None]:
#8. feed final iocs to MISP
OUT_DIR = Path("./generated"); OUT_DIR.mkdir(exist_ok=True, parents=True)
FINAL_IOCS = OUT_DIR / "iocs_misp.csv"               # produced by VT 200-only gate
MISP_EVENT_JSON = OUT_DIR / "misp_event_from_final_iocs.json"
SKIPPED_CSV = OUT_DIR / "misp_skipped.csv"
FAILED_CSV  = OUT_DIR / "misp_failed.csv"

if not FINAL_IOCS.exists():
    raise FileNotFoundError(f"{FINAL_IOCS} not found. Run the VT 200-only gate cell first.")

df = pd.read_csv(FINAL_IOCS).dropna().drop_duplicates()
print(f"Loaded final_iocs: {len(df)} rows")

def normalize_type_and_value(t, v):
    t = str(t).lower().strip()
    v = str(v).strip()

    # URLs
    if t == "url":
        return {"type": "url", "value": v.strip(').,;\'"'), "category": "Network activity"}

    # Domains
    if t == "domain":
        return {"type": "domain", "value": v.lower(), "category": "Network activity"}

    # IPs
    if t in ("ip","ipv4","ip-dst","ip-src"):
        try:
            ipaddress.IPv4Address(v)
            # Default to destination IP observable for blocklists; adjust if you track src instead
            return {"type": "ip-dst", "value": v, "category": "Network activity"}
        except Exception:
            return None

    # File hashes
    if t in ("file","md5","sha1","sha256"):
        hv = v.lower()
        if t == "md5" or (len(hv) == 32 and re.fullmatch(r"[0-9a-f]{32}", hv)):
            return {"type": "md5", "value": hv, "category": "Artifacts dropped"}
        if t == "sha1" or (len(hv) == 40 and re.fullmatch(r"[0-9a-f]{40}", hv)):
            return {"type": "sha1", "value": hv, "category": "Artifacts dropped"}
        if t == "sha256" or (len(hv) == 64 and re.fullmatch(r"[0-9a-f]{64}", hv)):
            return {"type": "sha256", "value": hv, "category": "Artifacts dropped"}
        return None

    # skip anything else (e.g., email) for this push
    return None

norm_rows, skipped = [], []
for _, r in df.iterrows():
    norm = normalize_type_and_value(r["type"], r["value"])
    if norm:
        norm_rows.append(norm)
    else:
        skipped.append({"orig_type": r["type"], "orig_value": r["value"], "reason": "failed normalization"})

norm_df = pd.DataFrame(norm_rows).drop_duplicates()
if skipped:
    pd.DataFrame(skipped).to_csv(SKIPPED_CSV, index=False)
    print(f"Skipped during normalization: {len(skipped)} → {SKIPPED_CSV}")

print("Prepared for MISP (by type):")
print(norm_df.groupby("type").size().to_string())

# Build event (no attributes yet)
if not (MISP_URL and MISP_KEY):
    raise RuntimeError("Set MISP_URL and MISP_KEY env vars to push to MISP.")

session = requests.Session()
headers = {"Authorization": MISP_KEY, "Accept": "application/json", "Content-Type": "application/json"}

event_body = {
    "Event": {
        "info": "Home-lab: final_iocs (VT 200-only) pushed to MISP",
        "analysis": 2,             # 0=initial,1=ongoing,2=completed
        "threat_level_id": 2,      # 1=high,2=medium,3=low,4=undefined
        "date": datetime.utcnow().strftime("%Y-%m-%d"),
        "distribution": 0          # 0=Your org only (safe default)
    }
}

resp = session.post(f"{MISP_URL}/events/add", headers=headers, json=event_body, timeout=30, verify=False)
if resp.status_code != 200:
    raise RuntimeError(f"Event creation failed: {resp.status_code} {resp.text}")

event = resp.json().get("Event") or resp.json().get("event") or {}
event_id = event.get("id")
if not event_id:
    raise RuntimeError(f"Could not obtain event_id from response: {resp.text}")

with open(MISP_EVENT_JSON, "w") as f:
    json.dump(resp.json(), f, indent=2)
print("Created event id:", event_id, "→", MISP_EVENT_JSON)

# Add attributes one-by-one so we can see which ones fail
failed = []
added = 0
for _, row in norm_df.iterrows():
    attr = {
        "type": row["type"],
        "category": row["category"],
        "value": row["value"],
        "to_ids": True,
        "distribution": 0
    }
    r = session.post(f"{MISP_URL}/attributes/add/{event_id}", headers=headers, json={"Attribute": attr}, timeout=30, verify=False)
    if r.status_code == 200:
        added += 1
    else:
        failed.append({
            "type": row["type"],
            "value": row["value"],
            "status": r.status_code,
            "body": r.text[:300]
        })

print(f"Attributes added: {added} / {len(norm_df)}")
if failed:
    pd.DataFrame(failed).to_csv(FAILED_CSV, index=False)
    print(f"Some attributes failed to add: {len(failed)} → {FAILED_CSV}")

In [16]:
# final outputs
print("Outputs in:", OUT_DIR.resolve())
for p in sorted(OUT_DIR.glob("*")):
    print("-", p.name)

Outputs in: C:\Users\ronal\Desktop\LLM Labs\generated
- iocs_enriched.json
- iocs_extracted.csv
- iocs_misp.csv
- parsed
- parsed_docs.parquet
- vt_flagged.csv
- vt_reputation.csv
