# Malware Detection Model Evaluation Framework

**Purpose:** Standardized evaluation of malware detection models across multiple industry-grade datasets to measure generalizability and zero-day detection capability.

## Evaluation Datasets

| Dataset | Type | Samples | Time Period | Purpose |
|---------|------|---------|-------------|--------|
| EMBER 2018 Test | Benchmark | 200k | 2018 | In-distribution baseline |
| theZoo | Research | ~200 | 2010-2022 | APT/nation-state malware |
| MalwareBazaar Recent | Wild | Variable | Last 30 days | Zero-day/emerging threats |
| VirusShare (subset) | Wild | Variable | Mixed | Volume testing |
| Benign (Sysinternals) | Legitimate | ~10 | Current | False positive baseline |
| Benign (System32) | Legitimate | Variable | Current | System file testing |

## Metrics Tracked
- Accuracy, Precision, Recall, F1, AUC-ROC
- Per-dataset breakdown
- Confidence distribution analysis
- Zero-day detection rate (samples < 30 days old)

In [None]:
import os
import json
import time
import hashlib
import requests
import zipfile
import io
from pathlib import Path
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import joblib

# EMBER for feature extraction
import ember
from sklearn.feature_extraction import FeatureHasher
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, roc_auc_score, roc_curve, classification_report
)

# Monkey patch for ember compatibility
def fixed_section_info_process_raw_features(self, raw_obj):
    sections = raw_obj['sections']
    general = [
        len(sections),
        sum(1 for s in sections if s['size'] == 0),
        sum(1 for s in sections if s['name'] == ""),
        sum(1 for s in sections if 'MEM_READ' in s['props'] and 'MEM_EXECUTE' in s['props']),
        sum(1 for s in sections if 'MEM_WRITE' in s['props'])
    ]
    section_sizes = [(s['name'], s['size']) for s in sections]
    section_sizes_hashed = FeatureHasher(50, input_type="pair").transform([section_sizes]).toarray()[0]
    section_entropy = [(s['name'], s['entropy']) for s in sections]
    section_entropy_hashed = FeatureHasher(50, input_type="pair").transform([section_entropy]).toarray()[0]
    section_vsize = [(s['name'], s['vsize']) for s in sections]
    section_vsize_hashed = FeatureHasher(50, input_type="pair").transform([section_vsize]).toarray()[0]
    entry_name_hashed = FeatureHasher(50, input_type="string").transform([[raw_obj['entry']]]).toarray()[0]
    characteristics = [p for s in sections for p in s['props'] if s['name'] == raw_obj['entry']]
    characteristics_hashed = FeatureHasher(50, input_type="string").transform([characteristics]).toarray()[0]
    return np.hstack([general, section_sizes_hashed, section_entropy_hashed, section_vsize_hashed, entry_name_hashed, characteristics_hashed]).astype(np.float32)

if hasattr(ember.features, 'SectionInfo'):
    ember.features.SectionInfo.process_raw_features = fixed_section_info_process_raw_features

sns.set_style("whitegrid")
print("Evaluation framework loaded.")

In [None]:
# ============== CONFIGURATION ==============

# Paths
PROJECT_ROOT = Path(".").resolve().parent
MODELS_DIR = PROJECT_ROOT / "models"
SAMPLES_DIR = PROJECT_ROOT / "evaluation_samples"
RESULTS_DIR = PROJECT_ROOT / "evaluation_results"

# Create directories
SAMPLES_DIR.mkdir(exist_ok=True)
(SAMPLES_DIR / "malware" / "thezoo").mkdir(parents=True, exist_ok=True)
(SAMPLES_DIR / "malware" / "malwarebazaar_recent").mkdir(parents=True, exist_ok=True)
(SAMPLES_DIR / "malware" / "virusshare").mkdir(parents=True, exist_ok=True)
(SAMPLES_DIR / "benign" / "sysinternals").mkdir(parents=True, exist_ok=True)
(SAMPLES_DIR / "benign" / "system").mkdir(parents=True, exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)

# Classification threshold
THRESHOLD = 0.35

print(f"Project root: {PROJECT_ROOT}")
print(f"Models dir: {MODELS_DIR}")
print(f"Samples dir: {SAMPLES_DIR}")
print(f"Results dir: {RESULTS_DIR}")

