In [1]:
%matplotlib inline
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

# Set up display

In [2]:
is_ipython = 'inline' in matplotlib.get_backend() # check if the backend is ipython
if is_ipython:
    from IPython import display

In [3]:
#deep Q network
class DQN(nn.Module):
    def __init__(self,img_height,img_width):
        super().__init__() #call the constructor of the parent class
        
        self.fc1 = nn.Linear(in_features=img_height*img_width*3,out_features=24)
        self.fc2 = nn.Linear(in_features=24,out_features=32)
        self.out = nn.Linear(in_features=32,out_features=2)
    
    def forward(self,t): #forward pass of the neural network 
        t  = t.flatten(start_dim=1) 
        t = F.relu(self.fc1(t)) #pass the tensor through the first fully connected layer
        t = F.relu(self.fc2(t)) #pass the tensor through the second fully connected layer
        t = self.out(t)
        return t

In [4]:
#experience class
Experience = namedtuple('Experience',('state','action','next_state','reward'))

In [5]:
e = Experience(2,3,1,4)

In [6]:
e

Experience(state=2, action=3, next_state=1, reward=4)

# Replay Memory

In [7]:
class ReplayMemory():
    def __init__(self,capacity):#capacity is the maximum number of experiences that can be stored
        self.capacity = capacity #how much experience can be stored
        self.memory = []  #store the experience
        self.push_count = 0 #keep track of past experiences
        
    def push(self,experience): #
        if len(self.memory)< self.capacity: #if memory is not full
            self.memory.append(experience)
        else: # if memory is full ,it will overwrite the past experience
            self.memory[self.push_count % self.capacity] = experience
        self.push_count += 1 #increment the push count by 1
        
    def sample(self, batch_size): #randomly sample the experience
        return random.sample(self.memory,batch_size)
    def can_provide_sample(self, batch_size): #training is already started before the memeory is full
        return len(self.memory) >= batch_size

# Epsilon Greedy Strategy

In [8]:
class EpsilonGreedyStreategy():
    def __init__(self, start , end, decay):
        self.start = start
        self.end = end
        self.decay = decay
        
    def get_exploration_rate(self,current_step):
        return self.end + (self.start - self.end) *\
            math.exp(-1. * current_step * self.decay) #formula for epsilon greedy strategy

# Reinforcement Learning Agent

In [9]:
class Agent():
    def __init__(self,strategy,num_actions,device):
        self.current_step = 0
        self.strategy = strategy
        self.num_actions = num_actions
        self.device = device #device is the hardware on which the model is running
    
    def select_action(self,state,policy_net):
        rate = strategy.get_exploration_rate(self.current_step) 
        self.current_step += 1
        
        if rate > random.random():  #explore
            action = random.randrange(self.num_actions)
            return torch.tensor([action]).to(device) 
        else:  #exploit
            with torch.no_grad(): # to turn off gradient tracking
                return policy_net(state).argmax(dim=1).to(device) #return the action with the highest Q value

In [None]:
import gym
import numpy as np
import torch
import torchvision.transforms as T

class CartPoleEnvManager:
    def __init__(self, device):
        self.device = device
        self.env = gym.make('CartPole-v1', render_mode="rgb_array")  # ✅ Correct render mode
        self.current_screen = None
        self.done = False
        self.reset()  # ✅ Ensure reset initializes the environment correctly

    def reset(self):
        observation, _ = self.env.reset()  # ✅ Use [0] to get state in newer Gym versions
        self.current_screen = None
        return observation

    def close(self):
        self.env.close()

    def render(self):
        return self.env.render()

    def num_actions_available(self):
        return self.env.action_space.n

    def take_action(self, action):
        _, reward, self.done, _, _ = self.env.step(action.item())
        return torch.tensor([reward], device=self.device)

    def just_starting(self):
        return self.current_screen is None

    def get_state(self):
        if self.just_starting() or self.done:
            self.current_screen = self.get_processed_screen()
            black_screen = torch.zeros_like(self.current_screen)
            return black_screen
        else:
            s1 = self.current_screen
            s2 = self.get_processed_screen()
            self.current_screen = s2
            return s2 - s1

    def get_screen_height(self):
        screen = self.get_processed_screen()
        return screen.shape[2]  # ✅ Height is at index 2

    def get_screen_width(self):
        screen = self.get_processed_screen()
        return screen.shape[3]  # ✅ Width is at index 3

    def get_processed_screen(self):
        try:
            screen = self.env.render()
            if screen is None:
                print("⚠️ `render()` returned None. Trying `render_frame()`...")
                screen = self.env.render_frame()  # ✅ Use this if `render()` fails
        except AttributeError:
            print("🚨 `render_frame()` not available. Check Gym version.")

        if screen is None:
            raise ValueError("🚨 Still getting None for screen rendering. Check Gym settings.")

        screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
        screen = torch.from_numpy(screen).permute(2, 0, 1)
        
        resize = T.Compose([
            T.ToPILImage(),
            T.Resize((40, 90)),
            T.ToTensor()
        ])
        
        return resize(screen).unsqueeze(0).to(self.device)




# === Hyperparameters ===
batch_size = 256
gamma = 0.999
eps_start = 1
eps_end = 0.01
eps_decay = 0.001
target_update = 10
memory_size = 100000
lt = 0.001
num_episodes = 1000

# === Initialize Environment & Components ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
em = CartPoleEnvManager(device)

# ✅ Fix typo: Correct `EpsilonGreedyStrategy`
strategy = EpsilonGreedyStreategy(eps_start, eps_end, eps_decay)

# ✅ Ensure `Agent` class is initialized correctly
agent = Agent(strategy, em.num_actions_available(), device)
memory = ReplayMemory(memory_size)

# ✅ Ensure `DQN` is initialized with correct input sizes
policy_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)
target_net = DQN(em.get_screen_height(), em.get_screen_width()).to(device)

# ✅ Sync weights & set evaluation mode
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

# ✅ Fix learning rate parameter name (`lr=lt`)
optimizer = torch.optim.Adam(params=policy_net.parameters(), lr=lt)


In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import deque
import random

# Define the Deep Q-Network (DQN)
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 24)
        self.fc2 = nn.Linear(24, 32)
        self.out = nn.Linear(32, action_dim)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.out(x)

# Set up the CartPole environment
env = gym.make('CartPole-v1', render_mode='human')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

dqn = DQN(state_dim, action_dim)
optimizer = optim.Adam(dqn.parameters(), lr=0.001)
memory = deque(maxlen=10000)

def select_action(state, epsilon):
    if random.random() < epsilon:
        return torch.tensor([[env.action_space.sample()]], dtype=torch.long)
    else:
        with torch.no_grad():
            return dqn(state).argmax(dim=1).view(1, 1)

# Run the environment with visualization
state, _ = env.reset()  # Updated unpacking
state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

done = False
while not done:
    env.render()
    action = select_action(state, epsilon=0.1)
    next_state, reward, done, _, _ = env.step(action.item())  # Updated unpacking
    next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
    state = next_state

env.close()


  deprecation(
  deprecation(


ValueError: too many values to unpack (expected 2)

: 