# OSINT Incident Mapping with a Multi-Agent Pipeline (Telegram ‚Üí SQLite ‚Üí Maps)

This notebook builds an **end-to-end, fully automated OSINT incident analysis pipeline**.

The goal is simple:

‚û°Ô∏è **Transform raw Telegram messages from OSINT channels into geocoded, categorized global incidents and visualize them on an interactive map.**

To achieve this, I designed a **6-agent architecture**, where each agent performs one specialized task‚Äîcleaning text, filtering incident relevance, deduping repeated reports, extracting categories/locations, geocoding them, and finally plotting everything in a global interactive viewer.

This project demonstrates how **agentic workflows + geospatial intelligence** can scale real-time monitoring of conflict, disasters, law enforcement activity, and emerging threats.

It also shows how real OSINT pipelines operate:  
structured, modular, resilient, and explainable.

This notebook includes:

- üßπ **Agent 1:** Text cleaning  
- üó∫Ô∏è **Agent 2:** Geo-significance (incident or not?)  
- üîÅ **Agent 3:** Deduplication (remove repeated reports)  
- üè∑Ô∏è **Agent 4:** Category & location extraction  
- üìå **Agent 5:** LLM-guided geocoding  
- üó∫Ô∏è **Agent 6:** Mapping + automated natural-language summary  

By the end, you will see:

- a fully populated **incidents.db**  
- categorized, deduplicated, and geocoded incidents  
- a **global interactive map** of all incidents  
- an LLM-generated **narrative intelligence summary**

Let‚Äôs begin by setting up the environment and configuration.


In [None]:
# Parameters (overwritten by Streamlit via Papermill)
start_date = "2024-01-01" #Ignore the initialized dates, it's a random placeholder
end_date = "2024-01-07"

In [None]:
print("[PARAMS]", start_date, end_date)


In [None]:
import os
import sqlite3
from datetime import datetime
from datetime import date, timedelta
from dotenv import load_dotenv
from pyrogram import Client
from pathlib import Path
import asyncio

load_dotenv()



# ---- Telegram client session ---

from telegram_client import get_telegram_client
app = get_telegram_client()


# ---- DB path ----
DB_PATH = Path("incidents.db")
try:
    if DB_PATH.exists():
        DB_PATH.unlink()
except Exception as e:
    print(f"[DB] Cleanup failed: {e}")

# ---- Channels & days ----
CHANNELS = [-1001554189930, -1001964457167, -1001150168882, -1001407087072]

#user_days = input("Enter comma-separated dates (YYYY-MM-DD): ").split(",")
#user_days = [d.strip() for d in user_days if d.strip()]
#DAYS = set(user_days)


#from datetime import date, timedelta

START_DATE = date.fromisoformat(start_date)
END_DATE = date.fromisoformat(end_date)

DAYS = set()
current = START_DATE

while current <= END_DATE:
    DAYS.add(current.isoformat())   
    current += timedelta(days=1)



LIMIT_PER_CHANNEL = 2000  # Too much data will crowd up the map

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS events (
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    -- Raw Telegram metadata
    chat_id INTEGER,
    message_id INTEGER,
    chat_title TEXT,
    date TEXT,           -- ISO timestamp
    text_raw TEXT,       -- original message text

    -- Cleaner Agent output
    text_clean TEXT,

    -- Geo-significance Agent
    checked INTEGER DEFAULT 0,   -- 0 = not checked, 1 = mappable, -1 = irrelevant

    -- Contextual Deduplicator Agent
    processed INTEGER DEFAULT 0, -- 0 = pending, 1 = canonical event, -1 = duplicate, -2 = bad_geo

    -- Category + Location Extractor
    category TEXT,
    location_text TEXT,

    -- Geocoding Agent
    lat REAL,
    lon REAL,

    UNIQUE(chat_id, message_id)
);
CREATE INDEX IF NOT EXISTS idx_chat_msg ON events(chat_id, message_id);
CREATE INDEX IF NOT EXISTS idx_date ON events(date);
CREATE INDEX IF NOT EXISTS idx_checked ON events(checked);
CREATE INDEX IF NOT EXISTS idx_processed ON events(processed);
"""

def init_db(path: str = DB_PATH):
    """Create incidents.db and events table if not present."""
    created = not os.path.exists(path)
    conn = sqlite3.connect(path, timeout=30)
    cur = conn.cursor()
    cur.executescript(SCHEMA_SQL)
    conn.commit()
    return conn, created

print("Config + schema ready. DB_PATH =", DB_PATH)


Config + schema ready. DB_PATH = incidents.db


# üóÑÔ∏è Database Schema: `incidents.db`

All agents write to a single SQLite file named `incidents.db`.

The core table is:

### **`events`**

This table tracks every transformation as messages flow through the pipeline.  
It contains raw fields, cleaned fields, LLM outputs, geocoding outputs, and pipeline control flags.

The schema is:

| Column | Type | Meaning |
|--------|------|---------|
| **id** | INTEGER | Primary key |
| **chat_id** | INTEGER | Telegram channel ID |
| **message_id** | INTEGER | Telegram message ID (prevents duplicates) |
| **chat_title** | TEXT | Channel name |
| **date** | TEXT | ISO timestamp |
| **text_raw** | TEXT | Original Telegram message |
| **text_clean** | TEXT | Cleaner Agent output |
| **checked** | INTEGER | Agent 2 output (1 = incident, -1 = ignore) |
| **processed** | INTEGER | Agent 3 output (1 = canonical, -1 = duplicate, -2 = ungeocodable) |
| **category** | TEXT | Agent 4 assigned category |
| **location_text** | TEXT | Extracted geocodable location |
| **lat** | REAL | Final geocoded latitude |
| **lon** | REAL | Final geocoded longitude |




## Fetch data from Telegram Channels 

Data is being extracted using Pyrogram from the following public channels for now. Their Channel IDs have already been initialized in the previous cell
- ElitePredators
- War and Gore
- OsintTv
- Resonant News

In [2]:
def row_in_days(date_str: str) -> bool:
    """Check if ISO timestamp like 2025-11-27T14:22:10 is in days list"""
    if not date_str:
        return False
    try:
        day = date_str[:10]
        return day in DAYS
    except Exception:
        return False

async def fetch_history_from_telegram_async() -> list[dict]:
    """
    Async version: use async with Client and async for history.
    Filters to the DAYS and returns a list of dicts.
    """
    messages: list[dict] = []

    async with app:
        for ch in CHANNELS:
            print(f"\n[Pyrogram] Fetching up to {LIMIT_PER_CHANNEL} messages from channel: {ch}")
            count_for_ch = 0

            try:
                # async generator
                async for msg in app.get_chat_history(ch, limit=LIMIT_PER_CHANNEL):
                    # date ‚Üí ISO string
                    try:
                        iso_ts = msg.date.isoformat()
                    except Exception:
                        iso_ts = str(msg.date)

                    # only keep our target days
                    if not row_in_days(iso_ts):
                        continue

                    # text content
                    text = msg.text or msg.caption or ""
                    if not text.strip():
                        # skip media-only messages
                        continue

                    chat_title = msg.chat.title if msg.chat else None

                    messages.append(
                        {
                            "chat_id": ch,
                            "message_id": msg.id,
                            "chat_title": chat_title,
                            "date": iso_ts,
                            "text_raw": text.strip(),
                        }
                    )
                    count_for_ch += 1

                print(f"[Pyrogram] Kept {count_for_ch} messages for {ch} within {DAYS}")

            except Exception as e:
                print(f"[Pyrogram] ERROR fetching channel {ch}: {e}")

    print(f"\n[Pyrogram] TOTAL messages in DAYS {DAYS}: {len(messages)}")
    return messages


def insert_messages_into_db(messages: list[dict], db_path: str = DB_PATH):
    """
    Insert fetched messages into incidents.db/events
    using INSERT OR IGNORE on (chat_id, message_id).
    """
    conn, created = init_db(db_path)
    cur = conn.cursor()
    inserted = 0
    skipped = 0

    for m in messages:
        try:
            cur.execute(
                """
                INSERT OR IGNORE INTO events (
                    chat_id, message_id, chat_title, date, text_raw,
                    text_clean, checked, processed, category, location_text,
                    lat, lon
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    m["chat_id"],
                    m["message_id"],
                    m["chat_title"],
                    m["date"],
                    m["text_raw"],
                    None,   # text_clean
                    0,      # checked
                    0,      # processed
                    None,   # category
                    None,   # location_text
                    None,   # lat
                    None,   # lon
                ),
            )
            if cur.rowcount > 0:
                inserted += 1
            else:
                skipped += 1
        except Exception as e:
            print("ERROR on insert:", e)
            skipped += 1

    conn.commit()

    # quick summary
    cur.execute("SELECT COUNT(1) FROM events")
    total = cur.fetchone()[0]
    conn.close()

    print(f"[DB] Inserted: {inserted}, skipped (duplicates/errors): {skipped}")
    print(f"[DB] Total rows in events: {total}")
    return inserted, skipped

print("Fetch + insert helpers ready.")


Fetch + insert helpers ready.


In [3]:
messages = await fetch_history_from_telegram_async()
inserted, skipped = insert_messages_into_db(messages)




[Pyrogram] Fetching up to 2000 messages from channel: -1001554189930
[Pyrogram] Kept 0 messages for -1001554189930 within {datetime.date(2024, 1, 7), datetime.date(2024, 1, 2), datetime.date(2024, 1, 5), datetime.date(2024, 1, 4), datetime.date(2024, 1, 3), datetime.date(2024, 1, 1), datetime.date(2024, 1, 6)}

[Pyrogram] Fetching up to 2000 messages from channel: -1001964457167
[Pyrogram] ERROR fetching channel -1001964457167: Telegram says: [420 Flood] - [420 FROZEN_METHOD_INVALID] (caused by "messages.GetStickerSet")

[Pyrogram] Fetching up to 2000 messages from channel: -1001150168882
[Pyrogram] ERROR fetching channel -1001150168882: Telegram says: [420 Flood] - [420 FROZEN_METHOD_INVALID] (caused by "messages.GetStickerSet")

[Pyrogram] Fetching up to 2000 messages from channel: -1001407087072
[Pyrogram] Kept 0 messages for -1001407087072 within {datetime.date(2024, 1, 7), datetime.date(2024, 1, 2), datetime.date(2024, 1, 5), datetime.date(2024, 1, 4), datetime.date(2024, 1, 3), 

In [4]:
import pandas as pd
conn = sqlite3.connect(DB_PATH)
pd.read_sql_query("SELECT chat_title, COUNT(*) as n FROM events GROUP BY chat_title", conn)


Unnamed: 0,chat_title,n
0,ElitePredators,21
1,OsintTV üì∫Ô∏è,13
2,Resonant News üì∞,13
3,War & Gore,26


---

# üßπ Agent 1 ‚Äî Text Cleaner  
### *Normalizing Real-World OSINT Into Machine-Readable Inputs*

Telegram OSINT messages are messy. They contain emojis, hashtags, random spacing, repost headers, and channel-specific noise.  
Before running any intelligence workflow, we need **clean, predictable text**.

Agent 1 performs:

- Emoji + symbol removal  
- Newline normalization  
- Whitespace collapsing  
- Removal of channel signatures  
- Canonicalization of message format  

This stage is foundational:  
**Every downstream agent depends on high-quality cleaned text to make accurate decisions.**

Below is the implementation of Agent 1.


In [5]:
'''
Agent 1 - Text cleaner
'''

import asyncio
from google.genai import types
from google.adk.agents import LlmAgent
from google.adk.models.google_llm import Gemini
from google.adk.runners import InMemoryRunner

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504]
)

CLEANING_SYSTEM_INSTRUCTIONS = """
You are an OSINT text cleaning assistant.

Your job:
- Remove ALL emojis, URLs, t.me links, @usernames, noisy hashtags, random unicode junk, stickers, and decorative icons
  'Forwarded from ...' headers, channel signatures, random unicode junk 'HEADLINES' that do not add info.
- Normalize whitespace.
- Do NOT translate, summarize, or add info.
Return ONLY the cleaned text.
""".strip()

def get_connection(db_path: str = DB_PATH) -> sqlite3.Connection:
    return sqlite3.connect(db_path)

def fetch_rows_to_clean(conn: sqlite3.Connection, limit: int = 32):
    cur = conn.cursor()
    cur.execute(
        """
        SELECT id, text_raw
        FROM events
        WHERE text_raw IS NOT NULL
          AND TRIM(text_raw) <> ''
          AND (text_clean IS NULL OR TRIM(text_clean) = '')
        ORDER BY id
        LIMIT ?
        """,
        (limit,)
    )
    return cur.fetchall()

