<a href="https://colab.research.google.com/github/ahmadspm/Human-AI-Collaboration/blob/main/supplementary.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Google Colab notebook cell: Red Team log processing for supplementary materials
# Paste the whole block into a single Colab cell and run.

import pandas as pd
import numpy as np
import re
from pathlib import Path
import matplotlib.pyplot as plt
from pandas.api import types as ptypes
import zipfile

# --- Config: edit filenames here if needed ---
file_event3 = "/content/ACDC Red Team Activity Logs (4).xlsx"       # upload this file to Colab
file_event4 = "/content/ACDC Red Team -TACTICS MITRE-2 (2).xlsx"   # upload this file to Colab

out_dir = Path("/content/supplementary_redteam")
out_dir.mkdir(parents=True, exist_ok=True)

# --- Helper functions ---
def anonymize_text(s, ip_map, host_map, domain_map):
    if pd.isna(s):
        return s
    text = str(s)
    # replace IPs
    def repl_ip(match):
        ip = match.group(0)
        if ip not in ip_map:
            ip_map[ip] = f"REDACTED_IP_{len(ip_map)+1}"
        return ip_map[ip]
    text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', repl_ip, text)
    # replace host-like tokens (heuristic)
    def repl_host(match):
        host = match.group(0)
        if host not in host_map:
            host_map[host] = f"REDACTED_HOST_{len(host_map)+1}"
        return host_map[host]
    text = re.sub(r'\b[A-Za-z0-9\-_]*?(?:host|PLC|workstation|server|pc|vm|node)[A-Za-z0-9\-_]*\b', repl_host, text, flags=re.IGNORECASE)
    # replace common domain names
    def repl_dom(match):
        dom = match.group(0)
        if dom not in domain_map:
            domain_map[dom] = f"REDACTED_DOMAIN_{len(domain_map)+1}"
        return domain_map[dom]
    text = re.sub(r'\b[a-zA-Z0-9.-]+\.(?:com|net|org|local|corp|lan|io|dev|gov|edu)\b', repl_dom, text)
    return text

def standardize_df(df):
    df.columns = [str(c).strip() for c in df.columns]
    lower = {c.lower(): c for c in df.columns}
    col_map = {}
    for cand in ['start_time','start','timestamp','time','ts']:
        if cand in lower:
            col_map[lower[cand]] = 'start_time'; break
    for cand in ['end_time','end','finish']:
        if cand in lower:
            col_map[lower[cand]] = 'end_time'; break
    for cand in ['phase','stage','attack_phase']:
        if cand in lower:
            col_map[lower[cand]] = 'phase'; break
    for cand in ['action','command','activity','description']:
        if cand in lower:
            col_map[lower[cand]] = 'action'; break
    for cand in ['actor','operator','user']:
        if cand in lower:
            col_map[lower[cand]] = 'actor'; break
    for cand in ['tactic','mitre_tactic']:
        if cand in lower:
            col_map[lower[cand]] = 'tactic'; break
    for cand in ['technique','mitre_technique']:
        if cand in lower:
            col_map[lower[cand]] = 'technique'; break
    df = df.rename(columns=col_map)
    return df

def load_best_sheet(path):
    xls = pd.ExcelFile(path)
    best = None; best_count = -1
    for s in xls.sheet_names:
        tmp = pd.read_excel(xls, sheet_name=s)
        if tmp.shape[0] > best_count:
            best_count = tmp.shape[0]; best = tmp
    return best

def try_parse_datetime(col):
    res = pd.to_datetime(col, errors='coerce', utc=False)
    if ptypes.is_datetime64_any_dtype(res):
        return res
    res = pd.to_datetime(col, errors='coerce', dayfirst=True, utc=False)
    if ptypes.is_datetime64_any_dtype(res):
        return res
    try:
        numeric = pd.to_numeric(col, errors='coerce')
        mask = numeric.notna()
        res = pd.Series([pd.NaT]*len(col))
        res[mask] = pd.to_timedelta(numeric[mask], unit='D') + pd.Timestamp('1899-12-30')
        return res
    except Exception:
        return pd.Series([pd.NaT]*len(col))

def to_utc_elementwise(ts):
    if pd.isna(ts):
        return pd.NaT
    try:
        if isinstance(ts, pd.Timestamp):
            if ts.tzinfo is None:
                return ts.tz_localize('Australia/Perth').tz_convert('UTC')
            else:
                return ts.tz_convert('UTC')
        else:
            parsed = pd.to_datetime(ts, errors='coerce')
            if pd.isna(parsed):
                return pd.NaT
            if parsed.tzinfo is None:
                return parsed.tz_localize('Australia/Perth').tz_convert('UTC')
            else:
                return parsed.tz_convert('UTC')
    except Exception:
        try:
            parsed = pd.to_datetime(ts, errors='coerce')
            if pd.isna(parsed):
                return pd.NaT
            return parsed.tz_localize('Etc/GMT-8').tz_convert('UTC')
        except Exception:
            return pd.NaT