In [None]:
@dataclass
class SampleResult:
    """Result for a single sample evaluation."""
    filepath: str
    sha256: str
    dataset: str
    expected_label: str
    predicted_label: str
    confidence: float
    correct: bool
    error: Optional[str] = None
    sample_age_days: Optional[int] = None  # For zero-day analysis

@dataclass 
class DatasetMetrics:
    """Aggregated metrics for a dataset."""
    name: str
    total_samples: int
    processed: int
    errors: int
    accuracy: float
    precision: float
    recall: float
    f1: float
    true_positives: int
    false_positives: int
    true_negatives: int
    false_negatives: int
    avg_confidence_malware: float
    avg_confidence_benign: float

@dataclass
class EvaluationReport:
    """Complete evaluation report."""
    model_name: str
    model_path: str
    evaluation_date: str
    threshold: float
    overall_metrics: Dict
    dataset_metrics: List[DatasetMetrics]
    zero_day_metrics: Optional[Dict] = None

In [None]:
class ModelEvaluator:
    """Evaluates malware detection models across multiple datasets."""
    
    def __init__(self, model_path: Path, scaler_path: Path, pca_path: Optional[Path] = None):
        """Load model, scaler, and optionally PCA."""
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)
        self.pca = joblib.load(pca_path) if pca_path and pca_path.exists() else None
        self.extractor = ember.PEFeatureExtractor(2)
        self.model_name = model_path.stem
        self.model_path = str(model_path)
        
        print(f"Loaded model: {self.model_name}")
        print(f"  - PCA: {'Yes' if self.pca else 'No (full features)'}")
    
    def predict_file(self, file_bytes: bytes, threshold: float = 0.35) -> Tuple[str, float]:
        """Predict if file is malicious."""
        try:
            features = np.array(self.extractor.feature_vector(file_bytes), dtype=np.float32)
            features = features.reshape(1, -1)
            features_scaled = self.scaler.transform(features)
            
            if self.pca:
                features_final = self.pca.transform(features_scaled)
            else:
                features_final = features_scaled
            
            if hasattr(self.model, "predict_proba"):
                probs = self.model.predict_proba(features_final)
                malicious_prob = float(probs[0][1])
            else:
                malicious_prob = float(self.model.predict(features_final)[0])
            
            label = "malicious" if malicious_prob > threshold else "benign"
            return label, malicious_prob
        except Exception as e:
            return "error", 0.0
    
    def evaluate_sample(self, filepath: Path, expected_label: str, dataset: str, 
                       threshold: float = 0.35, sample_date: Optional[datetime] = None) -> SampleResult:
        """Evaluate a single sample."""
        try:
            file_bytes = filepath.read_bytes()
            sha256 = hashlib.sha256(file_bytes).hexdigest()
            predicted, confidence = self.predict_file(file_bytes, threshold)
            
            age_days = None
            if sample_date:
                age_days = (datetime.now() - sample_date).days
            
            return SampleResult(
                filepath=str(filepath),
                sha256=sha256,
                dataset=dataset,
                expected_label=expected_label,
                predicted_label=predicted,
                confidence=confidence,
                correct=(predicted == expected_label),
                sample_age_days=age_days
            )
        except Exception as e:
            return SampleResult(
                filepath=str(filepath),
                sha256="",
                dataset=dataset,
                expected_label=expected_label,
                predicted_label="error",
                confidence=0.0,
                correct=False,
                error=str(e)
            )

In [None]:
# ============== SAMPLE DOWNLOADERS ==============

def download_sysinternals(output_dir: Path, tools: List[str] = None) -> int:
    """Download Microsoft Sysinternals tools as benign samples."""
    if tools is None:
        tools = [
            "procexp.exe", "procmon.exe", "autoruns.exe", "tcpview.exe",
            "pslist.exe", "listdlls.exe", "handle.exe", "Dbgview.exe",
            "strings.exe", "du.exe", "accesschk.exe", "psexec.exe",
            "logonsessions.exe", "psinfo.exe", "diskext.exe"
        ]
    
    downloaded = 0
    for tool in tools:
        filepath = output_dir / tool
        if filepath.exists():
            continue
        try:
            url = f"https://live.sysinternals.com/{tool}"
            response = requests.get(url, timeout=30)
            if response.status_code == 200 and len(response.content) > 1000:
                filepath.write_bytes(response.content)
                downloaded += 1
        except:
            pass
    return downloaded