def update_clean_text(conn: sqlite3.Connection, row_id: int, text_clean: str):
    cur = conn.cursor()
    cur.execute(
        "UPDATE events SET text_clean = ? WHERE id = ?",
        (text_clean, row_id)
    )

def build_cleaning_prompt(text_raw: str) -> str:
    return f"""
Clean the following Telegram OSINT message according to the rules.

RAW MESSAGE:
{text_raw}
""".strip()

def create_cleaner_agent() -> LlmAgent:
    agent = LlmAgent(
        name="Agent1_TextCleaner",
        model=Gemini(
            model="gemini-2.5-flash-lite",
            retry_options=retry_config,
        ),
        description="Cleans Telegram OSINT messages for a geospatial OSINT pipeline.",
        instruction=CLEANING_SYSTEM_INSTRUCTIONS,
        tools=[],  # no tools needed here
    )
    return agent

class CleanerLLM:
    def __init__(self, runner: InMemoryRunner):
        self.runner = runner

    async def call(self, prompt: str) -> str:
        events = await self.runner.run_debug(prompt)
        # find last assistant-like event
        for e in reversed(events):
            if getattr(e, "author", "") == "user":
                continue
            if getattr(e, "content", None) is None:
                continue
            parts = getattr(e.content, "parts", [])
            texts = [p.text for p in parts if hasattr(p, "text") and p.text]
            if texts:
                return "\n".join(texts).strip()
        return ""

async def run_cleaner_until_done(batch_size: int = 32):
    conn = get_connection(DB_PATH)
    agent = create_cleaner_agent()
    runner = InMemoryRunner(agent=agent)
    llm = CleanerLLM(runner)

    total = 0
    try:
        while True:
            rows = fetch_rows_to_clean(conn, limit=batch_size)
            if not rows:
                break
            for row_id, text_raw in rows:
                if not text_raw or not text_raw.strip():
                    continue
                prompt = build_cleaning_prompt(text_raw)
                try:
                    cleaned = await llm.call(prompt)
                except Exception as e:
                    print(f"[Cleaner] LLM error for id={row_id}: {e}")
                    cleaned = text_raw.strip()
                if not cleaned:
                    cleaned = text_raw.strip()
                update_clean_text(conn, row_id, cleaned)
            conn.commit()
            total += len(rows)
            print(f"[Cleaner] Processed batch of {len(rows)} rows (total={total}).")
    finally:
        conn.close()

    print(f"[Cleaner] Done. Total rows cleaned: {total}")

print("Agent 1 (Cleaner) ready.")


Agent 1 (Cleaner) ready.


In [6]:
await run_cleaner_until_done()



 ### Created new session: debug_session_id

User > Clean the following Telegram OSINT message according to the rules.

RAW MESSAGE:
In the picture, the books placed behind the alleged Pakistani terrorist Naveed Akram clearly reflect Salafi ideology. This detail matters because ideology shapes action.

Note : Jaish e Mohammad follows a Deobandi Salafi jihadist ideology.

(This is only an open source analysis. It is not a confirmation)
Agent1_TextCleaner > In the picture, the books placed behind the alleged Pakistani terrorist Naveed Akram clearly reflect Salafi ideology. This detail matters because ideology shapes action.

Note : Jaish e Mohammad follows a Deobandi Salafi jihadist ideology.

(This is only an open source analysis. It is not a confirmation)

 ### Continue session: debug_session_id

User > Clean the following Telegram OSINT message according to the rules.

RAW MESSAGE:
Here we go.

According to PaK media, the Sydney Bondi beach terrorist Naveed Akram is of Afghan origin. 

In [7]:
conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query("SELECT id, text_raw, text_clean FROM events;", conn)
df.head()

Unnamed: 0,id,text_raw,text_clean
0,1,"In the picture, the books placed behind the al...","In the picture, the books placed behind the al..."
1,2,"Here we go.\n\nAccording to PaK media, the Syd...","Here we go.\n\nAccording to PaK media, the Syd..."
2,3,"üö® REPORT:\n\nAccording to qualified sources, t...","REPORT:\n\nAccording to qualified sources, the..."
3,4,None of the top Pakistan media houses reported...,None of the top Pakistan media houses reported...
4,5,An unidentified drone has crashed into remote ...,An unidentified drone has crashed into remote ...


---

# üó∫Ô∏è Agent 2 ‚Äî Geo-Significance Classifier  
### *Filtering which messages represent real, mappable incidents*

Not every OSINT message describes a localized event.  
Some are commentary, propaganda, or broad political statements.

Agent 2 uses an LLM to distinguish real incidents and update the 'checked' column of the database:

- **checked=1** A concrete event in time & location  
- **checked=-1** Non-actionable contextual noise  

This step dramatically reduces false positives and ensures that only **spatially meaningful events** continue through the pipeline.

Below is the implementation of Agent 2.


In [8]:
"""
Agent 2 ‚Äî Geo-Significance Checker 

Definition of "incident" (map-worthy):
- Specific event in time and space
- That can be reasonably plotted on a map
"""

import sqlite3
from typing import List, Tuple

from google.genai import types
from google.adk.agents import LlmAgent
from google.adk.models import Gemini
from google.adk.runners import InMemoryRunner

DB_PATH = "incidents.db"
BATCH_SIZE = 32  # number of rows to classify per batch

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

GEO_SYSTEM_INSTRUCTIONS = """
You are an OSINT incident filter assistant for a geospatial mapping pipeline.

Your job:
- Read a cleaned Telegram OSINT message.
- Decide if it describes a CONCRETE, LOCALIZED INCIDENT that can be placed on a map.

DEFINITION OF INCIDENT (MAP-WORTHY):
- A specific event that happened (or is happening) at a particular place.
- Examples:
  - Attack, skirmish, firefight, encounter, ambush, bombing, shelling,
    missile strike, drone strike, IED blast, sniper incident.
  - Security/military operations tied to a locality: raid, search operation,
    cordon, troop movement, convoy movement, checkpoint clash, border
    infiltration attempt, drone spotting near a town.
  - Crime/policing: arrest, detention, seizure of weapons or contraband,
    kidnapping, smuggling interdiction, violent riot at a specific location.
  - Disaster/emergency: fire, building collapse, accident, bus/train crash,
    flood/landslide in a named area.
  - Infrastructure incidents: damage to bridges, roads, power stations,
    pipelines, airports, if localized.

NOT INCIDENT (NOT MAP-WORTHY):
- Pure political or diplomatic statements, border claims, map changes,
  without a concrete violent/security event.
- General tension or fear: "situation is tense in the region", "people
  are worried everywhere".
- Whole-country or broad-region statements with no specific locality:
  "Russia attacks Ukraine's anti-drone nets" (no specific city/site).
- Historical summaries or hypotheticals.
- Generic commentary, propaganda, analysis.

RULE:
- If the message clearly describes a concrete event at a city/area/facility
  that could be plotted on a map, label it as INCIDENT.
- If not, label it as NOT INCIDENT.

OUTPUT FORMAT:
- If INCIDENT (mappable)     ‚Üí output: 1
- If NOT INCIDENT (not used) ‚Üí output: -1

Return ONLY 1 or -1 as plain text, nothing else.
""".strip()


# ----------------- DB HELPERS -----------------

def get_connection(db_path: str = DB_PATH) -> sqlite3.Connection:
    """Return a sqlite3 connection to the DB file."""
    return sqlite3.connect(db_path)


def fetch_rows_to_check(
    conn: sqlite3.Connection,
    limit: int = BATCH_SIZE
) -> List[Tuple[int, str]]:
    """
    Fetch a batch of rows that still need geo-significance checking.

    Conditions:
    - text_clean is not NULL/empty
    - checked = 0 (default, not yet classified)
    """
    cur = conn.cursor()
    cur.execute(
        """
        SELECT id, text_clean
        FROM events
        WHERE text_clean IS NOT NULL
          AND TRIM(text_clean) <> ''
          AND checked = 0
        ORDER BY id
        LIMIT ?
        """,
        (limit,),
    )
    return cur.fetchall()


def update_checked(
    conn: sqlite3.Connection,
    row_id: int,
    checked_value: int
) -> None:
    """Update the checked field (1 or -1) for a single row."""
    cur = conn.cursor()
    cur.execute(
        """
        UPDATE events
        SET checked = ?
        WHERE id = ?
        """,
        (checked_value, row_id),
    )


# ----------------- PROMPT BUILDER -----------------

def build_geo_prompt(text_clean: str) -> str:
    """
    Build the user prompt that will be sent to the geo-significance agent.
    """
    user_prompt = f"""
Using your incident definition and rules, decide if the following OSINT message
is an INCIDENT or NOT INCIDENT, and return ONLY 1 or -1.

MESSAGE:
{text_clean}
""".strip()
    return user_prompt


# ----------------- ADK AGENT SETUP -----------------

def create_geo_agent() -> LlmAgent:
    """
    Create and return the ADK Agent dedicated to geo-significance checking.
    """
    agent = LlmAgent(
        name="Agent2_GeoChecker",
        model=Gemini(
            model="gemini-2.5-flash-lite",
            retry_options=retry_config,
        ),
        description="Decides if a cleaned OSINT message is a localized, map-worthy incident.",
        instruction=GEO_SYSTEM_INSTRUCTIONS,
        tools=[],  # no tools needed
    )
    return agent


# ----------------- LLM CALL WRAPPER -----------------

class GeoCheckerLLM:
    """
    Wrapper around an InMemoryRunner for the geo-significance agent.

    Provides an async call(prompt: str) -> int that returns 1 or -1.
    """

    def __init__(self, runner: InMemoryRunner):
        self.runner = runner

    async def call(self, prompt: str) -> int:
        events = await self.runner.run_debug(prompt)

        # Find the last non-user event with content
        assistant_event = None
        for e in reversed(events):
            if getattr(e, "author", "") == "user":
                continue
            if getattr(e, "content", None) is None:
                continue
            assistant_event = e
            break

        if assistant_event is None:
            # Fallback: default to NOT INCIDENT
            return -1

        # Extract text parts
        parts = getattr(assistant_event.content, "parts", [])
        texts = []
        for p in parts:
            if hasattr(p, "text") and p.text:
                texts.append(p.text)

        raw_text = "\n".join(texts).strip()
        cleaned = raw_text.strip()

        # Direct parse attempt
        try:
            val = int(cleaned)
            if val in (1, -1):
                return val
        except ValueError:
            pass

        # Fallback: scan for first occurrence of "-1" or "1"
        if "-1" in cleaned:
            return -1
        if "1" in cleaned:
            return 1

        # If nothing sensible found, default to NOT INCIDENT
        return -1


# ----------------- BATCH LOGIC (ASYNC) -----------------

async def run_geo_batch(
    llm: GeoCheckerLLM,
    db_path: str = DB_PATH,
    batch_size: int = BATCH_SIZE,
) -> int:
    """
    Process a single batch of rows that need geo-significance checking.

    Returns:
        Number of rows updated in this batch.
    """
    conn = get_connection(db_path)
    try:
        rows = fetch_rows_to_check(conn, limit=batch_size)
        if not rows:
            return 0

        for row_id, text_clean in rows:
            if not text_clean or not text_clean.strip():
                update_checked(conn, row_id, -1)
                continue

            prompt = build_geo_prompt(text_clean)

            try:
                checked_val = await llm.call(prompt)
            except Exception as e:
                print(f"[GeoChecker] LLM error for id={row_id}: {e}")
                checked_val = -1  # Fail-safe: NOT INCIDENT

            if checked_val not in (1, -1):
                checked_val = -1

            update_checked(conn, row_id, checked_val)

        conn.commit()
        return len(rows)
    finally:
        conn.close()


async def run_geo_until_done(
    llm: GeoCheckerLLM,
    db_path: str = DB_PATH,
    batch_size: int = BATCH_SIZE,
) -> None:
    """
    Keep running batches until there are no more rows with checked = 0.
    """
    total = 0
    while True:
        processed = await run_geo_batch(
            llm=llm,
            db_path=db_path,
            batch_size=batch_size,
        )
        if processed == 0:
            break
        total += processed
        print(f"[GeoChecker] Processed batch of {processed} rows (total={total}).")

    print(f"[GeoChecker] Done. Total rows labeled: {total}")


# ----------------- NOTEBOOK ENTRYPOINT -----------------

