In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
# =========================
# 1. Import libraries
# =========================
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import pickle
# =========================
# 2. Load Data
# =========================
# from google.colab import drive
# drive.mount('/content/drive')

print("Loading data...")
df = pd.read_csv('../rpc.csv')  

# =========================
# 3. Drop unnecessary columns
# =========================
X_raw = df.drop(columns=['index', 'timestamp', 'label', 'subject'])  # only sensor readings
y = df['label']

# =========================
# 4. Normalize (Scaling)
# =========================
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_raw)

# Convert back to DataFrame for easier handling
sensor_columns = X_raw.columns
X_scaled_df = pd.DataFrame(X_scaled, columns=sensor_columns)

pickle.dump(scaler, open("scaler.pkl", "wb"))

Loading data...


In [2]:
# 5. Feature Extraction (MAV, RMS, Variance, WL, ZC)
# =========================
print("Extracting features...")

def extract_features(df):
    features = {}
    for col in df.columns:
        signal = df[col].values

        # Temporal features
        mav = np.mean(np.abs(signal))
        rms = np.sqrt(np.mean(signal**2))
        var = np.var(signal)
        wl = np.sum(np.abs(np.diff(signal)))
        zc = ((np.diff(np.sign(signal))) != 0).sum()

        features[f"{col}_MAV"] = mav
        features[f"{col}_RMS"] = rms
        features[f"{col}_VAR"] = var
        features[f"{col}_WL"] = wl
        features[f"{col}_ZC"] = zc

    return features

# Apply feature extraction to the whole dataset (row-wise)
X_features_list = []
window_size = 200  # assuming 200 samples window
for i in range(0, len(X_scaled_df), window_size):
    window = X_scaled_df.iloc[i:i+window_size]
    if len(window) == window_size:
        feats = extract_features(window)
        X_features_list.append(feats)

# Create feature DataFrame
X_features_df = pd.DataFrame(X_features_list)

# Adjust labels (majority voting for window label)
y_window = []
for i in range(0, len(y), window_size):
    window_labels = y.iloc[i:i+window_size]
    if len(window_labels) == window_size:
        most_common = window_labels.mode()[0]
        y_window.append(most_common)

y_window = pd.Series(y_window)

print("✅ Feature extraction done. Feature shape:", X_features_df.shape)


Extracting features...
✅ Feature extraction done. Feature shape: (2537, 40)


In [3]:
# =========================
# 6. Spatial Reduction via LDA per Temporal Feature
# =========================
print("Reducing sensor dimension using LDA per temporal feature...")

from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA

temporal_features = ["MAV", "RMS", "VAR", "WL", "ZC"]
X_lda_features = []

for feat in temporal_features:
    # Extract all sensor columns for this temporal feature
    feat_cols = [col for col in X_features_df.columns if col.endswith(f"_{feat}")]
    X_group = X_features_df[feat_cols]  # shape: (n_samples, 8)

    lda = LDA(n_components=1)
    X_lda_feat = lda.fit_transform(X_group, y_window)
    X_lda_features.append(X_lda_feat)

# Final dataset: 5D vector (1D from each of 5 temporal features)
X_lda = np.hstack(X_lda_features)
print("✅ Final 5D spatial-LDA feature shape:", X_lda.shape)
np.savetxt("./5D_spatial_lda.csv", X_lda, delimiter=",")

Reducing sensor dimension using LDA per temporal feature...
✅ Final 5D spatial-LDA feature shape: (2537, 5)


In [75]:
# 8. Train-Test Split
# =========================
X_train, X_test, y_train, y_test = train_test_split(X_lda, y_window, test_size=0.2, random_state=42)

# 9. Define Base Neural Network
# =========================
class BaseNN(nn.Module):
    def __init__(self):
        super(BaseNN, self).__init__()
        self.fc1 = nn.Linear(5, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 2)  # 2 classes: rock, paper

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Initialize model
model = BaseNN()

# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


# 10. Train the Base Neural Net
# =========================
print("Training base neural network...")

# Convert to torch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.long)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.long)

# Training loop
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")


Training base neural network...
Epoch [10/50], Loss: 0.6439
Epoch [20/50], Loss: 0.6098
Epoch [30/50], Loss: 0.5752
Epoch [40/50], Loss: 0.5417
Epoch [50/50], Loss: 0.5081


In [76]:
# 11. Evaluate
# =========================
print("Evaluating model...")

model.eval()
with torch.no_grad():
    outputs = model(X_test_tensor)
    _, predicted = torch.max(outputs.data, 1)
    acc = (predicted == y_test_tensor).sum().item() / y_test_tensor.size(0)

print(f"✅ Accuracy on test set: {acc:.4f}")


Evaluating model...
✅ Accuracy on test set: 0.7913


In [77]:
import torch
import gym
import numpy as np
from gym import spaces
from stable_baselines3 import DQN
from stable_baselines3.dqn.policies import DQNPolicy
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.type_aliases import Schedule
from sklearn.model_selection import train_test_split
import torch.nn as nn

# === 1. Save Trained Base Model ===
torch.save(model.state_dict(), "base_model.pth")

# === 2. Custom Feature Extractor Using BaseNN ===
class CustomFeatureExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.Space, features_dim: int = 32):
        super().__init__(observation_space, features_dim)

        self.base_nn = BaseNN()
        self.base_nn.load_state_dict(torch.load("base_model.pth"))
        self.base_nn.eval()

        # Enhanced Feature Extractor Network
        self.enhanced_layers = nn.Sequential(
            nn.Linear(self.base_nn.fc2.out_features, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, features_dim),
            nn.ReLU()
        )

        self._features_dim = features_dim

    def forward(self, observations):
        with torch.no_grad():
            x = torch.relu(self.base_nn.fc1(observations))
            x = torch.relu(self.base_nn.fc2(x))

        # Enhanced representation
        features = self.enhanced_layers(x)
        return features

# === 3. Custom DQN Policy ===
class CustomDQNPolicy(DQNPolicy):
    def __init__(self, observation_space, action_space, lr_schedule: Schedule, **kwargs):
        super().__init__(
            observation_space,
            action_space,
            lr_schedule,
            net_arch=[],
            features_extractor_class=CustomFeatureExtractor,
            features_extractor_kwargs=dict(features_dim=32),
            **kwargs,
        )

# === 4. Custom EMG Environment with Refined Reward ===
class EMGEnv(gym.Env):
    def __init__(self, base_model, data_iter, feature_dim=5, episode_length=10):
        super().__init__()
        self.base_model = base_model
        self.data_iter = data_iter
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(feature_dim,), dtype=np.float32)
        self.action_space = spaces.Discrete(2)
        self.episode_length = episode_length
        self.current_step = 0

    def reset(self):
        self.current_step = 0
        self.feat, self.label = next(self.data_iter)
        self.state = self.feat.astype(np.float32)
        return self.state

    def step(self, action):
        with torch.no_grad():
            input_tensor = torch.tensor(self.state.reshape(1, -1), dtype=torch.float32)
            output = self.base_model(input_tensor)
            proba = torch.softmax(output, dim=1)[0, 1].item()

        # Reward based on agreement with base model and label
        # reward = proba if action == self.label else -(1.0 - proba)
        # Exponential reward/penalty based on confidence
        if action == self.label:
            reward = (np.exp(proba) - 1)
        else:
            reward = -(np.exp(1.0 - proba) - 1)


        # Next state (from next EMG sample)
        self.feat, self.label = next(self.data_iter)
        self.state = self.feat.astype(np.float32)
        self.current_step += 1

        done = self.current_step >= self.episode_length
        return self.state, reward, done, {}


# === 5. Data Iterator ===
def data_iterator(X, y):
    while True:
        for xi, yi in zip(X, y):
            yield xi, yi

