In [None]:
!pip install -q pydantic google-genai


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
import os
import json
from pydantic import BaseModel, Field
from typing import Literal, Optional, List, Tuple
from google import genai


client = genai.Client(
    api_key=os.environ.get("GEMINI_API_KEY"),
)

incidents_file = "../data/incidents-dataset-20250906.json"

In [None]:
class Incident(BaseModel):
    """Represents a single security incident."""

    proto: Literal["eth", "sol", "bnb", "btc", "other"] = Field(
        description="The blockchain protocol involved, like 'eth', 'sol', or 'btc'."
    )
    type: str = Field(
        description="The general category of the incident. Either: scam, exploit, rugpull, or other (name)."
    )
    exploit_type: str = Field(
        description="The specific type of exploit (e.g., 'Re-entrancy', 'Price Oracle Manipulation', 'Access Control')."
    )
    total_value_extracted: str = Field(
        description="The total value extracted in USD or equivalent (e.g., '$3M', '5 BTC'). If multiple currencies, list them separated by commas."
    )
    hack_date: str = Field(description="The date of the hack in YYYY-MM-DD format.")
    hacked_app_name: str = Field(
        description="The name of the hacked application or protocol."
    )
    hacked_app_website: Optional[str] = Field(
        default=None, description="The website of the hacked application, if known."
    )
    vuln_contract: Optional[str] = Field(
        default=None,
        description="The address of the vulnerable smart contract, if known.",
    )
    attacker_addr: Optional[str] = Field(
        default=None, description="The address of the attacker, if known."
    )
    attack_complexity: Literal["low", "medium", "high"] = Field(
        description="The complexity of the attack."
    )
    actor_name: Optional[str] = Field(
        default=None, description="The name of the attacker or group, if known."
    )
    actor_type: Optional[str] = Field(
        default=None,
        description="The type of actor involved in the incident. Either: individual, group, other (name).",
    )
    tags: List[str] = Field(
        description="A list of 4-10 short, lower-kebab tags (2-4 words each) describing the incident."
    )


tagging_instructions = """
Tagging rules (free-form; not from a fixed list):
- Produce 4-10 short tags; 2-4 words each; lower-kebab; no stopwords; no symbols.
- Produce 3-6 long tags; 5-12 words each; grammatical mini-phrases.
- Produce 4-10 keyphrases (noun phrases); plain text.
- Avoid duplicates and banalities (e.g., 'hack', 'crypto').

Examples: 'proxy-upgrade-takeover', 'flashloan-oracle-drift'
"""


def classify_incident(incident_description: str) -> Incident:
    """Classifies an incident description using Gemini Flash with structured output."""

    prompt = f"""
You extract a structured Incident. Use free-form semantic tags (no fixed vocab).
Follow these rules strictly.

{tagging_instructions}

Description:
{incident_description}
"""

    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt,
        config={
            "response_mime_type": "application/json",
            "response_schema": Incident,
            "thinking_config": {"thinking_budget": 0},
        },
    )

    inc: Incident = response.candidates[0].content.parts[0]
    return inc

In [75]:
def classify_incidents(incidents: dict) -> Tuple[bool, List[Incident]]:
    """Classifies multiple incident descriptions using Gemini Flash with structured output."""
    classified_incidents = []
    for i in incidents:
        try:
            incident = classify_incident(i["content"])
        except Exception as e:
            print(f"Error classifying incident: {e}")
            return False, classified_incidents
        classified_incidents.append({**i, **incident.model_dump()})
        print(json.loads(incident.text))
    return True, classified_incidents

In [None]:
# Classify Incidents and save
with open(incidents_file, "r") as f:
    data = json.load(f)
# data = [data[15]]  # Limit to one for testing
status, classified_incidents = classify_incidents(data)

with open(
    f"../data/incidents-dataset-20250906-classified{'-part' if not status else ''}.json",
    "w",
) as f:
    json.dump(classified_incidents, f, indent=2)

{'proto': 'bnb', 'type': 'exploit', 'exploit_type': 'Improper access control', 'total_value_extracted': '$4.5M', 'hack_date': '2021-06-15', 'hacked_app_name': 'Eleven Finance', 'hacked_app_website': None, 'vuln_contract': '0xdb2d590aCe7cAe51DF1fB3312738038Ec032Bf33', 'attacker_addr': '0xdb2d590aCe7cAe51DF1fB3312738038Ec032Bf33', 'attack_complexity': 'medium', 'actor_name': '0xdeadf4ce', 'actor_type': 'individual', 'tags': ['emergency-burn-exploit', 'flashloan-attack', 'yield-aggregator-exploit', 'bsc-exploit', 'nrv-vault-exploit', 'dumb-logic-issue', 'intermediate-vault-vulnerability', 'binance-smart-chain-hack']}
{'proto': 'eth', 'type': 'exploit', 'exploit_type': 'Integer Underflow', 'total_value_extracted': '$5M', 'hack_date': '2025-03-05', 'hacked_app_name': '1inch Fusion v1 resolver', 'hacked_app_website': 'null', 'vuln_contract': '0xa88800cd213da5ae406ce248380802bd53b47647', 'attacker_addr': '0xa7264a43a57ca17012148c46adbc15a5f951766e', 'attack_complexity': 'medium', 'actor_name'

In [None]:
newincidents = [json.loads(incident["text"]) for incident in classified_incidents]
with open(
    f"../data/incidents-dataset-20250906-classified-new{'-part' if not status else ''}.json",
    "w",
) as f:
    json.dump(newincidents, f, indent=2)