In [10]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, classification_report
import lightgbm as lgb

# Paths to your files
NPZ_PATH = 'bodmas.npz'  # e.g., './bodmas_data/bodmas.npz'
METADATA_PATH = 'bodmas_metadata.csv'
CATEGORY_PATH = 'bodmas_malware_category.csv'  # New: Your category file

def load_bodmas(npz_path, metadata_path, category_path, max_train=20000, max_test=5000):
    """Load BODMAS data, merge category for accurate labels, and split temporally."""
    # Load features
    data = np.load(npz_path)
    if 'X' not in data:
        raise KeyError(f"Expected 'X' in {npz_path}. Found: {data.files}")
    X = data['X']  # Shape: (134580, 2381)
    
    # Load metadata
    metadata = pd.read_csv(metadata_path)
    if 'sha256' not in metadata.columns or 'timestamp' not in metadata.columns or 'family' not in metadata.columns:
        raise KeyError(f"Expected 'sha256', 'timestamp', 'family' in {metadata_path}. Found: {metadata.columns}")
    
    # Load category CSV
    category = pd.read_csv(category_path)
    if 'sha256' not in category.columns or 'category' not in category.columns:
        raise KeyError(f"Expected 'sha256', 'category' in {category_path}. Found: {category.columns}")
    
    # Merge on 'sha256' (left join: metadata as base)
    merged = metadata.merge(category, on='sha256', how='left')
    
    # Create binary labels: Malware if 'family' non-NaN or 'category' present; else benign
    y = np.where(~merged['family'].isna() | ~merged['category'].isna(), 1, 0)
    y = y.astype(np.int32)  # Ensure numeric
    
    timestamps = merged['timestamp'].values

    # Verify alignment
    if len(X) != len(y):
        raise ValueError(f"Mismatch: {len(X)} features, {len(y)} labels")
    
    # Temporal split
    sorted_indices = np.argsort(timestamps)
    X_sorted = X[sorted_indices]
    y_sorted = y[sorted_indices]

    # Subsample
    train_indices = sorted_indices[:max_train]
    test_indices = sorted_indices[-max_test:]
    X_train = X_sorted[train_indices]
    y_train = y_sorted[train_indices]
    X_test = X_sorted[test_indices]
    y_test = y_sorted[test_indices]

    print(f"Loaded {X_train.shape[0]} train, {X_test.shape[0]} test samples")
    print(f"Train labels: Benign={np.sum(y_train == 0)}, Malware={np.sum(y_train == 1)}")
    print(f"Test labels: Benign={np.sum(y_test == 0)}, Malware={np.sum(y_test == 1)}")
    return X_train, y_train, X_test, y_test

# Load data
X_train, y_train, X_test, y_test = load_bodmas(NPZ_PATH, METADATA_PATH, CATEGORY_PATH)

# Train LightGBM
params = {
    'objective': 'binary',
    'metric': 'binary_logloss',
    'boosting_type': 'gbdt',
    'num_leaves': 31,
    'learning_rate': 0.05,
    'feature_fraction': 0.9,
    'verbose': -1
}

train_data = lgb.Dataset(X_train, label=y_train)
detector = lgb.train(params, train_data, num_boost_round=500)  # Increased for better learning

