## Setup & Configuration


In [None]:
!uv pip install google-genai pandas tqdm

In [None]:
import json
import os
import time
from pathlib import Path

import pandas as pd
from google import genai
from google.genai import types

In [None]:
# Configuration
DATA_DIR = Path("./edgartools-data")
OUTPUT_CSV = Path("./llm_energy_scores.csv")
SYSTEM_PROMPT_FILE = Path("./system_prompt.txt")
BATCH_REQUESTS_DIR = Path("./batch_requests")
BATCH_REQUESTS_DIR.mkdir(exist_ok=True)

MODEL_NAME = "gemini-2.5-pro"
BATCH_SIZE = 100  # Number of filings per batch job
POLL_INTERVAL = 60  # Seconds between status checks

# Gemini API setup
API_KEY = os.getenv("GEMINI_API_KEY")
if not API_KEY:
    raise ValueError("GEMINI_API_KEY environment variable not set")

client = genai.Client(api_key=API_KEY)

## Load System Prompt


In [None]:
with open(SYSTEM_PROMPT_FILE, "r") as f:
    SYSTEM_PROMPT = f.read()

print(f"Loaded system prompt ({len(SYSTEM_PROMPT)} chars)")
print(f"First 200 chars: {SYSTEM_PROMPT[:200]}...")

## Discover All Filings


In [None]:
def discover_filings(data_dir):
    """Discover all .md filings in the data directory."""
    filings = []

    for filepath in data_dir.rglob("*.md"):
        parts = filepath.parts
        ticker = parts[-3]
        form = parts[-2]
        date = filepath.stem

        filings.append(
            {"ticker": ticker, "form": form, "date": date, "filepath": str(filepath)}
        )

    return pd.DataFrame(filings)


filings_df = discover_filings(DATA_DIR)
print(f"Discovered {len(filings_df)} filings")
print(f"Tickers: {filings_df['ticker'].nunique()}")
print(f"Forms: {filings_df['form'].value_counts().to_dict()}")
filings_df.head()

## Load Previous Results (for Resumability)


In [None]:
def load_existing_results(output_csv):
    """Load already processed results to skip them."""
    if output_csv.exists():
        df = pd.read_csv(output_csv)
        print(f"Loaded {len(df)} existing results")
        return df
    else:
        print("No existing results found, starting fresh")
        return pd.DataFrame(
            columns=[
                "ticker",
                "form",
                "date",
                "filepath",
                "chain_of_thought",
                "estimated_mw",
                "score",
                "confidence",
                "batch_job_name",
                "timestamp",
            ]
        )


existing_results = load_existing_results(OUTPUT_CSV)

## Filter Pending Filings


In [None]:
def get_pending_filings(all_filings, existing_results):
    """Return filings that haven't been processed yet."""
    if len(existing_results) == 0:
        return all_filings

    # Create unique key for matching
    all_filings["key"] = (
        all_filings["ticker"] + "_" + all_filings["form"] + "_" + all_filings["date"]
    )
    existing_results["key"] = (
        existing_results["ticker"]
        + "_"
        + existing_results["form"]
        + "_"
        + existing_results["date"]
    )

    processed_keys = set(existing_results["key"])
    pending = all_filings[~all_filings["key"].isin(processed_keys)].copy()
    pending = pending.drop(columns=["key"])

    return pending


pending_filings = get_pending_filings(filings_df, existing_results)
print(f"Pending filings to process: {len(pending_filings)}")
print(f"Already processed: {len(existing_results)}")

## Create Batch Request Files


In [None]:
def read_filing_text(filepath, max_chars=100000):
    """Read filing text, truncate if too long to manage costs."""
    try:
        with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
            content = f.read()

        # Truncate if very long (to control token costs)
        if len(content) > max_chars:
            content = content[:max_chars] + "\n\n[TRUNCATED FOR LENGTH]"

        return content
    except Exception as e:
        print(f"Error reading {filepath}: {e}")
        return None