# === 6. Use your train-test split ===
X_train, X_test, y_train, y_test = train_test_split(X_lda, y_window, test_size=0.2, random_state=42)

# === 7. Train the DQN Agent ===
env = EMGEnv(base_model=model, data_iter=data_iterator(X_train, y_train), feature_dim=X_train.shape[1], episode_length=10)
dqn_model = DQN(CustomDQNPolicy, env, verbose=1)
dqn_model.learn(total_timesteps=100000)

# === 8. Evaluate Final Model on Test Set ===
def evaluate_model(model, X_eval, y_eval):
    correct = 0
    for xi, yi in zip(X_eval, y_eval):
        obs = xi.astype(np.float32)
        action, _ = model.predict(obs, deterministic=True)
        if action == yi:
            correct += 1
    return correct / len(y_eval)

print("=== Test Accuracy ===")
acc = evaluate_model(dqn_model, X_test, y_test)
print(f"DQN Accuracy: {acc * 100:.2f}%")
dqn_model.save("dqn_model.zip")

2025-05-15 04:32:57.691652: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-05-15 04:32:58.075202: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 10       |
|    ep_rew_mean      | -0.111   |
|    exploration_rate | 0.996    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 5839     |
|    time_elapsed     | 0        |
|    total_timesteps  | 40       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 10       |
|    ep_rew_mean      | -1.62    |
|    exploration_rate | 0.992    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 5421     |
|    time_elapsed     | 0        |
|    total_timesteps  | 80       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 10       |
|    ep_rew_mean      | -0.943 

  self.base_nn.load_state_dict(torch.load("base_model.pth"))


----------------------------------
| rollout/            |          |
|    ep_len_mean      | 10       |
|    ep_rew_mean      | -1.52    |
|    exploration_rate | 0.977    |
| time/               |          |
|    episodes         | 24       |
|    fps              | 1104     |
|    time_elapsed     | 0        |
|    total_timesteps  | 240      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.14     |
|    n_updates        | 34       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 10       |
|    ep_rew_mean      | -1.57    |
|    exploration_rate | 0.973    |
| time/               |          |
|    episodes         | 28       |
|    fps              | 1084     |
|    time_elapsed     | 0        |
|    total_timesteps  | 280      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.188    |
|    n_updates      

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
from stable_baselines3 import DQN
import pickle
import os

# === Helper Function for Feature Extraction ===
def extract_features(signal):
    """Extract 5 statistical features from a 1D EMG signal."""
    signal = np.array(signal)
    return {
        'MAV': np.mean(np.abs(signal)),
        'RMS': np.sqrt(np.mean(signal**2)),
        'VAR': np.var(signal),
        'WL': np.sum(np.abs(np.diff(signal))),
        'ZC': np.sum(np.diff(np.sign(signal)) != 0)
    }

# === Step 1: Combine Datasets into CSV ===
def combine_datasets(data_dir="/kaggle/input/freshd/data/"):
    all_data = []
    n_subjects = 13
    samples_per_subject = 1000

    for i in range(n_subjects):
        # Load X_i and Y_i
        x_file = os.path.join(data_dir, f"X{i}.npy")
        y_file = os.path.join(data_dir, f"Y{i}.npy")
        if not (os.path.exists(x_file) and os.path.exists(y_file)):
            print(f"Warning: Missing files for subject {i}")
            continue

        X_raw = np.load(x_file)
        y = np.load(y_file)
        print(f"Subject {i}: X shape {X_raw.shape}, Y shape {y.shape}")

        # Validate shape
        if len(X_raw.shape) == 2:  # (1000, 256)
            n_samples, signal_length = X_raw.shape
        elif len(X_raw.shape) == 3 and X_raw.shape[2] == 1:  # (1000, 256, 1)
            X_raw = X_raw.squeeze(-1)
            n_samples, signal_length = X_raw.shape
        else:
            raise ValueError(f"Unexpected shape for X{i}.npy: {X_raw.shape}")

        if y.shape != (n_samples,):
            raise ValueError(f"Unexpected shape for Y{i}.npy: {y.shape}")

        # Extract features
        feature_list = []
        for j in range(n_samples):
            features = extract_features(X_raw[j])
            features['label'] = y[j]
            features['subject'] = i
            # Add raw signals
            for t in range(signal_length):
                features[f"signal_t{t}"] = X_raw[j, t]
            feature_list.append(features)

        subject_df = pd.DataFrame(feature_list)
        all_data.append(subject_df)

    # Combine and save
    combined_df = pd.concat(all_data, ignore_index=True)
    combined_df.to_csv("combined_emg_data.csv", index=False)
    print(f"\nCombined CSV saved: combined_emg_data.csv")
    print(f"Shape: {combined_df.shape}")
    print(f"Columns: {combined_df.columns.tolist()}")
    return combined_df, all_data

