<a href="https://colab.research.google.com/github/Loki-33/RL-Algos/blob/main/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from collections import defaultdict
import random
import torch

In [None]:
states = [0,1,2,3,4,5,6]
actions = [-1, 1]
goal_state = 6
start_state = 3

In [None]:
epsilon = 0.1
gamma = 1.0
alpha = 0.1
episodes = 1000
max_steps = 20
buffer_size = 1000
batch_size = 4
target_update_freq = 20

In [None]:
Q = defaultdict(lambda: {a: 0.0 for a in actions})
Q_target = defaultdict(lambda: {a: 0.0 for a in actions})
buffer = []


In [None]:
for ep in range(episodes):
  state = start_state
  for step in range(max_steps):
    if random.random() < epsilon:
      action = random.choice(actions)
    else:
      action = max(actions, key=lambda a: Q[state][a])
    next_state = (state + action)
    next_state = max(0, min(next_state, max(states)))
    reward = 1 if next_state == 6 else 0
    done = (next_state == goal_state)
    buffer.append((state, action, reward, next_state))

    if len(buffer) > buffer_size:
      buffer.pop(0)

    if len(buffer) >= batch_size:
      batch = random.sample(buffer, batch_size)
      for s, a, r, s2 in batch:
        max_q_next = max(Q_target[s2].values()) if s2 in Q_target else 0
        target = r+gamma*max_q_next

        #Q-learning update
        Q[s][a] += alpha*(target-Q[s][a])

    if ep % target_update_freq == 0:
      for s in Q:
        for a in Q[s]:
          Q_target[s][a] = Q[s][a]

    state =next_state
    if done:
      break

  if ep % 100 == 0:
        print(f"Episode {ep}: reached state {state}")


In [None]:
print("\nLearned Policy:")
for s in states:
    if s == goal_state:
        print(f"State {s}: Goal 🎯")
    else:
        best_a = max(Q[s], key=Q[s].get)
        print(f"State {s}: Best Action {best_a}, Q-values: {Q[s]}")


In [None]:
#######the above method is not using gradient descnet, lets do it with it now

In [None]:
from collections import deque
import torch.optim as optim
import torch.nn as nn

In [None]:
class grid:
  def __init__(self,size):
    self.size = size
    self.start = size//2
    self.goal = size-1

  def reset(self):
    self.state = self.start
    return self._get_state()

  def step(self, action):
    move = -1 if action == 0 else 1
    self.state = min(self.size -1, max(0, self.state+move))
    reward = 1 if self.state == self.goal else 0
    done = (self.state == self.goal)
    return self._get_state(), reward, done

  def _get_state(self):
    one_hot = np.zeros(self.size)
    one_hot[self.state] = 1.0
    return one_hot

In [None]:
class QNet(nn.Module):
  def __init__(self, state_dim, action_dim):
    super().__init__()
    self.net = nn.Sequential(
        nn.Linear(state_dim, 32),
        nn.ReLU(),
        nn.Linear(32, action_dim)
    )

  def forward(self, x):
    return self.net(x)

In [None]:
state_dim = 7
action_dim = 2
episodes = 500
gamma = 0.99
epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.05
lr = 1e-3
batch_size = 32
buffer_size = 1000
target_update_freq = 20

In [None]:
env = grid(state_dim)
q_net = QNet(state_dim, action_dim)
target_net = QNet(state_dim, action_dim)
target_net.load_state_dict(q_net.state_dict())
optimizer = optim.Adam(q_net.parameters(), lr=lr)
buffer = deque(maxlen=buffer_size)
loss_fn = nn.MSELoss()


In [None]:
import sys

In [None]:
for ep in range(episodes):
  state = env.reset()
  episode_reward = 0

  for step in range(20):
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    if random.random() < epsilon:
      action = random.choice([0,1])
    else:
      with torch.no_grad():
        q_values = q_net(state_tensor)
        action = torch.argmax(q_values).item()

    next_state, reward, done = env.step(action)
    buffer.append((state, action, reward, next_state, done))
    state = next_state
    episode_reward += reward

    if len(buffer) >= batch_size:
      batch = random.sample(buffer, batch_size)
      s, a, r, s2, d = zip(*batch)

      sys.exit(1)
      s = torch.tensor(s, dtype=torch.float32)
      a = torch.tensor(a, dtype=torch.int64).unsqueeze(1)
      r = torch.tensor(r, dtype=torch.float32).unsqueeze(1)
      s2 = torch.tensor(s2, dtype=torch.float32)
      d = torch.tensor(d, dtype=torch.float32).unsqueeze(1)


      q_values = q_net(s).gather(1, a)
      with torch.no_grad():
        max_next_q = target_net(s2).max(1)[0].unsqueeze(0)
        target_q = r + gamma * max_next_q * (1 - d)

      loss = loss_fn(q_values, target_q)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    if done:
      break
  if ep%target_update_freq == 0:
    target_net.load_state_dict(q_net.state_dict())

  epsilon = max(epsilon_min, epsilon * epsilon_decay)

  if ep%50 == 0:
    print(f"Episode {ep}, Reward: {episode_reward}, Epsilon: {epsilon:.3f}")
