In [None]:
%%writefile src/backend/ml_models/ransomware_detector.ipynb
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Ransomware Detection System\n",
    "## Hybrid Deep Learning Model with Behavioral Analysis"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import pandas as pd\n",
    "import tensorflow as tf\n",
    "from tensorflow.keras import layers, models\n",
    "from sklearn.ensemble import IsolationForest\n",
    "from sklearn.model_selection import train_test_split\n",
    "from sklearn.metrics import classification_report, confusion_matrix\n",
    "import matplotlib.pyplot as plt\n",
    "import seaborn as sns\n",
    "import joblib\n",
    "from tqdm import tqdm\n",
    "import hashlib\n",
    "import lief  # For PE file analysis\n",
    "\n",
    "# Configuration\n",
    "RANDOM_SEED = 42\n",
    "np.random.seed(RANDOM_SEED)\n",
    "tf.random.set_seed(RANDOM_SEED)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Data Loading & Feature Engineering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def load_and_process_data(file_path):\n",
    "    \"\"\"Load and process ransomware dataset\"\"\"\n",
    "    df = pd.read_csv(file_path)\n",
    "    \n",
    "    # Feature engineering\n",
    "    df['entropy'] = df['file_content'].apply(lambda x: calculate_entropy(x))\n",
    "    df['api_calls_per_sec'] = df['api_call_count'] / df['process_duration']\n",
    "    df['file_entropy_variance'] = df.groupby('process_id')['entropy'].transform('var')\n",
    "    \n",
    "    # Behavioral features\n",
    "    df['encryption_pattern'] = df.apply(detect_encryption_patterns, axis=1)\n",
    "    df['ransom_note_keywords'] = df['created_files'].apply(find_ransom_note_keywords)\n",
    "    \n",
    "    return df\n",
    "\n",
    "def calculate_entropy(data):\n",
    "    \"\"\"Calculate Shannon entropy for file content\"\"\"\n",
    "    if isinstance(data, str):\n",
    "        data = data.encode()\n",
    "    counts = np.bincount(np.frombuffer(data, dtype=np.uint8))\n",
    "    probabilities = counts / len(data)\n",
    "    return -np.sum(probabilities * np.log2(probabilities + 1e-9))\n",
    "\n",
    "# Load dataset\n",
    "data = load_and_process_data(\"data/raw/ransomware_dataset.csv\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Hybrid Model Architecture"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def build_hybrid_model(input_shape):\n",
    "    \"\"\"Create LSTM + CNN hybrid model\"\"\"\n",
    "    inputs = layers.Input(shape=input_shape)\n",
    "    \n",
    "    # Temporal features branch\n",
    "    lstm_branch = layers.LSTM(128, return_sequences=True)(inputs)\n",
    "    lstm_branch = layers.LSTM(64)(lstm_branch)\n",
    "    \n",
    "    # Static features branch\n",
    "    static_branch = layers.Conv1D(64, 3, activation='relu')(inputs)\n",
    "    static_branch = layers.MaxPooling1D(2)(static_branch)\n",
    "    static_branch = layers.Flatten()(static_branch)\n",
    "    \n",
    "    combined = layers.concatenate([lstm_branch, static_branch])\n",
    "    dense = layers.Dense(64, activation='relu')(combined)\n",
    "    output = layers.Dense(1, activation='sigmoid')(dense)\n",
    "    \n",
    "    model = models.Model(inputs=inputs, outputs=output)\n",
    "    model.compile(\n",
    "        optimizer='adam',\n",
    "        loss='binary_crossentropy',\n",
    "        metrics=[\n",
    "            tf.keras.metrics.Precision(name='precision'),\n",
    "            tf.keras.metrics.Recall(name='recall'),\n",
    "            tf.keras.metrics.AUC(name='auc')\n",
    "        ]\n",
    "    )\n",
    "    return model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Anomaly Detection Layer"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class AnomalyAwareLayer(layers.Layer):\n",
    "    \"\"\"Custom layer combining ML model output with anomaly scores\"\"\"\n",
    "    def __init__(self, isolation_forest, **kwargs):\n",
    "        super().__init__(**kwargs)\n",
    "        self.isolation_forest = isolation_forest\n",
    "\n",
    "    def call(self, inputs):\n",
    "        ml_score, features = inputs\n",
    "        anomaly_score = self.isolation_forest.decision_function(features)\n",
    "        combined_score = ml_score * (1 - anomaly_score)\n",
    "        return combined_score\n",
    "\n",
    "def create_ensemble_model(hybrid_model, isolation_forest, feature_columns):\n",
    "    \"\"\"Create final ensemble model\"\"\"\n",
    "    feature_input = layers.Input(shape=(len(feature_columns),)\n",
    "    sequence_input = layers.Input(shape=hybrid_model.input_shape[1:])\n",
    "    \n",
    "    ml_output = hybrid_model(sequence_input)\n",
    "    anomaly_layer = AnomalyAwareLayer(isolation_forest)([ml_output, feature_input])\n",
    "    \n",
    "    model = models.Model(\n",
    "        inputs=[sequence_input, feature_input],\n",
    "        outputs=anomaly_layer\n",
    "    )\n",
    "    return model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4. Training Pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Data preparation\n",
    "X_sequence = np.array(data['behavior_sequence'].tolist())\n",
    "X_features = data[['entropy', 'api_calls_per_sec', 'file_entropy_variance']]\n",
    "y = data['is_ransomware']\n",
    "\n",
    "# Train-test split\n",
    "X_train_seq, X_test_seq, X_train_feat, X_test_feat, y_train, y_test = train_test_split(\n",
    "    X_sequence, X_features, y, test_size=0.2, stratify=y\n",
    ")\n",
    "\n",
    "# Train Isolation Forest\n",
    "iso_forest = IsolationForest(contamination=0.1, random_state=RANDOM_SEED)\n",
    "iso_forest.fit(X_train_feat)\n",
    "\n",
    "# Build and train hybrid model\n",
    "model = build_hybrid_model(X_train_seq.shape[1:])\n",
    "history = model.fit(\n",
    "    X_train_seq,\n",
    "    y_train,\n",
    "    validation_split=0.1,\n",
    "    epochs=20,\n",
    "    batch_size=64,\n",
    "    class_weight={0: 1, 1: 5}  # Handle class imbalance\n",
    ")\n",
    "\n",
    "# Create ensemble model\n",
    "final_model = create_ensemble_model(model, iso_forest, X_features.columns)\n",
    "final_model.compile(loss='binary_crossentropy', optimizer='adam')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5. Evaluation & Explainability"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Model evaluation\n",
    "y_pred = final_model.predict([X_test_seq, X_test_feat])\n",
    "y_pred_binary = (y_pred > 0.7).astype(int)\n",
    "\n",
    "print(\"Classification Report:\")\n",
    "print(classification_report(y_test, y_pred_binary))\n",
    "\n",
    "# Confusion matrix\n",
    "cm = confusion_matrix(y_test, y_pred_binary)\n",
    "sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')\n",
    "plt.xlabel('Predicted')\n",
    "plt.ylabel('Actual')\n",
    "plt.show()\n",
    "\n",
    "# SHAP explainability\n",
    "import shap\n",
    "explainer = shap.DeepExplainer(model, X_train_seq[:100])\n",
    "shap_values = explainer.shap_values(X_test_seq[:10])\n",
    "shap.initjs()\n",
    "shap.force_plot(explainer.expected_value[0], shap_values[0][0], X_test_seq[0])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6. Real-time Detection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def real_time_detection(process_data):\n",
    "    \"\"\"Real-time ransomware detection pipeline\"\"\"\n",
    "    # Feature extraction\n",
    "    features = extract_live_features(process_data)\n",
    "    sequence = preprocess_sequence(process_data['behavior_log'])\n",
    "    \n",
    "    # Prediction\n",
    "    score = final_model.predict([np.array([sequence]), np.array([features])])[0][0]\n",
    "    \n",
    "    # Alerting\n",
    "    if score > 0.85:\n",
    "        trigger_incident_response(process_data)\n",
    "    \n",
    "    return {\n",
    "        'score': float(score),\n",
    "        'features': features,\n",
    "        'timestamp': pd.Timestamp.now().isoformat()\n",
    "    }\n",
    "\n",
    "# Example usage\n",
    "sample_process = {\n",
    "    'behavior_log': [...]  # Raw behavioral data\n",
    "}\n",
    "print(real_time_detection(sample_process))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7. Model Persistence"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Save models\n",
    "final_model.save(\"models/ransomware_detector.keras\")\n",
    "joblib.dump(iso_forest, \"models/anomaly_detector.pkl\")\n",
    "\n",
    "# Save feature engineering pipeline\n",
    "joblib.dump({\n",
    "    'feature_columns': X_features.columns.tolist(),\n",
    "    'preprocessor': preprocessor\n",
    "}, \"models/feature_pipeline.pkl\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}

Key Features:

Hybrid LSTM + CNN architecture for temporal/spatial pattern detection

Integrated Isolation Forest for anomaly scoring

Custom TensorFlow layer combining ML and anomaly scores

Real-time behavioral analysis pipeline

SHAP explainability for model decisions

Automated incident response triggering

Feature engineering for encryption patterns/entropy analysis

Class imbalance handling with weighted loss

Model persistence with full pipeline saving

Comprehensive evaluation metrics