---
## Replay Buffer

In [153]:
import numpy as np
import collections
import random
import matplotlib.pyplot as plt 
import csv
import time
import os

class ReplayBuffer:
    # 'capacity' determines the maximum number of transitions the buffer can store.
    def __init__(self, capacity):
        # Initialize the buffer with a fixed capacity using a deque.
        # When the queue is full (e.g., reaches 10,000 items), adding new data
        # automatically pushes out (deletes) the oldest data. 
        # This implements "First-In-First-Out" fixed-capacity management automatically.
        self.buffer = collections.deque(maxlen=capacity)
        
    # Stores a single experience (transition) resulting from the agent-environment interaction.
    def push(self, state, action, reward, next_state, done):
        # Store a single transition
        self.buffer.append((state, action, reward, next_state, done))

    # Defines the sampling method. 'batch_size' determines how many transitions 
    # are used for a single training step (typically 32, 64, or 128).
    def sample(self, batch_size):
        # Randomly sample a batch of transitions
        transitions = random.sample(self.buffer, batch_size)
        # Unzip the transitions into separate tuples (state, action, etc.)
        state, action, reward, next_state, done = zip(*transitions)
        # Return the processed data for neural network training.
        return np.array(state), action, reward, np.array(next_state), done

    # Returns the actual number of transitions currently stored in the buffer.
    def size(self):
        # Return the current size of the buffer
        return len(self.buffer)

---
## 1D CNN Architecture

In [154]:
import torch
import torch.nn as nn # Neural network module (contains layers)
import torch.nn.functional as F # Functional interface (includes activation functions)

# Inherits from the PyTorch base class nn.Module
class QNetwork(nn.Module):
    def __init__(self, num_channels, window_size, num_actions):
        '''
        num_channels: Number of input data channels (e.g., 12 or 16 sEMG sensors).
        window_size:  Length of the input time window (e.g., 50 sampling points). 
                      Note: In this specific network, it is not directly used to define layer sizes 
                      because the subsequent pooling layer handles the length dimension.
        num_actions:  Number of output actions (e.g., 13 gesture classes), 
                      corresponding to the number of Q-values output by the DQN.
        '''
        
        super(QNetwork, self).__init__()

        # Input shape: (Batch, Channels, Window_Size)
        '''
        nn.Conv1d:    1D Convolutional layer. This is the core for processing time-series data like sEMG.
        in_channels:  Tells the network how many input curves (sensor data) there are.
        out_channels: This layer will extract 32 different feature patterns (Feature Maps).
        kernel_size:  The field of view size of the kernel; it looks at 3 adjacent time points at a time.
        padding:      Zero-padding strategy to ensure the output length after convolution remains 
                      consistent with the input length (facilitates subsequent calculations).
        '''
        
        # 1D Convolutional Layer 1
        self.conv1 = nn.Conv1d(in_channels=num_channels, out_channels=32, kernel_size=3, padding=1)
        
        # 1D Convolutional Layer 2
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        
        # Fully Connected Layers
        # We use Global Max Pooling, so the input to FC is just the number of feature maps (64)
        self.fc1 = nn.Linear(64, 128)
        
        # self.fc2: Output layer. The number of output nodes equals the number of actions. 
        # For DQN, this outputs the Q-value (expected reward) corresponding to each action.
        self.fc2 = nn.Linear(128, num_actions) # Output: Q-values for each action

    def forward(self, x):
        # x shape comes in as (Batch, Window, Channels) usually, need to transpose for Conv1d
        # Target shape for Conv1d: (Batch, Channels, Window)
        
        # Swap the 1st and 2nd dimensions. Move the 'time axis' to the end 
        # and the 'channel axis' to the middle to adapt to the convolutional layer.
        x = x.permute(0, 2, 1)

        # Apply Conv1 -> ReLU
        x = F.relu(self.conv1(x))
        
        # Apply Conv2 -> ReLU
        x = F.relu(self.conv2(x))
        
        # Global Max Pooling over the time dimension
        # Reduces (Batch, 64, Window) -> (Batch, 64)
        x = torch.max(x, dim=2)[0]
        
        # Fully connected layers
        x = F.relu(self.fc1(x))
        return self.fc2(x) # Returns Q(s, a)

---
## Environment Wrapper

In [155]:
import scipy.io
import numpy as np # Added numpy import as it is used in the code