async def run_agent2_geo_checker(batch_size: int = BATCH_SIZE):

    geo_agent = create_geo_agent()
    print("[GeoChecker] Agent created.")

    geo_runner = InMemoryRunner(agent=geo_agent)
    print("[GeoChecker] Runner created.")

    llm = GeoCheckerLLM(runner=geo_runner)
    await run_geo_until_done(llm=llm, db_path=DB_PATH, batch_size=batch_size)

    print("[GeoChecker] Completed.")


In [9]:
await run_agent2_geo_checker()


[GeoChecker] Agent created.
[GeoChecker] Runner created.

 ### Created new session: debug_session_id

User > Using your incident definition and rules, decide if the following OSINT message
is an INCIDENT or NOT INCIDENT, and return ONLY 1 or -1.

MESSAGE:
In the picture, the books placed behind the alleged Pakistani terrorist Naveed Akram clearly reflect Salafi ideology. This detail matters because ideology shapes action.

Note : Jaish e Mohammad follows a Deobandi Salafi jihadist ideology.

(This is only an open source analysis. It is not a confirmation)
Agent2_GeoChecker > -1

 ### Continue session: debug_session_id

User > Using your incident definition and rules, decide if the following OSINT message
is an INCIDENT or NOT INCIDENT, and return ONLY 1 or -1.

MESSAGE:
Here we go.

