In [1]:
import numpy as np
from scipy.stats import entropy

class ReinforcementEnvironment:
    def __init__(self, num_bands, energy_cost=2, reward_factor=5, weight=5, max_timestep=180):
        self.num_bands = num_bands
        self.energy_cost = energy_cost
        self.reward_factor = reward_factor
        self.max_timestep = max_timestep
        self.weight = weight
        self.signal_band = {band: [] for band in range(self.num_bands)}
        self.current_timestep = 0
        self.transition_matrixes = {band: {} for band in range(self.num_bands)}
        self.init_bands()
        self.current_state = self.get_current_state()
    
    def init_bands(self):
        """Initialize each band with two initial signal values (0 or 1)"""
        for band in range(self.num_bands):
            # First signal chosen with equal probability
            t1 = np.random.choice([0, 1])
            
            # Second signal chosen with random probability distribution
            t_m1 = np.random.rand(2,2)
            t_m1 /= t_m1.sum(axis=1,keepdims=True)  # Normalize to create valid probability distribution
            t2 = np.random.choice([0, 1], p=t_m1[t1])
            # t_m2 = {
            #     (0, 0): np.random.rand(2),
            #     (0, 1): np.random.rand(2),
            #     (1, 0): np.random.rand(2),
            #     (1, 1): np.random.rand(2)
            # }
            # for k in t_m2:
            #     t_m2[k] /= t_m2[k].sum()
            t_m2 = {
            (0, 0): np.random.dirichlet([1, 1]),  # Generates a valid probability distribution over {0,1}
            (0, 1): np.random.dirichlet([1, 1]),
            (1, 0): np.random.dirichlet([1, 1]),
            (1, 1): np.random.dirichlet([1, 1])
            }
            self.transition_matrixes[band] = t_m2
            self.signal_band[band] = [t1, t2]  
    
    def step(self, action):
        """
        Execute one time step within the environment
        
        Args:
            action: tuple (band, prediction) where band is the selected frequency band
                   and prediction is the predicted signal value (0 or 1)
        
        Returns:
            tuple: (observation, reward, done, info)
        """
        self.current_timestep += 1
        
        band = action[0]
        prediction = action[1]
        
        reward = self._calculate_reward(self.current_state[band], prediction)
        
        self.generate_state()
        
        observation = self.construct_observation_space()
        
        done = self.current_timestep >= self.max_timestep
        
        info = {
            "timestep": self.current_timestep,
            "correct_prediction": self.current_state[band] == prediction,
            "state": self.current_state
        }
        
        return observation, reward, done, info
    
    def _calculate_reward(self, actual_signal, prediction):
        """Calculate reward based on prediction accuracy and signal value"""
        if actual_signal == prediction:
            # Correct prediction
            reward = self.reward_factor * self.weight - self.energy_cost
        elif actual_signal == 0:
            # Incorrect prediction when signal is 0
            reward = self.reward_factor - self.energy_cost
        else:  # actual_signal == 1
            # Incorrect prediction when signal is 1
            reward = self.reward_factor - self.energy_cost * self.weight
        
        return reward
    
    def generate_state(self):
        """Generate next state for all bands based on transition probabilities"""
        for band in range(self.num_bands):
            # Get last two signals for this band
            p_2 = tuple(self.signal_band[band][-2:])
            
            t_m2 = self.transition_matrixes[band]
            
            next_signal = np.random.choice([0, 1], p=t_m2[p_2])
            
            self.signal_band[band].append(next_signal)
            self.signal_band[band].pop(0)
        
        # Update current state
        self.current_state = self.get_current_state()
        
        return self.current_state
    
    def get_current_state(self):
        """Return the current state as a list of the most recent signal for each band"""
        return [self.signal_band[band][-1] for band in range(self.num_bands)]
    
    def reset(self):
        """Reset the environment to initial state and return initial observation"""
        self.signal_band = {band: [] for band in range(self.num_bands)}
        self.current_timestep = 0
        self.init_bands()
        self.current_state = self.get_current_state()
        return self.construct_observation_space()
    
    def construct_observation_space(self, window_size=10):
        """
        Construct observation space with entropy calculations for each band
        
        Args:
            window_size: Number of recent signals to consider for entropy calculation
            
        Returns:
            list: Entropy values for each band
        """
        observation = []
        for band in range(self.num_bands):
            signal_values = np.array(self.signal_band[band][-window_size:])
            
            if len(signal_values) <= window_size:
                entropy_value = 0
            else:
                value_counts = np.bincount(signal_values, minlength=2)
                
                probability_distribution = value_counts / len(signal_values)
                
                # Handle edge cases
                if np.all(probability_distribution == 0):
                    entropy_value = 0
                else:
                    # Calculate entropy using scipy function
                    entropy_value = entropy(probability_distribution, base=2)
            
            observation.append(entropy_value)
        
        return observation
    
    def soft_reset(self):
        self.signal_band = {band: self.signal_band[band][-2:] for band in range(self.num_bands)}
        self.current_timestep = 0
        self.generate_state()
        return self.construct_observation_space()
        

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
from collections import deque

class AdvancedDQNLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=1, device=None):
        """
        Advanced LSTM-based Deep Q-Network with input dimensionality handling
        
        Args:
        - input_dim: Dimension of input state
        - hidden_dim: Number of LSTM hidden units
        - output_dim: Number of possible actions
        - num_layers: Number of LSTM layers
        - device: Torch device (cuda/cpu)
        """
        super(AdvancedDQNLSTM, self).__init__()
        
        # Explicitly set device
        self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # LSTM Layer
        self.lstm = nn.LSTM(
            input_size=input_dim, 
            hidden_size=hidden_dim, 
            num_layers=num_layers, 
            batch_first=True,
            dropout=0.2 if num_layers > 1 else 0
        ).to(self.device)
        
        # Fully connected layers with dropout
        self.fc1 = nn.Linear(hidden_dim, 128).to(self.device)
        self.dropout1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(128, 256).to(self.device)
        self.dropout2 = nn.Dropout(0.2)
        self.fc3 = nn.Linear(256, 128).to(self.device)
        self.fc_out = nn.Linear(128, output_dim).to(self.device)
        
        # Exploration parameters
        self.temperature = 1.0
        self.min_temperature = 0.1
        self.temperature_decay = 0.9995
        
        # Store input dimensions
        self.input_dim = input_dim
    
    def _prepare_input(self, state):
        """
        Prepare input tensor for LSTM with proper dimensionality
        
        Args:
        - state: Input state (can be 1D, 2D, or 3D)
        
        Returns:
        - Tensor with dimensions (batch_size, sequence_length, input_dim)
        """
        # Ensure input is a torch tensor
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(state, dtype=torch.float32)
        
        # Move to correct device
        state = state.to(self.device)
        
        # Handle different input dimensionalities
        if state.dim() == 1:
            # Single 1D state: convert to 3D (batch_size=1, sequence_length=1, input_dim)
            state = state.unsqueeze(0).unsqueeze(0)
        elif state.dim() == 2:
            # 2D input: assume (batch_size, input_dim) - convert to (batch_size, sequence_length=1, input_dim)
            state = state.unsqueeze(1)
        
        return state
    
    def forward(self, x, prev_hidden=None):
        """
        Forward pass through the network with input dimensionality handling
        """
        # Prepare input tensor
        x = self._prepare_input(x)
        
        # If no previous hidden state, initialize on the correct device
        if prev_hidden is None:
            h0 = torch.zeros(
                self.lstm.num_layers, 
                x.size(0), 
                self.lstm.hidden_size, 
                device=self.device
            )
            c0 = torch.zeros(
                self.lstm.num_layers, 
                x.size(0), 
                self.lstm.hidden_size, 
                device=self.device
            )
            prev_hidden = (h0, c0)
        else:
            # Move previous hidden state to correct device if needed
            prev_hidden = (
                prev_hidden[0].to(self.device), 
                prev_hidden[1].to(self.device)
            )
        
        # LSTM layer
        lstm_out, (hidden, cell) = self.lstm(x, prev_hidden)
        
        # Take the last time step
        last_time_step = lstm_out[:, -1, :]
        
        # Fully connected layers with dropout
        x = F.relu(self.fc1(last_time_step))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x = F.relu(self.fc3(x))
        
        # Output layer
        q_values = self.fc_out(x)
        
        return q_values, (hidden, cell)
    
    def select_action(self, state):
        """
        Softmax action selection with temperature decay
        
        Returns an action in the format expected by the environment
        """
        # Decay temperature
        self.temperature = max(
            self.min_temperature, 
            self.temperature * (self.temperature_decay ** (1 / 500))
        )
        
        with torch.no_grad():
            # Prepare input and get Q-values
            state_tensor = self._prepare_input(state)
            q_values, _ = self(state_tensor)
            
            # Ensure q_values is 1D numpy array
            q_values = q_values.squeeze().cpu().numpy()
            
            # Softmax action selection
            scaled_q_values = q_values / self.temperature
            scaled_q_values -= np.max(scaled_q_values)
            
            exp_q = np.exp(scaled_q_values)
            action_probs = exp_q / np.sum(exp_q)
            
            # Select action index
            return np.random.choice(len(q_values), p=action_probs)


