# GEOEventFusion — Developer Sandbox

Experimental scratch notebook for iterating on individual pipeline components  
without running the full pipeline. Use this to:

- Test individual agent logic in isolation
- Inspect raw GDELT API responses
- Prototype new analysis functions
- Debug specific pipeline phases

**This notebook is NOT the canonical entry point.** See `quickstart.ipynb` for production use.

## Setup

In [1]:
!git clone https://github.com/dshipley71/GEOEventFusion.git

Cloning into 'GEOEventFusion'...
remote: Enumerating objects: 203, done.[K
remote: Counting objects: 100% (203/203), done.[K
remote: Compressing objects: 100% (184/184), done.[K
remote: Total 203 (delta 36), reused 140 (delta 10), pack-reused 0 (from 0)[K
Receiving objects: 100% (203/203), 253.75 KiB | 3.13 MiB/s, done.
Resolving deltas: 100% (36/36), done.


In [2]:
%cd GEOEventFusion/
%ls

/content/GEOEventFusion
AGENTS.md     DIRECTORY_STRUCTURE.md  [0m[01;34moutputs[0m/              [01;34mscripts[0m/
CHANGELOG.md  [01;34mdocs[0m/                   pyproject.toml        skills.md
CLAUDE.md     [01;34mgeoeventfusion[0m/         README.md             [01;34mtests[0m/
[01;34mconfig[0m/       LICENSE                 requirements-dev.txt
[01;34mdata[0m/         [01;34mnotebooks[0m/              requirements.txt


In [4]:
!pip install -r requirements-dev.txt --quiet

Collecting feedparser>=6.0.11 (from -r requirements.txt (line 10))
  Downloading feedparser-6.0.12-py3-none-any.whl.metadata (2.7 kB)
Collecting trafilatura>=1.9.0 (from -r requirements.txt (line 11))
  Downloading trafilatura-2.0.0-py3-none-any.whl.metadata (12 kB)
Collecting newspaper3k>=0.2.8 (from -r requirements.txt (line 12))
  Downloading newspaper3k-0.2.8-py3-none-any.whl.metadata (11 kB)
Collecting anthropic>=0.34.0 (from -r requirements.txt (line 18))
  Downloading anthropic-0.84.0-py3-none-any.whl.metadata (3.0 kB)
Collecting ollama>=0.3.0 (from -r requirements.txt (line 19))
  Downloading ollama-0.6.1-py3-none-any.whl.metadata (4.3 kB)
Collecting Levenshtein>=0.25.0 (from -r requirements.txt (line 37))
  Downloading levenshtein-0.27.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (3.7 kB)
Collecting pytest-cov>=5.0.0 (from -r requirements-dev.txt (line 9))
  Downloading pytest_cov-7.0.0-py3-none-any.whl.metadata (31 kB)
Collecting pytest-mock>=3.14.0 

In [6]:
%pwd

'/content/GEOEventFusion'

In [5]:
import sys
import json
import logging
from pathlib import Path

# Ensure project root is on path when running from notebooks/
_ROOT = Path().resolve().parent
if str(_ROOT) not in sys.path:
    sys.path.insert(0, str(_ROOT))

logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)s — %(message)s')
print(f'Project root: {_ROOT}')

Project root: /content


In [7]:
from dotenv import load_dotenv
load_dotenv()

from config.settings import PipelineConfig

# Minimal test config — no real API calls
config = PipelineConfig(
    query='Houthi Red Sea attacks',
    days_back=30,
    llm_backend='ollama',
    max_records=50,
    test_mode=True,
    log_level='DEBUG',
)
print('Config loaded:', config.query)

Config loaded: Houthi Red Sea attacks


In [9]:
import rich

# TODO Fix default ollama host to ue cloud instead of local

config.ollama_host = "http://ollama.com"
rich.print(config)

## Test GDELT Client

In [33]:
# Direct GDELT API call — inspect raw response
from geoeventfusion.clients.gdelt_client import GDELTClient

client = GDELTClient(
    max_retries=config.gdelt_max_retries,
    backoff_base=config.gdelt_backoff_base,
    request_timeout=config.gdelt_request_timeout,
)

# TODO: Fix fetch to allow for timespan option vs start/end date

# Fetch a small article list
response = client.fetch(
    query='ICE protests in Minneapolis',
    mode='ArtList',
    max_records=100,
    sort='DateDesc',
    start_date="20260101000000", # YYYYMMDDHHMMSS or YYYY-MM-DD format
    end_date="20260227000000", # YYYYMMDDHHMMSS or YYYY-MM-DD format,
    # timespan='7d',
)

print('GDELTClient fetch call to run a live query COMPLETED)')



GDELTClient fetch call to run a live query COMPLETED)


In [34]:
print(f"Number of Articles: {len(response["articles"])}\n")

print(json.dumps(response, indent=2))

TypeError: 'NoneType' object is not subscriptable

## Test Spike Detector

In [29]:
from geoeventfusion.analysis.spike_detector import detect_spikes
from geoeventfusion.models.events import TimelineStep
from collections import Counter

articles = response.get("articles", [])

# ── Parse seendate → YYYY-MM-DD ──────────────────────────────────────────────
def _parse_date(raw: str) -> str:
    """Normalise GDELT seendate variants to YYYY-MM-DD.
    Handles: '20260220T120000Z', '2026-02-20', '20260220120000'.
    """
    s = raw.replace("-", "").replace("T", "").replace("Z", "").strip()
    # s is now digits only, at least 8
    return f"{s[:4]}-{s[4:6]}-{s[6:8]}"

# ── Aggregate article count per calendar day ──────────────────────────────────
date_counts: Counter = Counter()
for art in articles:
    raw_date = art.get("seendate", "")
    if raw_date:
        try:
            date_counts[_parse_date(raw_date)] += 1
        except (ValueError, IndexError):
            pass  # skip malformed dates

if not date_counts:
    print("No date data found in response — check that Cell 10 ran successfully.")
else:
    # Build sorted TimelineStep list (one entry per day)
    steps = [
        TimelineStep(date=d, value=float(c))
        for d, c in sorted(date_counts.items())
    ]

    print(f"Timeline built from {len(articles)} articles across {len(steps)} day(s):")
    for s in steps:
        print(f"  {s.date}  articles={int(s.value)}")

    if len(steps) >= 3:
        spikes = detect_spikes(steps, z_threshold=1.5)
        print(f"\nDetected {len(spikes)} spike(s):")
        for s in spikes:
            print(f"  [{s.rank}] {s.date}  Z={s.z_score:.2f}  vol={s.volume}")
    else:
        print("\n(Need ≥3 data points for spike detection — fetch more articles or widen the date range.)")

# from geoeventfusion.analysis.spike_detector import detect_spikes
# from geoeventfusion.models.events import TimelineStep

# # Build a synthetic timeline with one clear spike
# steps = [
#     TimelineStep(date=f'2024-01-{i:02d}', value=2.0)
#     for i in range(1, 28)
# ]
# steps[14] = TimelineStep(date='2024-01-15', value=9.5)  # Spike
# steps[24] = TimelineStep(date='2024-01-25', value=8.0)  # Second spike

# spikes = detect_spikes(steps, z_threshold=1.5)
# print(f'Detected {len(spikes)} spikes:')
# for s in spikes:
#     print(f'  [{s.rank}] {s.date}  Z={s.z_score:.2f}  vol={s.volume}')

Timeline built from 100 articles across 1 day(s):
  2026-02-27  articles=100

(Need ≥3 data points for spike detection — fetch more articles or widen the date range.)


## Test Actor Graph

In [14]:
from geoeventfusion.analysis.actor_graph import build_actor_graph

triples = [
    ('Houthi', 'United States', '2024-01-15'),
    ('Houthi', 'Yemen', '2024-01-15'),
    ('United States', 'United Kingdom', '2024-01-16'),
    ('Iran', 'Houthi', '2024-01-17'),
    ('Houthi', 'United States', '2024-01-18'),
    ('Iran', 'United States', '2024-01-19'),
]

graph = build_actor_graph(triples, hub_top_n=3, broker_ratio_threshold=0.5)
print(f'Nodes: {len(graph.nodes)}  Edges: {len(graph.edges)}')
print()
for node in sorted(graph.nodes, key=lambda n: n.pagerank, reverse=True)[:5]:
    print(f'  {node.name:<25} role={node.role:<12} pagerank={node.pagerank:.4f}')

Nodes: 5  Edges: 5

  Houthi                    role=Hub          pagerank=0.3193
  United States             role=Hub          pagerank=0.3193
  Iran                      role=Hub          pagerank=0.1657
  Yemen                     role=Peripheral   pagerank=0.0979
  United Kingdom            role=Peripheral   pagerank=0.0979


## Test Query Builder

In [None]:
from geoeventfusion.analysis.query_builder import QueryBuilder

qb = QueryBuilder(
    base_query='Houthi Red Sea attacks',
    repeat_threshold=3,
    near_window=15,
    near_min_term_length=5,
    tone_negative_threshold=-5.0,
    toneabs_threshold=8.0,
)

# Build query variants
print('Repeat query:    ', qb.build_repeat_query())
print('Tone-neg query:  ', qb.build_tone_negative_query())
print('High-emotion:    ', qb.build_high_emotion_query())

## Test LLM Client

In [None]:
from geoeventfusion.clients.llm_client import LLMClient

# Instantiate (no call made yet)
llm = LLMClient(
    backend=config.llm_backend,
    anthropic_model=config.anthropic_model,
    ollama_model=config.ollama_model,
    ollama_host=config.ollama_host,
    anthropic_api_key=config.anthropic_api_key,
    max_confidence=config.max_confidence,
)
print(f'LLMClient backend: {llm.backend}')
print(f'Max confidence cap: {llm.max_confidence}')

# Uncomment to make a live test call:
# response = llm.call(
#     system='You are a geopolitical analyst.',
#     prompt='In one sentence, what is the Houthi movement?',
#     max_tokens=100,
# )
# print('LLM response:', response)

## Inspect Fixture Data

In [None]:
# Load and inspect the test fixtures
fixtures_dir = _ROOT / 'tests' / 'fixtures'

with open(fixtures_dir / 'sample_artlist.json', encoding='utf-8') as f:
    artlist = json.load(f)

articles = artlist.get('articles', [])
print(f'Fixture articles: {len(articles)}')
for a in articles[:3]:
    print(f'  [{a.get("seendate", "")}] {a.get("title", "")[:70]}')

## Run Full Pipeline (optional)

In [None]:
# Uncomment to run the full pipeline with test fixtures (no real API calls)
# from geoeventfusion.pipeline import run_pipeline
# context = run_pipeline(config)
# print(f'Run ID: {context.run_id}')
# print(f'Warnings: {context.warnings}')
print('Uncomment the block above to run the full pipeline in test mode.')