def process_event(path, event_label):
    df = load_best_sheet(path)
    df = standardize_df(df)
    if 'start_time' in df.columns:
        parsed = try_parse_datetime(df['start_time'])
        df['start_time_parsed'] = parsed.apply(to_utc_elementwise)
    else:
        df['start_time_parsed'] = pd.NaT
    if 'end_time' in df.columns:
        parsed2 = try_parse_datetime(df['end_time'])
        df['end_time_parsed'] = parsed2.apply(to_utc_elementwise)
    else:
        df['end_time_parsed'] = pd.NaT
    def compute_dur(row):
        s = row['start_time_parsed']
        e = row['end_time_parsed']
        try:
            if pd.isna(s) or pd.isna(e):
                return 0.0
            return (e - s).total_seconds()
        except Exception:
            return 0.0
    df['duration_s'] = df.apply(compute_dur, axis=1)
    df['event_id'] = event_label
    if 'phase' not in df.columns:
        df['phase'] = np.where(df['action'].astype(str).str.contains('recon|scan|enumerate', case=False, na=False), 'Reconnaissance', 'Execution')
    ip_map, host_map, domain_map = {}, {}, {}
    for col in df.columns:
        if df[col].dtype == object:
            df[col] = df[col].apply(lambda x: anonymize_text(x, ip_map, host_map, domain_map))
    return df, ip_map, host_map, domain_map

# --- Run processing for both events ---
df3, ip_map3, host_map3, domain_map3 = process_event(file_event3, 'Event3')
df4, ip_map4, host_map4, domain_map4 = process_event(file_event4, 'Event4')

# Save cleaned CSVs
df3.to_csv(out_dir / "event3_redteam.csv", index=False)
df4.to_csv(out_dir / "event4_redteam.csv", index=False)

# Create simple phase summaries
def phase_summary(df):
    grp = df.groupby('phase').agg(
        n_actions=('action','count'),
        total_duration_min=('duration_s', lambda x: x.sum()/60.0),
        mean_duration_min=('duration_s', lambda x: x.mean()/60.0)
    ).reset_index().sort_values('total_duration_min', ascending=False)
    return grp

summary3 = phase_summary(df3)
summary4 = phase_summary(df4)
summary3.to_csv(out_dir / "event3_phase_summary.csv", index=False)
summary4.to_csv(out_dir / "event4_phase_summary.csv", index=False)

# Generate Gantt-style PNGs (simple visualization)
def create_gantt(df, out_png, title):
    plot_df = df[df['start_time_parsed'].notna()].copy()
    if plot_df.empty:
        return None
    plot_df = plot_df.sort_values('start_time_parsed').reset_index(drop=True)
    plot_df['y'] = range(len(plot_df))
    fig, ax = plt.subplots(figsize=(10, max(3, 0.25*len(plot_df))))
    for _, row in plot_df.iterrows():
        start = row['start_time_parsed'].to_pydatetime()
        dur = row['duration_s']
        if dur <= 0:
            dur = 60
        ax.barh(row['y'], dur/60.0, left=start, height=0.6)
    ax.set_yticks(plot_df['y'])
    labels = plot_df['phase'].astype(str) + " | " + plot_df.get('action', pd.Series(['']*len(plot_df))).astype(str)
    ax.set_yticklabels(labels)
    ax.invert_yaxis()
    ax.set_xlabel('Time (UTC)')
    ax.set_title(title)
    plt.tight_layout()
    fig.savefig(out_png, dpi=200)
    plt.close(fig)
    return out_png

gantt3 = create_gantt(df3, out_dir/"event3_gantt.png", "Event 3 — Red Team Attack Timeline (UTC)")
gantt4 = create_gantt(df4, out_dir/"event4_gantt.png", "Event 4 — Red Team Attack Timeline (UTC)")

# README + privacy note
readme_text = f"""
Red Team Supplementary Package
Files:
- event3_redteam.csv: cleaned, anonymized action log for Event 3 (timestamps converted from AWST to UTC).
- event4_redteam.csv: cleaned, anonymized action log for Event 4 (timestamps converted from AWST to UTC).
- event3_phase_summary.csv, event4_phase_summary.csv: per-phase statistics (n_actions, total_duration_min, mean_duration_min).
- event3_gantt.png, event4_gantt.png: Gantt-style attack timelines (UTC).

Redactions: IPs/hostnames/domains replaced with REDACTED_* tokens.

Notes: Timestamps originally AWST (UTC+8). Durations computed where end timestamps available.
"""
(out_dir / "README_redteam_supplement.md").write_text(readme_text)
(out_dir / "privacy_note.txt").write_text("Sensitive items redacted. See README.")

# Zip the package for upload
zipf = out_dir / "redteam_supplementary_package.zip"
with zipfile.ZipFile(zipf, 'w') as z:
    for f in out_dir.glob("*"):
        if f.name == zipf.name:
            continue
        z.write(f, arcname=f.name)

print("Saved package to:", zipf)
print("Event3 Gantt:", gantt3)
print("Event4 Gantt:", gantt4)
print("Anonymization counts (event3):", len(ip_map3), len(host_map3), len(domain_map3))
print("Anonymization counts (event4):", len(ip_map4), len(host_map4), len(domain_map4))

# Display phase summaries
print("\nEvent 3 phase summary:")
display(summary3)
print("\nEvent 4 phase summary:")
display(summary4)