def download_malwarebazaar_recent(output_dir: Path, limit: int = 50) -> List[Dict]:
    """
    Download recent malware samples from MalwareBazaar.
    Returns metadata including first_seen date for zero-day analysis.
    """
    metadata = []
    
    # Query for recent PE samples
    try:
        response = requests.post(
            "https://mb-api.abuse.ch/api/v1/",
            data={"query": "get_file_type", "file_type": "exe", "limit": limit},
            timeout=30
        )
        if response.status_code != 200:
            print(f"MalwareBazaar API error: {response.status_code}")
            return metadata
        
        data = response.json()
        if data.get("query_status") != "ok":
            print(f"Query failed: {data.get('query_status')}")
            return metadata
        
        samples = data.get("data", [])
        print(f"Found {len(samples)} recent samples from MalwareBazaar")
        
        for sample in samples[:limit]:
            sha256 = sample.get("sha256_hash")
            first_seen = sample.get("first_seen", "")
            
            if not sha256:
                continue
            
            filepath = output_dir / f"{sha256[:16]}.exe"
            if filepath.exists():
                metadata.append({
                    "sha256": sha256,
                    "filepath": str(filepath),
                    "first_seen": first_seen
                })
                continue
            
            # Download sample
            try:
                dl_response = requests.post(
                    "https://mb-api.abuse.ch/api/v1/",
                    data={"query": "get_file", "sha256_hash": sha256},
                    timeout=60
                )
                if dl_response.status_code == 200 and len(dl_response.content) > 100:
                    # Extract from password-protected zip
                    try:
                        with zipfile.ZipFile(io.BytesIO(dl_response.content)) as zf:
                            for name in zf.namelist():
                                content = zf.read(name, pwd=b"infected")
                                filepath.write_bytes(content)
                                metadata.append({
                                    "sha256": sha256,
                                    "filepath": str(filepath),
                                    "first_seen": first_seen
                                })
                                break
                    except:
                        pass
            except:
                pass
    except Exception as e:
        print(f"Error downloading from MalwareBazaar: {e}")
    
    return metadata


def collect_existing_samples(samples_dir: Path) -> Dict[str, List[Path]]:
    """Collect all existing samples organized by dataset."""
    samples = {
        "thezoo": [],
        "malwarebazaar_recent": [],
        "virusshare": [],
        "sysinternals": [],
        "system": [],
        "other_malware": [],
        "other_benign": []
    }
    
    # Malware samples
    malware_dir = samples_dir / "malware"
    if malware_dir.exists():
        for subdir in malware_dir.iterdir():
            if subdir.is_dir():
                key = subdir.name if subdir.name in samples else "other_malware"
                samples[key].extend([f for f in subdir.iterdir() if f.is_file()])
    
    # Benign samples  
    benign_dir = samples_dir / "benign"
    if benign_dir.exists():
        for subdir in benign_dir.iterdir():
            if subdir.is_dir():
                key = subdir.name if subdir.name in samples else "other_benign"
                samples[key].extend([f for f in subdir.iterdir() if f.is_file()])
    
    return samples


print("Sample downloaders defined.")

