# Production ML Model Training for Malicious APK Detection
Supports: Drebin, CICAndMal2017, AndroZoo, and custom datasets

## 1. Import Dependencies

In [6]:
import numpy as np
import pandas as pd
import pickle
import logging
import os
import sys
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import (classification_report, confusion_matrix, 
                            accuracy_score, precision_score, recall_score, 
                            f1_score, roc_auc_score, roc_curve)
from sklearn.preprocessing import StandardScaler
import joblib

## 2. Setup Logging

In [7]:
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/model_training.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

## 3. DatasetLoader Class
Load various malware datasets

In [None]:
class DatasetLoader:
    """Load various malware datasets"""
    
    @staticmethod
    def load_drebin(csv_path):
        """
        Load Drebin dataset from CSV
        Expected format: features + 'malware' label column
        """
        logger.info(f"Loading Drebin dataset from {csv_path}")
        try:
            # Load with low_memory=False to handle mixed types
            df = pd.read_csv(csv_path, low_memory=False)
            
            # Separate features and labels
            if 'malware' in df.columns:
                y_raw = df['malware']
                X_df = df.drop('malware', axis=1)
            elif 'class' in df.columns:
                y_raw = df['class']
                X_df = df.drop('class', axis=1)
            else:
                logger.error("No label column found. Expected 'malware' or 'class'")
                return None, None
            
            # Convert labels to integers (handle string/mixed types)
            # Map common string labels to 0/1
            if y_raw.dtype == 'object' or y_raw.dtype == 'str':
                y_raw = y_raw.astype(str).str.lower().str.strip()
                # Map various formats to binary
                y_raw = y_raw.replace({
                    'benign': 0, 'malware': 1, 'malicious': 1,
                    'b': 0, 'm': 1, 's': 1, 
                    '0': 0, '1': 1,
                    'false': 0, 'true': 1
                })
            
            # Convert to numeric, coercing errors to NaN
            y = pd.to_numeric(y_raw, errors='coerce')
            
            # Drop rows with invalid labels
            valid_indices = ~y.isna()
            if not valid_indices.all():
                dropped = (~valid_indices).sum()
                logger.warning(f"Dropping {dropped} samples with invalid labels")
                y = y[valid_indices]
                X_df = X_df[valid_indices]
            
            # Convert labels to int
            y = y.astype(int).values
            
            # Convert features to numeric, handling mixed types
            X_numeric = X_df.copy()
            for col in X_numeric.columns:
                if X_numeric[col].dtype == 'object':
                    # Try to convert string columns to numeric
                    X_numeric[col] = pd.to_numeric(X_numeric[col], errors='coerce')
            
            # Replace any remaining NaN values with 0
            X_numeric = X_numeric.fillna(0)
            X = X_numeric.values
            
            logger.info(f"Loaded {len(X)} samples with {X.shape[1]} features")
            logger.info(f"Malware samples: {sum(y)} ({sum(y)/len(y)*100:.1f}%)")
            logger.info(f"Benign samples: {len(y) - sum(y)} ({(len(y) - sum(y))/len(y)*100:.1f}%)")
            return X, y
            
        except Exception as e:
            logger.error(f"Failed to load Drebin dataset: {e}")
            import traceback
            logger.error(traceback.format_exc())
            return None, None
    
    @staticmethod
    def load_cicandmal2017(data_dir):
        """
        Load CICAndMal2017 dataset
        Expected structure: data_dir/benign/*.csv and data_dir/malware/*.csv
        """
        logger.info(f"Loading CICAndMal2017 dataset from {data_dir}")
        try:
            benign_files = list(Path(data_dir).glob('benign/*.csv'))
            malware_files = list(Path(data_dir).glob('malware/*.csv'))
            
            benign_data = []
            malware_data = []
            
            for file in benign_files:
                df = pd.read_csv(file, low_memory=False)
                benign_data.append(df)
            
            for file in malware_files:
                df = pd.read_csv(file, low_memory=False)
                malware_data.append(df)
            
            benign_df = pd.concat(benign_data, ignore_index=True)
            malware_df = pd.concat(malware_data, ignore_index=True)
            
            # Create labels
            benign_df['malware'] = 0
            malware_df['malware'] = 1
            
            # Combine
            full_df = pd.concat([benign_df, malware_df], ignore_index=True)
            
            y = full_df['malware'].values
            X = full_df.drop('malware', axis=1).values
            
            logger.info(f"Loaded {len(X)} samples with {X.shape[1]} features")
            logger.info(f"Malware samples: {sum(y)} ({sum(y)/len(y)*100:.1f}%)")
            return X, y
            
        except Exception as e:
            logger.error(f"Failed to load CICAndMal2017 dataset: {e}")
            return None, None
    
    @staticmethod
    def load_custom_csv(csv_path, label_column='label'):
        """
        Load custom CSV dataset
        Args:
            csv_path: Path to CSV file
            label_column: Name of label column (0=benign, 1=malware)
        """
        logger.info(f"Loading custom dataset from {csv_path}")
        try:
            df = pd.read_csv(csv_path, low_memory=False)
            
            if label_column not in df.columns:
                logger.error(f"Label column '{label_column}' not found")
                return None, None
            
            y = df[label_column].values
            X = df.drop(label_column, axis=1).values
            
            logger.info(f"Loaded {len(X)} samples with {X.shape[1]} features")
            logger.info(f"Malware samples: {sum(y)} ({sum(y)/len(y)*100:.1f}%)")
            return X, y
            
        except Exception as e:
            logger.error(f"Failed to load custom dataset: {e}")
            return None, None
    
    @staticmethod
    def generate_synthetic_data(n_samples=5000):
        """Generate synthetic data (fallback for testing)"""
        logger.info(f"Generating {n_samples} synthetic samples...")
        
        np.random.seed(42)
        features = []
        labels = []
        
        for i in range(n_samples):
            is_malicious = np.random.random() < 0.4
            feature_vector = []
            
            # Permission features (40 features)
            for j in range(40):
                if is_malicious:
                    prob = 0.6 if j < 20 else 0.3
                else:
                    prob = 0.2 if j < 20 else 0.1
                feature_vector.append(1 if np.random.random() < prob else 0)
            
            # Component counts (4 features)
            if is_malicious:
                feature_vector.extend([
                    np.random.uniform(0.3, 1.0),
                    np.random.uniform(0.4, 1.0),
                    np.random.uniform(0.5, 1.0),
                    np.random.uniform(0.2, 0.8)
                ])
            else:
                feature_vector.extend([
                    np.random.uniform(0.1, 0.5),
                    np.random.uniform(0.1, 0.4),
                    np.random.uniform(0.1, 0.3),
                    np.random.uniform(0.0, 0.3)
                ])
            
            # Suspicious features (6 features)
            for j in range(6):
                prob = 0.7 if is_malicious else 0.1
                feature_vector.append(1 if np.random.random() < prob else 0)
            
            features.append(feature_vector)
            labels.append(1 if is_malicious else 0)
        
        logger.info(f"Generated {n_samples} samples with {len(features[0])} features")
        logger.info(f"Malware samples: {sum(labels)} ({sum(labels)/len(labels)*100:.1f}%)")
        return np.array(features), np.array(labels)

