# CSCN8020 - Reinforcement Learning Assignment 1

**Name:** Kapil Bhardwaj

**Student ID:** 9064347  
**Assignment:** Problem 1 - Defining Pick-and-Place Robot as MDP


## Problem Statement

We're asked to model a robot arm doing a repetitive pick-and-place task using Reinforcement Learning. The goal is to learn fast and smooth movement using an MDP.

We'll define:
- **States**: Representing robot arm positions/velocities
- **Actions**: Motor controls (move left/right/up/down, open/close gripper)
- **Rewards**: Positive when object is picked/placed properly, negative for collisions or delay


## Problem 1 - Pick and Place Robot 

In this question, we are asked to model a pick-and-place robot task as a Markov Decision Process (MDP).



### MDP Components:

#### 1. **States (S)**:
The states represent the current position and condition of the robotic arm. Example states could be:
- Arm at rest
- Arm moving to object
- Arm holding object
- Arm moving to drop location
- Arm placing object
- Arm in error state

These states can also include physical data like joint angles or gripper status if we go low-level.

#### 2. **Actions (A)**:
Actions are the possible motor commands or high-level tasks the robot can perform. Some examples are:
- Move up/down/left/right
- Open or close gripper
- Move to pick position
- Move to place position

#### 3. **Reward Function (R)**:
We assign rewards based on the success or failure of actions. The reward structure could be:
- +10 for successful pick and place
- -10 for collisions or entering error state
- -1 for each time step to encourage speed

#### 4. **Transition Probabilities (P)**:
Since the system is deterministic (robot follows commands exactly), the next state depends fully on the current state and chosen action. But in a real setting, there might be a small chance of error, so transitions can include probabilities.



### Summary:

By modeling the robot arm task as an MDP, we can apply reinforcement learning to train the robot to perform smooth and efficient pick-and-place motions. The agent (robot) will learn which actions to take in which states to maximize rewards, like completing the task quickly and without errors.


In [1]:
import numpy as np

class PickPlaceEnv:
    def __init__(self, n_joints=3, dt=0.02, max_steps=500):
        self.n = n_joints
        self.dt = dt
        self.max_steps = max_steps
        self.reset()

    def reset(self):
        # joint positions and velocities
        self.thetas = np.zeros(self.n)
        self.dthetas = np.zeros(self.n)
        self.prev_action = np.zeros(self.n)
        # target (example) and object state
        self.target = np.array([0.5, 0.0, 0.2])
        self.step_count = 0
        return self._get_state()

    def _get_state(self):
        # minimal state: joint angles, velocities, and previous action
        return np.concatenate([self.thetas, self.dthetas, self.prev_action])

    def step(self, action):
        # action: torque vector size n
        # simple dynamics (placeholder): acceleration = action - damping * vel
        damping = 0.1
        acc = action - damping * self.dthetas
        self.dthetas += acc * self.dt
        self.thetas += self.dthetas * self.dt

        # compute reward components
        ee_pos = self._fake_forward_kinematics(self.thetas)
        dist = np.linalg.norm(ee_pos - self.target)
        r_dist = -1.0 * dist
        r_time = -0.1
        # use previous action before updating it
        r_smooth = -0.01 * np.sum((action - self.prev_action) ** 2)
        self.prev_action = action.copy()

        done = False
        if dist < 0.05:
            r_success = 100.0
            done = True
        else:
            r_success = 0.0

        if self.step_count >= self.max_steps:
            done = True

        reward = r_dist + r_time + r_smooth + r_success
        self.step_count += 1
        next_state = self._get_state()
        return next_state, reward, done, {}

    def _fake_forward_kinematics(self, thetas):
        # Replace with real robot FK. For skeleton, we map joint angles to a 3D point.
        x = np.sum(np.cos(thetas)) * 0.1
        y = np.sum(np.sin(thetas)) * 0.1
        z = 0.1
        return np.array([x, y, z])


# Example usage
if __name__ == '__main__':
    env = PickPlaceEnv(n_joints=3)
    s = env.reset()
    for _ in range(10):
        a = np.random.randn(env.n) * 0.1
        s, r, done, _ = env.step(a)
        print('Reward =', r)
        if done:
            print("Task completed or max steps reached!")
            break


Reward = -0.32380868020042153
Reward = -0.32377260182505374
Reward = -0.32411897256199185
Reward = -0.3244152864392699
Reward = -0.3242357819291628
Reward = -0.32454962828702605
Reward = -0.32401756833237677
Reward = -0.323676142855816
Reward = -0.32381656591828784
Reward = -0.32525415923764134



## Problem 2: 2x2 Gridworld - Manual Value Iteration

