In [None]:
"""
Author: Risshab Srinivas Ramesh
This python notebook illustrates how a learning agent can solve an optimisation problem.
"""

In [None]:
#Libraries
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
from collections import namedtuple, deque
import gymnasium, random

#For CUDA, use "cuda", For CPU, use "CPU"
device = torch.device("mps")
print(device)

In [None]:
PENALTY = 100.0
REPLAY_BUFFER = 50000
LR = 5e-4

In [None]:
SEED = 42 
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# For CUDA: torch.cuda.manual_seed_all(SEED)

print(f"Random seeds set to {SEED}")

In [None]:
class DQN(nn.Module):
    def __init__(self, state_size, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_size, 64*2),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )
    def forward(self, x):
        return self.net(x)

In [None]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayBuffer:
    def __init__(self, cap=10000):
        self.buffer = deque([], maxlen=cap)
    
    def push(self, *args):
        self.buffer.append(Transition(*args))
    
    def sample(self, batch_size):
       return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

In [None]:
# Objective: max z = 4*x + 3*y
def objective(x, y):
    return 4 * x + 3 * y

# Constraints:
# g1(x,y) = x + y - 40 <= 0
def constraint1(x, y):
    return x + y - 40

# g2(x,y) = 2*x + y - 60 <= 0
def constraint2(x, y):
    return 2 * x + y - 60

In [None]:
# CUSTOM ENV
class LinearOptimizationEnv:
    def __init__(self, max_steps=250):
        self.step_size = 1
        self.max_steps = max_steps
        self.penalty = PENALTY
        self.x_bounds = (0, 100)
        self.y_bounds = (0, 100)
        self.action_space_size = 4
        self.state_space_size = 2
        self.x = 0.0
        self.y = 0.0
        self.current_step = 0

    def _get_state(self):
        return np.array([self.x, self.y], dtype=np.float32)

    def _calculate_violation(self, x, y):
        c1_violation = max(0, constraint1(x, y))
        c2_violation = max(0, constraint2(x, y))
        return c1_violation + c2_violation

    def reset(self, seed=None, options=None):
        self.current_step = 0
        self.x = 10.0
        self.y = 10.0
        return self._get_state() / 100.0, {}

    def step(self, action):
        self.current_step += 1
        
        if action == 0: self.y += self.step_size
        elif action == 1: self.y -= self.step_size
        elif action == 2: self.x -= self.step_size
        elif action == 3: self.x += self.step_size

        self.x = np.clip(self.x, self.x_bounds[0], self.x_bounds[1])
        self.y = np.clip(self.y, self.y_bounds[0], self.y_bounds[1])

        current_z = objective(self.x, self.y)
        violation = self._calculate_violation(self.x, self.y)
        
        done = False
        
        if violation > 0:
            reward = -self.penalty
        else:
            reward = current_z
            if current_z >= 139.99:
                done = True

        if self.current_step >= self.max_steps:
            done = True

        info = {'objective': current_z, 'violation': violation, 'x': self.x, 'y': self.y}
        normalized_state = self._get_state() / 100.0
        
        return normalized_state, reward, done, info

In [None]:
# RUN THIS CELL TO TEST IF THE CUSTOM ENV HAS BEEN LOADED
env = LinearOptimizationEnv()
state, _ = env.reset()
n_obs = len(state)
n_act = env.n_actions

In [None]:
#RUN THIS CELL TO CHECK WETHER PYTORCH HAS BEEN LOADED, DELETE OTHERWISE
policy_net = DQN(n_obs, n_act).to(device)
target_net = DQN(n_obs, n_act).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
replay_buffer = ReplayBuffer(REPLAY_BUFFER)
steps_done = 0