In [None]:
def calculate_metrics(results: List[SampleResult], dataset_name: str) -> DatasetMetrics:
    """Calculate metrics for a set of results."""
    valid_results = [r for r in results if r.error is None]
    
    if not valid_results:
        return DatasetMetrics(
            name=dataset_name, total_samples=len(results), processed=0, errors=len(results),
            accuracy=0, precision=0, recall=0, f1=0,
            true_positives=0, false_positives=0, true_negatives=0, false_negatives=0,
            avg_confidence_malware=0, avg_confidence_benign=0
        )
    
    y_true = [1 if r.expected_label == "malicious" else 0 for r in valid_results]
    y_pred = [1 if r.predicted_label == "malicious" else 0 for r in valid_results]
    
    # Handle edge cases
    if len(set(y_true)) == 1 or len(set(y_pred)) == 1:
        accuracy = sum(1 for r in valid_results if r.correct) / len(valid_results)
        tp = sum(1 for r in valid_results if r.expected_label == "malicious" and r.predicted_label == "malicious")
        fp = sum(1 for r in valid_results if r.expected_label == "benign" and r.predicted_label == "malicious")
        tn = sum(1 for r in valid_results if r.expected_label == "benign" and r.predicted_label == "benign")
        fn = sum(1 for r in valid_results if r.expected_label == "malicious" and r.predicted_label == "benign")
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    else:
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred, zero_division=0)
        recall = recall_score(y_true, y_pred, zero_division=0)
        f1 = f1_score(y_true, y_pred, zero_division=0)
        cm = confusion_matrix(y_true, y_pred)
        tn, fp, fn, tp = cm.ravel() if cm.size == 4 else (0, 0, 0, 0)
    
    # Confidence analysis
    mal_confs = [r.confidence for r in valid_results if r.expected_label == "malicious"]
    ben_confs = [r.confidence for r in valid_results if r.expected_label == "benign"]
    
    return DatasetMetrics(
        name=dataset_name,
        total_samples=len(results),
        processed=len(valid_results),
        errors=len(results) - len(valid_results),
        accuracy=accuracy,
        precision=precision,
        recall=recall,
        f1=f1,
        true_positives=int(tp) if 'tp' in dir() else 0,
        false_positives=int(fp) if 'fp' in dir() else 0,
        true_negatives=int(tn) if 'tn' in dir() else 0,
        false_negatives=int(fn) if 'fn' in dir() else 0,
        avg_confidence_malware=np.mean(mal_confs) if mal_confs else 0,
        avg_confidence_benign=np.mean(ben_confs) if ben_confs else 0
    )


def run_evaluation(evaluator: ModelEvaluator, samples: Dict[str, List[Path]], 
                   threshold: float = 0.35, max_workers: int = 4) -> Tuple[List[SampleResult], List[DatasetMetrics]]:
    """Run full evaluation across all datasets."""
    all_results = []
    dataset_metrics = []
    
    # Define which datasets are malware vs benign
    malware_datasets = ["thezoo", "malwarebazaar_recent", "virusshare", "other_malware"]
    benign_datasets = ["sysinternals", "system", "other_benign"]
    
    for dataset_name, files in samples.items():
        if not files:
            continue
        
        expected_label = "malicious" if dataset_name in malware_datasets else "benign"
        print(f"\nEvaluating {dataset_name} ({len(files)} samples, expected: {expected_label})...")
        
        results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(evaluator.evaluate_sample, f, expected_label, dataset_name, threshold): f
                for f in files
            }
            
            for i, future in enumerate(as_completed(futures)):
                result = future.result()
                results.append(result)
                if (i + 1) % 20 == 0:
                    print(f"  Processed {i + 1}/{len(files)}...")
        
        all_results.extend(results)
        metrics = calculate_metrics(results, dataset_name)
        dataset_metrics.append(metrics)
        
        print(f"  Accuracy: {metrics.accuracy:.1%} | Recall: {metrics.recall:.1%} | Precision: {metrics.precision:.1%}")
    
    return all_results, dataset_metrics


print("Evaluation engine defined.")