class EMGEnvironment:
    def __init__(self, file_path, window_size=50, max_samples=None, channels=None): # <--- New parameter
            # 1. Load Data
            data = scipy.io.loadmat(file_path)
            self.emg = data['emg']             
            self.labels = data['restimulus']   
            
            # --- New Code: Truncate the first max_samples data points ---
            if max_samples is not None:
                self.emg = self.emg[:max_samples]       # Take only the first N rows
                self.labels = self.labels[:max_samples] # Truncate labels correspondingly
                print(f"⚠️ Data limited to first {max_samples} samples only (Input size)!")
                
            if channels is not None:
                # self.emg shape is (time_steps, num_channels)
                # [:, channels] means: Keep all time rows, but only keep the columns specified in the channels list
                self.emg = self.emg[:, channels]
                print(f"⚠️ Data limited to {channels} channels (Input dimensions)!")
            # ----------------------------------------

            # 2. Data Normalization (Calculate mean/std only on the truncated data to avoid future data leakage)
            mean = np.mean(self.emg, axis=0)
            std = np.std(self.emg, axis=0) + 1e-8 
            self.emg = (self.emg - mean) / std
            
            self.window_size = window_size
            self.idx = 0 
            self.n_samples = self.emg.shape[0] 
            self.num_actions = len(np.unique(self.labels)) 
            self.num_channels = self.emg.shape[1]
            
            print(f"⚠️ Data limited to {self.num_actions} labels (Output dimensions)!")

    # This function is called before the start of each Episode to restore the environment to its initial state.
    def reset(self):
        # Reset pointer to the beginning
        self.idx = 0
        # Return the first state (first window)
        return self.emg[self.idx : self.idx + self.window_size]


    # Every time the agent takes an Action, the environment executes this function once.
    def step(self, action, step=1):
        # 1. Get the ground truth label for the current window
        # We take the label of the last timestamp in the window as the target
        current_label_idx = self.idx + self.window_size - 1
        
        # Check if we reached the end of the dataset
        if current_label_idx >= self.n_samples - 1:
            return np.zeros_like(self.emg[0:self.window_size]), 0, True
            
        true_label = self.labels[current_label_idx][0]
        
        # 2. Calculate Reward
        # Reward +1 for correct classification, -1 for incorrect
        if action == true_label:
            reward = 1.0
        else:
            reward = -1.0
            
        # 3. Move forward one step
        self.idx += step
        
        # 4. Get Next State
        next_state = self.emg[self.idx : self.idx + self.window_size]
        
        # 5. Check done condition
        done = (self.idx + self.window_size >= self.n_samples)
        
        return next_state, reward, done

---
## Training Loop

In [156]:
import torch.optim as optim
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt # Library for plotting graphs
import csv
import time
import os

# --- Hyperparameters & Configuration ---
# stored in a dictionary for easy writing to CSV later
# These parameters control the training physics and network behavior.
PARAMS = {
    'FILE_PATH': 's1/S1_E1_A1.mat',   # Path to the EMG data file
    'WINDOW_SIZE': 50,                # Length of the time series window (state size)
    'BATCH_SIZE': 64,                # Number of samples to train on in one iteration
    'LR': 0.001,                      # Learning Rate: step size for gradient descent
    'GAMMA': 0.995,                    # Discount Factor: Importance of future rewards (0=short-sighted, 1=far-sighted)
    'EPSILON_START': 1.0,             # Initial exploration rate (100% random actions at start)
    'EPSILON_DECAY': 0.995,           # Decay rate: How fast exploration reduces per episode
    'EPSILON_MIN': 0.01,              # Minimum exploration rate (always keep 1% chance to explore)
    'BUFFER_CAPACITY': 10000,         # Max size of Replay Buffer (First-In-First-Out)
    'NUM_EPISODES': 500,               # Total number of training episodes (iterations)
    'MAX_SAMPLES': 5000,             # Limit input data size (truncation)
    'CHANNELS': list(range(2)),       # Active EMG sensor channels to use
    'STEP': 50 ,                      # Stride: How many time steps to move window forward
    'Output_dimensions' : 2
}

# Unpack a mutable variable for use in the loop
EPSILON = PARAMS['EPSILON_START']

# --- Initialization ---
# Select GPU if available, otherwise fallback to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize Environment, Network, Optimizer, and Buffer
# 1. Environment: Custom wrapper to handle EMG data as an RL environment
env = EMGEnvironment(PARAMS['FILE_PATH'], PARAMS['WINDOW_SIZE'], PARAMS['MAX_SAMPLES'], PARAMS['CHANNELS'])

