<a href="https://colab.research.google.com/github/hariniiy/RL_1796/blob/main/lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
def value_iteration(states, actions, transition_probs, rewards, gamma=0.9, theta=1e-6):

    V = {s: 0.0 for s in states}

    while True:
        delta = 0
        for s in states:
            v = V[s]
            action_values = []
            for a in actions:
                value = 0
                for s_prime in states:
                    prob = transition_probs[s][a].get(s_prime, 0)
                    reward = rewards[s][a].get(s_prime, 0)
                    value += prob * (reward + gamma * V[s_prime])
                action_values.append(value)
            V[s] = max(action_values)
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break

    # Extract optimal policy
    policy = {}
    for s in states:
        best_action = None
        best_value = float('-inf')
        for a in actions:
            value = 0
            for s_prime in states:
                prob = transition_probs[s][a].get(s_prime, 0)
                reward = rewards[s][a].get(s_prime, 0)
                value += prob * (reward + gamma * V[s_prime])
            if value > best_value:
                best_value = value
                best_action = a
        policy[s] = best_action

    return V, policy



states = ['s1', 's2']
actions = ['a1', 'a2']

# Transition probabilities: P[s][a][s']
transition_probs = {
    's1': {
        'a1': {'s1': 0.5, 's2': 0.5},
        'a2': {'s1': 0.7, 's2': 0.3}
    },
    's2': {
        'a1': {'s1': 0.4, 's2': 0.6},
        'a2': {'s1': 0.6, 's2': 0.4}
    }
}

# Rewards: R[s][a][s']
rewards = {
    's1': {
        'a1': {'s1': 5, 's2': 10},
        'a2': {'s1': -1, 's2': 2}
    },
    's2': {
        'a1': {'s1': 0, 's2': 4},
        'a2': {'s1': 1, 's2': 1}
    }
}



V, policy = value_iteration(states, actions, transition_probs, rewards, gamma=0.9)



print("\nüîç Optimal State Values:")
for s in V:
    print(f"V({s}) = {V[s]:.4f}")

print("\nü§ñ Optimal Policy:")
for s in policy:
    print(f"œÄ({s}) = {policy[s]}")



üîç Optimal State Values:
V(s1) = 49.7802
V(s2) = 44.1758

ü§ñ Optimal Policy:
œÄ(s1) = a1
œÄ(s2) = a1


In [1]:
import random
from collections import defaultdict



class DummyEnv:

    def __init__(self):
        self.nA = 2  # 2 actions
        self.states = [0, 1, 2, 3]
        self.terminal = 3
        self.reset()

    def reset(self):
        self.state = 0
        return self.state

    def step(self, action):
        if self.state == self.terminal:
            return self.state, 0, True, {}
        next_state = self.state + 1
        reward = 1 if next_state == self.terminal else 0
        done = next_state == self.terminal
        self.state = next_state
        return self.state, reward, done, {}



def mc_policy_evaluation(env, policy, episodes=1000, gamma=0.9):
    returns_sum = defaultdict(float)
    returns_count = defaultdict(int)
    V = defaultdict(float)

    for _ in range(episodes):
        episode = []
        state = env.reset()
        done = False

        while not done:
            action = policy(state)
            next_state, reward, done, _ = env.step(action)
            episode.append((state, action, reward))
            state = next_state

        G = 0
        visited_states = set()
        for t in reversed(range(len(episode))):
            state, _, reward = episode[t]
            G = gamma * G + reward
            if state not in visited_states:
                returns_sum[state] += G
                returns_count[state] += 1
                V[state] = returns_sum[state] / returns_count[state]
                visited_states.add(state)

    return V


def mc_control_epsilon_greedy(env, episodes=10000, gamma=0.9, epsilon=0.1):
    Q = defaultdict(lambda: defaultdict(float))
    returns_sum = defaultdict(lambda: defaultdict(float))
    returns_count = defaultdict(lambda: defaultdict(int))

    def get_action(state):
        if random.random() < epsilon:
            return random.choice(range(env.nA))
        else:
            q_values = Q[state]
            if not q_values:
                return random.randint(0, env.nA - 1)
            return max(q_values, key=q_values.get)

    for _ in range(episodes):
        episode = []
        state = env.reset()
        done = False

        while not done:
            action = get_action(state)
            next_state, reward, done, _ = env.step(action)
            episode.append((state, action, reward))
            state = next_state

        G = 0
        visited = set()
        for t in reversed(range(len(episode))):
            state, action, reward = episode[t]
            G = gamma * G + reward
            if (state, action) not in visited:
                returns_sum[state][action] += G
                returns_count[state][action] += 1
                Q[state][action] = returns_sum[state][action] / returns_count[state][action]
                visited.add((state, action))

    # Extract policy
    policy = {}
    for state in Q:
        policy[state] = max(Q[state], key=Q[state].get)

    return Q, policy