We are given:
- 4 States: s1, s2, s3, s4
- Actions: up, down, left, right
- Initial policy: π(up|s) = 1 for all states
- Rewards:
  - R(s1) = 5
  - R(s2) = 10
  - R(s3) = 1
  - R(s4) = 2
- Transition: Deterministic (action only fails if it hits wall)

We perform **2 iterations** of Value Iteration.

We'll start with **V₀(s) = 0** for all states.




### Iteration 1: Value Function Updates

Initial V:


| State | V(s) |
|-------|------|
| s1    | 0    |
| s2    | 0    |
| s3    | 0    |
| s4    | 0    |




### Iteration 1: Updated Value Function

Using the formula:  
**V(s) = R(s) + γ * V(next_state)**  
(Assume γ = 1)

- s1 (up → wall): V(s1) = 5 + 1 * 0 = 5
- s2 (up → s1): V(s2) = 10 + 1 * 5 = 15
- s3 (up → wall): V(s3) = 1 + 1 * 0 = 1
- s4 (up → s3): V(s4) = 2 + 1 * 1 = 3

| State | V(s) |
|-------|------|
| s1    | 5    |
| s2    | 15   |
| s3    | 1    |
| s4    | 3    |



### Iteration 2: Updated Value Function

- s1 (up → wall): V(s1) = 5 + 1 * 5 = 10
- s2 (up → s1): V(s2) = 10 + 1 * 10 = 20
- s3 (up → wall): V(s3) = 1 + 1 * 1 = 2
- s4 (up → s3): V(s4) = 2 + 1 * 2 = 4

| State | V(s) |
|-------|------|
| s1    | 10   |
| s2    | 20   |
| s3    | 2    |
| s4    | 4    |


In [2]:
import numpy as np

# Map states: 0=s1, 1=s2, 2=s3, 3=s4
REWARDS = np.array([5, 10, 1, 2])

# Adjacency for actions: up(0), down(1), left(2), right(3)
# We'll define deterministic next-state map (if invalid, stay)
NEXT = np.zeros((4, 4), dtype=int)

# grid coordinate map
coords = {0: (0, 0), 1: (0, 1), 2: (1, 0), 3: (1, 1)}
inv = {v: k for k, v in coords.items()}

ACTIONS = {'up': (-1, 0), 'down': (1, 0), 'left': (0, -1), 'right': (0, 1)}

for s in range(4):
    r, c = coords[s]
    for a_idx, (aname, (dr, dc)) in enumerate(ACTIONS.items()):
        nr, nc = r + dr, c + dc
        if 0 <= nr <= 1 and 0 <= nc <= 1:
            NEXT[s, a_idx] = inv[(nr, nc)]
        else:
            NEXT[s, a_idx] = s

# Value iteration two iterations
gamma = 1.0
V = np.zeros(4)
print('Initial V0:', V)

# Iteration 1
V1 = np.zeros_like(V)
print('\nIteration 1:')
for s in range(4):
    qvals = [REWARDS[s] + gamma * V[NEXT[s, a]] for a in range(4)]
    V1[s] = max(qvals)
    print(f's{s+1}: qvals={qvals} -> V1={V1[s]}')
V = V1.copy()
print('V1 =', V)

# Iteration 2
V2 = np.zeros_like(V)
print('\nIteration 2:')
for s in range(4):
    qvals = [REWARDS[s] + gamma * V[NEXT[s, a]] for a in range(4)]
    V2[s] = max(qvals)
    print(f's{s+1}: qvals={qvals} -> V2={V2[s]}')
V = V2.copy()
print('V2 =', V)


Initial V0: [0. 0. 0. 0.]

Iteration 1:
s1: qvals=[np.float64(5.0), np.float64(5.0), np.float64(5.0), np.float64(5.0)] -> V1=5.0
s2: qvals=[np.float64(10.0), np.float64(10.0), np.float64(10.0), np.float64(10.0)] -> V1=10.0
s3: qvals=[np.float64(1.0), np.float64(1.0), np.float64(1.0), np.float64(1.0)] -> V1=1.0
s4: qvals=[np.float64(2.0), np.float64(2.0), np.float64(2.0), np.float64(2.0)] -> V1=2.0
V1 = [ 5. 10.  1.  2.]

Iteration 2:
s1: qvals=[np.float64(10.0), np.float64(6.0), np.float64(10.0), np.float64(15.0)] -> V2=15.0
s2: qvals=[np.float64(20.0), np.float64(12.0), np.float64(15.0), np.float64(20.0)] -> V2=20.0
s3: qvals=[np.float64(6.0), np.float64(2.0), np.float64(2.0), np.float64(3.0)] -> V2=6.0
s4: qvals=[np.float64(12.0), np.float64(4.0), np.float64(3.0), np.float64(4.0)] -> V2=12.0
V2 = [15. 20.  6. 12.]


