In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.animation import FuncAnimation, FFMpegWriter
import random
from collections import defaultdict

# ---------------------- Environment Setup ---------------------- #
GRID_SIZE = 8
ACTIONS = ['U', 'D', 'L', 'R']
ACTION_MAP = {'U': (-1, 0), 'D': (1, 0), 'L': (0, -1), 'R': (0, 1)}
GOAL = (7, 7)
OBSTACLES = {(3, 3), (4, 4), (2, 5), (6, 2)}
REWARD_GOAL = 100
REWARD_STEP = -1
REWARD_OBSTACLE = -10
EPISODES = 300
MAX_STEPS = 50
EPSILON = 0.1
ALPHA = 0.1
GAMMA = 0.95
PLANNING_STEPS = 10

Q = defaultdict(lambda: {a: 0 for a in ACTIONS})
model = {}

def is_valid(pos):
    return 0 <= pos[0] < GRID_SIZE and 0 <= pos[1] < GRID_SIZE and pos not in OBSTACLES

def step(state, action):
    dx, dy = ACTION_MAP[action]
    next_state = (state[0] + dx, state[1] + dy)
    if not is_valid(next_state):
        return state, REWARD_OBSTACLE
    if next_state == GOAL:
        return next_state, REWARD_GOAL
    return next_state, REWARD_STEP

def select_action(state):
    if np.random.rand() < EPSILON:
        return random.choice(ACTIONS)
    return max(Q[state], key=Q[state].get)

# ---------------------- Dyna-Q Training ---------------------- #
for ep in range(EPISODES):
    state = (0, 0)
    for _ in range(MAX_STEPS):
        action = select_action(state)
        next_state, reward = step(state, action)

        # Q-learning update
        best_next = max(Q[next_state], key=Q[next_state].get)
        Q[state][action] += ALPHA * (reward + GAMMA * Q[next_state][best_next] - Q[state][action])

        # Model learning
        model[(state, action)] = (next_state, reward)

        # Planning updates
        for _ in range(PLANNING_STEPS):
            s, a = random.choice(list(model.keys()))
            s_, r = model[(s, a)]
            best_s_ = max(Q[s_], key=Q[s_].get)
            Q[s][a] += ALPHA * (r + GAMMA * Q[s_][best_s_] - Q[s][a])

        if next_state == GOAL:
            break
        state = next_state

# ---------------------- Extract Path ---------------------- #
path = [(0, 0)]
state = (0, 0)
for _ in range(30):
    action = select_action(state)
    state, _ = step(state, action)
    path.append(state)
    if state == GOAL:
        break

# ---------------------- Visualization Setup ---------------------- #
fig, ax = plt.subplots(figsize=(6, 6))
drone_patch = patches.Circle((0.5, 0.5), 0.3, color='blue')

def init():
    ax.clear()
    ax.set_xlim(0, GRID_SIZE)
    ax.set_ylim(0, GRID_SIZE)
    ax.set_xticks(np.arange(GRID_SIZE + 1))
    ax.set_yticks(np.arange(GRID_SIZE + 1))
    ax.grid(True)

    for i in range(GRID_SIZE):
        for j in range(GRID_SIZE):
            cell = (i, j)
            color = 'white'
            if cell in OBSTACLES:
                color = 'black'
            elif cell == GOAL:
                color = 'gold'
            rect = patches.Rectangle((j, GRID_SIZE - i - 1), 1, 1, facecolor=color)
            ax.add_patch(rect)
    return []

def update(frame):
    init()
    rx, ry = path[frame]
    drone_patch.center = (ry + 0.5, GRID_SIZE - rx - 0.5)
    ax.add_patch(drone_patch)
    return [drone_patch]

anim = FuncAnimation(fig, update, frames=len(path), init_func=init, blit=True)

# ---------------------- Save to Video ---------------------- #
save_path = "lab7_dyna_q_drone_navigation.mp4"
writer = FFMpegWriter(fps=2, metadata=dict(artist='RL Lab 7'), bitrate=1800)
anim.save(save_path, writer=writer)
print(f"✅ Drone navigation video saved as {save_path}")

Lab 7 Task

In [None]:
##7 code

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.animation import FuncAnimation, FFMpegWriter
import random
import torch
import torch.nn as nn
import torch.optim as optim

# ---------------------- Parameters ---------------------- #
GRID_SIZE = 10
ACTIONS = ['U', 'D', 'L', 'R']
ACTION_IDX = {a: i for i, a in enumerate(ACTIONS)}
ACTION_MAP = {'U': (-1, 0), 'D': (1, 0), 'L': (0, -1), 'R': (0, 1)}

PICKUP_ZONE = (0, 4)
DELIVERY_ZONES = [(3, 9), (8, 4), (9, 9)]

OBSTACLES = {(3, 3), (4, 4), (2, 5), (6, 2), (5, 5), (6, 6)}
REWARD_GOAL = 100
REWARD_STEP = -1
REWARD_OBSTACLE = -10
EPISODES = 400
MAX_STEPS = 70
EPSILON = 0.1
ALPHA = 0.01
GAMMA = 0.95
PLANNING_STEPS = 10

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---------------------- Neural Network for Q-value Approximation ---------------------- #
class QNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(3 + len(ACTIONS), 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, state, action):
        action_onehot = torch.zeros((len(action), len(ACTIONS)), device=device)
        action_onehot[range(len(action)), action] = 1
        x = torch.cat([state, action_onehot], dim=1)
        return self.fc(x)

