
# **BLADEI: Bitstream-Level Abnormality Detection for Embedded Inference**
### *Copyright (c) 2025, Rye Stahle-Smith* 
---
#### ***Description:***
#### `train_model.py` — Model Training and Export (CPU Only)

This script trains a supervised machine learning model to detect malicious FPGA bitstreams using byte-level and structural features.

> ⚠️ **Note:** This program must be executed on a general-purpose CPU (e.g., your laptop or workstation).  
> It is **not compatible with the PYNQ-Z1 board** due to:
> - High computational demands during training  
> - No supported `scikit-learn` wheel for the PYNQ-Z1's ARM architecture

#### ***Features:***
- Byte-level and structural feature extraction from `.bit` files  
- Dimensionality reduction via TSVD  
- Class balancing with SMOTE  
- Training multiple classifiers (e.g., Random Forest, SVM)  
- Evaluation using k-Fold Cross-Validation  
- Model and TSVD components exported as a `.tar.gz` archive for use on PYNQ

---

In [1]:
import numpy as np
import glob
import tarfile
import os
import sys
import json
from joblib import dump, load
from collections import Counter
from scipy.sparse import csr_matrix
from sklearn.decomposition import TruncatedSVD
from sklearn.model_selection import train_test_split, cross_validate, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, make_scorer, f1_score, precision_score, recall_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from imblearn.over_sampling import SMOTE

# --------------------------
# Step 1: Collect Bitstreams
# This portion of the program organizes the bitstreams respectively.
# --------------------------
EMPTY_FILES = glob.glob("trusthub_bitstreams/Empty/*.bit")
BENIGN_AES_FILES = glob.glob("trusthub_bitstreams/Benign/AES*.bit")
BENIGN_RS232_FILES = glob.glob("trusthub_bitstreams/Benign/RS232*.bit")
MAL_AES_FILES = glob.glob("trusthub_bitstreams/Malicious/AES*.bit")
MAL_RS232_FILES = glob.glob("trusthub_bitstreams/Malicious/RS232*.bit")
ALL_FILES = EMPTY_FILES + BENIGN_AES_FILES + BENIGN_RS232_FILES + MAL_AES_FILES + MAL_RS232_FILES
print("=== Organizing bitstreams... ===")

=== Organizing bitstreams... ===


In [2]:
# --------------------------
# Step 2: Feature Extraction
# This portion of the program sets the features and labels for the dataset.
# --------------------------
def extract_sparse_features(filepath):
    with open(filepath, 'rb') as f:
        data = f.read()
    size = len(data)
    if size == 0:
        return np.zeros(256)
    counts = Counter(data)
    dense_vec = np.zeros(256)
    for byte_val, count in counts.items():
        dense_vec[byte_val] = count / size
    return dense_vec

def display_progress(current, total):
    bar_length = 20
    percent = int((current / total) * 100)
    blocks = int((current / total) * bar_length)
    bar = '█' * blocks + '-' * (bar_length - blocks)
    sys.stdout.write(f'\rProgress: |{bar}| {percent}% ({current}/{total})')
    sys.stdout.flush()

print("=== Extracting sparse features... ===")
feature_matrix = []
for i, f in enumerate(ALL_FILES, 1):
    feature_matrix.append(extract_sparse_features(f))
    display_progress(i, len(ALL_FILES))

print("\n=== Defining labels... ===")
y = [0]*len(EMPTY_FILES) + [1]*len(BENIGN_AES_FILES) + [2]*len(BENIGN_RS232_FILES) + [3]*len(MAL_AES_FILES) + [4]*len(MAL_RS232_FILES)
X = np.array(feature_matrix)

=== Extracting sparse features... ===
Progress: |████████████████████| 100% (122/122)
=== Defining labels... ===


In [3]:
# --------------------------
# Step 3: Apply Truncated Singular Value Decomposition (TSVD)
# This portion of the program uses TSVD to reduce dataset dimensionality and preserve its most important characteristics.
# --------------------------
print("=== Applying Truncated Singular Value Decomposition (TSVD)... ===")
sparse_X = csr_matrix(X)
tsvd = TruncatedSVD(n_components=30, random_state=42)
X_reduced = tsvd.fit_transform(sparse_X)

=== Applying Truncated Singular Value Decomposition (TSVD)... ===