if __name__ == "__main__":
    env = DummyEnv()

    # Define a random policy for evaluation
    def random_policy(state):
        return random.choice([0, 1])

    # Evaluate random policy
    V = mc_policy_evaluation(env, random_policy, episodes=5000)
    print("\nüîç Monte Carlo Policy Evaluation (Random Policy):")
    for state in sorted(V):
        print(f"V({state}) = {V[state]:.4f}")

    # Find optimal policy using control
    Q, learned_policy = mc_control_epsilon_greedy(env, episodes=10000, epsilon=0.1)
    print("\nü§ñ Monte Carlo Control (Œµ-greedy):")
    print("Learned Policy:")
    for state in sorted(learned_policy):
        print(f"œÄ({state}) = {learned_policy[state]}")

    print("\nState-Action Values:")
    for state in sorted(Q):
        for action in Q[state]:
            print(f"Q({state}, {action}) = {Q[state][action]:.4f}")



üîç Monte Carlo Policy Evaluation (Random Policy):
V(0) = 0.8100
V(1) = 0.9000
V(2) = 1.0000

ü§ñ Monte Carlo Control (Œµ-greedy):
Learned Policy:
œÄ(0) = 1
œÄ(1) = 0
œÄ(2) = 0

State-Action Values:
Q(0, 0) = 0.8100
Q(0, 1) = 0.8100
Q(1, 1) = 0.9000
Q(1, 0) = 0.9000
Q(2, 0) = 1.0000
Q(2, 1) = 1.0000


In [6]:
import random
from collections import defaultdict

# =====================================
# Simple GridWorld Environment
# =====================================

class GridWorld:
    def __init__(self, width=4, height=4, terminal_states=[(0, 0), (3, 3)]):
        self.width = width
        self.height = height
        self.terminal_states = terminal_states
        self.actions = ['up', 'down', 'left', 'right']
        self.reset()

    def reset(self):
        self.state = (3, 0)  # Start at bottom-left
        return self.state

    def step(self, action):
        if self.state in self.terminal_states:
            return self.state, 0, True

        x, y = self.state
        if action == 'up': x = max(x - 1, 0)
        elif action == 'down': x = min(x + 1, self.height - 1)
        elif action == 'left': y = max(y - 1, 0)
        elif action == 'right': y = min(y + 1, self.width - 1)

        next_state = (x, y)
        reward = 0 if next_state not in self.terminal_states else 1
        done = next_state in self.terminal_states
        self.state = next_state
        return next_state, reward, done

    def get_all_states(self):
        return [(i, j) for i in range(self.height) for j in range(self.width)]

# =====================================
# TD(0) Policy Evaluation
# =====================================

def td_0(env, policy, episodes=1000, alpha=0.1, gamma=0.9):
    V = defaultdict(float)

    for _ in range(episodes):
        state = env.reset()
        done = False

        while not done:
            action = policy(state)
            next_state, reward, done = env.step(action)
            V[state] += alpha * (reward + gamma * V[next_state] - V[state])
            state = next_state

    return V

# =====================================
# SARSA Algorithm (Œµ-greedy)
# =====================================

def sarsa(env, episodes=1000, alpha=0.1, gamma=0.9, epsilon=0.1):
    Q = defaultdict(lambda: defaultdict(float))

    def choose_action(state):
        if random.random() < epsilon:
            return random.choice(env.actions)
        else:
            q_vals = Q[state]
            return max(q_vals, key=q_vals.get, default=random.choice(env.actions))

    for _ in range(episodes):
        state = env.reset()
        action = choose_action(state)
        done = False

        while not done:
            next_state, reward, done = env.step(action)
            next_action = choose_action(next_state)

            td_target = reward + gamma * Q[next_state][next_action]
            Q[state][action] += alpha * (td_target - Q[state][action])

            state = next_state
            action = next_action

    # Derive policy
    policy = {}
    for state in env.get_all_states():
        if state in Q:
            policy[state] = max(Q[state], key=Q[state].get)
        else:
            policy[state] = random.choice(env.actions)

    return Q, policy

# =====================================
# Run & Display Results
# =====================================