## 4. ProductionModelTrainer Class
Train production-grade malware detection model

In [9]:
class ProductionModelTrainer:
    """Train production-grade malware detection model"""
    
    def __init__(self, model_type='random_forest'):
        self.model_type = model_type
        self.model = None
        self.scaler = None
        
    def preprocess_data(self, X, y):
        """Preprocess and validate data"""
        logger.info("Preprocessing data...")
        
        # Handle missing values
        X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
        
        # Feature scaling for tree-based models (optional but can help)
        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X)
        
        logger.info("✓ Preprocessing completed")
        return X_scaled, y
    
    def train_model(self, X, y, hyperparameter_tuning=False):
        """Train the model with optional hyperparameter tuning"""
        logger.info(f"Training {self.model_type} model...")
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        logger.info(f"Training set: {len(X_train)} samples")
        logger.info(f"Test set: {len(X_test)} samples")
        logger.info(f"Malicious: {sum(y_train)} ({sum(y_train)/len(y_train)*100:.1f}%)")
        
        if hyperparameter_tuning:
            self.model = self._train_with_tuning(X_train, y_train)
        else:
            self.model = self._train_default(X_train, y_train)
        
        # Evaluate
        self._evaluate_model(X_test, y_test)
        
        # Cross-validation
        self._cross_validate(X_train, y_train)
        
        return self.model
    
    def _train_default(self, X_train, y_train):
        """Train with default hyperparameters"""
        if self.model_type == 'random_forest':
            model = RandomForestClassifier(
                n_estimators=200,
                max_depth=30,
                min_samples_split=5,
                min_samples_leaf=2,
                max_features='sqrt',
                random_state=42,
                n_jobs=-1,
                class_weight='balanced'
            )
        elif self.model_type == 'gradient_boosting':
            model = GradientBoostingClassifier(
                n_estimators=200,
                max_depth=10,
                learning_rate=0.1,
                random_state=42
            )
        else:
            raise ValueError(f"Unknown model type: {self.model_type}")
        
        model.fit(X_train, y_train)
        logger.info("✓ Model training completed")
        return model
    
    def _train_with_tuning(self, X_train, y_train):
        """Train with hyperparameter tuning (takes longer)"""
        logger.info("Starting hyperparameter tuning (this may take a while)...")
        
        if self.model_type == 'random_forest':
            param_grid = {
                'n_estimators': [100, 200, 300],
                'max_depth': [20, 30, 40],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            }
            base_model = RandomForestClassifier(random_state=42, n_jobs=-1)
        else:
            param_grid = {
                'n_estimators': [100, 200],
                'max_depth': [5, 10, 15],
                'learning_rate': [0.01, 0.1, 0.2]
            }
            base_model = GradientBoostingClassifier(random_state=42)
        
        grid_search = GridSearchCV(
            base_model, param_grid, cv=3, scoring='f1', n_jobs=-1, verbose=2
        )
        grid_search.fit(X_train, y_train)
        
        logger.info(f"Best parameters: {grid_search.best_params_}")
        logger.info(f"Best CV score: {grid_search.best_score_:.4f}")
        
        return grid_search.best_estimator_
    
    def _evaluate_model(self, X_test, y_test):
        """Comprehensive model evaluation"""
        y_pred = self.model.predict(X_test)
        y_proba = self.model.predict_proba(X_test)[:, 1]
        
        logger.info("\n" + "="*70)
        logger.info("MODEL EVALUATION RESULTS")
        logger.info("="*70)
        
        # Basic metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred)
        recall = recall_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        auc = roc_auc_score(y_test, y_proba)
        
        logger.info(f"\nAccuracy:  {accuracy*100:.2f}%")
        logger.info(f"Precision: {precision*100:.2f}%")
        logger.info(f"Recall:    {recall*100:.2f}%")
        logger.info(f"F1-Score:  {f1*100:.2f}%")
        logger.info(f"AUC-ROC:   {auc*100:.2f}%")
        
        # Classification report
        logger.info(f"\nClassification Report:")
        logger.info("\n" + classification_report(y_test, y_pred, 
                                                 target_names=['Benign', 'Malicious']))
        
        # Confusion matrix
        cm = confusion_matrix(y_test, y_pred)
        logger.info(f"\nConfusion Matrix:")
        logger.info(f"                Predicted")
        logger.info(f"              Benign  Malicious")
        logger.info(f"Actual Benign   {cm[0][0]:5d}    {cm[0][1]:5d}")
        logger.info(f"     Malicious  {cm[1][0]:5d}    {cm[1][1]:5d}")
        
        # Feature importance (for tree-based models)
        if hasattr(self.model, 'feature_importances_'):
            self._log_feature_importance()
    
    def _cross_validate(self, X_train, y_train):
        """Perform cross-validation"""
        logger.info("\nPerforming 5-fold cross-validation...")
        cv_scores = cross_val_score(self.model, X_train, y_train, cv=5, 
                                    scoring='f1', n_jobs=-1)
        logger.info(f"CV F1-Scores: {cv_scores}")
        logger.info(f"Average: {cv_scores.mean()*100:.2f}% (+/- {cv_scores.std()*2*100:.2f}%)")
    
    def _log_feature_importance(self):
        """Log top important features"""
        feature_importance = self.model.feature_importances_
        top_20_idx = np.argsort(feature_importance)[-20:]
        
        logger.info(f"\nTop 20 Important Features:")
        for idx in reversed(top_20_idx):
            logger.info(f"  Feature {idx:3d}: {feature_importance[idx]:.4f}")
    
    def save_model(self, model_path='models/malware_model.pkl'):
        """Save trained model and scaler"""
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        
        # Save model
        with open(model_path, 'wb') as f:
            pickle.dump(self.model, f)
        logger.info(f"✓ Model saved to {model_path}")
        
        # Save scaler
        scaler_path = model_path.replace('.pkl', '_scaler.pkl')
        with open(scaler_path, 'wb') as f:
            pickle.dump(self.scaler, f)
        logger.info(f"✓ Scaler saved to {scaler_path}")
        
        # Save metadata
        metadata = {
            'model_type': self.model_type,
            'n_features': self.model.n_features_in_ if hasattr(self.model, 'n_features_in_') else None,
            'training_date': pd.Timestamp.now().isoformat()
        }
        metadata_path = model_path.replace('.pkl', '_metadata.pkl')
        with open(metadata_path, 'wb') as f:
            pickle.dump(metadata, f)
        logger.info(f"✓ Metadata saved to {metadata_path}")