In [4]:
# --------------------------
# Step 4: Train/Test Split
# This portion of the program splits and scales the dataset into a complete training and testing set.
# --------------------------
print("=== Splitting the dataset for training/ testing... ===")
X_train, X_test, y_train, y_test = train_test_split(X_reduced, y, test_size=0.25, stratify=y, random_state=42)

=== Splitting the dataset for training/ testing... ===


In [5]:
# --------------------------
# Step 5: Apply SMOTE on Training Set
# This portion of the program uses SMOTE to balance class distributions by generating synthetic samples for minority classes.
# Thus, improving model performance on imbalanced datasets.
# --------------------------
print("=== Comparing k_neighbors values for Synthetic Minority Oversampling Technique (SMOTE)... ===\n")
k_values = [2, 5, 7, 9, 11]
smote_results = {}
best_k = None
best_score = 0

for k in k_values:
    try:
        smote = SMOTE(k_neighbors=k, random_state=42)
        X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)

        model = RandomForestClassifier(n_estimators=100, random_state=42)
        scores = cross_val_score(model, X_train_smote, y_train_smote, cv=5, scoring='f1_macro')
        mean_score = scores.mean()
        
        smote_results[k] = mean_score
        print(f"SMOTE (k={k}): F1 Macro = {mean_score:.4f} ± {scores.std():.4f}")

        if mean_score > best_score:
            best_score = mean_score
            best_k = k

    except ValueError as e:
        print(f"SMOTE (k={k}) failed: {e}")

print(f"\n=== Applying SMOTE with k_neighbors={best_k}... ===")
smote = SMOTE(k_neighbors=best_k, random_state=42)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)

=== Comparing k_neighbors values for Synthetic Minority Oversampling Technique (SMOTE)... ===

SMOTE (k=2): F1 Macro = 0.9810 ± 0.0234
SMOTE (k=5): F1 Macro = 0.9810 ± 0.0234
SMOTE (k=7): F1 Macro = 0.9729 ± 0.0224
SMOTE (k=9): F1 Macro = 0.9830 ± 0.0208
SMOTE (k=11): F1 Macro = 0.9830 ± 0.0208

=== Applying SMOTE with k_neighbors=9... ===


In [6]:
# --------------------------
# Step 6: Compare Classifiers
# This portion of the program uses K-Fold Cross-Validation to perform an initial test phase on each model.
# --------------------------
classifiers = {
    "Random Forest": RandomForestClassifier(n_estimators=100),
    "Gradient Boosting": GradientBoostingClassifier(n_estimators=100),
    "AdaBoost": GradientBoostingClassifier(n_estimators=100),
    "Logistic Regression": LogisticRegression(max_iter=1000),
    "Naive Bayes": GaussianNB(),
    "SVM (RBF)": SVC(kernel='rbf'),
    "KNN": KNeighborsClassifier(),
    "Decision Tree": DecisionTreeClassifier()
}

scoring = {
    'accuracy': 'accuracy',
    'precision': make_scorer(precision_score, average='macro', zero_division=0),
    'recall': make_scorer(recall_score, average='macro', zero_division=0),
    'f1': make_scorer(f1_score, average='macro', zero_division=0)
}

print("=== Comparing classifiers using k-Fold Cross-Validation (kFCV)... ===")
cv_results = {}
k = 5
for name, model in classifiers.items():
    results = cross_validate(model, X_train_smote, y_train_smote, cv=k, scoring=scoring)
    print(f"\n{name}")
    for metric in scoring:
        mean = results[f'test_{metric}'].mean()
        std = results[f'test_{metric}'].std()
        print(f"  {metric.capitalize()}: {mean:.2f} ± {std:.2f}")
    cv_results[name] = results['test_f1'].mean()

=== Comparing classifiers using k-Fold Cross-Validation (kFCV)... ===

Random Forest
  Accuracy: 0.98 ± 0.02
  Precision: 0.99 ± 0.02
  Recall: 0.98 ± 0.02
  F1: 0.98 ± 0.02

Gradient Boosting
  Accuracy: 0.89 ± 0.05
  Precision: 0.91 ± 0.04
  Recall: 0.89 ± 0.06
  F1: 0.88 ± 0.06

AdaBoost
  Accuracy: 0.90 ± 0.05
  Precision: 0.91 ± 0.06
  Recall: 0.90 ± 0.06
  F1: 0.89 ± 0.06