if __name__ == "__main__":
    env = GridWorld()

    # Define a random policy for TD(0)
    def random_policy(state):
        return random.choice(env.actions)

    # Run TD(0) to evaluate the random policy
    V = td_0(env, random_policy, episodes=5000)
    print("\nüîç TD(0) Value Function (Random Policy):")
    for i in range(env.height):
        row = [f"{V[(i, j)]:.2f}" for j in range(env.width)]
        print(" ".join(row))

    # Run SARSA to learn optimal policy
    Q, learned_policy = sarsa(env, episodes=10000)

    print("\nü§ñ SARSA-Learned Policy:")
    for i in range(env.height):
        row = [learned_policy[(i, j)][0].upper() if (i, j) in learned_policy else '.' for j in range(env.width)]
        print(" ".join(row))



üîç TD(0) Value Function (Random Policy):
0.00 0.51 0.32 0.26
0.55 0.35 0.29 0.29
0.41 0.31 0.35 0.38
0.30 0.31 0.47 0.00

ü§ñ SARSA-Learned Policy:
D R D R
U D L L
U R D U
R R R D


In [5]:
import gymnasium as gym
import numpy as np
import random

def q_learning(env_name, episodes=10000, alpha=0.1, gamma=0.99, epsilon=1.0, epsilon_decay=0.999, min_epsilon=0.01):
    """
    Q-Learning algorithm for discrete action environments.

    Parameters:
        env_name: Gym environment name string
        episodes: number of episodes to train
        alpha: learning rate
        gamma: discount factor
        epsilon: exploration rate (epsilon-greedy)
        epsilon_decay: decay rate of epsilon after each episode
        min_epsilon: minimum epsilon

    Returns:
        Q-table and environment instance
    """
    env = gym.make(env_name)
    state_space_size = env.observation_space.n
    action_space_size = env.action_space.n

    Q = np.zeros((state_space_size, action_space_size))

    for episode in range(episodes):
        state, _ = env.reset()
        done = False

        while not done:
            # Epsilon-greedy action selection
            if random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(Q[state])

            next_state, reward, done, truncated, info = env.step(action)

            best_next_action = np.argmax(Q[next_state])
            td_target = reward + gamma * Q[next_state][best_next_action]
            td_error = td_target - Q[state][action]
            Q[state][action] += alpha * td_error

            state = next_state

        epsilon = max(min_epsilon, epsilon * epsilon_decay)

        if (episode + 1) % (episodes // 10) == 0:
            print(f"Episode {episode + 1}/{episodes} completed. Epsilon: {epsilon:.4f}")

    return Q, env

if __name__ == "__main__":
    ENV_NAME = "FrozenLake-v1"  # Change to "Taxi-v3" if you want

    Q, env = q_learning(ENV_NAME, episodes=10000)

    policy = np.argmax(Q, axis=1)

    print("\nLearned Q-table:")
    print(Q)

    print("\nDerived policy (best action per state):")
    print(policy)

    # Test the learned policy for one episode
    state, _ = env.reset()
    done = False
    total_reward = 0
    print("\nRunning one episode with the learned policy:")
    while not done:
        action = policy[state]
        print(f"State: {state}, Action: {action}")
        next_state, reward, done, truncated, info = env.step(action)
        total_reward += reward
        state = next_state

    print(f"Episode finished with total reward: {total_reward}")







Episode 1000/10000 completed. Epsilon: 0.3677
Episode 2000/10000 completed. Epsilon: 0.1352
Episode 3000/10000 completed. Epsilon: 0.0497
Episode 4000/10000 completed. Epsilon: 0.0183
Episode 5000/10000 completed. Epsilon: 0.0100
Episode 6000/10000 completed. Epsilon: 0.0100
Episode 7000/10000 completed. Epsilon: 0.0100
Episode 8000/10000 completed. Epsilon: 0.0100
Episode 9000/10000 completed. Epsilon: 0.0100
Episode 10000/10000 completed. Epsilon: 0.0100

Learned Q-table:
[[0.53309112 0.48297452 0.48348996 0.47015764]
 [0.19383052 0.30900923 0.33957066 0.44431172]
 [0.37210476 0.24961985 0.24086319 0.24536764]
 [0.16504562 0.05263167 0.02733097 0.03461584]
 [0.54770963 0.38957774 0.3444187  0.41251059]
 [0.         0.         0.         0.        ]
 [0.29881392 0.08295876 0.15724911 0.09740837]
 [0.         0.         0.         0.        ]
 [0.36174316 0.37638384 0.35604983 0.57343738]
 [0.40347927 0.61974423 0.36467705 0.51059006]
 [0.61095118 0.36089928 0.29440335 0.28517969]
 [0.

In [7]:
import gymnasium as gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import cv2
import time

# =======================
# Hyperparameters
# =======================
ENV_NAME = "PongNoFrameskip-v4"
EPISODES = 1000
LEARNING_RATE = 1e-4
GAMMA = 0.99
BATCH_SIZE = 32
REPLAY_BUFFER_SIZE = 100_000
MIN_REPLAY_SIZE = 10_000
EPSILON_START = 1.0
EPSILON_END = 0.1
EPSILON_DECAY = 1_000_000
TARGET_UPDATE_FREQ = 10_000
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# =======================
# Preprocessing Wrappers
# =======================

class PreprocessFrame(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(84, 84, 1), dtype=np.uint8
        )

    def observation(self, obs):
        frame = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        frame = cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA)
        return frame[:, :, None]


class FrameStack(gym.Wrapper):
    def __init__(self, env, k):
        super().__init__(env)
        self.k = k
        self.frames = deque([], maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(shp[0], shp[1], k), dtype=np.uint8
        )

    def reset(self, **kwargs):
        obs, _ = self.env.reset(**kwargs)
        for _ in range(self.k):
            self.frames.append(obs)
        return np.concatenate(self.frames, axis=-1), {}

    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)
        self.frames.append(obs)
        return np.concatenate(self.frames, axis=-1), reward, done, truncated, info

def make_env(env_name):
    env = gym.make(env_name, render_mode=None)
    env = gym.wrappers.AtariPreprocessing(env, grayscale_obs=True, scale_obs=False, frame_skip=4)
    env = gym.wrappers.FrameStack(env, 4)
    return env

# =======================
# Q-Network
# =======================

class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )

    def forward(self, x):
        return self.net(x / 255.0)