According to PaK media, the Sydney Bondi beach terrorist Naveed Akram is of Afghan origin. The same outlet claims that Afghan nationals were hired by Indian spy agency R&AW to create social unrest (spoil Pa

In [10]:
conn = sqlite3.connect("incidents.db")
pd.read_sql_query(
    "SELECT checked, COUNT(*) as n FROM events GROUP BY checked",
    conn
)

Unnamed: 0,checked,n
0,-1,50
1,1,23


54 text pieces contain a geographic element

---

# üîÅ Agent 3 ‚Äî Contextual Deduplication  
### *Ensuring each real-world incident appears only once*

The same incident may be reported by:

- multiple channels  
- multiple message styles  
- multiple updates  

To prevent overcounting, Agent 3 compares **same-day** incident candidates and determines whether two messages describe:

- **SAME incident** ‚Üí Keep one canonical record  
    Each row gets updated as processed=-1
- **DIFFERENT incidents** ‚Üí Preserve both  
    Each row gets updated as processed=1

Time-based context and general heuristics I developed using iterative analysis are laid down as ground rules for deduplication. 

This stage is crucial for creating a **trustworthy event dataset** and producing accurate maps.


Below is the implementation of Agent 3.


In [11]:
"""
Agent 3 ‚Äî Contextual Deduplicator 

Role in pipeline:
- Input: rows in `events` table where:
    checked = 1        (incident-like, from Agent 2)
    processed = 0      (not yet deduplicated)
- Work per CALENDAR DAY (YYYY-MM-DD).
- Within each day, group messages that describe the SAME real-world incident.
- For each group:
    - One canonical row gets: processed = 1
    - All other rows in that group get: processed = -1
"""

import sqlite3
import re
from datetime import datetime
from typing import List, Tuple, Dict, Any, Optional

from google.genai import types
from google.adk.agents import LlmAgent
from google.adk.models import Gemini
from google.adk.runners import InMemoryRunner

# Use the same DB as the rest of the notebook
DB_PATH = "incidents.db"

# Heuristic thresholds
TIME_WINDOW_HOURS = 4.0      # only compare events within ¬±4 hours
LEN_RATIO_MAX = 2.0          # ignore pairs where length ratio > 2x
MAX_CANDIDATES_PER_MSG = 3   # limit LLM comparisons per message
LOCATION_MIN_COMMON = 1      # need at least 1 shared "location-like" token

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

DEDUP_SYSTEM_INSTRUCTIONS = """
You are an OSINT incident deduplication assistant.

Your job:
Given TWO cleaned OSINT messages from the SAME calendar day, decide whether they
describe the SAME real-world incident, or TWO DISTINCT incidents.

DEFINITION OF "SAME INCIDENT":
- One single event in time and place, described by different channels or wording.
- Examples:
  - "RPG attack on FC post in Khyber" vs
    "Militants attacked FC checkpoint with RPG in Khyber".
  - "IED blast in Pulwama town" vs
    "Explosion reported in Pulwama today".

DEFINITION OF "DIFFERENT INCIDENTS":
- Different locations, even if the event type is similar.
- Different targets (e.g., attack on police vs attack on civilians).
- Clearly separate times (e.g., morning vs late night, or hours apart)
  indicating different attacks or explosions.
- Follow-up consequences (e.g., later death of injured soldier, funerals,
  remembrance posts, documentaries about an attack) are NOT the same incident
  as the original attack.
- General commentary on a past incident is NOT the same as the incident report.

You are given:
- CLEANED_MESSAGE_A
- TIMESTAMP_A (ISO format)
- CLEANED_MESSAGE_B
- TIMESTAMP_B (ISO format)
All from the SAME calendar day.

CRITICAL RULES:
- If one message is clearly a follow-up, consequence, or commentary
  about an earlier event (e.g., "injured soldier succumbed", "documentary released",
  "funeral held", "statement about the attack"), treat it as a DIFFERENT incident.
- If there is any reasonable doubt whether they are the same event, choose DIFFERENT.
- Do NOT merge two incidents just because they share the same district or region name.
- Only treat them as SAME if they are clearly the same moment/event being described.

OUTPUT FORMAT (IMPORTANT):
- If they describe the SAME incident, return exactly: SAME
- If they describe DIFFERENT incidents, return exactly: DIFFERENT

Return ONLY one word: SAME or DIFFERENT.
""".strip()


# ----------------- DB HELPERS -----------------

def get_connection(db_path: str = DB_PATH) -> sqlite3.Connection:
    """Return a sqlite3 connection to the DB file."""
    return sqlite3.connect(db_path)


def get_distinct_calendar_days_with_incidents(conn: sqlite3.Connection) -> List[str]:
    """
    Get distinct calendar days (YYYY-MM-DD) which have checked=1 and processed=0.
    Deduplication will be done per day.

    Assumes date is an ISO timestamp string like '2025-11-27T14:22:10'.
    """
    cur = conn.cursor()
    cur.execute(
        """
        SELECT DISTINCT SUBSTR(date, 1, 10) AS day
        FROM events
        WHERE checked = 1
          AND processed = 0
          AND date IS NOT NULL
        ORDER BY day
        """
    )
    rows = cur.fetchall()
    return [r[0] for r in rows]


def fetch_incident_rows_for_day(
    conn: sqlite3.Connection,
    day_str: str,
) -> List[Tuple[int, str, str, int, int, str]]:
    """
    Fetch incident-like rows for a given calendar day (YYYY-MM-DD)
    that have not yet been deduplicated.

    Returns:
        List of tuples:
            (id, text_clean, date_iso, chat_id, message_id, chat_title)
    """
    cur = conn.cursor()
    cur.execute(
        """
        SELECT id, text_clean, date, chat_id, message_id, chat_title
        FROM events
        WHERE date LIKE ?
          AND checked = 1
          AND processed = 0
        ORDER BY date, chat_id, message_id, id
        """,
        (f"{day_str}%",),
    )
    return cur.fetchall()


def update_processed(conn: sqlite3.Connection, row_id: int, value: int) -> None:
    """
    Update the processed field for a single row.

    Values:
      1  -> canonical incident
     -1  -> duplicate of another incident
    """
    cur = conn.cursor()
    cur.execute(
        """
        UPDATE events
        SET processed = ?
        WHERE id = ?
        """,
        (value, row_id),
    )


# ----------------- HEURISTICS -----------------

WORD_RE = re.compile(r"[A-Za-z][A-Za-z\-]+")

GENERIC_WORDS = {
    # very generic words we don't want as "location-like"
    "attack", "attacks", "ambush", "blast", "blasts", "explosion", "explosions",
    "bomb", "bombing", "raid", "raids", "search", "operation", "operations",
    "forces", "security", "police", "army", "militants", "terrorists",
    "soldier", "soldiers", "troops", "people", "civilians", "checkpoint",
    "post", "base", "camp", "bridge", "road", "highway", "line", "front",
    "area", "region", "district", "province", "state", "country",
    "city", "town", "village", "border",
    "today", "yesterday", "tonight", "morning", "evening", "afternoon", "night",
}


def extract_location_like_tokens(text: str) -> set[str]:
    """
    Very rough heuristic: extract "location-like" tokens from the cleaned text.

    - Extract alphabetic words.
    - Lowercase.
    - Keep words with length >= 5 that are not in the GENERIC_WORDS set.

    This tends to keep items like: 'pulwama', 'peshawar', 'kulgam', 'bannu', 'gaza'.
    """
    words = [w.lower() for w in WORD_RE.findall(text)]
    tokens = {w for w in words if len(w) >= 5 and w not in GENERIC_WORDS}
    return tokens


def infer_event_type(text: str) -> str:
    """
    Very rough event-type heuristic from keywords in text_clean.
    This is only used to avoid comparing obviously different incident types.
    """
    t = text.lower()

    if any(k in t for k in ["ied", "bomb blast", "car bomb", "suicide attack"]):
        return "explosion"
    if any(k in t for k in ["explosion", "blast", "detonation"]):
        return "explosion"
    if any(k in t for k in ["shelling", "mortar", "artillery", "rocket fire"]):
        return "shelling"
    if any(k in t for k in ["drone strike", "drone attack"]):
        return "drone_strike"
    if any(k in t for k in ["airstrike", "air strike", "air raid", "air strike"]):
        return "air_strike"
    if any(k in t for k in ["gunfire", "firing", "firefight", "exchange of fire"]):
        return "firefight"
    if any(k in t for k in ["raid", "search operation", "cordon and search"]):
        return "raid_search"
    if any(k in t for k in ["arrested", "arrest", "detained", "detention"]):
        return "arrest"
    if any(k in t for k in ["seized", "seizure", "recovered weapons", "contraband"]):
        return "seizure"
    if any(k in t for k in ["killed", "shot dead", "martyred", "dead body"]):
        return "casualty"
    if any(k in t for k in ["injured", "wounded"]):
        return "injury"

    return "unknown"


def parse_iso_datetime(ts: str) -> Optional[datetime]:
    """Safely parse ISO timestamp to datetime, or return None on failure."""
    if not ts:
        return None
    try:
        return datetime.fromisoformat(ts.replace("Z", "+00:00"))  # tolerate 'Z'
    except Exception:
        return None


def hours_diff(dt1: Optional[datetime], dt2: Optional[datetime]) -> Optional[float]:
    """Return abs time difference in hours, or None if either dt is None."""
    if dt1 is None or dt2 is None:
        return None
    return abs((dt1 - dt2).total_seconds()) / 3600.0


# ----------------- PROMPT BUILDER -----------------

def build_dedup_prompt(
    text_a: str,
    text_b: str,
    ts_a: str,
    ts_b: str,
    day_str: str,
) -> str:
    """
    Build the user prompt that will be sent to the dedup agent.

    We pass both timestamps explicitly so the model can reason about
    follow-ups vs same-moment reports.
    """
    user_prompt = f"""
You are comparing two OSINT messages from the SAME calendar day: {day_str}.

You are given:
- CLEANED_MESSAGE_A
- TIMESTAMP_A: {ts_a}
- CLEANED_MESSAGE_B
- TIMESTAMP_B: {ts_b}

Using your deduplication rules, decide whether they describe the SAME incident
or DIFFERENT incidents.

Respond with ONLY ONE WORD: SAME or DIFFERENT.

CLEANED_MESSAGE_A:
{text_a}

---

CLEANED_MESSAGE_B:
{text_b}
""".strip()
    return user_prompt


# ----------------- ADK AGENT SETUP -----------------

def create_dedup_agent() -> LlmAgent:
    """
    Create and return the LlmAgent dedicated to contextual deduplication.
    """
    agent = LlmAgent(
        name="Agent3_Deduplicator",
        model=Gemini(
            model="gemini-2.5-flash-lite",
            retry_options=retry_config,
        ),
        description="Groups OSINT messages that describe the same incident on a given day.",
        instruction=DEDUP_SYSTEM_INSTRUCTIONS,
        tools=[],  # no external tools needed
    )
    return agent


# ----------------- LLM CALL WRAPPER -----------------

class DedupLLM:
    """
    Wrapper around an InMemoryRunner for the dedup agent.

    Provides an async call(...) -> bool that returns True if SAME, False otherwise.
    """

    def __init__(self, runner: InMemoryRunner):
        self.runner = runner

    async def call(
        self,
        text_a: str,
        text_b: str,
        ts_a: str,
        ts_b: str,
        day_str: str,
    ) -> bool:
        prompt = build_dedup_prompt(text_a, text_b, ts_a, ts_b, day_str)
        events = await self.runner.run_debug(prompt)

        # Find the last non-user event with content
        assistant_event = None
        for e in reversed(events):
            if getattr(e, "author", "") == "user":
                continue
            if getattr(e, "content", None) is None:
                continue
            assistant_event = e
            break

        if assistant_event is None:
            # Be conservative -> treat as DIFFERENT
            return False

        parts = getattr(assistant_event.content, "parts", [])
        texts = []
        for p in parts:
            if hasattr(p, "text") and p.text:
                texts.append(p.text)

        raw_text = "\n".join(texts).strip().upper()

        # Interpret output: SAME or DIFFERENT
        if "SAME" in raw_text and "DIFFERENT" not in raw_text:
            return True
        if "DIFFERENT" in raw_text and "SAME" not in raw_text:
            return False

        # If the model misbehaves, default to DIFFERENT (do not over-merge)
        return False


# ----------------- DEDUP LOGIC (ASYNC, WITH HEURISTICS) -----------------

async def dedup_for_day(
    conn: sqlite3.Connection,
    llm: DedupLLM,
    day_str: str,
) -> None:
    """
    Perform deduplication for a single calendar day (YYYY-MM-DD).

    Algorithm:
    - Fetch all checked=1, processed=0 rows for that day.
    - Maintain a list of canonical incidents with precomputed heuristics:
        id, text_clean, date_iso, dt, loc_tokens, event_type, length
    - For each message in order:
        - Build candidate canonicals based on:
            * time window (¬±TIME_WINDOW_HOURS)
            * overlapping location-like tokens
            * similar event type (if known)
            * length ratio <= LEN_RATIO_MAX
        - If no candidates -> mark as canonical (processed=1)
        - Else:
            - Compare against up to MAX_CANDIDATES_PER_MSG via LLM
            - If ANY returns SAME -> mark as duplicate (processed=-1)
            - Else mark as canonical (processed=1)
    """
    rows = fetch_incident_rows_for_day(conn, day_str)
    if not rows:
        print(f"[Dedup] No rows to process for day {day_str}.")
        return

    print(f"[Dedup] Processing day {day_str} with {len(rows)} incident-like messages.")

    canonical_incidents: List[Dict[str, Any]] = []

    for row_id, text_clean, date_iso, chat_id, message_id, chat_title in rows:
        if not text_clean or not text_clean.strip():
            update_processed(conn, row_id, -1)
            continue

        text_clean = text_clean.strip()
        dt = parse_iso_datetime(date_iso)
        loc_tokens = extract_location_like_tokens(text_clean)
        event_type = infer_event_type(text_clean)
        length = len(text_clean)

        # If there are no canonicals yet, this becomes the first canonical
        if not canonical_incidents:
            update_processed(conn, row_id, 1)
            canonical_incidents.append(
                {
                    "id": row_id,
                    "text_clean": text_clean,
                    "date_iso": date_iso,
                    "dt": dt,
                    "loc_tokens": loc_tokens,
                    "event_type": event_type,
                    "length": length,
                }
            )
            continue

        # ---- Candidate selection (heuristic pre-filter) ----
        candidates: List[Dict[str, Any]] = []

        for canon in canonical_incidents:
            # Time window filter
            dt_c = canon["dt"]
            diff_hours = hours_diff(dt, dt_c)
            if diff_hours is not None and diff_hours > TIME_WINDOW_HOURS:
                continue

            # Event type filter (if both known and different -> skip)
            et_c = canon["event_type"]
            if et_c != "unknown" and event_type != "unknown" and et_c != event_type:
                continue

            # Length ratio filter
            len_c = canon["length"]
            if len_c == 0 or length == 0:
                len_ratio = 1.0
            else:
                bigger = max(len_c, length)
                smaller = min(len_c, length)
                len_ratio = bigger / smaller
            if len_ratio > LEN_RATIO_MAX:
                continue

            # Location-like tokens overlap
            lt_c = canon["loc_tokens"]
            if loc_tokens and lt_c:
                common = loc_tokens.intersection(lt_c)
                if len(common) < LOCATION_MIN_COMMON:
                    continue

            # If all checks passed, accept as candidate
            candidates.append(canon)

            # Limit number of candidates to keep LLM calls bounded
            if len(candidates) >= MAX_CANDIDATES_PER_MSG:
                break

        # If no candidates, treat as new canonical
        if not candidates:
            update_processed(conn, row_id, 1)
            canonical_incidents.append(
                {
                    "id": row_id,
                    "text_clean": text_clean,
                    "date_iso": date_iso,
                    "dt": dt,
                    "loc_tokens": loc_tokens,
                    "event_type": event_type,
                    "length": length,
                }
            )
            continue

        # ---- LLM comparison with candidates ----
        is_duplicate = False
        for canon in candidates:
            same = await llm.call(
                text_a=text_clean,
                text_b=canon["text_clean"],
                ts_a=date_iso,
                ts_b=canon["date_iso"],
                day_str=day_str,
            )
            if same:
                update_processed(conn, row_id, -1)
                is_duplicate = True
                break

        if not is_duplicate:
            # New canonical incident
            update_processed(conn, row_id, 1)
            canonical_incidents.append(
                {
                    "id": row_id,
                    "text_clean": text_clean,
                    "date_iso": date_iso,
                    "dt": dt,
                    "loc_tokens": loc_tokens,
                    "event_type": event_type,
                    "length": length,
                }
            )

    conn.commit()
    print(
        f"[Dedup] Finished day {day_str}. "
        f"Canonical count: {len(canonical_incidents)}, "
        f"Duplicates: {len(rows) - len(canonical_incidents)}"
    )


async def run_dedup_all_days(
    llm: DedupLLM,
    db_path: str = DB_PATH,
) -> None:
    """
    Run deduplication for all calendar days that have checked=1 and processed=0.
    """
    conn = get_connection(db_path)
    try:
        days = get_distinct_calendar_days_with_incidents(conn)
        if not days:
            print("[Dedup] No days with pending incidents (checked=1, processed=0).")
            return

        print(f"[Dedup] Found {len(days)} day(s) to process: {days}")

        for day_str in days:
            await dedup_for_day(conn, llm, day_str)

    finally:
        conn.close()
        print("[Dedup] All days processed.")


# ----------------- NOTEBOOK ENTRYPOINT -----------------

async def run_agent3_dedup_fast():

    dedup_agent = create_dedup_agent()
    print("[Dedup] Agent created.")

    dedup_runner = InMemoryRunner(agent=dedup_agent)
    print("[Dedup] Runner created.")

    llm = DedupLLM(runner=dedup_runner)
    await run_dedup_all_days(llm=llm, db_path=DB_PATH)

    print("[Dedup] Completed.")



In [12]:
await run_agent3_dedup_fast()


[Dedup] Agent created.
[Dedup] Runner created.
[Dedup] Found 1 day(s) to process: ['2025-12-14']
[Dedup] Processing day 2025-12-14 with 23 incident-like messages.

 ### Created new session: debug_session_id

User > You are comparing two OSINT messages from the SAME calendar day: 2025-12-14.

You are given:
- CLEANED_MESSAGE_A
- TIMESTAMP_A: 2025-12-14T14:19:43
- CLEANED_MESSAGE_B
- TIMESTAMP_B: 2025-12-14T14:11:48

Using your deduplication rules, decide whether they describe the SAME incident
or DIFFERENT incidents.

Respond with ONLY ONE WORD: SAME or DIFFERENT.

CLEANED_MESSAGE_A:
Reports of 60 injured and 10 dead so far in Sydney

---

CLEANED_MESSAGE_B:
Reports of a mass shooting at Bondi Beach in Sydney, Australia.
Agent3_Deduplicator > SAME

 ### Continue session: debug_session_id

User > You are comparing two OSINT messages from the SAME calendar day: 2025-12-14.

You are given:
- CLEANED_MESSAGE_A
- TIMESTAMP_A: 2025-12-14T14:29:19
- CLEANED_MESSAGE_B
- TIMESTAMP_B: 2025-12-14T

In [13]:
conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query(
    "SELECT id, text_clean FROM events WHERE checked=1 AND processed=1;",
    conn
)
print(len(df))
df.head()

16


Unnamed: 0,id,text_clean
0,5,An unidentified drone has crashed into remote ...
1,6,CTD officer Khalil Ahmad was abducted and kill...
2,7,Unidentified gunmen ambushed a Pakistani milit...
3,8,Pakistan accuses Afghan Taliban for killing 3 ...
4,12,A Pakistani soldier was captured by the Rebels...


Duplicates caught


In [None]:

df_dups = pd.read_sql_query("""
    SELECT id, date, chat_title, text_clean
    FROM events
    WHERE processed = -1
    ORDER BY date, id;
""", conn)

df_dups

Unnamed: 0,id,date,chat_title,text_clean
0,100,2025-11-27T05:25:05,ElitePredators,"Kash Patel, FBI Director:\n\nGuardsmen are ali..."
1,98,2025-11-27T07:28:01,ElitePredators,The suspect in the DC National Guard shooting ...
2,46,2025-11-27T11:00:25,War & Gore,Yesterday two national guards were shot dead b...
3,35,2025-11-27T18:30:16,War & Gore,Massive explosion in Wana South Waziristan
4,54,2025-11-28T22:20:32,ElitePredators,Maoists responsible for the IED blast in Jhark...
5,192,2025-11-30T13:11:47,OsintTV üì∫Ô∏è,South Waziristan:\n\nHeavy clashes ongoing bet...
6,247,2025-11-30T17:20:06,War & Gore,"2nd Ambush, \n\nKolachi, Dera Ismail Kha distr..."
7,347,2025-11-30T22:11:30,ElitePredators,"In Balochistan‚Äôs Chagai district, intense clas..."
8,240,2025-12-01T01:36:54,War & Gore,"Reportedly, official Pakistani sources have co..."
9,176,2025-12-01T09:00:42,OsintTV üì∫Ô∏è,Update:\n\nBLF released the pic of woman rebel...


---

# üè∑Ô∏è Agent 4 ‚Äî Category & Location Extractor  
### *Assigning semantic meaning and extracting geocodable places*

Once we know an event is real and unique, we must interpret it.

Agent 4 uses an LLM to extract two key intelligence fields:

- **Category:** One label from a closed, 16-class taxonomy  (with multiple text/clause linkages)
- **Location Text:** A clean, geocodable place string  

This step enriches events with structure, enabling downstream geocoding, visualization, and analysis.

Below is the implementation of Agent 4.


In [14]:
"""
Agent 4 ‚Äî Category + Location Extractor 

Role in pipeline:
- Input: rows in `events` table where:
    checked   = 1  (incident-like, from Agent 2)
    processed = 1  (canonical after Agent 3 dedup)
- For each such row, extract:
    - category       (short incident type label)
    - location_text  (geocodable string)

Schema (must match existing):
    CREATE TABLE IF NOT EXISTS events (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        chat_id INTEGER,
        message_id INTEGER,
        chat_title TEXT,
        date TEXT,
        text_raw TEXT,
        text_clean TEXT,
        checked INTEGER DEFAULT 0,
        processed INTEGER DEFAULT 0,
        category TEXT,
        location_text TEXT,
        lat REAL,
        lon REAL,
        UNIQUE(chat_id, message_id)
    );
"""

import sqlite3
import asyncio
import json
from typing import List, Tuple, Optional

import os
from dotenv import load_dotenv

from google.adk.agents import LlmAgent
from google.adk.models import Gemini
from google.adk.runners import InMemoryRunner
from google.genai import types

# ----------------- CONFIG -----------------

load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise RuntimeError("GOOGLE_API_KEY not set in environment / .env")

DB_PATH = "incidents.db"   
BATCH_SIZE = 32            # how many canonical incidents to process per batch

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

CATLOC_SYSTEM_INSTRUCTIONS = """
You are an OSINT incident classification and location extraction assistant
for a geospatial mapping pipeline.

Your job:
Given a CLEANED OSINT message that is already known to describe a concrete,
localized incident, you must:

YOUR JOB (STRICT):
1. Identify ONE incident category.
2. Extract ONE geocodable location string if present.

CATEGORY SEMANTIC EXAMPLES (FOR INTERPRETATION ONLY):
These examples illustrate the meaning of categories. They are NEVER valid outputs.
- "attack": armed attack, shooting, skirmish, clash, firefight
- "explosion": bomb blast, IED, grenade explosion, rocket impact
- "shelling": artillery fire, mortar fire, rocket shelling
- "drone_strike": drone-delivered munition, drone strike
- "air_strike": jet or aircraft bombing, air raid
- "air_activity": military jet/helicopter overflight, unusual air activity
- "raid_search": raid, cordon & search operation
- "arrest_detention": arrest, capture, detention, chargesheets
- "weapons_seizure": seizure of weapons, explosives, contraband
- "casualty_death": confirmed death of soldier/civilian caused by an incident
- "injury": injuries reported without confirmed death
- "protest_riot": violent protest, riot, unrest
- "disaster": fire, flood, landslide, accident, building collapse, infrastructure damage
- "movement": troop/convoy movement, mobilization, infiltration attempt
- "kidnapping": abduction or kidnapping
- "smuggling": smuggling interdiction, trafficking crackdown

ALLOWED OUTPUT CATEGORIES (CLOSED LIST ‚Äî USE EXACTLY ONE):
["attack", "explosion", "shelling", "drone_strike", "air_strike",
 "air_activity", "raid_search", "arrest_detention", "weapons_seizure",
 "casualty_death", "injury", "protest_riot",
 "movement", "disaster", "kidnapping", "smuggling"]

CATEGORY RULES:
- Choose EXACTLY ONE label from the CLOSED LIST above.
- NEVER output any example phrases or synonyms.
- The category value MUST be exactly one of the allowed labels.
- If uncertain, choose the closest valid category.
- Do NOT invent new categories.

DEATH VS INJURY RULES:
- "injury" is for cases where injuries are mentioned and NO death is confirmed.
- "casualty_death" is for messages whose PRIMARY focus is that someone has died
  as a result of an incident (for example, an injured person later
  "succumbed to injuries", "was martyred", or "was found dead").
- If the message mainly describes an action-type incident (attack, explosion,
  shelling, drone_strike, air_strike, etc.) and mentions that people were
  "killed" or "died" as part of that event, you should choose the ACTION
  category (e.g., "attack", "explosion", "shelling", "drone_strike",
  "air_strike") rather than "casualty_death".
- Only choose "casualty_death" when the death itself is the main subject of
  the message (for example, a follow-up report about a specific person dying).

LOCATION EXTRACTION RULES:
- Extract ONE location string that a geocoding service (e.g., Google Maps) can interpret.
- Use a comma-separated hierarchy when possible, for example:
  "Bannu, Khyber Pakhtunkhwa, Pakistan"
  "Kupwara, Jammu and Kashmir, India"
  "Peshawar, Khyber Pakhtunkhwa, Pakistan"
- Prefer the most specific place mentioned (town > district > region > country).
- Include the country when it is explicitly mentioned or clearly implied.
- Do NOT guess or invent a country if it is not clearly indicated.
- When multiple locations or countries are mentioned, choose the location
  where the incident physically occurred or where casualties/damage happened,
  NOT the origin of an attack or drone. For example, if a drone is launched
  from Country A and kills people in Country B, choose Country B.
- If the message mentions MULTIPLE locations, choose ONE based on where the MAIN incident occurs.
- NEVER return lists, arrays, multiple places, or separators like ";", "/", " and ".
- Do NOT add descriptive words such as "near", "around", "border area of".
  Use only the place names, e.g. "Bannu, Khyber Pakhtunkhwa, Pakistan".
- If only a country is given, return just that country, e.g. "Pakistan".
- If no concrete location exists, return "" (empty string).

OUTPUT FORMAT (STRICT):
Return a SINGLE JSON object with EXACTLY two keys:
{
  "category": "<one_of_the_allowed_labels>",
  "location_text": "<string_or_empty>"
}

Do NOT include any extra keys.
Do NOT include comments or explanations.
No explanations. No additional fields. Must be valid JSON.
""".strip()


# ----------------- DB HELPERS -----------------

def get_connection(db_path: str = DB_PATH) -> sqlite3.Connection:
    """Return a sqlite3 connection to the DB file."""
    return sqlite3.connect(db_path)


def fetch_rows_for_catloc(
    conn: sqlite3.Connection,
    limit: int = BATCH_SIZE,
) -> List[Tuple[int, str, Optional[str], Optional[str]]]:
    """
    Fetch a batch of canonical incidents that need category/location extraction.

    Conditions:
    - checked = 1              (incident-like)
    - processed = 1            (canonical, not duplicate)
    - text_clean not NULL/empty
    - category is NULL/empty   (we only run once per row)

    Returns:
        (id, text_clean, date, chat_title)
    """
    cur = conn.cursor()
    cur.execute(
        """
        SELECT id, text_clean, date, chat_title
        FROM events
        WHERE checked = 1
          AND processed = 1
          AND text_clean IS NOT NULL
          AND TRIM(text_clean) <> ''
          AND (category IS NULL OR TRIM(category) = '')
        ORDER BY id
        LIMIT ?
        """,
        (limit,),
    )
    return cur.fetchall()


def update_catloc(
    conn: sqlite3.Connection,
    row_id: int,
    category: str,
    location_text: str,
) -> None:
    """Update the category and location_text for a single row."""
    cur = conn.cursor()
    cur.execute(
        """
        UPDATE events
        SET category = ?, location_text = ?
        WHERE id = ?
        """,
        (category, location_text, row_id),
    )


# ----------------- PROMPT BUILDER -----------------

def build_catloc_prompt(
    text_clean: str,
    date_str: Optional[str],
    chat_title: Optional[str],
) -> str:
    """
    Build the user prompt that will be sent to the category+location agent.

    We provide some extra context (date, chat_title) but text_clean
    is the primary source of truth.
    """
    context_lines = []
    if date_str:
        context_lines.append(f"Date: {date_str}")
    if chat_title:
        context_lines.append(f"Channel: {chat_title}")
    context_block = "\n".join(context_lines) if context_lines else "Date/Channel: (not specified)"

    user_prompt = f"""
You are given a CLEANED OSINT incident message and some optional context.

CONTEXT:
{context_block}

CLEANED MESSAGE:
{text_clean}

Using your incident classification and location extraction rules,
return a JSON object with two keys: "category" and "location_text".

Remember:
- "category" is a short incident type label.
- "location_text" is the most specific geocodable place, or "" if none.

Output ONLY the JSON object.
""".strip()
    return user_prompt


# ----------------- ADK AGENT SETUP -----------------

def create_catloc_agent() -> LlmAgent:
    """
    Create and return the LlmAgent dedicated to category + location extraction.
    """
    agent = LlmAgent(
        name="Agent4_CategoryLocation",
        model=Gemini(
            model="gemini-2.5-flash-lite",
            api_key=GOOGLE_API_KEY,
            retry_options=retry_config,
        ),
        description="Extracts incident category and geocodable location_text from cleaned OSINT messages.",
        instruction=CATLOC_SYSTEM_INSTRUCTIONS,
        tools=[],  # no external tools needed
    )
    return agent


# ----------------- LLM CALL WRAPPER -----------------

class CatLocLLM:
    """
    Wrapper around an InMemoryRunner for the category+location agent.

    Provides:
        async call(...) -> (category: str, location_text: str)
    """

    def __init__(self, runner: InMemoryRunner):
        self.runner = runner

    async def call(
        self,
        text_clean: str,
        date_str: Optional[str],
        chat_title: Optional[str],
    ) -> Tuple[str, str]:
        prompt = build_catloc_prompt(text_clean, date_str, chat_title)
        events = await self.runner.run_debug(prompt)

        # Find the last non-user event with content
        assistant_event = None
        for e in reversed(events):
            if getattr(e, "author", "") == "user":
                continue
            if getattr(e, "content", None) is None:
                continue
            assistant_event = e
            break

        if assistant_event is None:
            # Fallback: choose a valid default category with no location
            return "attack", ""

        parts = getattr(assistant_event.content, "parts", [])
        texts = []
        for p in parts:
            if hasattr(p, "text") and p.text:
                texts.append(p.text)

        raw_text = "\n".join(texts).strip()

        # Strip Markdown code fences if present (```json ... ```)
        cleaned = raw_text
        if cleaned.startswith("```"):
            lines = cleaned.splitlines()
            # Drop leading ``` or ```json line
            if lines:
                lines = lines[1:]
            # Drop trailing ``` line if present
            if lines and lines[-1].strip().startswith("```"):
                lines = lines[:-1]
            cleaned = "\n".join(lines).strip()

        category = ""
        location_text = ""

        try:
            obj = json.loads(cleaned)
            if isinstance(obj, dict):
                cat_val = obj.get("category", "")
                loc_val = obj.get("location_text", "")
                if isinstance(cat_val, str) and cat_val.strip():
                    category = cat_val.strip()
                if isinstance(loc_val, str):
                    location_text = loc_val.strip()
        except Exception as e:
            print(f"[CatLoc] JSON parse error: {e}")

        # Basic cleanup
        category = category[:100] if category else ""
        location_text = location_text[:255] if location_text else ""

        # Final safety: ensure category is one of the allowed labels
        ALLOWED = {
            "attack", "explosion", "shelling", "drone_strike", "air_strike",
            "air_activity", "raid_search", "arrest_detention", "weapons_seizure",
            "casualty_death", "injury", "protest_riot",
            "movement", "disaster", "kidnapping", "smuggling",
        }
        if category not in ALLOWED:
            category = "attack"

        return category, location_text


# ----------------- BATCH LOGIC (ASYNC) -----------------

async def run_catloc_batch(
    llm: CatLocLLM,
    db_path: str = DB_PATH,
    batch_size: int = BATCH_SIZE,
) -> int:
    """
    Process a single batch of canonical incidents that need category/location.

    Returns:
        Number of rows updated in this batch.
    """
    conn = get_connection(db_path)
    try:
        rows = fetch_rows_for_catloc(conn, limit=batch_size)
        if not rows:
            return 0

        for row_id, text_clean, date_str, chat_title in rows:
            if not text_clean or not text_clean.strip():
                # Shouldn't happen, but be defensive
                update_catloc(conn, row_id, "attack", "")
                continue

            try:
                category, location_text = await llm.call(
                    text_clean=text_clean,
                    date_str=date_str,
                    chat_title=chat_title,
                )
            except Exception as e:
                print(f"[CatLoc] LLM error for id={row_id}: {e}")
                category, location_text = "attack", ""

            update_catloc(conn, row_id, category, location_text)

        conn.commit()
        return len(rows)

    finally:
        conn.close()


async def run_catloc_until_done(
    llm: CatLocLLM,
    db_path: str = DB_PATH,
    batch_size: int = BATCH_SIZE,
) -> None:
    """
    Keep running batches until there are no more canonical incidents
    needing category/location extraction.
    """
    total = 0
    while True:
        processed = await run_catloc_batch(
            llm=llm,
            db_path=db_path,
            batch_size=batch_size,
        )
        if processed == 0:
            break
        total += processed
        print(f"[CatLoc] Processed batch of {processed} rows (total={total}).")

    print(f"[CatLoc] Done. Total rows updated with category/location_text: {total}")


# ----------------- NOTEBOOK ENTRYPOINT -----------------

async def run_agent4_catloc():

    catloc_agent = create_catloc_agent()
    print("[CatLoc] Agent created.")

    catloc_runner = InMemoryRunner(agent=catloc_agent)
    print("[CatLoc] Runner created.")

    llm = CatLocLLM(runner=catloc_runner)
    await run_catloc_until_done(llm=llm, db_path=DB_PATH, batch_size=BATCH_SIZE)

    print("[CatLoc] Completed.")


In [15]:
await run_agent4_catloc()


[CatLoc] Agent created.
[CatLoc] Runner created.

 ### Created new session: debug_session_id

User > You are given a CLEANED OSINT incident message and some optional context.

CONTEXT:
Date: 2025-12-14T21:12:46
Channel: OsintTV üì∫Ô∏è

CLEANED MESSAGE:
An unidentified drone has crashed into remote areas of Kandahar,Afghanistan reports TOLO news.

While some Afghans claim it was Pakistani drone brough down by Afghan forces.

Using your incident classification and location extraction rules,
return a JSON object with two keys: "category" and "location_text".

Remember:
- "category" is a short incident type label.
- "location_text" is the most specific geocodable place, or "" if none.

Output ONLY the JSON object.
Agent4_CategoryLocation > ```json
{
  "category": "disaster",
  "location_text": "Kandahar, Afghanistan"
}
```

 ### Continue session: debug_session_id

User > You are given a CLEANED OSINT incident message and some optional context.

CONTEXT:
Date: 2025-12-14T21:12:35
Channel: 

In [16]:
pd.read_sql_query(
    "SELECT category, location_text, COUNT(*) AS n "
    "FROM events WHERE checked = 1 AND processed = 1 "
    "GROUP BY category, location_text ORDER BY n DESC;",
    conn
)

Unnamed: 0,category,location_text,n
0,attack,"Bondi Beach, Sydney",2
1,drone_strike,Sudan,2
2,arrest_detention,"Bannu, Khyber Pakhtunkhwa, Pakistan",1
3,arrest_detention,Manipur,1
4,attack,"Bondi Beach, Sydney, Australia",1
5,attack,"Khurram, Khyber Pakhtunkhwa, Pakistan",1
6,attack,"Kurram, Pakistan",1
7,attack,Sydney,1
8,disaster,Bondi Beach,1
9,disaster,"Kandahar, Afghanistan",1


---

# üìå Agent 5 ‚Äî LLM-Guided Geocoder  
### *Converting ambiguous or partial locations into precise coordinates*

Traditional geocoding APIs struggle with:

- incomplete place names  
- ambiguous regional references  
- informal OSINT phrasing  

Agent 5 solves this using a hybrid approach:

1. **LLM-based reasoning** ‚Üí interprets the location context, generates a refined, geocodable phrase  
2. **Deterministic Google Maps lookup** ‚Üí returns exact lat/lon  
3. **Fallback handling** ‚Üí marks ungeocodable events safely

This results in **far more accurate geocoding**, especially for conflict-zone OSINT.

Below is the implementation of Agent 5.


In [None]:
"""
Agent 5 ‚Äî Geocoder (Google Maps + LlmAgent variants)

Role in pipeline:
- Input: rows in `events` table where:
    checked = 1         (incident-like)
    processed = 1       (canonical after dedup, NOT duplicate)
    location_text != '' (has some location from Agent 4)
    lat/lon are NULL    (not geocoded yet)

- For each such row:
    1) Use a small LlmAgent to generate 1‚Äì3 refined query variants
       for the location_text (e.g. add country, normalize, pick a
       more precise sub-region for broad phrases).
    2) Try Google Maps geocoding on those variants (with caching).
    3) On success ‚Üí fill lat, lon.
       On total failure ‚Üí set processed = -2 (ungood_geo).
"""

import sqlite3
from typing import Optional, Tuple, List, Dict

import time
import os
from dotenv import load_dotenv

import googlemaps

import json
import asyncio

from google.adk.agents import LlmAgent
from google.adk.models import Gemini
from google.adk.runners import InMemoryRunner
from google.genai import types

# ----------------- CONFIG -----------------

DB_PATH = "incidents.db"   
BATCH_SIZE = 32            # number of rows to geocode per batch
GEOCODE_SLEEP = 0.1        # small delay between API calls

load_dotenv()
GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

if not GOOGLE_MAPS_API_KEY:
    raise RuntimeError("GOOGLE_MAPS_API_KEY is not set in environment / .env file.")
if not GOOGLE_API_KEY:
    raise RuntimeError("GOOGLE_API_KEY is not set in environment / .env file.")

# Google Maps client (deterministic)
gmaps = googlemaps.Client(key=GOOGLE_MAPS_API_KEY)

# Cache: query_string -> (lat, lon, formatted_address or None, status_str)
# status_str is "ok" or "fail"
GEOCODE_CACHE: Dict[str, Tuple[Optional[float], Optional[float], Optional[str], str]] = {}

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

# ----------------- LLM VARIANT GENERATOR (AGENT) -----------------

VARIANT_SYSTEM_INSTRUCTIONS = """
You are a location query refinement assistant for a geocoding service.

Your job:
- Take a single input location_text from an OSINT pipeline.
- Return a small list (1 to 3) of query strings that are suitable for geocoding.

Key distinction: BROAD REGION vs SPECIFIC PLACE
------------------------------------------------
1) SPECIFIC PLACE:
   - Examples (conceptually): named town, city, village, neighbourhood, checkpoint,
     base, bridge, road segment, or a clearly identified district name.
   - When the input is specific, you MUST preserve that specificity.
   - You may:
       - Normalize punctuation and spacing.
       - Expand abbreviations (& -> "and").
       - Add the country or state that is clearly implied by context.
   - You MUST NOT:
       - Drop or replace specific locality names with a broader area.
       - Replace a village/town/area with only its district, province, or country.
       - Generalize "X town in Y district" to just "Y district" or "Y country".

2) BROAD REGION:
   - Examples in structure (NOT specific names): phrases like "southwestern part of <country>",
     "northern region of <country>", or generic macro-regions without a single clear locality.
   - When the input is a broad region phrase:
       - You may choose a SINGLE best-guess sub-region (such as a state or province)
         that lies inside that broad region, based on your world knowledge and textual hints.
       - You may also include the original broad phrase as a fallback variant.
       - Do NOT invent an arbitrary city that is not clearly suggested by the phrase.
       - Your refined sub-region should still be realistic and geographically consistent
         with the described broad area.

General rules:
- All variants must refer to the SAME underlying general area described by the input.
- You may go from BROAD ‚Üí more specific (e.g., macro-region to one constituent state/province).
- You must NOT go from SPECIFIC ‚Üí broader.
- Never switch to a completely different region or country.
- Avoid purely generalized queries that lose important locality detail.

Output format (STRICT):
Return a SINGLE JSON object:
{
  "variants": ["<string1>", "<string2>", ...]
}

- The "variants" array must have between 1 and 3 strings.
- Do NOT include explanations or any other fields.
""".strip()


def create_variant_agent() -> LlmAgent:
    """
    Create the LlmAgent used only to refine location_text into
    a small list of geocoding query variants.
    """
    agent = LlmAgent(
        name="Agent5_LocationVariantGenerator",
        model=Gemini(
            model="gemini-2.5-flash-lite",
            api_key=GOOGLE_API_KEY,
            retry_options=retry_config,
        ),
        description="Generates refined geocoding query variants for a location_text.",
        instruction=VARIANT_SYSTEM_INSTRUCTIONS,
        tools=[],  # no tools needed
    )
    return agent


class VariantLLM:
    """
    Wrapper around InMemoryRunner for the variant generator agent.

    async get_variants(location_text: str) -> List[str]
    """

    def __init__(self, runner: InMemoryRunner):
        self.runner = runner

    async def get_variants(self, location_text: str) -> List[str]:
        """
        Ask the LLM for up to 3 refined variants.

        Fallback: if anything goes wrong, return [location_text.strip()].
        """
        loc = (location_text or "").strip()
        if not loc:
            return []

        user_prompt = f"""
Given the following location_text from an OSINT message:

"{loc}"

Return a JSON object of the form:
{{
  "variants": ["<string1>", "<string2>", ...]
}}

Follow your system instructions carefully:
- Distinguish between BROAD REGION and SPECIFIC PLACE.
- Preserve specificity when the place is already concrete.
- Only choose a more specific sub-region when the phrase is inherently broad.
Output ONLY the JSON.
""".strip()

        events = await self.runner.run_debug(user_prompt)

        # Find last assistant message with text
        assistant_event = None
        for e in reversed(events):
            if getattr(e, "author", "") == "user":
                continue
            if getattr(e, "content", None) is None:
                continue
            assistant_event = e
            break

        if assistant_event is None:
            return [loc]

        parts = getattr(assistant_event.content, "parts", [])
        texts: List[str] = []
        for p in parts:
            if hasattr(p, "text") and p.text:
                texts.append(p.text)

        raw_text = "\n".join(texts).strip()

        # Handle ```json ... ``` wrappers
        cleaned = raw_text
        if cleaned.startswith("```"):
            lines = cleaned.splitlines()
            # drop first line (``` or ```json)
            if lines:
                lines = lines[1:]
            # drop last ``` if present
            if lines and lines[-1].strip().startswith("```"):
                lines = lines[:-1]
            cleaned = "\n".join(lines).strip()

        variants: List[str] = [loc]
        try:
            obj = json.loads(cleaned)
            if isinstance(obj, dict) and "variants" in obj:
                arr = obj["variants"]
                if isinstance(arr, list):
                    extracted = []
                    for v in arr:
                        if isinstance(v, str) and v.strip():
                            extracted.append(v.strip())
                    if extracted:
                        variants = extracted
        except Exception as e:
            print(f"[VariantLLM] JSON parse error for '{loc}': {e}")

        # Remove duplicates, keep at most 3
        seen = set()
        uniq: List[str] = []
        for v in variants:
            if v not in seen:
                seen.add(v)
                uniq.append(v)
        if not uniq:
            uniq = [loc]

        return uniq[:3]


# ----------------- DB HELPERS -----------------

def get_connection(db_path: str = DB_PATH) -> sqlite3.Connection:
    """Return a sqlite3 connection to the incidents.db file."""
    return sqlite3.connect(db_path)


def fetch_rows_for_geocoding(
    conn: sqlite3.Connection,
    limit: int = BATCH_SIZE
) -> List[Tuple[int, str]]:
    """
    Fetch a batch of canonical incidents that need geocoding.

    Conditions:
    - checked = 1
    - processed = 1
    - location_text not NULL/empty
    - lat IS NULL OR lon IS NULL

    Returns:
        list of (id, location_text)
    """
    cur = conn.cursor()
    cur.execute(
        """
        SELECT id, location_text
        FROM events
        WHERE checked = 1
          AND processed = 1
          AND location_text IS NOT NULL
          AND TRIM(location_text) <> ''
          AND (lat IS NULL OR lon IS NULL)
        ORDER BY id
        LIMIT ?
        """,
        (limit,)
    )
    return cur.fetchall()


def update_geocode_success(
    conn: sqlite3.Connection,
    row_id: int,
    lat: float,
    lon: float
) -> None:
    """Update lat/lon for a successfully geocoded row."""
    cur = conn.cursor()
    cur.execute(
        """
        UPDATE events
        SET lat = ?, lon = ?
        WHERE id = ?
        """,
        (lat, lon, row_id)
    )


def update_geocode_failure(
    conn: sqlite3.Connection,
    row_id: int
) -> None:
    """
    Mark a row as ungeocodable.

    We do NOT change `checked`, but we set processed = -2 so that
    mapping and analysis can filter these out.
    """
    cur = conn.cursor()
    cur.execute(
        """
        UPDATE events
        SET processed = -2
        WHERE id = ?
        """,
        (row_id,)
    )


# ----------------- GEOCODING HELPERS -----------------

def geocode_once(query: str) -> Tuple[Optional[float], Optional[float], Optional[str]]:
    """
    Call Google Maps geocoding once for a given query string.

    Returns:
        (lat, lon, formatted_address) or (None, None, None) if no result.
    """
    try:
        results = gmaps.geocode(query)
    except Exception as e:
        print(f"[Geocode] Error for '{query}': {e}")
        return None, None, None

    if not results:
        return None, None, None

    loc = results[0].get("geometry", {}).get("location", {})
    lat = loc.get("lat")
    lon = loc.get("lng")
    formatted_address = results[0].get("formatted_address")

    if lat is None or lon is None:
        return None, None, None

    return float(lat), float(lon), formatted_address


def geocode_with_variants(variants: List[str]) -> Tuple[Optional[float], Optional[float], Optional[str], str]:
    """
    Try geocoding each variant in order, using cache.

    Returns:
        (lat, lon, formatted_address, status)  where status is "ok" or "fail".
    """
    lat = lon = None
    formatted = None

    for variant in variants:
        v = variant.strip()
        if not v:
            continue

        # Check cache first
        if v in GEOCODE_CACHE:
            lat, lon, formatted, status = GEOCODE_CACHE[v]
            if status == "ok" and lat is not None and lon is not None:
                print(f"[Geocode] (cached) '{v}' -> ({lat:.4f}, {lon:.4f})")
                return lat, lon, formatted, status
            else:
                # Cached failure ‚Üí don't retry
                continue

        # Small delay
        time.sleep(GEOCODE_SLEEP)

        lat, lon, formatted = geocode_once(v)
        if lat is not None and lon is not None:
            print(f"[Geocode] '{v}' -> ({lat:.4f}, {lon:.4f})")
            status = "ok"
            GEOCODE_CACHE[v] = (lat, lon, formatted, status)
            return lat, lon, formatted, status
        else:
            print(f"[Geocode] No result for '{v}'")
            GEOCODE_CACHE[v] = (None, None, None, "fail")

    print(f"[Geocode] FAILED for variants: {variants}")
    return None, None, None, "fail"


# ----------------- BATCH GEOCODING (ASYNC) -----------------

async def geocode_batch_async(
    variant_llm: VariantLLM,
    db_path: str = DB_PATH,
    batch_size: int = BATCH_SIZE
) -> int:
    """
    Process a single batch of events that need geocoding.

    Returns:
        Number of rows processed in this batch.
    """
    conn = get_connection(db_path)
    try:
        rows = fetch_rows_for_geocoding(conn, limit=batch_size)
        if not rows:
            return 0

        for row_id, location_text in rows:
            loc = (location_text or "").strip()
            if not loc:
                update_geocode_failure(conn, row_id)
                continue

            # 1) Get LLM-generated variants (broad‚Üíspecific or specific‚Üínormalized)
            try:
                variants = await variant_llm.get_variants(loc)
            except Exception as e:
                print(f"[Agent5] Variant LLM error for id={row_id}, loc='{loc}': {e}")
                variants = [loc]

            if not variants:
                variants = [loc]

            # 2) Geocode using those variants
            lat, lon, formatted, status = geocode_with_variants(variants)

            if status == "ok" and lat is not None and lon is not None:
                update_geocode_success(conn, row_id, lat, lon)
            else:
                update_geocode_failure(conn, row_id)

        conn.commit()
        return len(rows)

    finally:
        conn.close()


async def geocode_until_done_async(
    variant_llm: VariantLLM,
    db_path: str = DB_PATH,
    batch_size: int = BATCH_SIZE
) -> None:
    """
    Keep running geocoding batches until there are no more rows
    needing geocoding.
    """
    total = 0
    while True:
        processed = await geocode_batch_async(
            variant_llm=variant_llm,
            db_path=db_path,
            batch_size=batch_size,
        )
        if processed == 0:
            break
        total += processed
        print(f"[Agent5] Processed batch of {processed} rows (total={total}).")

    print(f"[Agent5] Done. Total rows geocoded or marked as ungeocodable: {total}")


# ----------------- NOTEBOOK ENTRYPOINT -----------------

async def run_agent5_geocoder():

    variant_agent = create_variant_agent()
    print("[Agent5] Variant LlmAgent created.")

    runner = InMemoryRunner(agent=variant_agent)
    print("[Agent5] Runner created.")

    variant_llm = VariantLLM(runner=runner)
    await geocode_until_done_async(
        variant_llm=variant_llm,
        db_path=DB_PATH,
        batch_size=BATCH_SIZE
    )

    print("[Agent5] Geocoding pipeline completed.")


In [18]:
await run_agent5_geocoder()

[Agent5] Variant LlmAgent created.
[Agent5] Runner created.

 ### Created new session: debug_session_id

User > Given the following location_text from an OSINT message:

"Kandahar, Afghanistan"

Return a JSON object of the form:
{
  "variants": ["<string1>", "<string2>", ...]
}

Follow your system instructions carefully:
- Distinguish between BROAD REGION and SPECIFIC PLACE.
- Preserve specificity when the place is already concrete.
- Only choose a more specific sub-region when the phrase is inherently broad.
Output ONLY the JSON.
Agent5_LocationVariantGenerator > ```json
{
  "variants": [
    "Kandahar, Afghanistan"
  ]
}
```
[Geocode] 'Kandahar, Afghanistan' -> (31.6205, 65.7158)

 ### Continue session: debug_session_id

User > Given the following location_text from an OSINT message:

"Panjgur, Balochistan, Pakistan"

Return a JSON object of the form:
{
  "variants": ["<string1>", "<string2>", ...]
}

Follow your system instructions carefully:
- Distinguish between BROAD REGION and S

In [19]:
df = pd.read_sql_query(
    "SELECT category, location_text, lat, lon FROM events WHERE checked=1 AND processed=1;",
    conn
)
df.head()

Unnamed: 0,category,location_text,lat,lon
0,disaster,"Kandahar, Afghanistan",31.620508,65.715819
1,kidnapping,"Panjgur, Balochistan, Pakistan",26.970599,64.08868
2,attack,"Khurram, Khyber Pakhtunkhwa, Pakistan",34.151517,71.572395
3,attack,"Kurram, Pakistan",33.695975,70.336069
4,arrest_detention,"Bannu, Khyber Pakhtunkhwa, Pakistan",32.990994,70.645473


---

# üó∫Ô∏è Agent 6 ‚Äî Reporter & Map Orchestrator  
### *Generating the final interactive map + LLM intelligence summary*

Agent 6 is the ‚Äúpresentation‚Äù layer of the pipeline.

It performs two responsibilities:

### 1. Mapping  
Using a function tool, it renders a **global interactive incident map** incorporating:  
- category-based colors  
- shape-based families  
- toggleable category layers  
- dark-matter basemap  

### 2. Narrative Intelligence Summary  
Using the full incident context, Agent 6 produces a **human-readable markdown report** explaining:  
- per-day activity  
- category distribution  
- spatial clusters  
- notable out-of-region events  

This is the final deliverable showing the pipeline‚Äôs analytical value.

Below is the implementation of Agent 6.


In [None]:

# === Agent 6 ‚Äî Reporter & Interactive Map Orchestrator ===
#
# Role:
# - Input: incidents.db / events table with:
#       checked = 1
#       processed = 1
#       category, location_text, lat, lon
# - Tool:
#       make_interactive_map(mode: str = "all") -> dict
#       ‚Üí plots ALL incidents with lat/lon on a global interactive map
#         (Folium, DarkMatter tiles)
# - LlmAgent:
#       1) Calls make_interactive_map ONCE
#       2) Reads a textual context describing per-day / per-category stats
#       3) Produces a GRAND MARKDOWN SUMMARY for the respective days

import os
import sqlite3
import asyncio
from collections import defaultdict
from typing import List, Dict, Tuple

from dotenv import load_dotenv

import pandas as pd
import matplotlib.pyplot as plt

from shapely.geometry import Point
import folium
from folium.features import RegularPolygonMarker

from google.genai import types
from google.adk.agents import LlmAgent
from google.adk.models import Gemini
from google.adk.runners import InMemoryRunner
from google.adk.tools.function_tool import FunctionTool

# ----------------- CONFIG -----------------

DB_PATH = "incidents.db" 
LAST_GENERATED_MAP = None

load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise RuntimeError("GOOGLE_API_KEY is not set in environment / .env file.")

retry_config = types.HttpRetryOptions(
    attempts=5,
    exp_base=7,
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],
)

