In [None]:
# Create a new incident alert
phishing_alert = {
    "alert_id": "PHISH-1002",
    "source_ip": "192.0.2.1",
    "sender_domain": "secure-portal-updates.com",
    "sender_email": "noreply@secure-portal-updates.com",
    "subject": "ACTION REQUIRED: Your company email password has expired.",
    "body": "Please click the link below to reset your password and avoid account suspension."
}
print("New incident alert received:")
print(phishing_alert)

In [None]:
# Execute the triage function on the new alert
triage_result = initial_phishing_triage(phishing_alert)

# Evaluate the triage score to determine severity
if triage_result['triage_score'] >= 50:
    severity = "High"
    mitre_techniques = "T1566.001"  # Spearphishing Link/Attachment
elif triage_result['triage_score'] >= 30:
    severity = "Medium"
    mitre_techniques = "T1071.001"  # Application Layer Protocol
else:
    severity = "Low"
    mitre_techniques = "N/A"
    
print("\n--- Triage Results ---")
print(f"Incident ID: {triage_result['alert_id']}")
print(f"Calculated Score: {triage_result['triage_score']}")
print(f"Severity Level: {severity}")
print(f"MITRE ATT&CK Mapping: {mitre_techniques}")
print("Triage Summary:")
for entry in triage_result['triage_summary']:
    print(f"- {entry}")

# This result would prompt the analyst to escalate the incident and move to containment
if severity == "High":
    print("\nNext steps: This is a high-priority incident. An analyst should immediately investigate the sender, block the sender domain, and check for any compromised accounts.")

In [None]:
# Sample Incident Data Structure
def create_incident_alert(alert_id: str, source_ip: str, sender_domain: str, sender_email: str, subject: str, body: str) -> dict:
    """Creates a standardized incident alert dictionary."""
    return {
        "alert_id": alert_id,
        "source_ip": source_ip,
        "sender_domain": sender_domain,
        "sender_email": sender_email,
        "subject": subject,
        "body": body,
        "triage_score": 0,
        "triage_summary": []
    }

# Example of a mock phishing alert
mock_incident = create_incident_alert(
    alert_id="PHISH-1001",
    source_ip="192.0.2.1",
    sender_domain="fakewebsite-update.net",
    sender_email="support@fakewebsite-update.net",
    subject="URGENT: Your account has been suspended",
    body="Click here to verify your account immediately. This is a critical security alert."
)

In [None]:
from typing import Set

# Assume these global sets are already populated from the previous step
KNOWN_BAD_DOMAINS: Set[str] = {
    'fakewebsite-update.net', 'malware-c2.xyz', 'phishingsite.co'
}
KNOWN_BAD_IPS: Set[str] = {
    '1.2.3.4', '192.0.2.1', '203.0.113.5'
}
SUSPICIOUS_KEYWORDS: Set[str] = {
    'urgent', 'password', 'invoice', 'payment', 'suspended', 
    'verify', 'action required', 'click here'
}

def initial_phishing_triage(incident: dict) -> dict:
    """
    Performs initial triage on a phishing-related incident using known IoCs.
    
    Args:
        incident (dict): The incident alert to analyze.
        
    Returns:
        dict: The updated incident dictionary with triage scores and summary.
    """
    triage_score = 0
    triage_summary = []
    
    # 1. Check against Known Bad Domains (MITRE ATT&CK T1566.001: Spearphishing Attachment/Link)
    if incident['sender_domain'] in KNOWN_BAD_DOMAINS:
        triage_score += 50
        triage_summary.append("MATCH: Sender domain is a known bad domain.")
        
    # 2. Check against Known Bad IPs (MITRE ATT&CK T1071.001: Application Layer Protocol)
    if incident['source_ip'] in KNOWN_BAD_IPS:
        triage_score += 30
        triage_summary.append("MATCH: Source IP is a known malicious IP address.")
        
    # 3. Check for suspicious keywords in subject and body
    combined_text = incident['subject'] + " " + incident['body']
    found_keywords = [kw for kw in SUSPICIOUS_KEYWORDS if kw in combined_text.lower()]
    
    if found_keywords:
        triage_score += 10 * len(found_keywords)
        triage_summary.append(f"MATCH: Found suspicious keywords: {', '.join(found_keywords)}.")
        
    # Set the final score and summary
    incident['triage_score'] = triage_score
    incident['triage_summary'] = triage_summary
    
    return incident

In [1]:
import requests
from typing import Set

def fetch_threat_intel_list(url: str, comment_prefix: str = "#") -> Set[str]:
    """
    Fetches a list of IoCs from a URL, ignoring comments.

    Args:
        url (str): The URL of the threat intelligence feed.
        comment_prefix (str): The character prefix for comment lines to ignore.

    Returns:
        Set[str]: A set of unique IoCs (domains, IPs, etc.).
    """
    try:
        response = requests.get(url, timeout=15)
        response.raise_for_status()  # Raise an exception for bad status codes
        
        lines = response.text.splitlines()
        ioc_list = {
            line.strip() 
            for line in lines 
            if line.strip() and not line.startswith(comment_prefix)
        }
        print(f"Successfully fetched {len(ioc_list)} IoCs from {url}")
        return ioc_list
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from {url}: {e}")
        return set()