# 2. Q-Network: The Neural Network approximating the Q-function Q(s, a)
q_net = QNetwork(env.num_channels, PARAMS['WINDOW_SIZE'], env.num_actions).to(device)

# 3. Optimizer: Adam is used to update network weights based on the learning rate
optimizer = optim.Adam(q_net.parameters(), lr=PARAMS['LR'])

# 4. Loss Function: Mean Squared Error (MSE) measures the difference between prediction and target
loss_fn = nn.MSELoss()

# 5. Replay Buffer: Stores past experiences to break temporal correlation during training
buffer = ReplayBuffer(PARAMS['BUFFER_CAPACITY'])

# --- History Lists (For Logging) ---
history_rewards = []
history_accuracy = []
history_epsilon = []
history_episodes = []

# --- Main Training Loop ---
print(f"Starting training on device: {device}")
start_time = time.time()

for episode in range(PARAMS['NUM_EPISODES']):
    # Reset environment to start state at the beginning of each episode
    state = env.reset()
    step_count = 0
    total_reward = 0
    done = False
    
    # Loop until the episode ends (dataset exhausted)
    while not done:
        # -------------------------------------------------------
        # 1. Action Selection (Epsilon-Greedy Strategy)
        # -------------------------------------------------------
        # Explore: With probability EPSILON, choose a random action
        if np.random.rand() < EPSILON:
            action = np.random.randint(0, env.num_actions)
        # Exploit: Otherwise, ask the Neural Network for the best action
        else:
            with torch.no_grad(): # Disable gradient calc for inference (saves memory/speed)
                # Convert state to tensor and add batch dimension: (C, W) -> (1, C, W)
                state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
                # Get Q-values for all actions
                q_values = q_net(state_tensor)
                # Select the action index with the highest Q-value
                action = torch.argmax(q_values).item()
        
        # -------------------------------------------------------
        # 2. Environment Interaction
        # -------------------------------------------------------
        # Execute the action, receive feedback (reward) and new state
        next_state, reward, done = env.step(action, PARAMS['STEP'])
        
        step_count += 1
        
        # Store the transition tuple in the Replay Buffer
        buffer.push(state, action, reward, next_state, done)
        
        # Update current state to next state for the next iteration
        state = next_state
        total_reward += reward
        
        # -------------------------------------------------------
        # 3. Network Training (Experience Replay)
        # -------------------------------------------------------
        # Start training only when we have enough data in the buffer
        if buffer.size() > PARAMS['BATCH_SIZE']:
            # Randomly sample a batch of transitions
            b_states, b_actions, b_rewards, b_next_states, b_dones = buffer.sample(PARAMS['BATCH_SIZE'])
            
            # Convert numpy arrays to PyTorch tensors and move to GPU/CPU
            b_states = torch.FloatTensor(b_states).to(device)
            b_actions = torch.LongTensor(b_actions).unsqueeze(1).to(device) # Shape: (batch, 1)
            b_rewards = torch.FloatTensor(b_rewards).to(device)
            b_next_states = torch.FloatTensor(b_next_states).to(device)
            b_dones = torch.FloatTensor(b_dones).to(device)
            
            # --- Compute Q_current (Predicted Q) ---
            # Pass batch states through network. gather(1, b_actions) selects only 
            # the Q-value corresponding to the action that was actually taken.
            q_current = q_net(b_states).gather(1, b_actions).squeeze(1)
            
            # --- Compute Q_target (Bellman Equation) ---
            # We do not want to update gradients for the target calculation
            with torch.no_grad():
                # Find the max Q-value for the *next* state (Logic: best possible future)
                q_next = q_net(b_next_states).max(1)[0]
                
                # Formula: Target = Reward + Gamma * Max(Q_next)
                # If done is True (1), the future reward is 0.
                q_target = b_rewards + (PARAMS['GAMMA'] * q_next * (1 - b_dones))
            
            # --- Optimization Step ---
            # Calculate loss between Predicted Q and Target Q
            loss = loss_fn(q_current, q_target)
            
            # Backpropagation: Clear old gradients, compute new gradients, update weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
    # -------------------------------------------------------
    # 4. End of Episode Updates & Logging
    # -------------------------------------------------------
    # Decay Epsilon: Reduce exploration rate exponentially, but stop at EPSILON_MIN
    EPSILON = max(PARAMS['EPSILON_MIN'], EPSILON * PARAMS['EPSILON_DECAY'])
    
    # Calculate accuracy based on rewards (Assuming Reward is +1 for correct, -1 for wrong)
    # Formula converts range [-step_count, +step_count] to [0, 100]
    accuracy = (total_reward + step_count) / (2 * step_count) * 100
    
    # --- Store data for this episode ---
    history_episodes.append(episode + 1)
    history_rewards.append(total_reward)
    history_accuracy.append(accuracy)
    history_epsilon.append(EPSILON)
    
    PARAMS['Output_dimensions'] = env.num_actions
    
    
    print(f"Episode {episode+1}/{PARAMS['NUM_EPISODES']}, Reward: {total_reward:.1f}, Acc: {accuracy:.2f}%, Eps: {EPSILON:.3f}")