'''
# Optional: consistent category colors (used as palette in .explore)
CATEGORY_COLORS: Dict[str, str] = {
    "attack":           "#ff6b6b",
    "explosion":        "#ffd93b",
    "shelling":         "#ff9f1c",
    "drone_strike":     "#f15bb5",
    "air_strike":       "#d7263d",
    "air_activity":     "#4cc9f0",
    "raid_search":      "#7209b7",
    "arrest_detention": "#4361ee",
    "weapons_seizure":  "#4cd5b5",
    "casualty_death":   "#f72585",
    "injury":           "#ff8fab",
    "protest_riot":     "#80ed99",
    "movement":         "#8d99ae",
    "disaster":         "#b56576",
    "kidnapping":       "#ff7f51",
    "smuggling":        "#b08968",
}
'''

# ----------------- DB ‚Üí CONTEXT HELPERS -----------------

def fetch_incidents_for_context() -> pd.DataFrame:
    """
    Fetch all canonical, mappable incidents from incidents.db/events.

    Conditions:
        checked = 1
        processed = 1
        lat/lon not null
    """
    conn = sqlite3.connect(DB_PATH)
    try:
        df = pd.read_sql_query(
            """
            SELECT date, category, location_text, lat, lon
            FROM events
            WHERE checked = 1
              AND processed = 1
              AND lat IS NOT NULL
              AND lon IS NOT NULL
            """,
            conn,
        )
    finally:
        conn.close()
    return df