In [None]:
def plot_evaluation_results(results: List[SampleResult], metrics: List[DatasetMetrics], 
                           model_name: str, output_dir: Path):
    """Generate visualization plots for evaluation results."""
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle(f"Evaluation Results: {model_name}", fontsize=14, fontweight='bold')
    
    # 1. Accuracy by dataset
    ax = axes[0, 0]
    names = [m.name for m in metrics if m.processed > 0]
    accuracies = [m.accuracy for m in metrics if m.processed > 0]
    colors = ['#2ecc71' if a >= 0.8 else '#f39c12' if a >= 0.6 else '#e74c3c' for a in accuracies]
    bars = ax.barh(names, accuracies, color=colors)
    ax.set_xlabel('Accuracy')
    ax.set_title('Accuracy by Dataset')
    ax.set_xlim(0, 1)
    for bar, acc in zip(bars, accuracies):
        ax.text(acc + 0.02, bar.get_y() + bar.get_height()/2, f'{acc:.1%}', va='center')
    
    # 2. Precision vs Recall
    ax = axes[0, 1]
    for m in metrics:
        if m.processed > 0:
            ax.scatter(m.recall, m.precision, s=m.processed*2, alpha=0.7, label=m.name)
    ax.set_xlabel('Recall')
    ax.set_ylabel('Precision')
    ax.set_title('Precision vs Recall by Dataset')
    ax.set_xlim(0, 1.05)
    ax.set_ylim(0, 1.05)
    ax.legend(loc='lower left', fontsize=8)
    ax.grid(True, alpha=0.3)
    
    # 3. Confidence distribution
    ax = axes[0, 2]
    malware_confs = [r.confidence for r in results if r.expected_label == "malicious" and r.error is None]
    benign_confs = [r.confidence for r in results if r.expected_label == "benign" and r.error is None]
    if malware_confs:
        ax.hist(malware_confs, bins=30, alpha=0.7, label=f'Malware (n={len(malware_confs)})', color='red')
    if benign_confs:
        ax.hist(benign_confs, bins=30, alpha=0.7, label=f'Benign (n={len(benign_confs)})', color='green')
    ax.axvline(x=0.35, color='black', linestyle='--', label='Threshold (0.35)')
    ax.set_xlabel('Confidence (P(Malicious))')
    ax.set_ylabel('Count')
    ax.set_title('Confidence Distribution')
    ax.legend()
    
    # 4. Confusion matrix (overall)
    ax = axes[1, 0]
    valid_results = [r for r in results if r.error is None]
    y_true = [1 if r.expected_label == "malicious" else 0 for r in valid_results]
    y_pred = [1 if r.predicted_label == "malicious" else 0 for r in valid_results]
    if len(set(y_true)) > 1 and len(set(y_pred)) > 1:
        cm = confusion_matrix(y_true, y_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax,
                    xticklabels=['Benign', 'Malicious'], yticklabels=['Benign', 'Malicious'])
    ax.set_title('Overall Confusion Matrix')
    ax.set_xlabel('Predicted')
    ax.set_ylabel('Actual')
    
    # 5. Sample counts by dataset
    ax = axes[1, 1]
    names = [m.name for m in metrics]
    counts = [m.processed for m in metrics]
    errors = [m.errors for m in metrics]
    x = np.arange(len(names))
    ax.bar(x, counts, label='Processed', color='steelblue')
    ax.bar(x, errors, bottom=counts, label='Errors', color='salmon')
    ax.set_xticks(x)
    ax.set_xticklabels(names, rotation=45, ha='right')
    ax.set_ylabel('Sample Count')
    ax.set_title('Samples by Dataset')
    ax.legend()
    
    # 6. F1 scores
    ax = axes[1, 2]
    f1_scores = [m.f1 for m in metrics if m.processed > 0]
    names = [m.name for m in metrics if m.processed > 0]
    colors = ['#2ecc71' if f >= 0.8 else '#f39c12' if f >= 0.6 else '#e74c3c' for f in f1_scores]
    bars = ax.barh(names, f1_scores, color=colors)
    ax.set_xlabel('F1 Score')
    ax.set_title('F1 Score by Dataset')
    ax.set_xlim(0, 1)
    for bar, f1 in zip(bars, f1_scores):
        ax.text(f1 + 0.02, bar.get_y() + bar.get_height()/2, f'{f1:.2f}', va='center')
    
    plt.tight_layout()
    plt.savefig(output_dir / f"evaluation_{model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png", dpi=150)
    plt.show()


print("Visualization functions defined.")