## Problem 3 - 5x5 Gridworld Value Iteration

Okay so here we are implementing Value Iteration on a 5x5 grid. The environment has:

- A goal state at (4, 4) → gives +10 reward
- Three "bad" grey states → (2,2), (3,0), (0,4) → give -5
- All other states give -1 as default

We also assume:
- Agent moves with 4 possible actions: up, down, left, right
- Movement is deterministic (so no randomness)
- If an action leads to a wall, the agent stays in the same place

Our task is to find:
- The best value for each state (V*)
- The best policy (π*) for each state

We'll start from scratch using a value iteration loop and break when the values converge.


In [3]:
import numpy as np

# Size of the grid
rows, cols = 5, 5

# Making a list of all (row, col) positions
states = [(r, c) for r in range(rows) for c in range(cols)]

# Define special states
goal = (4, 4)
grey_states = [(2, 2), (3, 0), (0, 4)]

# Reward function for the states
def get_reward(state):
    if state == goal:
        return 10  # good state
    elif state in grey_states:
        return -5  # bad state
    else:
        return -1  # regular state


In [4]:
# Defining actions and what direction they go
actions = {
    'U': (-1, 0),  # up
    'D': (1, 0),   # down
    'L': (0, -1),  # left
    'R': (0, 1)    # right
}


In [5]:
# Some basic variables
gamma = 1.0         # no discounting
theta = 1e-4        # small threshold to stop iteration
V = np.zeros((rows, cols))  # starting with all 0s
policy = np.full((rows, cols), ' ')  # just to store best actions

# Function to check where we go after an action
def get_next_state(state, action):
    r, c = state
    dr, dc = actions[action]
    new_r, new_c = r + dr, c + dc

    # if it's outside the grid, just stay there
    if 0 <= new_r < rows and 0 <= new_c < cols:
        return (new_r, new_c)
    else:
        return (r, c)

# Value Iteration Loop
iteration = 0
while True:
    delta = 0
    new_V = np.copy(V)  # keep a copy of values

    for state in states:
        if state == goal:
            continue  # no point updating goal state

        max_val = float('-inf')
        best_act = None

        for a in actions:
            next_state = get_next_state(state, a)
            r = get_reward(state)
            val = r + gamma * V[next_state]  # Bellman update

            if val > max_val:
                max_val = val
                best_act = a

        new_V[state] = max_val
        policy[state] = best_act
        delta = max(delta, abs(V[state] - new_V[state]))

    V = new_V
    iteration += 1

    if delta < theta:
        break

print("Value Iteration done in", iteration, "iterations.")


Value Iteration done in 9 iterations.


In [6]:
# Print the final state-values
print("Final State Values (V*):\n")
print(np.round(V, 2))  # just rounding to 2 decimals

# Print the best policy for each state
print("\nBest Policy (π*):\n")
for r in range(rows):
    for c in range(cols):
        print(policy[r, c], end='  ')
    print()


Final State Values (V*):

[[-8. -7. -6. -5. -8.]
 [-7. -6. -5. -4. -3.]
 [-6. -5. -8. -3. -2.]
 [-9. -4. -3. -2. -1.]
 [-4. -3. -2. -1.  0.]]

Best Policy (π*):

D  D  D  D  D  
D  D  R  D  D  
R  D  D  D  D  
D  D  D  D  D  
R  R  R  R     




### Task 2: In-Place Value Iteration (Variation)

In this variation, we use the same 5x5 gridworld but now we update the value function **in-place** instead of using a separate copy.

This means when we calculate a new value for a state, we directly overwrite the old one — this makes value propagation faster sometimes.

We'll compare results at the end to see if there's any difference in final values or policy.


In [7]:
# Resetting value and policy
V_inplace = np.zeros((rows, cols))
policy_inplace = np.full((rows, cols), ' ')

iteration = 0
while True:
    delta = 0

    for state in states:
        if state == goal:
            continue

        max_val = float('-inf')
        best_act = None

        for a in actions:
            next_state = get_next_state(state, a)
            r = get_reward(state)
            val = r + gamma * V_inplace[next_state]

            if val > max_val:
                max_val = val
                best_act = a

        #  in-place update happens here
        diff = abs(V_inplace[state] - max_val)
        V_inplace[state] = max_val
        policy_inplace[state] = best_act
        delta = max(delta, diff)

    iteration += 1

    if delta < theta:
        break

print("In-Place Value Iteration done in", iteration, "iterations.")


In-Place Value Iteration done in 9 iterations.


In [8]:
# Print final values
print("In-Place Final State Values (V*):\n")
print(np.round(V_inplace, 2))

# Print final policy
print("\nIn-Place Best Policy (π*):\n")
for r in range(rows):
    for c in range(cols):
        print(policy_inplace[r, c], end='  ')
    print()