# --- Usage Example in a Jupyter Notebook Cell ---
# Define your IoC lists and their respective URLs
KNOWN_BAD_DOMAINS = set()
KNOWN_BAD_IPS = set()
SUSPICIOUS_KEYWORDS = {
    'urgent', 'password', 'invoice', 'payment', 'expired', 'verify', 
    'action required', 'suspicious activity', 'account suspended', 'click here'
}

# Example of populating KNOWN_BAD_DOMAINS and KNOWN_BAD_IPS
print("Populating KNOWN_BAD_DOMAINS...")
urlhaus_url = "https://urlhaus.abuse.ch/downloads/hostnames/"
KNOWN_BAD_DOMAINS.update(fetch_threat_intel_list(urlhaus_url))

print("\nPopulating KNOWN_BAD_IPS...")
# Note: You'll need to find a public feed URL for IPs, e.g., from Spamhaus or SANS.
# This is a placeholder URL for demonstration.
dummy_ip_feed = "http://example.com/ip_feed.txt"
KNOWN_BAD_IPS.update(fetch_threat_intel_list(dummy_ip_feed))

# To view a sample of the populated lists
print("\nSample of KNOWN_BAD_DOMAINS:")
print(list(KNOWN_BAD_DOMAINS)[:5])
print("\nSample of SUSPICIOUS_KEYWORDS:")
print(list(SUSPICIOUS_KEYWORDS))

Populating KNOWN_BAD_DOMAINS...
Error fetching data from https://urlhaus.abuse.ch/downloads/hostnames/: 404 Client Error: Not Found for url: https://urlhaus.abuse.ch/downloads/hostnames/

Populating KNOWN_BAD_IPS...
Error fetching data from http://example.com/ip_feed.txt: 404 Client Error: Not Found for url: http://example.com/ip_feed.txt

Sample of KNOWN_BAD_DOMAINS:
[]

Sample of SUSPICIOUS_KEYWORDS:
['expired', 'invoice', 'urgent', 'password', 'verify', 'action required', 'suspicious activity', 'account suspended', 'click here', 'payment']


# Playbook: Phishing Incident Triage

**Timestamp:** 2025-08-04 11:00 AM EDT
**Analyst:** Darren Shady
**Incident ID:** (Will be generated by the script)

**Objective:** To perform initial analysis on a reported phishing email to determine its maliciousness, identify key indicators, and recommend initial containment steps.

**CISA Alignment:** Detection & Analysis, Containment.
**MITRE ATT&CK Alignment:**
| Tactic | Technique ID | Technique Name |
|---|---|---|
| Initial Access | T1566 | Phishing |
| Initial Access | T1566.001 | Spearphishing Attachment |
| Initial Access | T1566.002 | Spearphishing Link |

In [None]:
import json
import re
from datetime import datetime, timezone
import pandas as pd

# Configure pandas to display full URL text
pd.set_option('display.max_colwidth', None)

# --- Configuration (User to customize) ---
KNOWN_BAD_DOMAINS = {"evil-domain.com", "phish-corp.net"}
KNOWN_BAD_IPS = {"198.51.100.55"}
SUSPICIOUS_KEYWORDS = {"login", "verify", "account", "update", "urgent", "password"}

# --- Mock Threat Intelligence Function ---
def query_threat_intel_mock(indicator_type: str, value: str) -> dict:
    """Mocks a threat intelligence query. Replace with actual API calls."""
    print(f"[Intel] Querying for {indicator_type}: {value}...")
    if indicator_type == "url" and "evil-domain.com" in value:
        return {"status": "malicious", "source": "Mock Intel Service"}
    if indicator_type == "ip" and value in KNOWN_BAD_IPS:
        return {"status": "malicious", "source": "Mock Intel Service"}
    if indicator_type == "hash" and value.startswith("8f"):
        return {"status": "malicious", "source": "Mock Intel Service", "malware_family": "GenericDownloader.exe"}
    return {"status": "unknown", "source": "Mock Intel Service"}

print("✅ Playbook setup and configuration loaded.")

In [None]:
import json
import re
from datetime import datetime, timezone
import pandas as pd

# Configure pandas to display full URL text
pd.set_option('display.max_colwidth', None)

# --- Configuration (User to customize) ---
KNOWN_BAD_DOMAINS = {"evil-domain.com", "phish-corp.net"}
KNOWN_BAD_IPS = {"198.51.100.55"}
SUSPICIOUS_KEYWORDS = {"login", "verify", "account", "update", "urgent", "password"}

