In [2]:
import gymnasium as gym
from gymnasium import spaces

import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import statistics

In [10]:
class CustomTargetEnv(gym.Env):
    def __init__(self):
        """
        Initialize the environment with feature ranges, action space, and observation space.
        """
        super(CustomTargetEnv, self).__init__()

        # Feature ranges: Each feature has its allowed range
        self.feature_ranges = [(0, 5), (0, 1), (0, 5)]  # Ranges for x1, x2, x3
        
        # Define the action space: 2 actions per feature (increment, decrement)
        self.action_space = spaces.Discrete(len(self.feature_ranges) * 2)
        
        # Define the observation space: MultiDiscrete for the feature ranges
        self.observation_space = spaces.MultiDiscrete([r[1] - r[0] + 1 for r in self.feature_ranges])
        
        # State and target state
        self.state = None
        self.target_state = None  # Placeholder; initialized properly in reset()
        
        # Episode parameters
        self.steps = 0
        self.max_steps = 5000  # Prevent infinite episodes
        self.cumulative_reward = 0.0  # Track cumulative reward per episode

    def _generate_target_state(self):
        """
        Generate a random target state within the feature ranges.
        """
        return np.array([np.random.randint(low=r[0], high=r[1] + 1) for r in self.feature_ranges])

    def reset(self, seed=None, options=None):
        """
        Reset the environment for a new episode.
        """
        # Reset state to the minimum values for each feature
        self.state = np.array([r[0] for r in self.feature_ranges], dtype=np.int32)
        
        # Generate a new target state
        self.target_state = self._generate_target_state()
        
        # Reset episode parameters
        self.steps = 0
        self.cumulative_reward = 0.0
        
        return self.state, {}

    def step(self, action):
        """
        Execute an action and update the environment's state.
        """
        self.steps += 1
        
        # Map action to a feature index and operation (increment or decrement)
        feature_index = action // 2  # Feature to modify
        increment = 1 if action % 2 == 0 else -1  # Increment or decrement
        
        # Update the feature value within its range
        feature_min, feature_max = self.feature_ranges[feature_index]
        self.state[feature_index] = np.clip(
            self.state[feature_index] + increment, feature_min, feature_max
        )

        # Compute reward and check for success
        if np.array_equal(self.state, self.target_state):
            reward = 50.0  # Large reward for reaching the target state
            done = True
        else:
            reward = -0.5  # Small penalty for each step
            done = False

        # Check for max steps to end the episode
        if self.steps >= self.max_steps:
            done = True

        # Update cumulative reward
        self.cumulative_reward += reward

        return self.state, reward, done, False, {}

    def render(self):
        """
        Display the current state and target state.
        """
        print(f"State: {self.state}, Target: {self.target_state}, Cumulative Reward: {self.cumulative_reward}")

    def close(self):
        """
        Cleanup the environment (if necessary).
        """
        pass


In [4]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

In [5]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        batch = [self.buffer[idx] for idx in indices]
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards),
            np.array(next_states),
            np.array(dones),
        )
    
    def size(self):
        return len(self.buffer)

In [6]:
def train_dqn(env, episodes, gamma, epsilon, epsilon_decay, min_epsilon, batch_size, replay_buffer, q_network, target_network, optimizer, sync_freq):
    """
    Train a DQN using the custom environment.
    """
    total_rewards = []
    for episode in range(episodes):
        state, _ = env.reset()
        state = torch.FloatTensor(state).unsqueeze(0)
        episode_reward = 0
        done = False

        while not done:
            # Epsilon-greedy action selection
            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q_values = q_network(state)
                    action = torch.argmax(q_values).item()
            
            # Take action in the environment
            next_state, reward, done, truncated, _ = env.step(action)
            next_state = torch.FloatTensor(next_state).unsqueeze(0)

            # Store transition in replay buffer
            replay_buffer.add(state.squeeze(0).numpy(), action, reward, next_state.squeeze(0).numpy(), done)
            
            # Update state and cumulative reward
            state = next_state
            episode_reward += reward

            # Train the Q-network
            if replay_buffer.size() >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
                
                # Convert to tensors
                states = torch.FloatTensor(states)
                actions = torch.LongTensor(actions)
                rewards = torch.FloatTensor(rewards)
                next_states = torch.FloatTensor(next_states)
                dones = torch.FloatTensor(dones)

                # Compute target Q-values
                with torch.no_grad():
                    target_q_values = target_network(next_states)
                    max_target_q_values = target_q_values.max(1)[0]
                    targets = rewards + gamma * max_target_q_values * (1 - dones)
                
                # Compute current Q-values
                q_values = q_network(states)
                q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

                # Compute loss
                loss = nn.MSELoss()(q_values, targets)

                # Optimize the Q-network
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # Update target network periodically
        if episode % sync_freq == 0:
            target_network.load_state_dict(q_network.state_dict())

        # Decay epsilon
        epsilon = max(min_epsilon, epsilon * epsilon_decay)

        # Log results
        total_rewards.append(episode_reward)
        print(f"Episode {episode + 1}, Reward: {episode_reward:.2f}, Epsilon: {epsilon:.2f}")

    return total_rewards


In [11]:
# Import your custom environment
env = CustomTargetEnv()

# Parameters
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
episodes = 500
gamma = 0.99
epsilon = 1.0
epsilon_decay = 0.995
min_epsilon = 0.01
batch_size = 64
sync_freq = 10
learning_rate = 1e-3

# Initialize DQN and target network
q_network = DQN(state_dim, action_dim)
target_network = DQN(state_dim, action_dim)
target_network.load_state_dict(q_network.state_dict())

# Optimizer and replay buffer
optimizer = optim.Adam(q_network.parameters(), lr=learning_rate)
replay_buffer = ReplayBuffer(max_size=10000)

# Train the DQN
rewards = train_dqn(
    env, episodes, gamma, epsilon, epsilon_decay, min_epsilon, batch_size,
    replay_buffer, q_network, target_network, optimizer, sync_freq
)


Episode 1, Reward: -33.00, Epsilon: 0.99
Episode 2, Reward: -29.50, Epsilon: 0.99
Episode 3, Reward: 41.00, Epsilon: 0.99
Episode 4, Reward: 38.50, Epsilon: 0.98
Episode 5, Reward: 14.00, Epsilon: 0.98
Episode 6, Reward: -6.00, Epsilon: 0.97
Episode 7, Reward: 18.50, Epsilon: 0.97
Episode 8, Reward: -99.00, Epsilon: 0.96
Episode 9, Reward: 4.50, Epsilon: 0.96
Episode 10, Reward: 26.50, Epsilon: 0.95
Episode 11, Reward: 44.50, Epsilon: 0.95
Episode 12, Reward: -47.50, Epsilon: 0.94
Episode 13, Reward: -207.00, Epsilon: 0.94
Episode 14, Reward: 13.00, Epsilon: 0.93
Episode 15, Reward: -20.50, Epsilon: 0.93
Episode 16, Reward: -59.50, Epsilon: 0.92
Episode 17, Reward: 39.00, Epsilon: 0.92
Episode 18, Reward: -142.00, Epsilon: 0.91
Episode 19, Reward: -86.00, Epsilon: 0.91
Episode 20, Reward: 40.50, Epsilon: 0.90
Episode 21, Reward: -29.00, Epsilon: 0.90
Episode 22, Reward: -47.50, Epsilon: 0.90
Episode 23, Reward: 25.00, Epsilon: 0.89
Episode 24, Reward: -544.50, Epsilon: 0.89
Episode 25,

KeyboardInterrupt: 