# PPO LSTM Feature Attribution: Input Perturbation Analysis

This notebook demonstrates how to analyze which observation features your RL agent is using, by measuring action sensitivity to each feature group (price, features, volume_profile, account) using input perturbation.

In [None]:
# ================================================
# 🔧 SETUP - Add src to Python Path
# ================================================

import sys
import os

# Add src directory to Python path so 'core' module can be found
project_root = os.getcwd()
src_path = os.path.join(project_root, 'src')

if src_path not in sys.path:
    sys.path.insert(0, src_path)
    print(f"✅ Added to Python path: {src_path}")
else:
    print(f"✅ Already in path: {src_path}")

# Verify
print(f"📂 Working directory: {project_root}")
print(f"🔍 Python will search for modules in: {src_path}")
print("=" * 50)

In [None]:
# --- Setup: Import Required Libraries ---
import numpy as np
import torch
from sb3_contrib import RecurrentPPO
from src.environments.trading_environment import TradingEnvironment
from src.training.data_loader import DataLoader

In [None]:
# --- Load Data and Environment ---
symbol = 'BTCUSDT'
loader = DataLoader()
dfs = loader.load_data_with_indicators(symbol)
train_df = dfs['15m']
# Set window_size to match the model's expected input shape (e.g., 96)
env = TradingEnvironment(train_df, window_size=96)  # <-- adjust if your model used a different window_size
obs, _ = env.reset()

In [None]:
# --- Load Trained PPO LSTM Model ---
model_path = 'models/rl_optimized/ppo_trading.zip'  # Adjust if needed
model = RecurrentPPO.load(model_path)

In [None]:
# --- Helper: Convert Dict Observation to Tensor ---
def obs_to_tensor(obs_dict):
    obs_flat = []
    for k in ['price', 'features', 'volume_profile_week', 'volume_profile_prev_day', 'volume_profile_daily', 'account']:
        v = obs_dict[k]
        if isinstance(v, np.ndarray) and v.ndim > 1:
            v = v.flatten()
        obs_flat.append(v)
    return torch.tensor(np.concatenate(obs_flat)[None, :], dtype=torch.float32)

obs_tensor = obs_to_tensor(obs)

In [None]:
# --- Prepare observation for model (handles 1D/2D/3D cases) ---
def prepare_obs_for_model(obs, window=96):
    # dict path
    if isinstance(obs, dict):
        prepared = {}
        for k, v in obs.items():
            arr = np.asarray(v)
            if arr.ndim == 1:
                # repeat feature vector across time window
                features = arr.shape[0]
                tiled = np.tile(arr.reshape(1, -1), (window, 1))  # (window, features)
                prepared[k] = tiled.reshape(1, window, features)
            elif arr.ndim == 2:
                # (window, features) -> add batch dim
                prepared[k] = arr.reshape(1, arr.shape[0], arr.shape[1])
            elif arr.ndim == 3:
                prepared[k] = arr
            else:
                raise ValueError(f"Unsupported dict observation array shape for key '{k}': {arr.shape}")
        return prepared
    raise ValueError(f"Cannot prepare observation of type {type(obs)}")

# Usage:
obs_batched = prepare_obs_for_model(obs, window=96)

In [None]:
# --- Baseline Action (with batch dimension and LSTM state) ---
last_state = None
episode_start = np.ones((1,), dtype=bool)  # True at the start of an episode
with torch.no_grad():
    action, last_state = model.policy.predict(obs_batched, state=last_state, episode_start=episode_start, deterministic=True)
print('Baseline action:', action)

In [None]:
# --- Feature Attribution Report: Action Sensitivity to Each Feature Group ---
feature_keys = ['price', 'features', 'volume_profile_week', 'volume_profile_prev_day', 'volume_profile_daily', 'account']
action_changes = {}
for key in feature_keys:
    obs_perturbed = obs.copy()
    obs_perturbed[key] = np.zeros_like(obs_perturbed[key])
    obs_perturbed_batched = prepare_obs_for_model(obs_perturbed, window=96)
    with torch.no_grad():
        perturbed_action, _ = model.policy.predict(obs_perturbed_batched, state=last_state, episode_start=episode_start, deterministic=True)
    change = np.linalg.norm(np.array(action) - np.array(perturbed_action))
    action_changes[key] = change
    print(f"Action change when zeroing '{key}': {change:.6f}")
# --- Simple Report ---
print("\nFeature Attribution Report:")
for k, v in action_changes.items():
    print(f"{k:>15}: Action change = {v:.6f}")

In [None]:
# --- Visualize Feature Attribution as Bar Chart ---
import matplotlib.pyplot as plt
plt.figure(figsize=(14,8))
plt.bar(action_changes.keys(), action_changes.values(), color='skyblue')
plt.ylabel('Action Change (L2 Norm)')
plt.title('Feature Attribution: Action Sensitivity to Each Feature Group')
plt.grid(axis='y', linestyle='--', alpha=0.6)
plt.show()