# Evaluate
y_pred = detector.predict(X_test) > 0.5
print(f"Detector Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(classification_report(y_test, y_pred, target_names=['Benign', 'Malware'], zero_division=0))

# Top 10 features for RL
feature_importance = detector.feature_importance(importance_type='gain')
top_features_idx = np.argsort(feature_importance)[-10:]
print(f"Top 10 feature indices: {top_features_idx}")

detector.save_model('bodmas_detector.txt')

Loaded 20000 train, 5000 test samples
Train labels: Benign=16324, Malware=3676
Test labels: Benign=2518, Malware=2482
Detector Accuracy: 0.9846
              precision    recall  f1-score   support

      Benign       0.97      1.00      0.98      2518
     Malware       1.00      0.97      0.98      2482

    accuracy                           0.98      5000
   macro avg       0.99      0.98      0.98      5000
weighted avg       0.99      0.98      0.98      5000

Top 10 feature indices: [ 655 2375 2360  930   95   55 2355  613 2359  637]


<lightgbm.basic.Booster at 0x22bdd524830>

In [29]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from collections import defaultdict

# Assume these come from your detector code
# X_train, y_train, X_test, y_test, detector, top_features_idx are defined

# Fit scaler on top features from X_train
scaler = MinMaxScaler()
scaler.fit(X_train[:, top_features_idx])

# Discretize function (moved outside q_learning)
def discretize(state, bins=5):
    bins_edges = np.linspace(0, 1, bins + 1)[1:-1]
    return tuple(np.digitize(s, bins_edges, right=True) for s in state)

# Custom Gym Environment
class MalwareEvasionEnv(gym.Env):
    def __init__(self, sample, model, top_features, scaler, delta=0.005, norm_threshold=0.15, max_steps=50):
        super(MalwareEvasionEnv, self).__init__()
        self.model = model
        self.top_features = top_features
        self.delta = delta
        self.norm_threshold = norm_threshold
        self.max_steps = max_steps
        
        # Normalize initial sample's top features
        self.initial_sample = sample.copy()
        self.initial_top = scaler.transform(sample[:, top_features]).flatten()  # Shape (10,)
        self.state = self.initial_top.copy()
        
        # Action space: 21 actions (10 increase, 10 decrease, 1 no-op)
        self.action_space = spaces.Discrete(21)
        self.observation_space = spaces.Box(low=0, high=1, shape=(10,), dtype=np.float32)
        
        self.current_step = 0
        self.initial_prob = self._get_prob()

    def _get_prob(self):
        full_sample = self.initial_sample.copy()
        full_sample[0, self.top_features] = scaler.inverse_transform(self.state.reshape(1, -1)).flatten()
        return self.model.predict(full_sample)[0]  # Malware prob (>0.5 = detected)

    def step(self, action):
        self.current_step += 1
        prev_state = self.state.copy()
        
        if action < 10:  # Increase feature
            self.state[action] = min(1.0, self.state[action] + self.delta)
        elif action < 20:  # Decrease feature
            self.state[action - 10] = max(0.0, self.state[action - 10] - self.delta)
        # else: no-op
        
        norm = np.linalg.norm(self.state - self.initial_top, ord=2)
        prob = self._get_prob()
        reward = 1 - prob if prob < 0.5 else -0.1
        done = (prob < 0.5) or (norm > self.norm_threshold) or (self.current_step >= self.max_steps)
        
        info = {'norm': norm, 'prob': prob, 'steps': self.current_step, 'state_change': np.linalg.norm(self.state - prev_state)}
        return self.state.copy(), reward, done, info

    def reset(self):
        self.state = self.initial_top.copy()
        self.current_step = 0
        return self.state.copy()

# Q-Learning with sparse Q-table
def q_learning(env, episodes=2000, alpha=0.1, gamma=0.9, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.01, bins=5):
    q_table = defaultdict(lambda: np.zeros(env.action_space.n))  # Sparse Q-table
    
    logs = []
    for episode in range(episodes):
        state = discretize(env.reset(), bins)
        done = False
        total_reward = 0
        steps = 0
        
        while not done:
            steps += 1
            if np.random.rand() < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(q_table[state])
            
            next_state, reward, done, info = env.step(action)
            next_state_disc = discretize(next_state, bins)
            
            q_table[state][action] += alpha * (reward + gamma * np.max(q_table[next_state_disc]) - q_table[state][action])
            state = next_state_disc
            total_reward += reward
        
        epsilon = max(min_epsilon, epsilon * epsilon_decay)
        if episode % 100 == 0:
            logs.append(f"Episode {episode}: Steps={info['steps']}, Norm={info['norm']:.4f}, Prob={info['prob']:.4f}, Reward={total_reward:.2f}, Q-Table States={len(q_table)}")
    
    return q_table, logs

# Test on 100 detected malware samples
malware_indices = np.where((y_test == 1) & (detector.predict(X_test) > 0.5))[0]
np.random.shuffle(malware_indices)
test_samples = X_test[malware_indices[:100]]

evasion_count = 0
avg_steps = []
avg_norm = []
evaded_features = []
original_features = []

for idx, sample in enumerate(test_samples):
    env = MalwareEvasionEnv(sample.reshape(1, -1), detector, top_features_idx, scaler, delta=0.005, norm_threshold=0.15)
    q_table, logs = q_learning(env)
    
    # Test policy
    state = env.reset()
    state_disc = discretize(state, bins=5)
    done = False
    while not done:
        action = np.argmax(q_table[state_disc])
        next_state, _, done, info = env.step(action)
        state_disc = discretize(next_state, bins=5)
    
    if idx == 0:  # Save for plotting
        original_features = env.initial_top.copy()
        evaded_features = env.state.copy()
    
    if info['prob'] < 0.5:
        evasion_count += 1
    avg_steps.append(info['steps'])
    avg_norm.append(info['norm'])
    
    print(f"Sample {idx+1}: Evaded={info['prob'] < 0.5}, Steps={info['steps']}, Norm={info['norm']:.4f}, State Change={info['state_change']:.4f}")
    if idx == 0:
        for log in logs[-5:]:
            print(log)

evasion_rate = (evasion_count / len(test_samples)) * 100
print(f"Evasion Rate: {evasion_rate:.1f}%, Avg Steps: {np.mean(avg_steps):.1f}, Avg Norm: {np.mean(avg_norm):.4f}")

# Plot for thesis
plt.figure(figsize=(10, 6))
plt.bar(np.arange(10) - 0.2, original_features, 0.4, label='Original', color='blue')
plt.bar(np.arange(10) + 0.2, evaded_features, 0.4, label='Evaded', color='red')
plt.xlabel('Top Feature Index')
plt.ylabel('Normalized Value')
plt.title('Original vs. Evaded Feature Values (Sample 1)')
plt.xticks(np.arange(10), top_features_idx)
plt.legend()
plt.savefig('feature_comparison.png')
plt.close()

Sample 1: Evaded=False, Steps=31, Norm=0.1550, State Change=0.0050
Episode 1500: Steps=37, Norm=0.1291, Prob=0.1082, Reward=-2.71, Q-Table States=2
Episode 1600: Steps=31, Norm=0.1550, Prob=0.9784, Reward=-3.10, Q-Table States=2
Episode 1700: Steps=31, Norm=0.1550, Prob=0.9784, Reward=-3.10, Q-Table States=2
Episode 1800: Steps=12, Norm=0.0552, Prob=0.4012, Reward=-0.50, Q-Table States=2
Episode 1900: Steps=25, Norm=0.1201, Prob=0.4012, Reward=-1.80, Q-Table States=2
Sample 2: Evaded=False, Steps=31, Norm=0.1550, State Change=0.0050
Sample 3: Evaded=False, Steps=31, Norm=0.1550, State Change=0.0050
Sample 4: Evaded=True, Steps=2, Norm=0.0100, State Change=0.0050
Sample 5: Evaded=False, Steps=31, Norm=0.1550, State Change=0.0050
Sample 6: Evaded=False, Steps=31, Norm=0.1550, State Change=0.0050
Sample 7: Evaded=False, Steps=50, Norm=0.0193, State Change=0.0000
Sample 8: Evaded=False, Steps=31, Norm=0.1550, State Change=0.0050
Sample 9: Evaded=False, Steps=50, Norm=0.0000, State Change=0