def build_context_for_agent6() -> str:
    """
    Build a compact textual context for Agent 6.

    Includes:
        - Overall count of incidents across both days
        - Per-day counts
        - Per-day category breakdown
        - A small sample list of incidents (date | category | location_text)
    """
    df = fetch_incidents_for_context()
    if df.empty:
        return "No incidents available."

    # Extract YYYY-MM-DD for convenience
    df = df.copy()
    df["day"] = df["date"].astype(str).str.slice(0, 10)

    # Overall stats
    total_incidents = len(df)
    days_present = sorted(df["day"].unique())

    # Per-day counts
    day_counts = df.groupby("day").size().to_dict()

    # Per-day category counts
    day_cat_counts: Dict[str, Dict[str, int]] = {}
    grouped = df.groupby(["day", "category"]).size()
    for (day, cat), cnt in grouped.items():
        day_cat_counts.setdefault(day, {})[cat] = int(cnt)

    # Sample incidents (cap to avoid giant prompt)
    samples = []
    for _, row in df.head(30).iterrows():
        samples.append(
            f"- {row['day']} | {row['category']} | {row['location_text']}"
        )

    lines: List[str] = []
    lines.append("INCIDENT CONTEXT FOR AGENT 6 (GLOBAL VIEW)")
    lines.append("")
    lines.append(f"Total incidents with coordinates: {total_incidents}")
    lines.append(f"Days covered: {', '.join(days_present)}")
    lines.append("")

    lines.append("PER-DAY INCIDENT COUNTS:")
    for day in sorted(day_counts.keys()):
        lines.append(f"- {day}: {day_counts[day]} incidents")

    lines.append("")
    lines.append("PER-DAY CATEGORY BREAKDOWN:")
    for day in sorted(day_cat_counts.keys()):
        lines.append(f"{day}:")
        cat_map = day_cat_counts[day]
        # Sort categories by frequency descending
        for cat, cnt in sorted(cat_map.items(), key=lambda x: -x[1]):
            lines.append(f"  - {cat}: {cnt}")

    lines.append("")
    lines.append("SAMPLE INCIDENTS (for qualitative flavor):")
    if samples:
        lines.extend(samples)
    else:
        lines.append("- (no samples)")

    return "\n".join(lines)