## 5. Configuration
Set dataset type, model type, and other parameters

In [24]:
# Configuration
DATASET_TYPE = 'drebin'  # Options: 'drebin', 'cicandmal2017', 'custom', 'synthetic'
DATASET_PATH = 'D:\\Projects\\Malicious_APK_Detection_System\\datasets\\drebin.csv'  # Update with your dataset path
MODEL_TYPE = 'random_forest'  # Options: 'random_forest', 'gradient_boosting'
HYPERPARAMETER_TUNING = False  # Set True for production (takes longer)

print("="*70)
print("PRODUCTION MALWARE DETECTION MODEL TRAINING")
print("="*70)
print(f"\nConfiguration:")
print(f"  Dataset Type: {DATASET_TYPE}")
print(f"  Dataset Path: {DATASET_PATH}")
print(f"  Model Type: {MODEL_TYPE}")
print(f"  Hyperparameter Tuning: {HYPERPARAMETER_TUNING}")

# Check if dataset file exists
import os
if not os.path.exists(DATASET_PATH):
    print(f"\n⚠️  WARNING: Dataset file not found at: {DATASET_PATH}")
    print(f"   Absolute path: {os.path.abspath(DATASET_PATH)}")
    print(f"   Will attempt to load anyway...")
else:
    print(f"\n✓ Dataset file found!")