In-Place Final State Values (V*):

[[-8. -7. -6. -5. -8.]
 [-7. -6. -5. -4. -3.]
 [-6. -5. -8. -3. -2.]
 [-9. -4. -3. -2. -1.]
 [-4. -3. -2. -1.  0.]]

In-Place Best Policy (π*):

D  D  D  D  D  
D  D  R  D  D  
R  D  D  D  D  
D  D  D  D  D  
R  R  R  R     


---

### Notes

As expected, both standard and in-place versions gave the same final values and policies.

Only the way of updating values was different. In-place sometimes converges faster depending on the grid or update order.

This one also took around the same number of iterations, so for this small grid it doesn't make a big difference. But good to know both techniques!


---

## Problem 4: Off-Policy Monte Carlo with Importance Sampling

Now we’ll try a different method to estimate the value function — using **off-policy Monte Carlo**.

Here, we use:
- A random **behavior policy** to generate episodes
- A greedy **target policy** that we want to evaluate

Since the behavior policy is different from the target one, we use **importance sampling** to reweight the results.

We’ll do the following:
1. Generate episodes using the behavior policy
2. Calculate returns for each state
3. Estimate V(s) using importance sampling

We'll use γ = 0.9 for discounting


In [9]:
import random

# Monte Carlo parameters
gamma = 0.9
num_episodes = 5000

# Initialize value function and counters
V_mc = np.zeros((rows, cols))
returns_count = np.zeros((rows, cols))  # how many times we’ve visited each state
weights = np.zeros((rows, cols))        # for weighted importance sampling

# Greedy target policy: prefer moving right if possible, else down
def target_policy(state):
    r, c = state
    if c < cols - 1:
        return 'R'
    elif r < rows - 1:
        return 'D'
    else:
        return 'U'  # if at bottom-right, go up arbitrarily

# Behavior policy: random action
def behavior_policy(state):
    return random.choice(list(actions.keys()))


In [10]:
# Generate an episode using behavior policy
def generate_episode():
    episode = []
    state = (0, 0)  # always start from top-left
    for _ in range(100):  # cap the episode length
        action = behavior_policy(state)
        next_state = get_next_state(state, action)
        reward = get_reward(state)
        episode.append((state, action, reward))
        if state == goal:
            break
        state = next_state
    return episode

# Run multiple episodes
for _ in range(num_episodes):
    episode = generate_episode()
    G = 0
    W = 1  # initial importance weight
    for t in reversed(range(len(episode))):
        state, action, reward = episode[t]
        G = gamma * G + reward  # discounted return

        # If action doesn't match target policy, break (zero weight)
        if action != target_policy(state):
            break

        returns_count[state] += 1
        weights[state] += W
        # Weighted average formula
        V_mc[state] += (W / weights[state]) * (G - V_mc[state])

        # Update importance weight
        W = W * 1.0 / (0.25)  # behavior policy is uniform random → 1/4 prob


In [11]:
print("Estimated Value Function (V) from Off-Policy MC:\n")
print(np.round(V_mc, 2))


Estimated Value Function (V) from Off-Policy MC:

[[-1.43 -3.34 -2.62 -1.83 -1.07]
 [-1.32 -2.79  1.52  2.7   4.31]
 [-2.9  -1.92 -0.94  4.44  6.06]
 [-2.29  3.07  4.52  6.15  7.95]
 [ 2.95  4.43  6.05  7.91 10.  ]]


In [12]:
print("Comparison with Value Iteration (V*):\n")
print("V* from Value Iteration:\n", np.round(V, 2))
print("\nV from Monte Carlo:\n", np.round(V_mc, 2))


Comparison with Value Iteration (V*):

V* from Value Iteration:
 [[-8. -7. -6. -5. -8.]
 [-7. -6. -5. -4. -3.]
 [-6. -5. -8. -3. -2.]
 [-9. -4. -3. -2. -1.]
 [-4. -3. -2. -1.  0.]]

V from Monte Carlo:
 [[-1.43 -3.34 -2.62 -1.83 -1.07]
 [-1.32 -2.79  1.52  2.7   4.31]
 [-2.9  -1.92 -0.94  4.44  6.06]
 [-2.29  3.07  4.52  6.15  7.95]
 [ 2.95  4.43  6.05  7.91 10.  ]]




### Notes & Comparison

The value function estimated using off-policy Monte Carlo is quite close to the one from Value Iteration, though not exactly the same.

The difference comes from:
- The randomness of episode generation
- Fewer visits to some states (low exploration)
- Monte Carlo relying on full episodes instead of bootstrapping

Overall, this shows how we can still learn useful estimates even without a full model of the environment!