def create_batch_request_file(filings_batch, batch_idx, system_prompt):
    """Create a JSONL file for batch API submission."""
    jsonl_path = BATCH_REQUESTS_DIR / f"batch_{batch_idx:04d}.jsonl"

    with open(jsonl_path, "w") as f:
        for idx, row in filings_batch.iterrows():
            filing_text = read_filing_text(row["filepath"])
            if filing_text is None:
                continue

            # Create unique key for this request
            request_key = f"{row['ticker']}_{row['form']}_{row['date']}"

            # Build the request
            request = {
                "key": request_key,
                "request": {
                    "contents": [
                        {
                            "parts": [
                                {
                                    "text": f"{system_prompt}\n\n---\n\nFILING TO ANALYZE:\n\n{filing_text}"
                                }
                            ],
                            "role": "user",
                        }
                    ],
                    "generation_config": {
                        "temperature": 0.1,  # Low temperature for consistency
                        "response_mime_type": "application/json",
                    },
                },
            }

            f.write(json.dumps(request) + "\n")

    return jsonl_path

## Submit Batch Jobs & Monitor


In [None]:
def submit_batch_job(jsonl_path, batch_idx):
    """Upload JSONL and create a batch job."""
    try:
        # Upload the JSONL file
        print(
            f"Uploading batch {batch_idx} ({jsonl_path.stat().st_size / 1024 / 1024:.2f} MB)..."
        )
        uploaded_file = client.files.upload(
            file=str(jsonl_path),
            config=types.UploadFileConfig(
                display_name=f"batch_{batch_idx:04d}", mime_type="application/jsonl"
            ),
        )
        print(f"Uploaded file: {uploaded_file.name}")

        # Create the batch job
        batch_job = client.batches.create(
            model=f"models/{MODEL_NAME}",
            src=uploaded_file.name,
            config={
                "display_name": f"energy-scoring-batch-{batch_idx:04d}",
            },
        )
        print(f"Created batch job: {batch_job.name}")
        return batch_job.name

    except Exception as e:
        print(f"Error submitting batch {batch_idx}: {e}")
        return None


def monitor_batch_job(job_name, poll_interval=60):
    """Monitor a batch job until completion."""
    completed_states = {
        "JOB_STATE_SUCCEEDED",
        "JOB_STATE_FAILED",
        "JOB_STATE_CANCELLED",
        "JOB_STATE_EXPIRED",
    }

    print(f"Monitoring job: {job_name}")
    start_time = time.time()

    while True:
        batch_job = client.batches.get(name=job_name)
        state = batch_job.state.name
        elapsed = time.time() - start_time

        print(f"[{elapsed / 60:.1f}m] State: {state}")

        if state in completed_states:
            print(f"Job finished with state: {state}")
            if state == "JOB_STATE_FAILED":
                print(f"Error: {batch_job.error}")
            return batch_job

        time.sleep(poll_interval)

## Parse Results & Save to CSV


In [None]:
def parse_and_save_results(batch_job, filings_batch, batch_idx):
    """Download results, parse JSON, and append to CSV."""
    if batch_job.state.name != "JOB_STATE_SUCCEEDED":
        print(f"Batch {batch_idx} did not succeed, skipping")
        return

    # Download result file
    result_file_name = batch_job.dest.file_name
    print(f"Downloading results from: {result_file_name}")
    file_content_bytes = client.files.download(file=result_file_name)
    file_content = file_content_bytes.decode("utf-8")

    # Parse JSONL results
    results = []
    for line in file_content.splitlines():
        if not line.strip():
            continue

        try:
            parsed = json.loads(line)

            # Extract request key
            if "key" not in parsed:
                continue

            request_key = parsed["key"]
            ticker, form, date = request_key.split("_", 2)

            # Get the corresponding filing info
            filing_info = filings_batch[
                (filings_batch["ticker"] == ticker)
                & (filings_batch["form"] == form)
                & (filings_batch["date"] == date)
            ]

            if len(filing_info) == 0:
                print(f"Warning: Could not find filing for key {request_key}")
                continue

            filepath = filing_info.iloc[0]["filepath"]

            # Parse the LLM response
            if "response" in parsed and parsed["response"]:
                response_text = parsed["response"]["candidates"][0]["content"]["parts"][
                    0
                ]["text"]

                # Parse JSON from response
                try:
                    llm_output = json.loads(response_text)

                    results.append(
                        {
                            "ticker": ticker,
                            "form": form,
                            "date": date,
                            "filepath": filepath,
                            "chain_of_thought": llm_output.get("chain_of_thought", ""),
                            "estimated_mw": llm_output.get("estimated_mw", -1),
                            "score": llm_output.get("score", -1),
                            "confidence": llm_output.get("confidence", 0),
                            "batch_job_name": batch_job.name,
                            "timestamp": pd.Timestamp.now(),
                        }
                    )
                except json.JSONDecodeError as e:
                    print(f"Error parsing LLM JSON for {request_key}: {e}")
                    print(f"Response text: {response_text[:200]}...")

            elif "error" in parsed:
                print(f"Error for {request_key}: {parsed['error']}")

        except Exception as e:
            print(f"Error processing line: {e}")
            print(f"Line: {line[:200]}...")

    # Append to CSV
    if len(results) > 0:
        results_df = pd.DataFrame(results)

        # Append to existing CSV or create new
        if OUTPUT_CSV.exists():
            results_df.to_csv(OUTPUT_CSV, mode="a", header=False, index=False)
        else:
            results_df.to_csv(OUTPUT_CSV, index=False)

        print(f"Saved {len(results)} results to {OUTPUT_CSV}")
        return results_df
    else:
        print("No valid results to save")
        return None

