# Mobile Malware Detection using Apache Spark (MapReduce & Batch)

**Course Project:** Malware Classification
**Dataset:** CIC-MalDroid-2020

This notebook implements a complete pipeline:
1. **Ingestion:** Loading data using Spark (MapReduce paradigm).
2. **Preprocessing:** Cleaning, Vectorization, and Scaling.
3. **Batch Processing:** Training multiple ML models (RF, LR, DT).
4. **Evaluation:** Comparing models using F1-Score and Accuracy.
5. **Testing:** Simulating live malware detection.

In [1]:
# 1. Install necessary libraries (if not present)
!pip install pyspark pandas seaborn matplotlib scikit-learn numpy

Collecting pyspark
  Downloading pyspark-4.0.1.tar.gz (434.2 MB)
     ---------------------------------------- 0.0/434.2 MB ? eta -:--:--
     -------------------------------------- 0.0/434.2 MB 640.0 kB/s eta 0:11:19
     -------------------------------------- 0.0/434.2 MB 495.5 kB/s eta 0:14:37
     ---------------------------------------- 0.1/434.2 MB 1.0 MB/s eta 0:07:01
     ---------------------------------------- 0.4/434.2 MB 2.4 MB/s eta 0:02:59
     ---------------------------------------- 0.8/434.2 MB 4.0 MB/s eta 0:01:48
     ---------------------------------------- 1.8/434.2 MB 7.1 MB/s eta 0:01:02
     --------------------------------------- 3.2/434.2 MB 11.3 MB/s eta 0:00:39
     --------------------------------------- 4.7/434.2 MB 14.3 MB/s eta 0:00:31
      -------------------------------------- 7.2/434.2 MB 19.3 MB/s eta 0:00:23
      -------------------------------------- 7.2/434.2 MB 19.3 MB/s eta 0:00:23
      ------------------------------------- 11.4/434.2 MB 43.5


[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# 2. Imports and Spark Session Initialization
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# PySpark Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize Spark Session (The MapReduce Engine)
spark = SparkSession.builder \
    .appName("MobileMalwareClassification_CIC") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

sns.set_style("whitegrid")
print(f"Spark Session Created! Version: {spark.version}")

In [None]:
# 3. Data Ingestion & Cleaning
# Ensure 'MalDroid.csv' is in the same folder
DATASET_PATH = "MalDroid.csv" 

print("Loading data...")
try:
    # Reading CSV (Distributed Ingestion)
    df = spark.read.csv(DATASET_PATH, header=True, inferSchema=True)
    print(f"Dataset loaded. Raw count: {df.count()}")
except Exception as e:
    print(f"Error loading file: {e}. Please check the path.")

# Clean Column Names
new_columns = [c.strip() for c in df.columns]
df = df.toDF(*new_columns)

# Remove Duplicates and Nulls
df_clean = df.dropDuplicates().dropna(how='any')
print(f"Count after cleaning: {df_clean.count()}")

# Show Class Distribution
print("\nClass Distribution:")
df_clean.groupBy("class").count().orderBy(col("count").desc()).show()

In [None]:
# 4. Feature Engineering Pipeline

# A. String Indexing (Label Encoding)
indexer = StringIndexer(inputCol="class", outputCol="label").fit(df_clean)
df_indexed = indexer.transform(df_clean)
labels_list = indexer.labels
print(f"Class Mappings: {labels_list}")

# B. Vector Assembler
# Select all columns except 'class' and 'label' as features
ignore_cols = ['class', 'label']
feature_cols = [c for c in df_indexed.columns if c not in ignore_cols]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")

# C. Scaling
scaler = StandardScaler(inputCol="rawFeatures", outputCol="features", withStd=True, withMean=False)

# Build and Run Pipeline
pipeline_prep = Pipeline(stages=[assembler, scaler])
model_prep = pipeline_prep.fit(df_indexed)
final_data = model_prep.transform(df_indexed).select("features", "label", "class")

final_data.show(5, truncate=False)

In [None]:
# 5. Train/Test Split & Model Definition
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=2024)

print(f"Train Rows: {train_data.count()}")
print(f"Test Rows:  {test_data.count()}")

# Define Models
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=50, maxDepth=10, seed=42)
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=20)
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=10, seed=42)

models_map = {
    "Random Forest": rf,
    "Logistic Regression": lr,
    "Decision Tree": dt
}

In [None]:
# 6. Batch Training Loop
results = {}
best_model = None
best_f1 = 0.0
best_model_name = ""

print("Starting Batch Training...")
print("-" * 50)

for name, model in models_map.items():
    print(f"Training {name}...")
    
    # Train
    model_fitted = model.fit(train_data)
    
    # Predict
    predictions = model_fitted.transform(test_data)
    
    # Evaluate
    evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1Weighted")
    
    acc = evaluator_acc.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)
    
    print(f"--> {name}: Accuracy={acc:.4f}, F1-Score={f1:.4f}")
    
    results[name] = {
        "Accuracy": acc, 
        "F1-Score": f1,
        "Predictions": predictions
    }
    
    if f1 > best_f1:
        best_f1 = f1
        best_model = model_fitted
        best_model_name = name

print("-" * 50)
print(f"Best Model: {best_model_name} (F1: {best_f1:.4f})")

In [None]:
# 7. Visualization of Results
metrics_data = []
for name, res in results.items():
    metrics_data.append({"Model": name, "Metric": "Accuracy", "Score": res["Accuracy"]})
    metrics_data.append({"Model": name, "Metric": "F1-Score", "Score": res["F1-Score"]})

metrics_df = pd.DataFrame(metrics_data)

plt.figure(figsize=(10, 6))
sns.barplot(x="Model", y="Score", hue="Metric", data=metrics_df, palette="viridis")
plt.title("Model Performance Comparison")
plt.ylim(0, 1.05)
plt.show()

In [None]:
# 8. Confusion Matrix (Best Model)
from sklearn.metrics import confusion_matrix, classification_report

print(f"Analyzing Best Model: {best_model_name}")
best_preds = results[best_model_name]["Predictions"]

# Convert to Pandas for plotting
y_true = best_preds.select("label").toPandas()
y_pred = best_preds.select("prediction").toPandas()

cm = confusion_matrix(y_true, y_pred)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=labels_list, 
            yticklabels=labels_list)
plt.ylabel('True Class')
plt.xlabel('Predicted Class')
plt.title(f'Confusion Matrix: {best_model_name}')
plt.show()

print(classification_report(y_true, y_pred, target_names=labels_list))

In [None]:
# 9. Live Testing Simulation
def classify_sample(row_idx, dataset, model, label_map):
    sample = dataset.limit(row_idx + 1).tail(1)[0]
    features = sample['features']
    real = label_map[int(sample['label'])]
    
    # Create Single-Row DF
    df_new = spark.createDataFrame([(features,)], ["features"])
    pred = model.transform(df_new).select("prediction").collect()[0][0]
    pred_str = label_map[int(pred)]
    
    print(f"Sample #{row_idx} -> True: {real} | Pred: {pred_str}")
    return real == pred_str

print("--- Live Inference Tests ---")
classify_sample(10, test_data, best_model, labels_list)
classify_sample(55, test_data, best_model, labels_list)
classify_sample(102, test_data, best_model, labels_list)

In [None]:
# Stop Spark
spark.stop()