In [22]:
# SEAS 8414_DC8 – Assignment 8: Cognitive SOAR Implementation
# Author: Anetta Nichols
# Date: 22 August 2025
# Environment: Google Colab with Miniconda (Python 3.10)
# Purpose: Full pipeline for synthetic threat actor simulation, model training, and Streamlit-based attribution interface.

In [23]:
# Step 1: Download and install Miniconda with Python 3.10
!wget -O miniconda.sh https://repo.anaconda.com/miniconda/Miniconda3-py310_24.1.2-0-Linux-x86_64.sh
!chmod +x miniconda.sh
!bash ./miniconda.sh -b -f -p /usr/local

# Step 2: Update environment variables so Python 3.10 packages are accessible
import sys
sys.path.append("/usr/local/lib/python3.10/site-packages")

# Step 3: Confirm Python 3.10 is installed
!/usr/local/bin/python3.10 --version

--2025-08-23 03:51:43--  https://repo.anaconda.com/miniconda/Miniconda3-py310_24.1.2-0-Linux-x86_64.sh
Resolving repo.anaconda.com (repo.anaconda.com)... 104.16.32.241, 104.16.191.158, 2606:4700::6810:bf9e, ...
Connecting to repo.anaconda.com (repo.anaconda.com)|104.16.32.241|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 134948792 (129M) [application/octet-stream]
Saving to: ‘miniconda.sh’


2025-08-23 03:51:44 (141 MB/s) - ‘miniconda.sh’ saved [134948792/134948792]

PREFIX=/usr/local
Unpacking payload ...
                                                                                 
Installing base environment...


Downloading and Extracting Packages:

Preparing transaction: - done
Executing transaction: | done
installation finished.
    You currently have a PYTHONPATH environment variable set. This may cause
    unexpected behavior when running the Python interpreter in Miniconda3.
    For best results, please verify that your PYTHONPATH only points

In [None]:
# Step 4: Install PyCaret (latest stable version)
!/usr/local/bin/python3.10 -m pip install pycaret

In [16]:
# Step 5: Write the full script to train_model.py
train_model_code = """
import pandas as pd
import numpy as np
from datetime import datetime
from pycaret.classification import ClassificationExperiment
from pycaret.clustering import ClusteringExperiment

def generate_synthetic_data(num_samples=600):
    print("Generating synthetic dataset...")

    profiles = {
        'state_sponsored': {
            'having_IP_Address': [1, -1], 'p': [0.6, 0.4],
            'SSLfinal_State': [-1, 0, 1], 'p_ssl': [0.7, 0.2, 0.1],
            'has_political_keyword': [0]
        },
        'cybercrime': {
            'having_IP_Address': [1, -1], 'p': [0.3, 0.7],
            'SSLfinal_State': [-1, 0, 1], 'p_ssl': [0.5, 0.3, 0.2],
            'has_political_keyword': [0]
        },
        'hacktivist': {
            'having_IP_Address': [1, -1], 'p': [0.4, 0.6],
            'SSLfinal_State': [-1, 0, 1], 'p_ssl': [0.6, 0.3, 0.1],
            'has_political_keyword': [1]
        }
    }

    benign_profile = {
        'having_IP_Address': [1, -1], 'p': [0.05, 0.95],
        'SSLfinal_State': [-1, 0, 1], 'p_ssl': [0.05, 0.15, 0.8],
        'has_political_keyword': [0]
    }

    def create_samples(profile, count, label):
        data = {
            'having_IP_Address': np.random.choice(profile['having_IP_Address'], count, p=profile['p']),
            'URL_Length': np.random.choice([1, 0, -1], count),
            'Shortining_Service': np.random.choice([1, -1], count),
            'having_At_Symbol': np.random.choice([1, -1], count),
            'double_slash_redirecting': np.random.choice([1, -1], count),
            'Prefix_Suffix': np.random.choice([1, -1], count),
            'having_Sub_Domain': np.random.choice([1, 0, -1], count),
            'SSLfinal_State': np.random.choice(profile['SSLfinal_State'], count, p=profile['p_ssl']),
            'URL_of_Anchor': np.random.choice([-1, 0, 1], count),
            'Links_in_tags': np.random.choice([-1, 0, 1], count),
            'SFH': np.random.choice([-1, 0, 1], count),
            'Abnormal_URL': np.random.choice([1, -1], count),
            'has_political_keyword': np.random.choice(profile['has_political_keyword'], count)
        }
        df = pd.DataFrame(data)
        df['actor_profile'] = label
        return df

    dfs = []
    for profile_name, profile_data in profiles.items():
        dfs.append(create_samples(profile_data, num_samples // 4, profile_name))
    dfs.append(create_samples(benign_profile, num_samples // 4, 'benign'))

    final_df = pd.concat(dfs, ignore_index=True).sample(frac=1).reset_index(drop=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M")
    filename = f"synthetic_threat_data_{timestamp}.csv"
    final_df.to_csv(filename, index=False)
    print(f"Synthetic dataset saved to '{filename}'")
    print("Sample counts per actor profile:")
    print(final_df['actor_profile'].value_counts())

    return final_df

def train_models():
    df = generate_synthetic_data()

    print("Training classification model...")
    clf_exp = ClassificationExperiment()
    clf_exp.setup(data=df, target='actor_profile', session_id=42)
    clf_model = clf_exp.create_model('rf')
    clf_exp.save_model(clf_model, 'phishing_url_detector')
    print("Classification model saved as 'phishing_url_detector.pkl'")

    print("Training clustering model...")
    clustering_df = df[df['actor_profile'] != 'benign'].drop(columns=['actor_profile'])
    clust_exp = ClusteringExperiment()
    clust_exp.setup(data=clustering_df, session_id=42)
    clust_model = clust_exp.create_model('kmeans', num_clusters=3)
    clust_exp.save_model(clust_model, 'threat_actor_profiler')
    print("Clustering model saved as 'threat_actor_profiler.pkl'")

if __name__ == "__main__":
    train_models()
"""