In [None]:
def train_dqn(episodes=2000):
    """
    Train DQN for LinearOptimizationEnv with all bugs fixed.
    """
    # Hyperparameters
    GAMMA = 0.99
    TAU = 0.01
    EPSILON_START = 1.0
    EPSILON_END = 0.001
    EPSILON_DECAY = 20000
    BATCH_SIZE = 128
    LR = 5e-4
    REPLAY_BUFFER_CAP = 50000

    # Initialize environment and networks
    env = LinearOptimizationEnv(max_steps=250)
    state_size = env.state_space_size
    action_size = env.action_space_size
    replay_buffer = ReplayBuffer(REPLAY_BUFFER_CAP)

    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    
    policy_net = DQN(state_size, action_size).to(device)
    target_net = DQN(state_size, action_size).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    # Initialize the optimizer with the correct network
    optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)

    # Logging lists
    all_rewards = []
    final_states_info = []
    avg_violations = []
    avg_objectives = []
    rew = []
    max_obj = []
    steps_done = 0

    for episode in range(episodes):
        state, _ = env.reset()
        episode_reward = 0
        episode_violations = []
        episode_objectives = []

        for t in range(env.max_steps):
            steps_done += 1
            epsilon = EPSILON_END + (EPSILON_START - EPSILON_END) * np.exp(-steps_done / EPSILON_DECAY)
            
            if random.random() < epsilon:
                action = random.randrange(action_size)
            else:
                with torch.no_grad():
                    state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
                    action = policy_net(state_tensor).argmax().item()
            
            next_state, reward, done, info = env.step(action)
            replay_buffer.push(state, action, next_state, reward)
            state = next_state
            
            episode_reward += reward
            rew.append(reward)
            episode_violations.append(info['violation'])
            episode_objectives.append(info['objective'])
            
            if len(replay_buffer) >= BATCH_SIZE:
                transitions = replay_buffer.sample(BATCH_SIZE)
                batch = Transition(*zip(*transitions))
                
                states_b = torch.FloatTensor(np.array(batch.state)).to(device)
                actions_b = torch.LongTensor(batch.action).unsqueeze(1).to(device)
                rewards_b = torch.FloatTensor(batch.reward).to(device)
                next_states_b = torch.FloatTensor(np.array(batch.next_state)).to(device)
                
                q_values = policy_net(states_b).gather(1, actions_b).squeeze(1)
                next_q_values = target_net(next_states_b).max(1)[0]
                target_q = rewards_b + (GAMMA * next_q_values)
                
                loss = nn.MSELoss()(q_values, target_q.detach())
                
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
                optimizer.step()
            
            if done:
                break
        
        # Soft update target network
        with torch.no_grad():
            for target_param, policy_param in zip(target_net.parameters(), policy_net.parameters()):
                target_param.data.copy_(TAU * policy_param.data + (1.0 - TAU) * target_param.data)
        
        # Log metrics
        all_rewards.append(episode_reward)
        final_states_info.append(info)
        avg_violations.append(np.mean(episode_violations))
        avg_objectives.append(np.mean(episode_objectives))
        max_obj.append(max(episode_objectives))
        
        if episode % 100 == 0:
            final_x = info['x']
            final_y = info['y']
            print(f"Episode {episode} | Avg Reward: {np.mean(all_rewards[-100:]):.2f} | Epsilon: {epsilon:.3f} | "
                  f"Final State: ({final_x:.2f}, {final_y:.2f}) | "
                  f"Avg Objective: {np.mean(avg_objectives[-100:]):.2f} | "
                  f"Avg Violation: {np.mean(avg_violations[-100:]):.2f}")
    
    return all_rewards, final_states_info, avg_violations, avg_objectives, rew, max_obj

In [None]:
a,b,c,d, rew, max_obj = train_dqn()
avg_violations = c
avg_objectives = d
final_states = b

In [None]:
all_rewards = a
smoothed_rewards = np.convolve(all_rewards, np.ones(100)/100, mode='valid')
plt.plot(smoothed_rewards)
plt.xlabel('Episode')
plt.ylabel('Smoothed Reward (100-episode avg)')
plt.title('DQN Training: Rewards vs. Episodes')
plt.show()
print(a,b,c,d,sep='\n')

In [None]:
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(np.convolve(avg_violations, np.ones(100)/100, mode='valid'))
plt.xlabel('Episode')
plt.ylabel('Avg Violation')
plt.title('Constraint Violations vs. Episodes')
plt.subplot(1, 2, 2)
plt.plot(np.convolve(avg_objectives, np.ones(100)/100, mode='valid'))
plt.xlabel('Episode')
plt.ylabel('Avg Objective (z)')
plt.title('Objective Value vs. Episodes')
plt.tight_layout()
plt.show()

In [None]:

def plot_constraints_and_states(final_states):
    final_states = np.array(final_states)

    plt.figure(figsize=(8, 8))
    x = np.linspace(0, 100, 400)
    y1 = 40 - x
    plt.plot(x, y1, 'r-', label='Constraint 1: x + y = 40')
    y2 = 60 - 2 * x
    plt.plot(x, y2, 'b-', label='Constraint 2: 2x + y = 60')
    
    vertices = [(0, 0), (0, 40), (20, 20), (30, 0)]  # Feasible region boundary
    

    x_fill = np.linspace(0, 100, 400)
    y_fill1 = 40 - x_fill
    y_fill2 = 60 - 2 * x_fill
 
    plt.fill_between(x_fill, 0, np.minimum(y_fill1, y_fill2), 
                     where=(y_fill2 >= 0) & (y_fill1 >= 0), 
                     color='green', alpha=0.1, label='Feasible region')
    

    if len(final_states) > 0:
        plt.scatter(final_states[:, 0], final_states[:, 1], 
                    c='orange', marker='o', s=20, label='Final States')
    

    plt.xlim(0, 70)  
    plt.ylim(0, 70)
    plt.xlabel('x')
    plt.ylabel('y')
    plt.title('Constraints and Final States')
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend()
    plt.show()
print(final_states)
l=[]
for i in final_states:
    l.append([i['x'], i['y']])
print(l)
plot_constraints_and_states(l)