# --- Mock Threat Intelligence Function ---
def query_threat_intel_mock(indicator_type: str, value: str) -> dict:
    """Mocks a threat intelligence query. Replace with actual API calls."""
    print(f"[Intel] Querying for {indicator_type}: {value}...")
    if indicator_type == "url" and "evil-domain.com" in value:
        return {"status": "malicious", "source": "Mock Intel Service"}
    if indicator_type == "ip" and value in KNOWN_BAD_IPS:
        return {"status": "malicious", "source": "Mock Intel Service"}
    if indicator_type == "hash" and value.startswith("8f"):
        return {"status": "malicious", "source": "Mock Intel Service", "malware_family": "GenericDownloader.exe"}
    return {"status": "unknown", "source": "Mock Intel Service"}

print("✅ Playbook setup and configuration loaded.")

In [None]:
def triage_phishing_alert(alert: dict) -> dict:
    """Performs triage, enrichment, and severity scoring on a phishing alert."""
    incident = {
        "incident_id": f"INC-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}",
        "type": "Phishing",
        "status": "New",
        "severity": "Low",
        "summary": alert.get("subject"),
        "triage_notes": []
    }    
    severity_score = 0
    indicators_list = []

    # Analyze sender, IP, URLs, Hashes
    sender_email = alert.get("sender", "")
    sender_domain = sender_email.split('@')[1] if '@' in sender_email else ""
    if sender_domain:
        intel = "On Blocklist" if sender_domain in KNOWN_BAD_DOMAINS else "Unknown"
        indicators_list.append({"type": "domain", "value": sender_domain, "intel": intel, "notes": ""})
        if intel == "On Blocklist": severity_score += 50
    
    sender_ip = alert.get("sender_ip", "")
    if sender_ip:
        intel_result = query_threat_intel_mock("ip", sender_ip)
        indicators_list.append({"type": "ip", "value": sender_ip, "intel": intel_result.get("status"), "notes": ""})
        if intel_result.get("status") == "malicious": severity_score += 25

    for url in alert.get("urls", []):
        intel_result = query_threat_intel_mock("url", url)
        note = "URL contains suspicious keywords." if any(keyword in url.lower() for keyword in SUSPICIOUS_KEYWORDS) else ""
        indicators_list.append({"type": "url", "value": url, "intel": intel_result.get("status"), "notes": note})
        if intel_result.get("status") == "malicious": severity_score += 50
        if note: severity_score += 10

    for att in alert.get("attachments", []):
        intel_result = query_threat_intel_mock("hash", att['sha256'])
        note = f"Filename: {att['filename']}"
        indicators_list.append({"type": "file_hash_sha256", "value": att['sha256'], "intel": intel_result.get("status"), "notes": note})
        if intel_result.get("status") == "malicious": severity_score += 75
    
    if any(keyword in alert.get("subject", "").lower() for keyword in SUSPICIOUS_KEYWORDS):
        severity_score += 10
        incident["triage_notes"].append("Subject line contains suspicious keywords.")

    # Finalize Severity
    if severity_score >= 100: incident["severity"] = "Critical"
    elif severity_score >= 75: incident["severity"] = "High"
    elif severity_score >= 40: incident["severity"] = "Medium"
        
    incident['indicators_df'] = pd.DataFrame(indicators_list).fillna('')
    incident['status'] = "Triaged"
    return incident

def generate_next_steps(incident: dict):
    """Generates recommended next steps based on incident details."""
    print("\n--- 🗣️ Analyst Briefing ---")
    print(f"Incident {incident['incident_id']} has been triaged with a severity of: {incident['severity']}")
    for note in incident.get('triage_notes', []):
        print(f"- {note}")
    
    print("\n--- 💡 Recommended Next Steps (Containment & Investigation) ---")
    steps = []
    df = incident['indicators_df']
    
    if incident["severity"] in ["High", "Critical"]:
        steps.append("[IMMEDIATE] Search mail logs for other recipients and delete emails.")
        if 'domain' in df.type.values: steps.append(f"[BLOCK] Sender domain(s): {list(df[df.type=='domain'].value)}")
        if 'ip' in df.type.values: steps.append(f"[BLOCK] Sender IP(s): {list(df[df.type=='ip'].value)}")
        if 'url' in df.type.values: steps.append(f"[BLOCK] URL(s): {list(df[df.type=='url'].value)}")
        if 'file_hash_sha256' in df.type.values: steps.append(f"[BLOCK/HUNT] File Hash(es): {list(df[df.type=='file_hash_sha256'].value)}")
        steps.append("[INVESTIGATE] Check proxy/firewall logs for any user access to the URL(s).")
        steps.append("[INVESTIGATE] Use EDR to hunt for file hash(es) on endpoints.")
    else:
        steps.append("Severity is below High. Continue to monitor. No immediate blocking action required.")

    for i, step in enumerate(steps, 1):
        print(f"{i}. {step}")

# --- Run the Triage and Generate Recommendations ---
triaged_incident = triage_phishing_alert(raw_alert)

print("--- 📊 Triage Summary ---")
display(triaged_incident['indicators_df'])
generate_next_steps(triaged_incident)