In [None]:
def generate_report(results: List[SampleResult], metrics: List[DatasetMetrics], 
                   evaluator: ModelEvaluator, threshold: float, output_dir: Path) -> EvaluationReport:
    """Generate and save comprehensive evaluation report."""
    
    # Overall metrics
    valid_results = [r for r in results if r.error is None]
    overall_correct = sum(1 for r in valid_results if r.correct)
    
    overall = {
        "total_samples": len(results),
        "processed": len(valid_results),
        "errors": len(results) - len(valid_results),
        "accuracy": overall_correct / len(valid_results) if valid_results else 0,
        "total_malware_samples": sum(1 for r in valid_results if r.expected_label == "malicious"),
        "total_benign_samples": sum(1 for r in valid_results if r.expected_label == "benign"),
    }
    
    # Zero-day analysis (samples seen in last 30 days)
    zero_day_results = [r for r in valid_results if r.sample_age_days is not None and r.sample_age_days <= 30]
    zero_day_metrics = None
    if zero_day_results:
        zd_correct = sum(1 for r in zero_day_results if r.correct)
        zero_day_metrics = {
            "samples": len(zero_day_results),
            "accuracy": zd_correct / len(zero_day_results),
            "avg_age_days": np.mean([r.sample_age_days for r in zero_day_results])
        }
    
    report = EvaluationReport(
        model_name=evaluator.model_name,
        model_path=evaluator.model_path,
        evaluation_date=datetime.now().isoformat(),
        threshold=threshold,
        overall_metrics=overall,
        dataset_metrics=metrics,
        zero_day_metrics=zero_day_metrics
    )
    
    # Save report as JSON
    report_path = output_dir / f"report_{evaluator.model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    
    report_dict = {
        "model_name": report.model_name,
        "model_path": report.model_path,
        "evaluation_date": report.evaluation_date,
        "threshold": report.threshold,
        "overall_metrics": report.overall_metrics,
        "dataset_metrics": [asdict(m) for m in report.dataset_metrics],
        "zero_day_metrics": report.zero_day_metrics
    }
    
    with open(report_path, 'w') as f:
        json.dump(report_dict, f, indent=2)
    
    print(f"\nReport saved to: {report_path}")
    
    # Save detailed results CSV
    results_df = pd.DataFrame([asdict(r) for r in results])
    csv_path = output_dir / f"details_{evaluator.model_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    results_df.to_csv(csv_path, index=False)
    print(f"Detailed results saved to: {csv_path}")
    
    return report


def print_summary(report: EvaluationReport):
    """Print formatted summary of evaluation."""
    print("\n" + "="*70)
    print(f"EVALUATION SUMMARY: {report.model_name}")
    print("="*70)
    print(f"Date: {report.evaluation_date}")
    print(f"Threshold: {report.threshold}")
    print(f"\nOverall: {report.overall_metrics['accuracy']:.1%} accuracy ")
    print(f"         ({report.overall_metrics['processed']} samples, {report.overall_metrics['errors']} errors)")
    
    print("\nPer-Dataset Results:")
    print("-" * 70)
    print(f"{'Dataset':<25} {'Samples':>8} {'Accuracy':>10} {'Precision':>10} {'Recall':>10} {'F1':>8}")
    print("-" * 70)
    for m in report.dataset_metrics:
        if m.processed > 0:
            print(f"{m.name:<25} {m.processed:>8} {m.accuracy:>10.1%} {m.precision:>10.1%} {m.recall:>10.1%} {m.f1:>8.2f}")
    print("-" * 70)
    
    if report.zero_day_metrics:
        print(f"\nZero-Day Detection (samples < 30 days old):")
        print(f"  Samples: {report.zero_day_metrics['samples']}")
        print(f"  Accuracy: {report.zero_day_metrics['accuracy']:.1%}")
        print(f"  Avg Age: {report.zero_day_metrics['avg_age_days']:.1f} days")


print("Report generator defined.")

---
## Run Evaluation

Execute the cells below to run a full evaluation.

In [None]:
# ============== STEP 1: Download/Collect Samples ==============

print("Setting up evaluation samples...\n")

# Download Sysinternals (benign)
print("Downloading Sysinternals tools...")
n_sysinternals = download_sysinternals(SAMPLES_DIR / "benign" / "sysinternals")
print(f"  Downloaded {n_sysinternals} new tools")

# Copy existing malware samples if they exist
existing_malware = PROJECT_ROOT / "malware_samples"
if existing_malware.exists():
    import shutil
    dest = SAMPLES_DIR / "malware" / "thezoo"
    for f in existing_malware.iterdir():
        if f.is_file() and not (dest / f.name).exists():
            shutil.copy(f, dest / f.name)
    print(f"Copied existing malware samples to thezoo directory")

# Copy existing benign samples
existing_benign = PROJECT_ROOT / "benign_samples"
if existing_benign.exists():
    dest = SAMPLES_DIR / "benign" / "sysinternals"
    for f in existing_benign.iterdir():
        if f.is_file() and not (dest / f.name).exists():
            shutil.copy(f, dest / f.name)
    print(f"Copied existing benign samples")