# ----------------- MAP TOOL (INTERACTIVE, GLOBAL) -----------------

import folium
from folium.features import RegularPolygonMarker
from IPython.display import display
from typing import Dict
from typing import Dict
from folium import FeatureGroup, LayerControl


# High-level families of incident types
KINETIC = {
    "attack", "explosion", "shelling",
    "drone_strike", "air_strike", "kidnapping",
}

SECURITY = {
    "raid_search", "arrest_detention", "weapons_seizure",
    "movement", "smuggling", "air_activity",
}

CIVILIAN_IMPACT = {
    "disaster", "protest_riot", "casualty_death", "injury",
}

FAMILY_COLORS = {
    "KINETIC": "#ff6b6b",        # red
    "SECURITY": "#4cc9f0",       # blue
    "CIVILIAN_IMPACT": "#ffd93b" # yellow
}

def classify_family(category: str) -> str:
    """Map a fine-grained category into one of the three visual families."""
    if category in KINETIC:
        return "KINETIC"
    if category in SECURITY:
        return "SECURITY"
    if category in CIVILIAN_IMPACT:
        return "CIVILIAN_IMPACT"
    return "CIVILIAN_IMPACT"  # safe fallback


def add_shape_family_legend(m: folium.Map) -> None:
    """Add a compact HTML legend explaining color + shape."""
    html = """
    <div style="
        position: fixed;
        bottom: 40px;
        right: 40px;
        z-index: 9999;
        background-color: rgba(0,0,0,0.7);
        padding: 8px 12px;
        border-radius: 6px;
        color: #ffffff;
        font-size: 12px;
        line-height: 1.4;
    ">
      <b>Map key</b><br>
      <div style="margin-top:4px;">
        <span style="color:#ff6b6b;">‚óè</span> Kinetic violence<br>
        <span style="color:#4cc9f0;">‚ñ†</span> Security / law enforcement<br>
        <span style="color:#ffd93b;">‚ñ≤</span> Civilian impact / disasters<br>
      </div>
    </div>
    """
    m.get_root().html.add_child(folium.Element(html))


from typing import Dict
import folium
from folium import FeatureGroup, LayerControl
from folium.features import RegularPolygonMarker
from IPython.display import display