# === Step 2: Evaluate RL Model ===
def evaluate_rl_model(data_dir="/kaggle/input/freshd/data/"):
    # Load BaseNN
    class BaseNN(nn.Module):
        def __init__(self):
            super(BaseNN, self).__init__()
            self.fc1 = nn.Linear(5, 16)
            self.fc2 = nn.Linear(16, 8)
            self.fc3 = nn.Linear(8, 2)
        def forward(self, x):
            x = torch.relu(self.fc1(x))
            x = torch.relu(self.fc2(x))
            x = self.fc3(x)
            return x

    base_model = BaseNN()
    base_model.load_state_dict(torch.load("base_model.pth"))
    base_model.eval()

    # Load DQN
    dqn_model = DQN.load("dqn_model.zip")

    # Combine data to fit scaler
    combined_df, subject_dfs = combine_datasets(data_dir)
    feature_names = ['MAV', 'RMS', 'VAR', 'WL', 'ZC']
    X_features_all = combined_df[feature_names]

    # Fit new scaler on 1D data
    scaler = StandardScaler()
    scaler.fit(X_features_all)
    print(f"New scaler fitted on features: {feature_names}")

    # Evaluate per subject
    n_subjects = 13
    all_predictions = []
    all_labels = []
    subject_accuracies = []

    for i in range(n_subjects):
        # Load data
        x_file = os.path.join(data_dir, f"X{i}.npy")
        y_file = os.path.join(data_dir, f"Y{i}.npy")
        if not (os.path.exists(x_file) and os.path.exists(y_file)):
            print(f"Warning: Skipping subject {i} due to missing files")
            continue

        X_raw = np.load(x_file)
        y = np.load(y_file)

        # Handle shape
        if len(X_raw.shape) == 2:  # (1000, 256)
            n_samples, signal_length = X_raw.shape
        elif len(X_raw.shape) == 3 and X_raw.shape[2] == 1:  # (1000, 256, 1)
            X_raw = X_raw.squeeze(-1)
            n_samples, signal_length = X_raw.shape
        else:
            raise ValueError(f"Unexpected shape for X{i}.npy: {X_raw.shape}")

        # Extract features
        feature_list = [extract_features(X_raw[j]) for j in range(n_samples)]
        X_features_df = pd.DataFrame(feature_list)
        feature_names = ['MAV', 'RMS', 'VAR', 'WL', 'ZC']

        # Scale features with new scaler
        X_scaled = scaler.transform(X_features_df[feature_names])

        # Use scaled features directly
        X_eval = X_scaled  # Shape: (1000, 5)

        # Evaluate DQN
        predictions = []
        for xi in X_eval:
            obs = xi.astype(np.float32)
            action, _ = dqn_model.predict(obs, deterministic=True)
            predictions.append(action)

        # Compute accuracy
        acc = np.mean(predictions == y)
        print(f"Subject {i} Accuracy: {acc*100:.2f}%")
        subject_accuracies.append(acc)
        all_predictions.extend(predictions)
        all_labels.extend(y)

    # Overall accuracy
    overall_acc = np.mean(np.array(all_predictions) == np.array(all_labels))
    print(f"\n=== Overall Results ===")
    print(f"Overall Accuracy: {overall_acc*100:.2f}%")
    print(f"Subject-wise Accuracies: {[f'{acc*100:.2f}%' for acc in subject_accuracies]}")

    # Save predictions
    np.save("predictions.npy", np.array(all_predictions))
    print("Predictions saved to 'predictions.npy'")

