In [14]:
#@title CTC (arXiv:2109.02473) — Colab One-Cell Runner (GitHub ZIP fix)
#@markdown This version downloads the **entire CTC repo zip** and reads validation data from its folders.
#@markdown <br>Keep **Tiny demo** on for a quick recording; switch off + enable **Zenodo** for fuller replication later.

# ==== OPTIONS (Colab form) ====
TINY_DEMO: bool = True  # @param {type:"boolean"}
TINY_N: int = 100000     # @param {type:"integer"}
TRAIN_MODELS: bool = True  # @param {type:"boolean"}
RUN_PREDICT: bool = True   # @param {type:"boolean"}
USE_DNN: bool = True       # @param {type:"boolean"}
EPOCHS: int = 3            # @param {type:"integer"}
BATCH_SIZE: int = 512      # @param {type:"integer"}
DOWNLOAD_ZENODO: bool = False  # @param {type:"boolean"}
WORKDIR: str = "work_colab"    # @param {type:"string"}

# ==== Minimal deps (TensorFlow is preinstalled on Colab) ====
import sys, subprocess, importlib.util
def pip_install(pkgs):
    subprocess.run([sys.executable, "-m", "pip", "install", "-q"] + pkgs, check=True)

need = []
for pkg in ["scikit-learn", "joblib", "tqdm", "beautifulsoup4", "lxml", "html5lib", "requests", "scipy", "pandas", "numpy"]:
    if importlib.util.find_spec(pkg) is None:
        need.append(pkg)
if need:
    pip_install(need)

# ==== Imports ====
import os, re, tarfile, zipfile, json, io, shutil, time, random
from pathlib import Path
from typing import List, Tuple

import requests
from tqdm import tqdm

import numpy as np
import pandas as pd

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
import joblib

# TensorFlow (use CPU if no GPU)
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    TF_OK = True
except Exception as e:
    print("TensorFlow not available; continuing without DNN. Error:", e)
    TF_OK = False
    USE_DNN = False

# ==== Constants (authors' repo + Zenodo) ====
# Repo with validation folders & dictionary:
CTC_REPO_ZIP = "https://codeload.github.com/epelofske-student/CTC/zip/refs/heads/main"  # full repo as zip
# Files we will read from the unzipped repo:
DICT_REPO_PATH = "English_word_dictionary.txt" # Corrected path
VAL_DIR_CYB = "validation_data_cybersecurity" # Corrected path
VAL_DIR_NON = "validation_data_non_cybersecurity" # Corrected path
# Big training set (optional):
ZENODO_URL = "https://zenodo.org/records/10655913/files/CTC_training_data.tar.gz?download=1"  # ~3.1 GB

# ==== FS helpers ====
W = Path(WORKDIR)
DATA = W / "data"
MODELS = W / "models"
for p in (W, DATA, MODELS):
    p.mkdir(parents=True, exist_ok=True)

def stream_download(url: str, out_path: Path, desc: str = None):
    if out_path.exists() and out_path.stat().st_size > 0:
        return
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        total = int(r.headers.get("content-length", 0))
        with open(out_path, "wb") as f, tqdm(total=total, unit="B", unit_scale=True, desc=desc or out_path.name) as pbar:
            for chunk in r.iter_content(chunk_size=1024 * 1024):
                if chunk:
                    f.write(chunk)
                    pbar.update(len(chunk))

def extract_tar_gz(tar_path: Path, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)
    with tarfile.open(tar_path, "r:gz") as tar:
        tar.extractall(path=out_dir)