print("Training Finished. Generating reports...")



⚠️ Data limited to first 5000 samples only (Input size)!
⚠️ Data limited to [0, 1] channels (Input dimensions)!
⚠️ Data limited to 2 labels (Output dimensions)!
Starting training on device: cuda
Episode 1/500, Reward: 5.0, Acc: 52.53%, Eps: 0.995
Episode 2/500, Reward: -3.0, Acc: 48.48%, Eps: 0.990
Episode 3/500, Reward: -1.0, Acc: 49.49%, Eps: 0.985
Episode 4/500, Reward: -13.0, Acc: 43.43%, Eps: 0.980
Episode 5/500, Reward: -7.0, Acc: 46.46%, Eps: 0.975
Episode 6/500, Reward: 9.0, Acc: 54.55%, Eps: 0.970
Episode 7/500, Reward: -3.0, Acc: 48.48%, Eps: 0.966
Episode 8/500, Reward: 15.0, Acc: 57.58%, Eps: 0.961
Episode 9/500, Reward: 1.0, Acc: 50.51%, Eps: 0.956
Episode 10/500, Reward: -1.0, Acc: 49.49%, Eps: 0.951
Episode 11/500, Reward: 3.0, Acc: 51.52%, Eps: 0.946
Episode 12/500, Reward: 5.0, Acc: 52.53%, Eps: 0.942
Episode 13/500, Reward: 1.0, Acc: 50.51%, Eps: 0.937
Episode 14/500, Reward: 7.0, Acc: 53.54%, Eps: 0.932
Episode 15/500, Reward: -1.0, Acc: 49.49%, Eps: 0.928
Episode 16

---
## Data generation

In [157]:
# --- Save Results Logic ---
# Create a timestamp string to ensure unique filenames
timestamp = time.strftime("%Y%m%d-%H%M%S")
csv_filename = f"data/training_log_{timestamp}.csv"
pdf_filename = f"graph/training_plot_{timestamp}.pdf"

# 1. Save CSV (Hyperparameters + Training Data)
with open(csv_filename, 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    
    # Section 1: Hyperparameters
    writer.writerow(["--- Hyperparameters ---"])
    for key, value in PARAMS.items():
        writer.writerow([key, value])
    
    writer.writerow([]) # Empty line for separation
    
    # Section 2: Training Data
    writer.writerow(["--- Training Data ---"])
    writer.writerow(["Episode", "Total Reward", "Accuracy (%)", "Epsilon"])
    
    # Zip lists together to write row by row
    rows = zip(history_episodes, history_rewards, history_accuracy, history_epsilon)
    writer.writerows(rows)

print(f"CSV Log saved to: {csv_filename}")

# 2. Save PDF Plot
plt.figure(figsize=(10, 8)) # Set figure size (Width, Height)

# Subplot 1: Accuracy
plt.subplot(2, 1, 1) # (Rows, Cols, Index)
plt.plot(history_episodes, history_accuracy, label='Accuracy', color='blue', linewidth=2)
plt.title('Training Accuracy per Episode')
plt.ylabel('Accuracy (%)')
plt.grid(True, linestyle='--', alpha=0.7)
plt.legend()

# Subplot 2: Total Reward
plt.subplot(2, 1, 2)
plt.plot(history_episodes, history_rewards, label='Total Reward', color='green', linewidth=2)
plt.title('Total Reward per Episode')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.grid(True, linestyle='--', alpha=0.7)
plt.legend()

# Adjust layout to prevent overlap
plt.tight_layout()

# Save and close
plt.savefig(pdf_filename)
plt.close()

print(f"PDF Report saved to: {pdf_filename}")

CSV Log saved to: data/training_log_20260112-085544.csv
PDF Report saved to: graph/training_plot_20260112-085544.pdf