Logistic Regression
  Accuracy: 0.33 ± 0.07
  Precision: 0.14 ± 0.03
  Recall: 0.36 ± 0.08
  F1: 0.20 ± 0.05

Naive Bayes
  Accuracy: 0.95 ± 0.06
  Precision: 0.96 ± 0.05
  Recall: 0.96 ± 0.05
  F1: 0.96 ± 0.05

SVM (RBF)
  Accuracy: 0.48 ± 0.08
  Precision: 0.35 ± 0.10
  Recall: 0.53 ± 0.09
  F1: 0.38 ± 0.10

KNN
  Accuracy: 0.87 ± 0.07
  Precision: 0.89 ± 0.07
  Recall: 0.88 ± 0.06
  F1: 0.87 ± 0.06

Decision Tree
  Accuracy: 0.92 ± 0.03
  Precision: 0.94 ± 0.02
  Recall: 0.92 ± 0.03
  F1: 0.92 ± 0.03


In [7]:
# --------------------------
# Step 7: Final Evaluation on Test Set
# This portion of the program evaluates the RandomForestClassifier on the dataset and generates a confusion matrix for visualization.
# --------------------------
best_model_name = max(cv_results, key=cv_results.get)
best_model = classifiers[best_model_name]
best_model.fit(X_train_smote, y_train_smote)

y_pred = best_model.predict(X_test)
print(f"\n*** Final Evaluation on Hold-out Test Set using {best_model_name} ***\n")
print(classification_report(y_test, y_pred, zero_division=0))

print(f"\n*** Confusion Matrix on Hold-out Test Set using {best_model_name} ***")
cm = confusion_matrix(y_test, y_pred)
print("\n\t\tPredicted\n\t\t0\t1\t2\t3\t4")
for i, row in enumerate(cm):
    print(f"Actual {i} |\t" + "\t".join(str(val) for val in row))


*** Final Evaluation on Hold-out Test Set using Random Forest ***

              precision    recall  f1-score   support

           0       1.00      1.00      1.00         8
           1       0.86      1.00      0.92         6
           2       1.00      1.00      1.00         5
           3       1.00      0.86      0.92         7
           4       1.00      1.00      1.00         5

    accuracy                           0.97        31
   macro avg       0.97      0.97      0.97        31
weighted avg       0.97      0.97      0.97        31


*** Confusion Matrix on Hold-out Test Set using Random Forest ***

		Predicted
		0	1	2	3	4
Actual 0 |	8	0	0	0	0
Actual 1 |	0	6	0	0	0
Actual 2 |	0	0	5	0	0
Actual 3 |	0	1	0	6	0
Actual 4 |	0	0	0	0	5


In [None]:
# --------------------------
# Step 8: Export the Trained Model for Deployment
# This portion of the program exports the trained RandomForestClassifier for deployment in a target embedded environment (PYNQ-Z1).
# --------------------------
dump(best_model, './model_components/random_forest_model.joblib')
dump(tsvd, './model_components/tsvd.joblib')

tsvd = load("./model_components/tsvd.joblib")
np.save("./model_components/tsvd_components.npy", tsvd.components_)

rf = load("./model_components/random_forest_model.joblib")

def extract_tree(tree):
    return {
        "children_left": tree.children_left.tolist(),
        "children_right": tree.children_right.tolist(),
        "feature": tree.feature.tolist(),
        "threshold": tree.threshold.tolist(),
        "value": tree.value.squeeze(1).tolist()
    }

forest_json = [extract_tree(estimator.tree_) for estimator in rf.estimators_]

with open("./model_components/rf_forest.json", "w") as f:
    json.dump(forest_json, f)

def compress_to_tar_gz(output_filename, items_to_compress):
    with tarfile.open(output_filename, "w:gz") as tar:
        for item in items_to_compress:
            if os.path.exists(item):
                tar.add(item, arcname=os.path.basename(item))
                print(f"Added '{item}' to archive.")
            else:
                print(f"Warning: '{item}' not found and will be skipped.")

print(f"=== Compressing {best_model_name} for PYNQ Deployment... ===\n")

targets = ["trusthub_bitstreams", "model_components", "VirtualEnv", "deploy_model.ipynb", "requirements.txt"]
output_file = "PYNQ_BLADEI.tar.gz"

compress_to_tar_gz(output_file, targets)
print(f"\nCompression complete. Archive saved as '{output_file}'.")