# === Main ===
if __name__ == "__main__":
    evaluate_rl_model(data_dir="/kaggle/input/freshd/data/")

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load combined data and predictions
combined_df = pd.read_csv("combined_emg_data.csv")
predictions = np.load("predictions.npy")

# Add predictions to dataframe
combined_df['prediction'] = predictions

# === Label Distribution per Subject ===
print("=== Label Distribution per Subject ===")
label_dist = combined_df.groupby('subject')['label'].value_counts().unstack(fill_value=0)
print(label_dist)

# === Feature Statistics per Subject ===
feature_names = ['MAV', 'RMS', 'VAR', 'WL', 'ZC']
print("\n=== Feature Means per Subject ===")
feature_means = combined_df.groupby('subject')[feature_names].mean()
print(feature_means)

# === Accuracy per Subject ===
print("\n=== Accuracy per Subject ===")
accuracies = combined_df.groupby('subject').apply(lambda x: np.mean(x['prediction'] == x['label']))
print(accuracies * 100)

# === Visualize Feature Distributions ===
plt.figure(figsize=(15, 10))
for i, feature in enumerate(feature_names):
    plt.subplot(2, 3, i+1)
    sns.boxplot(x='subject', y=feature, hue='label', data=combined_df)
    plt.title(f"{feature} by Subject and Label")
    plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("feature_distributions_by_subject.png")
plt.close()

# === Confusion Matrix ===
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(combined_df['label'], combined_df['prediction'])
print("\n=== Confusion Matrix ===")
print(cm)

# Plot confusion matrix
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Open (0)', 'Closed (1)'], yticklabels=['Open (0)', 'Closed (1)'])
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.savefig("confusion_matrix.png")
plt.close()

print("\nCheck 'feature_distributions_by_subject.png' and 'confusion_matrix.png' for visualizations.")

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix
import torch
import torch.nn as nn
from stable_baselines3 import DQN
import os

# Define your BaseNN class here (must match the saved model architecture)
class BaseNN(nn.Module):
    def __init__(self):
        super(BaseNN, self).__init__()
        self.fc1 = nn.Linear(5, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 2)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def extract_features(signal):
    signal = np.array(signal)
    return {
        'MAV': np.mean(np.abs(signal)),
        'RMS': np.sqrt(np.mean(signal**2)),
        'VAR': np.var(signal),
        'WL': np.sum(np.abs(np.diff(signal))),
        'ZC': np.sum(np.diff(np.sign(signal)) != 0)
    }