PRODUCTION MALWARE DETECTION MODEL TRAINING

Configuration:
  Dataset Type: drebin
  Dataset Path: D:\Projects\Malicious_APK_Detection_System\datasets\drebin.csv
  Model Type: random_forest
  Hyperparameter Tuning: False

✓ Dataset file found!


## 6. Load Dataset

In [25]:
# Load dataset
loader = DatasetLoader()

if DATASET_TYPE == 'drebin':
    X, y = loader.load_drebin(DATASET_PATH)
elif DATASET_TYPE == 'cicandmal2017':
    X, y = loader.load_cicandmal2017(DATASET_PATH)
elif DATASET_TYPE == 'custom':
    X, y = loader.load_custom_csv(DATASET_PATH)
else:  # synthetic
    X, y = loader.generate_synthetic_data(n_samples=10000)

if X is None or y is None:
    logger.error("Failed to load dataset.")
    print("\n❌ Dataset loading failed!")
    print(f"   Attempted path: {DATASET_PATH}")
    print(f"   Absolute path: {os.path.abspath(DATASET_PATH)}")
    print(f"\nPossible solutions:")
    print(f"   1. Check if the file exists at the specified location")
    print(f"   2. Verify the path is correct (use '../datasets/drebin.csv' from models directory)")
    print(f"   3. Try using synthetic data by setting: DATASET_TYPE = 'synthetic'")
    raise Exception("Dataset loading failed - check the path and try again")