# Optional: Download recent MalwareBazaar samples (uncomment to enable)
# print("\nDownloading recent samples from MalwareBazaar...")
# mb_metadata = download_malwarebazaar_recent(SAMPLES_DIR / "malware" / "malwarebazaar_recent", limit=30)
# print(f"  Downloaded {len(mb_metadata)} recent samples")

# Collect all samples
samples = collect_existing_samples(SAMPLES_DIR)

print("\nSample counts:")
for name, files in samples.items():
    if files:
        print(f"  {name}: {len(files)}")

In [None]:
# ============== STEP 2: Load Model ==============

# Choose which model to evaluate:

# Option A: Improved model (no PCA, 97.5% EMBER accuracy)
evaluator = ModelEvaluator(
    model_path=MODELS_DIR / "model_improved.pkl",
    scaler_path=MODELS_DIR / "scaler_improved.pkl",
    pca_path=None  # No PCA for improved model
)

# Option B: Original model (with PCA, 93% EMBER accuracy)
# evaluator = ModelEvaluator(
#     model_path=MODELS_DIR / "xgboost_pca_model.pkl",
#     scaler_path=MODELS_DIR / "scaler.pkl",
#     pca_path=MODELS_DIR / "pca_transform.pkl"
# )

In [None]:
# ============== STEP 3: Run Evaluation ==============

print("Running evaluation...")
start_time = time.time()

results, metrics = run_evaluation(
    evaluator=evaluator,
    samples=samples,
    threshold=THRESHOLD,
    max_workers=4
)

elapsed = time.time() - start_time
print(f"\nEvaluation completed in {elapsed:.1f} seconds")

In [None]:
# ============== STEP 4: Generate Report & Visualizations ==============

# Generate report
report = generate_report(results, metrics, evaluator, THRESHOLD, RESULTS_DIR)

# Print summary
print_summary(report)

# Generate plots
plot_evaluation_results(results, metrics, evaluator.model_name, RESULTS_DIR)

---
## Compare Multiple Models

Use this section to compare different model versions side-by-side.

In [None]:
def compare_models(model_configs: List[Dict], samples: Dict[str, List[Path]], 
                   threshold: float = 0.35) -> pd.DataFrame:
    """
    Compare multiple models on the same sample set.
    
    model_configs: List of dicts with keys: name, model_path, scaler_path, pca_path (optional)
    """
    comparison_data = []
    
    for config in model_configs:
        print(f"\n{'='*50}")
        print(f"Evaluating: {config['name']}")
        print(f"{'='*50}")
        
        evaluator = ModelEvaluator(
            model_path=Path(config['model_path']),
            scaler_path=Path(config['scaler_path']),
            pca_path=Path(config['pca_path']) if config.get('pca_path') else None
        )
        
        results, metrics = run_evaluation(evaluator, samples, threshold, max_workers=4)
        
        # Overall stats
        valid = [r for r in results if r.error is None]
        overall_acc = sum(1 for r in valid if r.correct) / len(valid) if valid else 0
        
        row = {
            'Model': config['name'],
            'Overall Accuracy': overall_acc,
            'Total Samples': len(valid)
        }
        
        for m in metrics:
            if m.processed > 0:
                row[f'{m.name}_acc'] = m.accuracy
                row[f'{m.name}_f1'] = m.f1
        
        comparison_data.append(row)
    
    df = pd.DataFrame(comparison_data)
    return df


# Example usage (uncomment to compare models):
# model_configs = [
#     {
#         "name": "v1_pca",
#         "model_path": MODELS_DIR / "xgboost_pca_model.pkl",
#         "scaler_path": MODELS_DIR / "scaler.pkl",
#         "pca_path": MODELS_DIR / "pca_transform.pkl"
#     },
#     {
#         "name": "v2_improved",
#         "model_path": MODELS_DIR / "model_improved.pkl",
#         "scaler_path": MODELS_DIR / "scaler_improved.pkl",
#         "pca_path": None
#     }
# ]
# comparison_df = compare_models(model_configs, samples)
# display(comparison_df)