In [1]:
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random

class CustomEnv(gym.Env):
    def __init__(self):
        super(CustomEnv, self).__init__()
        # Define action and observation space
        self.action_space = spaces.MultiDiscrete([10, 10, 10])  # Example: 3 integers each from 0 to 9
        self.observation_space = spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
        
    def reset(self):
        return np.array([0.0])
    
    def step(self, action):
        # Dummy scoring function based on the action
        reward = sum(action)
        done = True  # End the episode
        info = {}
        return np.array([0.0]), reward, done, info

    def render(self, mode='human'):
        pass

env = CustomEnv()


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Hyperparameters
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
learning_rate = 0.001
batch_size = 32
memory_size = 10000

# Initialize replay memory
memory = []

# Initialize policy and target networks
policy_net = DQN(env.observation_space.shape[0], env.action_space.nvec.sum())
target_net = DQN(env.observation_space.shape[0], env.action_space.nvec.sum())
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

def select_action(state, epsilon):
    if random.random() < epsilon:
        return env.action_space.sample()
    else:
        with torch.no_grad():
            return policy_net(torch.FloatTensor(state)).numpy()

def optimize_model():
    if len(memory) < batch_size:
        return
    transitions = random.sample(memory, batch_size)
    batch_state, batch_action, batch_reward, batch_next_state = zip(*transitions)

    batch_state = torch.FloatTensor(batch_state)
    batch_action = torch.FloatTensor(batch_action)
    batch_reward = torch.FloatTensor(batch_reward)
    batch_next_state = torch.FloatTensor(batch_next_state)

    q_values = policy_net(batch_state).gather(1, batch_action.long().unsqueeze(1)).squeeze(1)
    next_q_values = target_net(batch_next_state).max(1)[0]
    expected_q_values = batch_reward + (gamma * next_q_values)

    loss = criterion(q_values, expected_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


In [None]:
num_episodes = 1000

for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0

    for t in range(100):  # limit the number of steps per episode
        action = select_action(state, epsilon)
        next_state, reward, done, _ = env.step(action)
        
        memory.append((state, action, reward, next_state))
        if len(memory) > memory_size:
            memory.pop(0)

        state = next_state
        total_reward += reward

        optimize_model()

        if done:
            break

    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

    print(f"Episode {episode} - Total Reward: {total_reward}")

    # Update the target network every few episodes
    if episode % 10 == 0:
        target_net.load_state_dict(policy_net.state_dict())


In [None]:
state = env.reset()
total_reward = 0
done = False

while not done:
    action = select_action(state, 0.0)  # Select action with greedy policy (epsilon=0)
    state, reward, done, _ = env.step(action)
    total_reward += reward

print(f"Total Reward during testing: {total_reward}")