# =======================
# Experience Replay Buffer
# =======================

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, transition):
        self.buffer.append(transition)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, dones, next_states = zip(*batch)
        return (
            np.stack(states),
            np.array(actions),
            np.array(rewards),
            np.array(dones),
            np.stack(next_states)
        )

    def __len__(self):
        return len(self.buffer)

# =======================
# Training
# =======================

def train():
    env = make_env(ENV_NAME)
    obs_shape = env.observation_space.shape
    n_actions = env.action_space.n

    online_net = DQN((obs_shape[2], obs_shape[0], obs_shape[1]), n_actions).to(DEVICE)
    target_net = DQN((obs_shape[2], obs_shape[0], obs_shape[1]), n_actions).to(DEVICE)
    target_net.load_state_dict(online_net.state_dict())

    optimizer = optim.Adam(online_net.parameters(), lr=LEARNING_RATE)
    replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

    state, _ = env.reset()
    episode_reward = 0
    epsilon = EPSILON_START
    total_steps = 0

    for episode in range(EPISODES):
        state, _ = env.reset()
        state = np.transpose(state, (2, 0, 1))  # CHW
        done = False
        truncated = False
        episode_reward = 0

        while not done and not truncated:
            total_steps += 1
            epsilon = max(EPSILON_END, EPSILON_START - total_steps / EPSILON_DECAY)

            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    state_tensor = torch.tensor([state], dtype=torch.float32, device=DEVICE)
                    q_values = online_net(state_tensor)
                    action = q_values.argmax().item()

            next_state, reward, done, truncated, _ = env.step(action)
            next_state_transposed = np.transpose(next_state, (2, 0, 1))  # CHW

            replay_buffer.push((state, action, reward, done, next_state_transposed))
            state = next_state_transposed
            episode_reward += reward

            # Train
            if len(replay_buffer) >= MIN_REPLAY_SIZE:
                states, actions, rewards, dones, next_states = replay_buffer.sample(BATCH_SIZE)

                states_t = torch.tensor(states, dtype=torch.float32, device=DEVICE)
                actions_t = torch.tensor(actions, dtype=torch.int64, device=DEVICE)
                rewards_t = torch.tensor(rewards, dtype=torch.float32, device=DEVICE)
                dones_t = torch.tensor(dones, dtype=torch.float32, device=DEVICE)
                next_states_t = torch.tensor(next_states, dtype=torch.float32, device=DEVICE)

                q_values = online_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)
                with torch.no_grad():
                    next_q_values = target_net(next_states_t).max(1)[0]
                    target_q = rewards_t + GAMMA * next_q_values * (1 - dones_t)

                loss = nn.functional.mse_loss(q_values, target_q)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                # Update target network
                if total_steps % TARGET_UPDATE_FREQ == 0:
                    target_net.load_state_dict(online_net.state_dict())

        print(f"Episode {episode + 1}, Reward: {episode_reward}, Epsilon: {epsilon:.3f}")

    env.close()

# =======================
# Run Training
# =======================

if __name__ == "__main__":
    start_time = time.time()
    train()
    print(f"\nTraining completed in {time.time() - start_time:.2f} seconds")


NameNotFound: Environment `PongNoFrameskip` doesn't exist.