## Main Processing Loop


In [None]:
def process_all_batches(pending_filings, batch_size=100):
    """Main loop: create batches, submit, monitor, and save results."""
    total_batches = (len(pending_filings) + batch_size - 1) // batch_size
    print(f"Processing {len(pending_filings)} filings in {total_batches} batches")

    for batch_idx in range(total_batches):
        print(f"\n{'=' * 80}")
        print(f"BATCH {batch_idx + 1}/{total_batches}")
        print(f"{'=' * 80}\n")

        # Get batch slice
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, len(pending_filings))
        filings_batch = pending_filings.iloc[start_idx:end_idx]

        print(f"Creating batch request file for {len(filings_batch)} filings...")
        jsonl_path = create_batch_request_file(filings_batch, batch_idx, SYSTEM_PROMPT)

        print("Submitting batch job...")
        job_name = submit_batch_job(jsonl_path, batch_idx)

        if job_name is None:
            print(f"Failed to submit batch {batch_idx}, skipping")
            continue

        print("Monitoring batch job...")
        batch_job = monitor_batch_job(job_name, POLL_INTERVAL)

        print("Parsing and saving results...")
        parse_and_save_results(batch_job, filings_batch, batch_idx)

        print(f"\nBatch {batch_idx + 1} complete!")

        # Small delay between batches to be respectful
        if batch_idx < total_batches - 1:
            print("Waiting 10 seconds before next batch...")
            time.sleep(10)

    print(f"\n{'=' * 80}")
    print("ALL BATCHES COMPLETE!")
    print(f"{'=' * 80}")

## Execute Pipeline


In [None]:
if len(pending_filings) > 0:
    process_all_batches(pending_filings, BATCH_SIZE)
else:
    print("No pending filings to process!")

## View Results


In [None]:
# Load final results
if OUTPUT_CSV.exists():
    final_results = pd.read_csv(OUTPUT_CSV)
    print(f"Total results: {len(final_results)}")
    print("\nScore distribution:")
    print(final_results["score"].describe())
    print("\nTickers with highest average scores:")
    print(
        final_results.groupby("ticker")["score"]
        .mean()
        .sort_values(ascending=False)
        .head(10)
    )
    final_results.head(10)
else:
    print("No results file found yet")

## Summary Statistics


In [None]:
if OUTPUT_CSV.exists():
    results = pd.read_csv(OUTPUT_CSV)

    # Filter out error cases
    valid_results = results[results["score"] >= 0]

    print(f"Total filings processed: {len(results)}")
    print(f"Valid scores: {len(valid_results)}")
    print(f"Error cases (score=-1): {len(results[results['score'] == -1])}")
    print("\nConfidence distribution:")
    print(valid_results["confidence"].describe())
    print("\nEstimated MW distribution:")
    print(valid_results[valid_results["estimated_mw"] >= 0]["estimated_mw"].describe())

    # Export summary by ticker
    ticker_summary = (
        valid_results.groupby("ticker")
        .agg(
            {
                "score": ["mean", "std", "count"],
                "estimated_mw": "mean",
                "confidence": "mean",
            }
        )
        .round(2)
    )
    ticker_summary.columns = ["_".join(col) for col in ticker_summary.columns]
    ticker_summary = ticker_summary.sort_values("score_mean", ascending=False)

    print("\nTicker summary:")
    print(ticker_summary)

    ticker_summary.to_csv("./ticker_energy_summary.csv")
    print("\nSaved ticker summary to ticker_energy_summary.csv")