def make_interactive_map():
    """
    Create an INTERACTIVE global map of incidents using folium.

    mode:
      - "all" (default): plot all incidents
      - "YYYY-MM-DD": filter to that day
    LayerControl:
      - one toggle per CATEGORY
    Legend:
      - only 3 families (kinetic / security / civilian) with shape + color.
    """

    global LAST_GENERATED_MAP
    df = fetch_incidents_for_context()
    if df.empty:
        print("[MapTool] No incidents with coordinates to plot.")
        return {"status": "no_data"}
    
    '''
    if mode != "all":
        df = df[df["date"].astype(str).str.startswith(mode)]
        if df.empty:
            print(f"[MapTool] No incidents for day={mode}.")
            return {"status": "no_data"}
    '''

    df = df.dropna(subset=["lat", "lon"])
    if df.empty:
        print("[MapTool] No incidents with valid coordinates after filtering.")
        return {"status": "no_data"}

    center_lat = df["lat"].mean()
    center_lon = df["lon"].mean()

    m = folium.Map(
        location=[center_lat, center_lon],
        tiles="CartoDB dark_matter",
        zoom_start=4,
        control_scale=True,
    )

    # --- one FeatureGroup per CATEGORY ---
    categories = sorted(df["category"].dropna().unique())
    fg_by_category: Dict[str, FeatureGroup] = {}
    for cat in categories:
        fg = FeatureGroup(name=cat, show=True)
        fg.add_to(m)
        fg_by_category[cat] = fg

    # --- drop rows with missing category (optional) ---
    df = df[df["category"].notna()].copy()

    for _, row in df.iterrows():
        cat = row["category"]
        loc_text = row.get("location_text") or "Unknown location"
        family = classify_family(cat)
        color = FAMILY_COLORS.get(family, "#ffd93b")
        popup_txt = f"{cat} ‚Äî {loc_text}"

        fg = fg_by_category.get(cat)
        if fg is None:
            continue  # should not happen

        if family == "KINETIC":
            # circle = kinetic violence
            folium.CircleMarker(
                location=[row["lat"], row["lon"]],
                radius=6,
                color=None,
                fill=True,
                fill_opacity=0.9,
                fill_color=color,
                popup=popup_txt,
            ).add_to(fg)

        elif family == "SECURITY":
            # diamond (rotated square) = security / law enforcement
            RegularPolygonMarker(
                location=[row["lat"], row["lon"]],
                number_of_sides=4,
                radius=7,
                rotation=0,
                weight=1.5,
                color=color,
                fill=True,
                fill_opacity=0.9,
                popup=popup_txt,
            ).add_to(fg)

        else:
            # triangle = civilian impact / disasters
            RegularPolygonMarker(
                location=[row["lat"], row["lon"]],
                number_of_sides=3,
                radius=7,
                rotation=30,
                weight=1.5,
                color=color,
                fill=True,
                fill_opacity=0.9,
                popup=popup_txt,
            ).add_to(fg)

    # LayerControl now shows *all categories*
    LayerControl(collapsed=False).add_to(m)

    # Shape + color legend for the 3 families
    add_shape_family_legend(m)
    m.save("final_map.html")
    LAST_GENERATED_MAP = m
    display(m)
    print(f"[MapTool] Interactive map rendered")
    return {"status": "ok"}





# ----------------- AGENT 6: REPORTER + MAP ORCHESTRATOR -----------------

AGENT6_SYSTEM_INSTRUCTIONS = """
You are Agent6_Reporter in a multi-agent OSINT pipeline.

You have two things:
1. A map-generation tool: make_interactive_map().
   - When you call it, it renders an interactive global map
     of all incidents with coordinates in the notebook output.
2. A textual context describing counts and samples of incidents.

Your tasks (STRICT):

1. Call make_interactive_map once,
   so that the notebook shows an interactive global incident map
   for the days involved.

2. After the map tool has been called, write a GRAND MARKDOWN SUMMARY
   of the incident data based on the context you are given. The summary
   should be intended for a human analyst reading the notebook.

   The report MUST include:
   - A section "## Overview" that describes:
       * scale of activity across both days
       * which categories seem most common
       * any high-level spatial intuition (e.g., cluster in certain regions)
         based on the context (do not hallucinate exact coordinates).
   - A section "## 27 November 2025" summarizing:
       * key categories and notable patterns for that day
   - A section "## 28 November 2025" summarizing:
       * key categories and notable patterns for that day
   - A section "## Analyst Notes & Caveats" that briefly mentions:
       * potential biases in the source channels
       * limitations of geocoding and categorization
       * that this is a day-based slice, not a full operational picture.

Important:
- ALWAYS call the map tool exactly once before finalizing the report.
- Do NOT attempt to describe the visual style of the map in great detail.
  It is enough to refer to it as an interactive incident map.
- Base all quantitative statements ONLY on the context text you receive.
- Stay under ~500 words total.
""".strip()


def create_agent6_reporter() -> LlmAgent:
    """
    Create Agent 6 (LlmAgent) with the interactive map function as a tool.
    This is the ONLY agent in the pipeline that uses a FunctionTool.
    """
    agent = LlmAgent(
        name="Agent6_Reporter",
        model=Gemini(
            model="gemini-2.5-flash-lite",
            api_key=GOOGLE_API_KEY,
            retry_options=retry_config,
        ),
        description="Final reporter agent that renders an interactive global map and writes a grand summary.",
        instruction=AGENT6_SYSTEM_INSTRUCTIONS,
        tools=[FunctionTool(func=make_interactive_map)],  # map tool
    )
    return agent


# ----------------- RUNNER FOR AGENT 6 (NOTEBOOK ENTRYPOINT) -----------------


async def run_agent6_once():
    """
    Build context, run Agent 6 with the interactive-map tool,
    and print the final markdown report.
    """
    context_text = build_context_for_agent6()  

    user_prompt = f"""
You are provided with the following incident context text:

{context_text}

Remember your tasks:
1. You MUST call the `make_interactive_map` tool exactly once.
2. AFTER the tool has been called and you have seen its result,
   you MUST send a final assistant message that is a concise markdown report.

Output rules:
- Call the tool first.
- Then send ONE final assistant message containing ONLY the markdown report
  (no code, no tool calls, no extra commentary).
""".strip()

    agent = create_agent6_reporter()      # Agent6_Reporter constructor
    runner = InMemoryRunner(agent=agent)
    print("[Agent6] Reporter agent created. Running with debug trace...\n")

    events = await runner.run_debug(user_prompt)

    # --- extract final assistant text, skipping pure tool-call events ---
    final_text = None

    all_assistant_text = []

    for e in events:
        if getattr(e, "author", "") != "assistant":
            continue

        content = getattr(e, "content", None)
        if not content:
            continue

        for p in getattr(content, "parts", []):
            if hasattr(p, "text") and p.text:
                all_assistant_text.append(p.text.strip())

    summary_md = "\n\n".join(t for t in all_assistant_text if t)

    if not summary_md:
        print("[Agent6] DEBUG: No assistant markdown found. Events dump:")
        for e in events:
            print(e)
        summary_md = None

        
        # ---- CAPTURE MAP FROM TOOL ----
        if LAST_GENERATED_MAP is None:
            raise RuntimeError("Agent 6 did not generate an interactive map")
        
        print("\n===== Agent 6 Final Report =====\n")
        print(final_text)
        return {
            "map_html_path": "final_map.html",
            "markdown": summary_md,
        }





In [None]:
#Result for 2025-11-27, 2025-11-28
#await run_agent6_once()


[Agent6] Reporter agent created. Running with debug trace...


 ### Created new session: debug_session_id

User > You are provided with the following incident context text:

INCIDENT CONTEXT FOR AGENT 6 (GLOBAL VIEW)

Total incidents with coordinates: 45
Days covered: 2025-11-27, 2025-11-28

PER-DAY INCIDENT COUNTS:
- 2025-11-27: 25 incidents
- 2025-11-28: 20 incidents

PER-DAY CATEGORY BREAKDOWN:
2025-11-27:
  - attack: 7
  - disaster: 5
  - arrest_detention: 3
  - drone_strike: 3
  - casualty_death: 1
  - explosion: 1
  - kidnapping: 1
  - movement: 1
  - raid_search: 1
  - shelling: 1
  - smuggling: 1
2025-11-28:
  - attack: 12
  - disaster: 2
  - casualty_death: 1
  - drone_strike: 1
  - explosion: 1
  - movement: 1
  - raid_search: 1
  - weapons_seizure: 1

SAMPLE INCIDENTS (for qualitative flavor):
- 2025-11-28 | attack | Bannu
- 2025-11-28 | attack | Peshawar, Khyber Pakhtunkhwa, Pakistan
- 2025-11-28 | movement | Khyber Pakhtunkhwa
- 2025-11-28 | attack | Bannu, Khyber Pakhtunk



[MapTool] Interactive map rendered for mode='all'.
Agent6_Reporter > ## Grand Summary of Incidents (27-28 November 2025)

### Overview

The provided data covers a two-day period with a total of 45 incidents that have been geocoded. The activity appears to be most concentrated on **27 November 2025**, with 25 incidents, followed by **28 November 2025** with 20 incidents. The most frequently reported incident categories across both days are **attacks**, with a notable presence of **disasters** and **drone strikes**. Preliminary spatial observation suggests activity clusters in South Asia and potentially the Middle East, though exact locations are not specified here.

### 27 November 2025

This day saw 25 incidents. The dominant category was **attack** (7 incidents), followed by **disaster** (5 incidents). Other reported incidents included arrests, drone strikes, and casualties.

### 28 November 2025

A total of 20 incidents were recorded on this day. **Attacks** remained the most common 

In [None]:
#Results for 2025-12-14

# Run Agent 6 and get markdown summary
summary_md = await run_agent6_once()

from pathlib import Path
import pickle

# ---- Persist artifacts to disk ----
MAP_HTML_PATH = Path("final_map.html")
RESULT_PATH = Path("final_result.pkl")

# Safety check: map should already be written by the tool
if not MAP_HTML_PATH.exists():
    raise RuntimeError("Expected map HTML file was not created by Agent 6.")

# ---- Serialize only artifact paths ----
final_result = {
    "map_html_path": str(MAP_HTML_PATH),
}

with RESULT_PATH.open("wb") as f:
    pickle.dump(final_result, f)



[Agent6] Reporter agent created. Running with debug trace...


 ### Created new session: debug_session_id

User > You are provided with the following incident context text:

INCIDENT CONTEXT FOR AGENT 6 (GLOBAL VIEW)

Total incidents with coordinates: 16
Days covered: 2025-12-14

PER-DAY INCIDENT COUNTS:
- 2025-12-14: 16 incidents

PER-DAY CATEGORY BREAKDOWN:
2025-12-14:
  - attack: 6
  - drone_strike: 4
  - arrest_detention: 2
  - disaster: 2
  - kidnapping: 1
  - weapons_seizure: 1

SAMPLE INCIDENTS (for qualitative flavor):
- 2025-12-14 | disaster | Kandahar, Afghanistan
- 2025-12-14 | kidnapping | Panjgur, Balochistan, Pakistan
- 2025-12-14 | attack | Khurram, Khyber Pakhtunkhwa, Pakistan
- 2025-12-14 | attack | Kurram, Pakistan
- 2025-12-14 | arrest_detention | Bannu, Khyber Pakhtunkhwa, Pakistan
- 2025-12-14 | attack | Sydney
- 2025-12-14 | attack | Bondi Beach, Sydney, Australia
- 2025-12-14 | drone_strike | Sudan
- 2025-12-14 | drone_strike | Abyei, Sudan
- 2025-12-14 | disaste



[MapTool] Interactive map rendered for mode='all'.
Agent6_Reporter > ## Grand Summary of Incidents (2025-12-14)

### Overview

This report covers a single day of activity, 2025-12-14, which recorded a total of 16 incidents with reported coordinates. The most common incident categories observed were 'attack' (6 incidents) and 'drone_strike' (4 incidents). Preliminary spatial analysis of the incident data suggests activity clusters in South Asia, specifically Pakistan and Afghanistan, as well as notable incidents in Australia and Sudan.

### 14 December 2025

On 2025-12-14, the reported incidents were predominantly categorized as 'attack' (6) and 'drone_strike' (4). Other categories included 'arrest_detention' (2), 'disaster' (2), 'kidnapping' (1), and 'weapons_seizure' (1). Several 'attack' and 'disaster' incidents were noted in the Sydney area of Australia, including Bondi Beach. Pakistan also featured prominently with multiple 'attack', 'drone_strike', and 'arrest_detention' incidents

In [None]:
try:
    if DB_PATH.exists():
        DB_PATH.unlink()
except Exception as e:
    print(f"[DB] Cleanup failed: {e}")

---

# üéØ Final Remarks

This concludes the full OSINT incident pipeline:

Raw Telegram messages ‚Üí  Cleaned, structured events ‚Üí  Filtered to real incidents ‚Üí Deduped ‚Üí Categorized ‚Üí Geocoded ‚Üí  Mapped + summarized automatically 

The entire workflow is reproducible, transparent, and modular, making it easy to improve or scale.  
It showcases how **agentic AI + geospatial intelligence** can transform unstructured data streams into actionable situational awareness.

Feedback is welcome ‚Äî the pipeline is designed to evolve.

---