class ImprovedDQNLSTMAgent:
    def __init__(self, env, input_dim, output_dim, hidden_dim=64):
        """
        Improved DQN Agent with LSTM and advanced training techniques
        """
        self.env = env
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        # Centralized device selection
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")
        
        # Twin networks with explicit device management
        self.q_network = AdvancedDQNLSTM(
            input_dim, 
            hidden_dim, 
            output_dim, 
            device=self.device
        )
        self.target_network = AdvancedDQNLSTM(
            input_dim, 
            hidden_dim, 
            output_dim, 
            device=self.device
        )
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        # Hyperparameters
        self.learning_rate = 0.0005
        self.gamma = 0.99
        
        # Experience Replay
        self.replay_memory = deque(maxlen=20000)
        self.batch_size = 128
        
        # Optimizer with L2 regularization
        self.optimizer = optim.Adam(
            self.q_network.parameters(), 
            lr=self.learning_rate, 
            weight_decay=1e-5
        )
        
        # Loss function
        self.loss_fn = F.smooth_l1_loss
    
    def store_transition(self, state, action, reward, next_state, done):
        """
        Store transition in replay memory
        """
        experience = (state, action, reward, next_state, done)
        self.replay_memory.append(experience)
    
    def experience_replay(self):
        """
        Enhanced experience replay with double DQN
        """
        if len(self.replay_memory) < self.batch_size:
            return
        
        # Sample batch
        batch = random.sample(self.replay_memory, self.batch_size)
        
        # Prepare batch tensors
        states = torch.tensor(
            np.array([b[0] for b in batch]), 
            dtype=torch.float32
        ).to(self.device)
        
        actions = torch.tensor(
            [b[1] for b in batch], 
            dtype=torch.long
        ).to(self.device)
        
        rewards = torch.tensor(
            np.array([b[2] for b in batch]), 
            dtype=torch.float32
        ).to(self.device)
        
        next_states = torch.tensor(
            np.array([b[3] for b in batch]), 
            dtype=torch.float32
        ).to(self.device)
        
        dones = torch.tensor(
            np.array([b[4] for b in batch]), 
            dtype=torch.float32
        ).to(self.device)
        
        # Compute current Q values
        current_q_values, _ = self.q_network(states)
        current_q_values = current_q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Compute target Q values with double DQN
        with torch.no_grad():
            next_q_values_main, _ = self.q_network(next_states)
            next_q_values_target, _ = self.target_network(next_states)
            
            # Double DQN: select actions from main network, evaluate from target
            max_next_actions = next_q_values_main.argmax(1)
            max_next_q_values = next_q_values_target.gather(1, max_next_actions.unsqueeze(1)).squeeze(1)
            
            # Compute target Q values
            target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values
        
        # Compute loss
        loss = self.loss_fn(current_q_values, target_q_values)
        
        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), max_norm=1)
        
        self.optimizer.step()
    
    def train(self, episodes=1000):
        """
        Training loop with comprehensive metrics
        """
        total_rewards = []
        
        for episode in range(episodes):
            # Reset environment
            state = self.env.soft_reset()
            total_reward = 0
            done = False
            
            while not done:
                action = self.q_network.select_action(state)
                next_state, reward, done, info = self.env.step(action)
                
                self.store_transition(state, action, reward, next_state, done)
                
                state = next_state
                total_reward += reward
                
                self.experience_replay()
            
            # Periodic target network update
            if episode % 100 == 0:
                self.target_network.load_state_dict(self.q_network.state_dict())
            
            total_rewards.append(total_reward)
            
            # Logging
            if episode % 50 == 0:
                print(f"Episode {episode}, "
                      f"Total Reward: {total_reward:.2f}, "
                      f"Temperature: {self.q_network.temperature:.4f}")
        
        return total_rewards

# Utility function for plotting rewards
def plot_rewards(rewards, window_size=10):
    import matplotlib.pyplot as plt
    import numpy as np
    
    plt.figure(figsize=(12, 6), facecolor='white')
    
    plt.plot(rewards, alpha=0.5, color='lightblue', label='Episode Reward')
    
    moving_average = np.convolve(rewards, np.ones(window_size)/window_size, mode='valid')
    
    plt.plot(
        np.arange(window_size-1, len(rewards)), 
        moving_average, 
        color='blue', 
        linewidth=2, 
        label=f'{window_size}-Episode Moving Avg'
    )
    
    plt.title('Training Reward over Episodes', fontweight='bold')
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()



In [3]:
def main():
    """
    Main function to run the LSTM-based DQN
    """
    # Set random seeds for reproducibility
    np.random.seed(42)
    torch.manual_seed(42)
    random.seed(42)
    
    # Environment configuration
    num_bands = 10
    
    # Create environment
    env = ReinforcementEnvironment(num_bands)
    
    # Network configuration
    input_dim = num_bands  # Input dimension (state size)
    output_dim = num_bands * 2  # Output dimension (number of possible actions)
    hidden_dim = 64  # LSTM hidden dimension
    
    # Create agent
    agent = ImprovedDQNLSTMAgent(
        env=env, 
        input_dim=input_dim, 
        output_dim=output_dim, 
        hidden_dim=hidden_dim
    )
    
    # Train the agent
    rewards = agent.train(episodes=500)
    
    # Plot training rewards
    plot_rewards(rewards)

In [4]:
main()

Using device: cuda


TypeError: 'int' object is not subscriptable