with open("train_model.py", "w") as f:
    f.write(train_model_code)


In [24]:
# Step 6: Execute training script to generate synthetic data and save classification/clustering models
!/usr/local/bin/python3.10 train_model.py

Generating synthetic dataset...
Synthetic dataset saved to 'synthetic_threat_data_20250823_0354.csv'
Sample counts per actor profile:
actor_profile
state_sponsored    150
benign             150
hacktivist         150
cybercrime         150
Name: count, dtype: int64
Training classification model...
                    Description                                              Value
0                    Session id                                                 42
1                        Target                                      actor_profile
2                   Target type                                         Multiclass
3                Target mapping  benign: 0, cybercrime: 1, hacktivist: 2, state...
4           Original data shape                                          (600, 14)
5        Transformed data shape                                          (600, 14)
6   Transformed train set shape                                          (420, 14)
7    Transformed test set shape      

In [26]:
# Step 7: Write the full script to app.py
app_code = '''
import streamlit as st
import pandas as pd
from pycaret.classification import load_model as load_classification_model, predict_model as predict_classification
from pycaret.clustering import load_model as load_clustering_model, predict_model as predict_clustering
import os

# Step 1: Configure Streamlit Page
st.set_page_config(page_title="GenAI-Powered Phishing SOAR", layout="wide")

# Step 2: Load Models and Optional Feature Plot
@st.cache_resource
def load_assets():
    clf_path = 'models/phishing_url_detector'
    cluster_path = 'models/threat_actor_profiler'
    plot_path = 'models/feature_importance.png'

    clf_model = load_classification_model(clf_path) if os.path.exists(clf_path + '.pkl') else None
    cluster_model = load_clustering_model(cluster_path) if os.path.exists(cluster_path + '.pkl') else None
    plot = plot_path if os.path.exists(plot_path) else None

    return clf_model, plot, cluster_model

model, feature_plot, cluster_model = load_assets()

if not model:
    st.error("Classification model not found. Please run training first or check logs.")
    st.stop()

# Step 3: Collect User Inputs via Sidebar
with st.sidebar:
    st.title("URL Feature Input")
    st.write("Describe the characteristics of a suspicious URL below.")

    test_case = st.selectbox("Load a Test Case", options=["None", "Benign", "Cybercrime", "State-Sponsored", "Hacktivist"])

    preset_inputs = {
        "Benign": {
            'url_length': 'Normal', 'ssl_state': 'Trusted', 'sub_domain': 'One',
            'prefix_suffix': False, 'has_ip': False, 'short_service': False,
            'at_symbol': False, 'double_slash': False, 'anchor': 'Trusted',
            'links_in_tags': 'Trusted', 'sfh': 'Trusted', 'abnormal_url': False,
            'political_keyword': False
        },
        "Cybercrime": {
            'url_length': 'Long', 'ssl_state': 'None', 'sub_domain': 'Many',
            'prefix_suffix': True, 'has_ip': True, 'short_service': True,
            'at_symbol': True, 'double_slash': True, 'anchor': 'Suspicious',
            'links_in_tags': 'Suspicious', 'sfh': 'Suspicious', 'abnormal_url': True,
            'political_keyword': False
        },
        "State-Sponsored": {
            'url_length': 'Normal', 'ssl_state': 'Trusted', 'sub_domain': 'One',
            'prefix_suffix': True, 'has_ip': False, 'short_service': False,
            'at_symbol': False, 'double_slash': False, 'anchor': 'Neutral',
            'links_in_tags': 'Neutral', 'sfh': 'Neutral', 'abnormal_url': False,
            'political_keyword': False
        },
        "Hacktivist": {
            'url_length': 'Long', 'ssl_state': 'Suspicious', 'sub_domain': 'Many',
            'prefix_suffix': True, 'has_ip': True, 'short_service': False,
            'at_symbol': True, 'double_slash': True, 'anchor': 'Suspicious',
            'links_in_tags': 'Neutral', 'sfh': 'Suspicious', 'abnormal_url': True,
            'political_keyword': True
        }
    }

    if test_case != "None":
        form_values = preset_inputs[test_case]
    else:
        form_values = {
            'url_length': st.select_slider("URL Length", options=['Short', 'Normal', 'Long']),
            'ssl_state': st.select_slider("SSL Certificate Status", options=['Trusted', 'Suspicious', 'None']),
            'sub_domain': st.select_slider("Sub-domain Complexity", options=['None', 'One', 'Many']),
            'prefix_suffix': st.checkbox("URL has a Prefix/Suffix (e.g., '-')"),
            'has_ip': st.checkbox("URL uses an IP Address"),
            'short_service': st.checkbox("Is it a shortened URL"),
            'at_symbol': st.checkbox("URL contains '@' symbol"),
            'double_slash': st.checkbox("URL contains '//' after protocol"),
            'anchor': st.select_slider("Anchor Tag Behavior", options=['Trusted', 'Neutral', 'Suspicious']),
            'links_in_tags': st.select_slider("Links in Tags", options=['Trusted', 'Neutral', 'Suspicious']),
            'sfh': st.select_slider("Server Form Handler (SFH)", options=['Trusted', 'Neutral', 'Suspicious']),
            'abnormal_url': st.checkbox("URL is Abnormal (e.g., doesn't match domain)"),
            'political_keyword': st.checkbox("Contains Political Keywords")
        }

# Step 4: Display Feature Importance Plot
if feature_plot:
    st.image(feature_plot, caption="Feature Importance", use_column_width=True)

# Step 5: Run Predictions
if st.button("Run Attribution"):
    input_df = pd.DataFrame([form_values])

    # Classification
    clf_result = predict_classification(model, data=input_df)
    predicted_label = clf_result.loc[0, 'Label']
    prediction_score = clf_result.loc[0, 'Score']

    st.subheader(" Phishing Classification Result")
    st.markdown(f"- **Predicted Label:** `{predicted_label}`")
    st.markdown(f"- **Confidence Score:** `{prediction_score:.2f}`")

    # Clustering
    if cluster_model:
        cluster_result = predict_clustering(cluster_model, data=input_df)
        cluster_label = cluster_result.loc[0, 'Cluster']

        st.subheader(" Threat Actor Attribution")
        st.markdown(f"- **Assigned Cluster:** `{cluster_label}`")

        cluster_map = {
            0: "Benign",
            1: "Cybercrime",
            2: "State-Sponsored",
            3: "Hacktivist"
        }
        actor_type = cluster_map.get(cluster_label, "Unknown")
        st.markdown(f"- **Likely Actor Type:** `{actor_type}`")

        st.info("This attribution is based on clustering of behavioral URL features. Please validate against known threat actor profiles and campaign metadata.")

    # Step 6: Optional Export
    if st.checkbox("Save Prediction to CSV"):
        output_df = pd.DataFrame({
            "Predicted Label": [predicted_label],
            "Confidence Score": [prediction_score],
            "Assigned Cluster": [cluster_label],
            "Actor Type": [actor_type]
        })
        timestamp = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
        filename = f"outputs/prediction_{timestamp}.csv"
        output_df.to_csv(filename, index=False)
        st.success(f"Prediction saved as {filename}")

    # Step 7: Justification Block
    with st.expander(" Attribution Justification"):
        st.markdown("""
        The classification model predicts phishing likelihood based on behavioral URL features.
        The clustering model assigns threat actor types using unsupervised profiling.

        - **Benign**: Low-risk, trusted indicators
        - **Cybercrime**: High-risk, evasive patterns
        - **State-Sponsored**: Neutral but strategic indicators
        - **Hacktivist**: Politically charged and disruptive traits

        Please validate results against campaign metadata and known actor profiles.
        """)
'''

# Step 8: Write the full Streamlit interface to app.py for later deployment or local execution
# This file wraps the trained models into an interactive attribution tool using PyCaret and Streamlit.
with open("app.py", "w") as f:
    f.write(app_code)

print(" Streamlit app saved successfully as 'app.py'")


 Streamlit app saved successfully as 'app.py'


In [27]:
# Step 9: Verify that trained model files were successfully saved
# This loop lists all .pkl files in the current working directory, confirming that both
# 'phishing_url_detector.pkl' and 'threat_actor_profiler.pkl' exist for use in the Streamlit app.
import os

print("Files in current directory:")
for f in os.listdir():
    if f.endswith('.pkl'):
        print(" ", f)


Files in current directory:
  phishing_url_detector.pkl
  threat_actor_profiler.pkl