def evaluate_rl_combined(data_dir="/kaggle/input/freshd/data/"):
    # Load base model correctly
    base_model = BaseNN()
    base_model.load_state_dict(torch.load("base_model.pth", map_location='cpu'))
    base_model.eval()

    # Load DQN
    dqn_model = DQN.load("dqn_model.zip")

    # Combine all data
    all_X, all_y = [], []
    for i in range(13):
        x_file = os.path.join(data_dir, f"X{i}.npy")
        y_file = os.path.join(data_dir, f"Y{i}.npy")
        if not (os.path.exists(x_file) and os.path.exists(y_file)):
            continue
        X_raw = np.load(x_file)
        y = np.load(y_file)
        if len(X_raw.shape) == 3 and X_raw.shape[2] == 1:
            X_raw = X_raw.squeeze(-1)
        features = [extract_features(X_raw[j]) for j in range(X_raw.shape[0])]
        df = pd.DataFrame(features)[['MAV', 'RMS', 'VAR', 'WL', 'ZC']]
        all_X.append(df)
        all_y.append(y)
    X_combined = pd.concat(all_X, axis=0)
    y_combined = np.concatenate(all_y)

    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_combined)

    # Get base features using the base model
    X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
    with torch.no_grad():
        base_features = base_model(X_tensor).numpy()

    # Predict using DQN
    predictions = []
    # for feat in base_features:
    #     action, _ = dqn_model.predict(feat, deterministic=True)
    #     predictions.append(action)
    for feat in X_scaled:  # X_scaled has shape (n_samples, 5)
        action, _ = dqn_model.predict(feat, deterministic=True)
        predictions.append(action)
    predictions = np.array(predictions)

    # Metrics
    acc = accuracy_score(y_combined, predictions)
    print(f"Combined Dataset Accuracy: {acc*100:.2f}%")
    print("Confusion Matrix:")
    print(confusion_matrix(y_combined, predictions))

if __name__ == "__main__":
    evaluate_rl_combined(data_dir="/kaggle/input/freshd/data/")


In [None]:
import numpy as np
import pandas as pd
import os
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# --- Feature Extraction ---
def extract_features(signal):
    signal = np.array(signal)
    return {
        'MAV': np.mean(np.abs(signal)),
        'RMS': np.sqrt(np.mean(signal**2)),
        'VAR': np.var(signal),
        'WL': np.sum(np.abs(np.diff(signal))),
        'ZC': np.sum(np.diff(np.sign(signal)) != 0)
    }

# --- Data Preparation ---
def load_and_prepare_data(data_dir):
    all_X, all_y = [], []
    for i in range(13):
        x_file = f"{data_dir}/X{i}.npy"
        y_file = f"{data_dir}/Y{i}.npy"
        if not (os.path.exists(x_file) and os.path.exists(y_file)):
            continue
        X_raw = np.load(x_file)
        y = np.load(y_file)
        if len(X_raw.shape) == 3 and X_raw.shape[2] == 1:
            X_raw = X_raw.squeeze(-1)
        feature_list = [extract_features(X_raw[j]) for j in range(X_raw.shape[0])]
        df = pd.DataFrame(feature_list)[['MAV', 'RMS', 'VAR', 'WL', 'ZC']]
        all_X.append(df)
        all_y.append(y)
    X_combined = pd.concat(all_X, axis=0)
    y_combined = np.concatenate(all_y)
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_combined)
    return X_scaled, y_combined, scaler

# --- Base Model ---
class BaseNN(nn.Module):
    def __init__(self):
        super(BaseNN, self).__init__()
        self.fc1 = nn.Linear(5, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 2)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# --- Adaptation Head ---
class AdaptationHead(nn.Module):
    def __init__(self):
        super(AdaptationHead, self).__init__()
        self.fc = nn.Linear(2, 2)
    def forward(self, x):
        return self.fc(x)

