In [None]:
# --- Cell 1: Imports and Setup --- ---
import json
import re
import openai
import pandas as pd
from sklearn.cluster import DBSCAN
from collections import defaultdict

# Setup OpenAI API key
openai.api_key = 'YOUR_OPENAI_API_KEY'


In [None]:
# --- Cell 2: Load JSON Alerts --- ---
def load_alerts(json_path):
    with open(json_path, 'r') as f:
        alerts = json.load(f)
    return alerts

# Example usage
alerts_json = load_alerts('alerts.json')


In [None]:
# --- LangChain LLM Support ---
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage

llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0)

def langchain_llm_extract(json_alert):
    prompt = f"""
    Given this alert JSON:
    {json.dumps(json_alert)}

    Extract the following fields precisely:
    id, time, description (desc), entities (comma-separated list), mitretechnique (MITRE ATT&CK ID, leave blank if unknown).

    Output Format:
    id:<value>
    time:<value>
    desc:<value>
    entities:<value>
    mitretechnique:<value>
    """

    response = llm([HumanMessage(content=prompt)]).content.strip()

    data = {}
    for line in response.split('\n'):
        key, value = line.split(':', 1)
        data[key.strip()] = value.strip()

    return data

def langchain_fill_missing_technique(alert):
    if alert['mitretechnique']:
        return alert['mitretechnique']

    prompt = f"""
    Given this cybersecurity alert description:
    {alert['desc']}

    Identify the most likely MITRE ATT&CK technique ID.

    Output only the technique ID:
    """

    response = llm([HumanMessage(content=prompt)]).content.strip()
    return response


In [None]:
# --- Cell 3: Automap JSON using Regex and LLM --- ---
def llm_extract(json_alert):
    prompt = f"""
    Given this alert JSON:
    {json.dumps(json_alert)}

    Extract the following fields precisely:
    id, time, description (desc), entities (comma-separated list), mitretechnique (MITRE ATT&CK ID, leave blank if unknown).

    Output Format:
    id:<value>
    time:<value>
    desc:<value>
    entities:<value>
    mitretechnique:<value>
    """

    response = openai.ChatCompletion.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        max_tokens=200
    )

    data = {}
    for line in response.choices[0].message.content.strip().split('\n'):
        key, value = line.split(':', 1)
        data[key.strip()] = value.strip()

    return data

# Process alerts
processed_alerts = []
for alert in alerts_json:
    mapped_alert = llm_extract(alert)
    processed_alerts.append(mapped_alert)


In [None]:
# --- Cell 4: Fill Missing MITRE Techniques --- ---
def fill_missing_technique(alert):
    if alert['mitretechnique']:
        return alert['mitretechnique']

    prompt = f"""
    Given this cybersecurity alert description:
    {alert['desc']}

    Identify the most likely MITRE ATT&CK technique ID.

    Output only the technique ID:
    """

    response = openai.ChatCompletion.create(
        model="gpt-4-turbo",
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        max_tokens=10
    )

    return response.choices[0].message.content.strip()

# Update alerts with missing MITRE techniques
for alert in processed_alerts:
    if not alert['mitretechnique']:
        alert['mitretechnique'] = fill_missing_technique(alert)


In [None]:
# --- Cell 5: Prepare Data for Clustering --- ---
df_alerts = pd.DataFrame(processed_alerts)

df_alerts['timestamp'] = pd.to_datetime(df_alerts['time'])
df_alerts['entities_set'] = df_alerts['entities'].apply(lambda x: set(x.split(',')))

# Create a numeric technique ID for clustering
df_alerts['technique_numeric'] = df_alerts['mitretechnique'].apply(lambda x: int(re.sub(r'[^\d]', '', x)))


In [None]:
# --- Cell 6: Clustering Alerts --- ---
features = df_alerts[['timestamp', 'technique_numeric']].copy()
features['timestamp'] = features['timestamp'].astype(int) / 1e9  # Convert timestamp to numerical format

clustering_model = DBSCAN(eps=3600, min_samples=1)
df_alerts['cluster'] = clustering_model.fit_predict(features)


In [None]:
# --- Cell 7: Sequencing Based on MITRE Tactics Causality --- ---
mitre_tactics_order = {  # simplified example
    "Initial Access": 1,
    "Execution": 2,
    "Persistence": 3,
    "Privilege Escalation": 4,
    "Defense Evasion": 5,
    "Credential Access": 6,
    "Discovery": 7,
    "Lateral Movement": 8,
    "Collection": 9,
    "Command and Control": 10,
    "Exfiltration": 11,
    "Impact": 12
}

# Simplified function to get tactic from MITRE technique
technique_to_tactic = lambda tech: "Execution"  # Replace this logic with actual mapping from MITRE ATT&CK

df_alerts['tactic'] = df_alerts['mitretechnique'].apply(technique_to_tactic)
df_alerts['tactic_order'] = df_alerts['tactic'].map(mitre_tactics_order)

# Sequence alerts within clusters by tactic causality and entities overlap
sequences = defaultdict(list)

for cluster_id in df_alerts['cluster'].unique():
    cluster_alerts = df_alerts[df_alerts['cluster'] == cluster_id].sort_values(by=['tactic_order', 'timestamp'])

    current_sequence = []
    seen_entities = set()

    for _, alert in cluster_alerts.iterrows():
        alert_entities = alert['entities_set']
        if not seen_entities or seen_entities.intersection(alert_entities):
            current_sequence.append(alert['id'])
            seen_entities.update(alert_entities)
        else:
            if current_sequence:
                sequences[cluster_id].append(current_sequence)
            current_sequence = [alert['id']]
            seen_entities = alert_entities

    if current_sequence:
        sequences[cluster_id].append(current_sequence)


In [None]:
# --- Cell 8: Output Sequences --- ---
for cluster_id, seq_list in sequences.items():
    print(f"\nCluster {cluster_id}:")
    for idx, seq in enumerate(seq_list, start=1):
        print(f"  Sequence {idx}: {seq}")