def extract_zip(zip_path: Path, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(out_dir)

# ==== Cleaning ====
CLEAN_HTML_RE = re.compile(r"<[^>]+>")
URL_RE = re.compile(r"http[s]?://\S+|www\.\S+")
CODE_RE = re.compile(r"`{1,3}.*?`{1,3}", re.DOTALL)
NON_ASCII_RE = re.compile(r"[^\x00-\x7F]+")
WHITESPACE_RE = re.compile(r"\s+")

def clean_text(text: str) -> str:
    if not isinstance(text, str):
        return ""
    text = URL_RE.sub(" ", text)
    text = CODE_RE.sub(" ", text)
    text = CLEAN_HTML_RE.sub(" ", text)
    text = NON_ASCII_RE.sub(" ", text)
    text = WHITESPACE_RE.sub(" ", text).strip()
    return text

def read_text_dir(dir_path: Path, max_files=None) -> List[str]:
    files = sorted([p for p in dir_path.rglob("*") if p.is_file()])
    if max_files is not None:
        files = files[:max_files]
    texts = []
    for p in files:
        try:
            txt = p.read_text("utf-8", errors="ignore")
            texts.append(clean_text(txt))
        except Exception:
            continue
    return texts

def load_training_json(json_path: Path, tiny=False, tiny_n=50000) -> Tuple[List[str], List[int]]:
    data = json.loads(json_path.read_text("utf-8"))
    if tiny:
        data = random.sample(data, min(tiny_n, len(data)))
    X = [clean_text(d.get("text","")) for d in data]
    y = [int(d.get("label", 0)) for d in data]
    return X, y

def make_vectorizer_from_dictionary(dict_path: Path) -> TfidfVectorizer:
    # Read vocabulary, remove duplicates, and filter out empty lines
    vocab_lines = dict_path.read_text("utf-8").splitlines()
    vocab = sorted(list(set(w.strip() for w in vocab_lines if w.strip())))

    vec = TfidfVectorizer(
        vocabulary=vocab,
        lowercase=True,
        dtype=np.float32,
        token_pattern=r"(?u)\b\w+\b",
        max_df=1.0,
        min_df=1
    )
    return vec

def build_models(random_state=42):
    return {
        "DecisionTree": DecisionTreeClassifier(max_depth=100, random_state=random_state),
        "RandomForest": RandomForestClassifier(n_estimators=200, n_jobs=-1, random_state=random_state),
        "Logistic":     LogisticRegression(max_iter=300, n_jobs=-1, solver="saga", penalty="l2"),
        "LinearSVC":    LinearSVC(),
        "MLP":          MLPClassifier(hidden_layer_sizes=(256,), activation="relu", max_iter=15, random_state=random_state)
    }

def build_dnn(input_dim: int):
    model = keras.Sequential([
        layers.Input(shape=(input_dim,)),
        layers.Dense(256, activation="relu"),
        layers.Dropout(0.2),
        layers.Dense(2, activation="softmax"),
    ])
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model

def majority_vote(preds_bin: List[np.ndarray]) -> np.ndarray:
    stacked = np.vstack(preds_bin)   # (M, N)
    votes = stacked.sum(axis=0)
    return (votes >= (stacked.shape[0] / 2.0)).astype(int)

# ==== Get the authors' repo as a ZIP and set paths ====
repo_zip = DATA / "CTC-main.zip"
repo_root = DATA / "CTC-main"
if not repo_root.exists():
    print("Downloading CTC repo (validation folders + dictionary) ...")
    stream_download(CTC_REPO_ZIP, repo_zip, desc="CTC-main.zip")
    extract_zip(repo_zip, DATA)

# Dictionary path from repo
dict_path = repo_root / DICT_REPO_PATH
if not dict_path.exists():
    raise FileNotFoundError(f"Dictionary not found inside repo ZIP at {repo_root / DICT_REPO_PATH}")

# Validation dirs from repo
val_dir_cyb = repo_root / VAL_DIR_CYB
val_dir_non = repo_root / VAL_DIR_NON
for p in (val_dir_cyb, val_dir_non):
    if not p.exists():
        raise FileNotFoundError(f"Validation folder missing: {p}")

# ==== Optional: download big Zenodo training set ====
train_json = DATA / "CTC_training_data.json"
train_tgz  = DATA / "CTC_training_data.tar.gz"
if DOWNLOAD_ZENODO and not train_json.exists():
    print("Downloading Zenodo training set (~3.1 GB).")
    stream_download(ZENODO_URL, train_tgz, desc="CTC_training_data.tar.gz")
    with tarfile.open(train_tgz, "r:gz") as tar:
        members = [m for m in tar.getmembers() if m.name.endswith(".json")]
        if members:
            f = tar.extractfile(members[0])
            train_json.write_bytes(f.read())

# ==== Build TF-IDF vectorizer with authors' dictionary ====
vec = make_vectorizer_from_dictionary(dict_path)

# ==== Prepare training data ====
if train_json.exists():
    X_text_all, y_all = load_training_json(train_json, tiny=TINY_DEMO, tiny_n=TINY_N)
else:
    # Tiny fallback: sample from validation sets just to get you running
    print("Using tiny fallback sample from validation folders (demo).")
    X_c = read_text_dir(val_dir_cyb, max_files=min(5000, TINY_N))
    X_n = read_text_dir(val_dir_non, max_files=min(5000, TINY_N))
    y_all = [1]*len(X_c) + [0]*len(X_n)
    X_text_all = X_c + X_n

# Split 70% train, 30% temp
X_train_text, X_temp_text, y_train, y_temp = train_test_split(
    X_text_all, y_all, test_size=0.3, random_state=42, stratify=y_all
)

# Split 30% temp into 20% test and 10% validation (relative to original data size)
# 20% of original is (20/30) of temp, 10% of original is (10/30) of temp = 1/3
X_test_text, X_val_text, y_test, y_val = train_test_split(
    X_temp_text, y_temp, test_size=1/3, random_state=42, stratify=y_temp
)

# Vectorize
X_train = vec.fit_transform(X_train_text)
X_val   = vec.transform(X_val_text)
X_test  = vec.transform(X_test_text) # Vectorize the test set as well
joblib.dump(vec, MODELS / "tfidf_vectorizer.joblib")

# ==== Train ====
if TRAIN_MODELS:
    classes = np.array([0,1])
    cw_vals = compute_class_weight("balanced", classes=classes, y=np.array(y_train))
    class_weights = {int(i): float(w) for i, w in zip(classes, cw_vals)}

    # scikit models
    sk_models = build_models()
    for name, model in sk_models.items():
        print(f"\nTraining {name} ...")
        try:
            if hasattr(model, "class_weight"):
                model.set_params(class_weight="balanced")
        except Exception:
            pass
        # Train on X_train, evaluate on X_val
        model.fit(X_train, y_train)
        preds_val = model.predict(X_val)
        acc_val = accuracy_score(y_val, preds_val)
        print(f"{name} val acc: {acc_val:.4f}")
        joblib.dump(model, MODELS / f"{name}.joblib")

    # DNN
    if USE_DNN and TF_OK:
        print("\nTraining DNN ...")
        dnn = build_dnn(X_train.shape[1])
        # Train on X_train, evaluate on X_val during training
        dnn.fit(
            X_train.toarray(), np.array(y_train),
            validation_data=(X_val.toarray(), np.array(y_val)),
            epochs=EPOCHS, batch_size=BATCH_SIZE, verbose=2
        )
        dnn.save(MODELS / "DNN.keras")

# ==== Evaluate on the new Test set (20% split) ====
if TRAIN_MODELS or RUN_PREDICT: # Evaluate on test if models were trained or prediction is requested
    print("\n=== Evaluating models on the new Test set (20% split) ===")
    vec_loaded, models_loaded, dnn_loaded = load_models() # Ensure models are loaded
    X_test_vec = vec_loaded.transform(X_test_text) # Vectorize test text using the loaded vectorizer

    # Evaluate individual models on test set
    for name, model in models_loaded.items():
        try:
            preds_test = model.predict(X_test_vec)
        except Exception:
            preds_test = model.decision_function(X_test_vec)
            preds_test = (preds_test > 0).astype(int)
        acc_test = accuracy_score(y_test, preds_test)
        print(f"{name} test acc: {acc_test:.4f}")

    # Evaluate DNN on test set
    if dnn_loaded is not None:
        loss, acc_test_dnn = dnn_loaded.evaluate(X_test_vec.toarray(), np.array(y_test), verbose=0)
        print(f"DNN test acc: {acc_test_dnn:.4f}")

    # Evaluate Ensemble (Majority Vote) on test set
    preds_bin_test = []
    for name, m in models_loaded.items():
        try:
            preds = m.predict(X_test_vec)
        except Exception:
             preds = m.decision_function(X_test_vec)
             preds = (preds > 0).astype(int)
        preds_bin_test.append(preds.astype(int))

    if dnn_loaded is not None:
         p = dnn_loaded.predict(X_test_vec.toarray(), verbose=0)
         preds_bin_test.append(np.argmax(p, axis=1).astype(int))

    if preds_bin_test: # Ensure there's at least one model prediction
        maj_test = majority_vote(preds_bin_test)
        acc_maj_test = accuracy_score(y_test, maj_test)
        report_maj_test = classification_report(y_test, maj_test, target_names=["non-cybersecurity", "cybersecurity"], output_dict=True)
        print(f"\nEnsemble (Majority Vote) test acc: {acc_maj_test:.4f}")
        print("Ensemble Test Classification Report:")
        print(json.dumps(report_maj_test, indent=2))
    else:
        print("\nNo models available to evaluate ensemble on test set.")


# ==== Predict on authors’ validation sets (original step) ====
# This uses the separate validation folders from the repo, not the 10% split
if RUN_PREDICT:
    print("\n=== Evaluating ensemble (CTC majority vote) on authors' original validation folders ===")
    # Ensure models are loaded - already done above if TRAIN_MODELS or RUN_PREDICT was True
    # vec_loaded, models_loaded, dnn_loaded = load_models() # Uncomment if only RUN_PREDICT is True

    cs_acc, cs_fp, cs_fn, n1 = infer_on_dir(vec_loaded, models_loaded, dnn_loaded, val_dir_cyb, 1)
    nc_acc, nc_fp, nc_fn, n0 = infer_on_dir(vec_loaded, models_loaded, dnn_loaded, val_dir_non, 0)
    print(f"Authors' Cybersecurity Val:      acc={cs_acc:.4f}, FP={cs_fp:.4f}, FN={cs_fn:.4f}, N={n1}")
    print(f"Authors' Non-cybersecurity Val:  acc={nc_acc:.4f}, FP={nc_fp:.4f}, FN={nc_fn:.4f}, N={n0}")
    print("\nDone ✅ — ready for your screen recording.")

Using tiny fallback sample from validation folders (demo).

Training DecisionTree ...
DecisionTree val acc: 1.0000

Training RandomForest ...
RandomForest val acc: 1.0000

Training Logistic ...
Logistic val acc: 0.5000

Training LinearSVC ...
LinearSVC val acc: 0.5000

Training MLP ...




MLP val acc: 0.5000

Training DNN ...
Epoch 1/3
1/1 - 1s - 1s/step - accuracy: 0.3636 - loss: 0.6940 - val_accuracy: 0.5000 - val_loss: 0.6933
Epoch 2/3
1/1 - 0s - 187ms/step - accuracy: 0.6364 - loss: 0.6854 - val_accuracy: 0.5000 - val_loss: 0.6935
Epoch 3/3
1/1 - 0s - 143ms/step - accuracy: 0.6364 - loss: 0.6809 - val_accuracy: 0.5000 - val_loss: 0.6939

=== Evaluating models on the new Test set (20% split) ===
DecisionTree test acc: 0.6667
RandomForest test acc: 0.6667
Logistic test acc: 0.6667
LinearSVC test acc: 0.6667
MLP test acc: 0.6667
DNN test acc: 0.6667

Ensemble (Majority Vote) test acc: 0.6667
Ensemble Test Classification Report:
{
  "non-cybersecurity": {
    "precision": 0.6666666666666666,
    "recall": 1.0,
    "f1-score": 0.8,
    "support": 2.0
  },
  "cybersecurity": {
    "precision": 0.0,
    "recall": 0.0,
    "f1-score": 0.0,
    "support": 1.0
  },
  "accuracy": 0.6666666666666666,
  "macro avg": {
    "precision": 0.3333333333333333,
    "recall": 0.5,
    "

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Authors' Cybersecurity Val:      acc=0.5714, FP=0.0000, FN=0.4286, N=7
Authors' Non-cybersecurity Val:  acc=1.0000, FP=0.0000, FN=0.0000, N=9

Done ✅ — ready for your screen recording.


# Task
Explain the error in the selected code, fix it, and adapt the entire Python code to run in Google Colab, including necessary modifications for argument handling and execution flow within a notebook environment.

In [12]:
import joblib
import numpy as np
from pathlib import Path

# Define the directory to save the data
save_dir = Path(WORKDIR) / "data"
save_dir.mkdir(parents=True, exist_ok=True)

# Save vectorized data and labels for train, test, and validation sets
joblib.dump(X_train, save_dir / "X_train_vectorized.joblib")
joblib.dump(X_val, save_dir / "X_val_vectorized.joblib")
joblib.dump(X_test, save_dir / "X_test_vectorized.joblib") # Save test vectorized data
joblib.dump(y_train, save_dir / "y_train_labels.joblib")
joblib.dump(y_val, save_dir / "y_val_labels.joblib")
joblib.dump(y_test, save_dir / "y_test_labels.joblib")   # Save test labels

# Save original text data for train, test, and validation sets
with open(save_dir / "X_train_text.txt", "w", encoding="utf-8") as f:
    for text in X_train_text:
        f.write(text + "\n")

with open(save_dir / "X_val_text.txt", "w", encoding="utf-8") as f:
    for text in X_val_text:
        f.write(text + "\n")

with open(save_dir / "X_test_text.txt", "w", encoding="utf-8") as f: # Save test text data
    for text in X_test_text:
        f.write(text + "\n")


print(f"Train, test, and validation data saved to {save_dir}")

Train, test, and validation data saved to work_colab/data