print(f"\n✓ Dataset loaded successfully!")
print(f"  Samples: {len(X)}")
print(f"  Features: {X.shape[1]}")
print(f"  Malware ratio: {sum(y)/len(y)*100:.1f}%")


2026-01-14 15:58:11,271 - INFO - Loading Drebin dataset from D:\Projects\Malicious_APK_Detection_System\datasets\drebin.csv
  df = pd.read_csv(csv_path)
2026-01-14 15:58:11,452 - INFO - Loaded 15036 samples with 215 features
2026-01-14 15:58:11,452 - ERROR - Failed to load Drebin dataset: unsupported operand type(s) for +: 'int' and 'str'
2026-01-14 15:58:11,468 - ERROR - Failed to load dataset.



❌ Dataset loading failed!
   Attempted path: D:\Projects\Malicious_APK_Detection_System\datasets\drebin.csv
   Absolute path: D:\Projects\Malicious_APK_Detection_System\datasets\drebin.csv

Possible solutions:
   1. Check if the file exists at the specified location
   2. Verify the path is correct (use '../datasets/drebin.csv' from models directory)
   3. Try using synthetic data by setting: DATASET_TYPE = 'synthetic'


Exception: Dataset loading failed - check the path and try again

## 7. Preprocess Data

In [None]:
# Create trainer instance
trainer = ProductionModelTrainer(model_type=MODEL_TYPE)

# Preprocess data
X_processed, y_processed = trainer.preprocess_data(X, y)

print(f"\n✓ Data preprocessing completed!")

## 8. Train Model

In [None]:
# Train model
model = trainer.train_model(
    X_processed, 
    y_processed, 
    hyperparameter_tuning=HYPERPARAMETER_TUNING
)

print(f"\n✓ Model training completed!")

## 9. Save Model

In [None]:
# Create models directory if it doesn't exist
os.makedirs('models', exist_ok=True)

# Save model
trainer.save_model('models/malwares_model.pkl')

print("\n" + "="*70)
print("✓ TRAINING COMPLETED SUCCESSFULLY!")
print("="*70)
print("\nModel is ready for production use.")
print("Deploy with: python run.py")

## 10. Test the Model (Optional)
Quick test to verify the model works

In [None]:
# Quick prediction test
if X_processed is not None and len(X_processed) > 0:
    # Test with first sample
    sample = X_processed[0:1]
    prediction = model.predict(sample)
    probability = model.predict_proba(sample)[0]
    
    print("\nQuick Test Prediction:")
    print(f"  Sample prediction: {'Malicious' if prediction[0] == 1 else 'Benign'}")
    print(f"  Probability [Benign, Malicious]: [{probability[0]:.4f}, {probability[1]:.4f}]")
    print(f"  Actual label: {'Malicious' if y_processed[0] == 1 else 'Benign'}")