# --- Transfer Learning Pipeline ---
def train_adaptation_head(data_dir):
    X_scaled, y_combined, scaler = load_and_prepare_data(data_dir)
    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y_combined, test_size=0.2, random_state=42, stratify=y_combined
    )

    # Load pretrained base model
    base_model = BaseNN()
    base_model.load_state_dict(torch.load("base_model.pth", map_location='cpu'))
    base_model.eval()
    for param in base_model.parameters():
        param.requires_grad = False

    adaptation_head = AdaptationHead()
    model = nn.Sequential(base_model, adaptation_head)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Prepare DataLoader
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)

    train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=64, shuffle=True)
    test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=128, shuffle=False)

    # Training loop
    optimizer = torch.optim.Adam(adaptation_head.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    num_epochs = 10

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            optimizer.zero_grad()
            preds = model(xb)
            loss = criterion(preds, yb)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {total_loss/len(train_loader):.4f}")

    # Evaluation
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for xb, yb in test_loader:
            preds = model(xb)
            pred_labels = preds.argmax(dim=1)
            correct += (pred_labels == yb).sum().item()
            total += yb.size(0)
    print(f"Test Accuracy after adaptation: {correct/total*100:.2f}%")

    # Save adaptation head for RL use
    torch.save(adaptation_head.state_dict(), "adaptation_head.pth")
    return scaler

# Run transfer learning
if __name__ == "__main__":
    scaler = train_adaptation_head("/kaggle/input/freshd/data/")


In [None]:
import numpy as np
import torch
from stable_baselines3 import DQN
# from stable_baselines3.common.envs import DummyVecEnv
from stable_baselines3.common.vec_env import DummyVecEnv

import gym

# --- Load models ---
class BaseNN(nn.Module):
    def __init__(self):
        super(BaseNN, self).__init__()
        self.fc1 = nn.Linear(5, 16)
        self.fc2 = nn.Linear(16, 8)
        self.fc3 = nn.Linear(8, 2)
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class AdaptationHead(nn.Module):
    def __init__(self):
        super(AdaptationHead, self).__init__()
        self.fc = nn.Linear(2, 2)
    def forward(self, x):
        return self.fc(x)

# --- Custom Gym Environment for RL ---
# class EMGAdaptedEnv(gym.Env):
#     def __init__(self, adapted_features, labels):
#         super().__init__()
#         self.X = adapted_features.astype(np.float32)
#         self.y = labels.astype(np.int64)
#         self.action_space = gym.spaces.Discrete(2)
#         self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32)
#         self.current_step = 0
#         self.max_steps = len(self.X)
#     def reset(self):
#         self.current_step = 0
#         return self.X[self.current_step]
#     def step(self, action):
#         reward = 1.0 if action == self.y[self.current_step] else -1.0
#         self.current_step += 1
#         done = self.current_step >= self.max_steps
#         obs = self.X[self.current_step % self.max_steps] if not done else self.X[0]
#         return obs, reward, done, {}
#     def render(self):
#         pass

def train_rl_on_adapted_features(data_dir):
    # Reload data and scaler
    from sklearn.preprocessing import StandardScaler
    X_scaled, y_combined, scaler = load_and_prepare_data(data_dir)

    # Load models
    base_model = BaseNN()
    base_model.load_state_dict(torch.load("base_model.pth", map_location='cpu'))
    base_model.eval()
    adaptation_head = AdaptationHead()
    adaptation_head.load_state_dict(torch.load("adaptation_head.pth", map_location='cpu'))
    adaptation_head.eval()

    # Compute adapted features
    X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
    with torch.no_grad():
        base_out = base_model(X_tensor)
        adapted_features = adaptation_head(base_out).numpy()

    # RL environment
    env = DummyVecEnv([lambda: EMGAdaptedEnv(adapted_features, y_combined)])

    # Train DQN
    dqn_model = DQN("MlpPolicy", env, verbose=1, learning_rate=1e-3, buffer_size=5000, learning_starts=1000)
    dqn_model.learn(total_timesteps=10000)
    dqn_model.save("dqn_adapted.zip")

if __name__ == "__main__":
    train_rl_on_adapted_features("/kaggle/input/freshd/data/")


In [None]:
from stable_baselines3 import DQN

# Load your trained RL agent
dqn_model = DQN.load("dqn_model.zip")

# Evaluate on test set
def evaluate_model(model, X_eval, y_eval):
    correct = 0
    for xi, yi in zip(X_eval, y_eval):
        obs = xi.astype(np.float32)
        action, _ = model.predict(obs, deterministic=True)
        if action == yi:
            correct += 1
    return correct / len(y_eval)

# X_test and y_test should be the same feature format as used during RL training
accuracy = evaluate_model(dqn_model, X_test, y_test)
print(f"DQN Test Accuracy: {accuracy * 100:.2f}%")