q_network = QNetwork().to(device)
optimizer = optim.Adam(q_network.parameters(), lr=ALPHA)
loss_fn = nn.MSELoss()

model = {}  # Dyna-Q model (s,a) -> (s',r)

# ---------------------- Helper Functions ---------------------- #
def is_valid(pos):
    return 0 <= pos[0] < GRID_SIZE and 0 <= pos[1] < GRID_SIZE and pos not in OBSTACLES

def step(state, action):
    x, y, has_payload = state
    dx, dy = ACTION_MAP[action]
    nx, ny = x + dx, y + dy
    if not is_valid((nx, ny)):
        return (x, y, has_payload), REWARD_OBSTACLE

    if not has_payload and (nx, ny) == PICKUP_ZONE:
        return (nx, ny, True), REWARD_STEP
    elif has_payload and (nx, ny) in DELIVERY_ZONES:
        return (nx, ny, True), REWARD_GOAL
    else:
        return (nx, ny, has_payload), REWARD_STEP

def encode_state(state):
    x, y, has_payload = state
    return torch.tensor([[x / GRID_SIZE, y / GRID_SIZE, int(has_payload)]], dtype=torch.float32, device=device)

def select_action(state):
    if np.random.rand() < EPSILON:
        return random.choice(ACTIONS)
    with torch.no_grad():
        s = encode_state(state).repeat(len(ACTIONS), 1)
        a = torch.tensor([ACTION_IDX[a] for a in ACTIONS], device=device).view(-1)
        q_vals = q_network(s, a)
        best_idx = torch.argmax(q_vals).item()
        return ACTIONS[best_idx]

def train_step(state, action, reward, next_state):
    state_enc = encode_state(state)
    next_state_enc = encode_state(next_state)

    action_idx = torch.tensor([ACTION_IDX[action]], device=device)
    q_val = q_network(state_enc, action_idx)

    with torch.no_grad():
        next_qs = q_network(next_state_enc.repeat(len(ACTIONS), 1),
                            torch.tensor(list(ACTION_IDX.values()), device=device))
        target = reward + GAMMA * torch.max(next_qs).item()

    loss = loss_fn(q_val, torch.tensor([[target]], dtype=torch.float32, device=device))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# ---------------------- Training Loop ---------------------- #
for ep in range(EPISODES):
    state = (0, 0, False)
    for _ in range(MAX_STEPS):
        action = select_action(state)
        next_state, reward = step(state, action)

        train_step(state, action, reward, next_state)

        model[(state, action)] = (next_state, reward)

        # Dyna-Q Planning
        for _ in range(PLANNING_STEPS):
            if not model:
                continue
            s, a = random.choice(list(model.keys()))
            s_, r = model[(s, a)]
            train_step(s, a, r, s_)

        # Check for successful delivery
        if next_state[2] and (next_state[0], next_state[1]) in DELIVERY_ZONES:
            break
        state = next_state

# ---------------------- Extract Path ---------------------- #
path = [(0, 0, False)]
state = (0, 0, False)
for _ in range(50):
    action = select_action(state)
    next_state, _ = step(state, action)
    path.append(next_state)
    if next_state[2] and (next_state[0], next_state[1]) in DELIVERY_ZONES:
        break
    state = next_state

# ---------------------- Visualization ---------------------- #
fig, ax = plt.subplots(figsize=(6, 6))
drone_patch = patches.Circle((0.5, 0.5), 0.3, color='blue')

def init():
    ax.clear()
    ax.set_xlim(0, GRID_SIZE)
    ax.set_ylim(0, GRID_SIZE)
    ax.set_xticks(np.arange(GRID_SIZE + 1))
    ax.set_yticks(np.arange(GRID_SIZE + 1))
    ax.grid(True)
    for i in range(GRID_SIZE):
        for j in range(GRID_SIZE):
            cell = (i, j)
            color = 'white'
            if cell in OBSTACLES:
                color = 'black'
            elif cell == PICKUP_ZONE:
                color = 'green'
            elif cell in DELIVERY_ZONES:
                color = 'gold'
            rect = patches.Rectangle((j, GRID_SIZE - i - 1), 1, 1, facecolor=color)
            ax.add_patch(rect)
    return []

def update(frame):
    init()
    rx, ry, _ = path[frame]
    drone_patch.center = (ry + 0.5, GRID_SIZE - rx - 0.5)
    ax.add_patch(drone_patch)
    return [drone_patch]

anim = FuncAnimation(fig, update, frames=len(path), init_func=init, blit=True)

# ---------------------- Save to Video ---------------------- #
save_path = "lab7_dyna_q_drone_navigation_payload.mp4"
writer = FFMpegWriter(fps=2, metadata=dict(artist='RL Lab 7'), bitrate=1800)
anim.save(save_path, writer=writer)
print(f"✅ Drone navigation video saved as {save_path}")
