# Mushroom Harvester



## Simple Car Environment
---

![simple_car.gif](https://media0.giphy.com/media/v1.Y2lkPTc5MGI3NjExODU0NmVlMzU1MGU1YzJjMjA5ODE5NjM0MTg0MTU1ZmM1OTA1NzRkNCZjdD1n/VI3OuvQShK3gzENiVz/giphy.gif)

*(code for this tutorial adapted from: https://gerardmaggiolino.medium.com/creating-openai-gym-environments-with-pybullet-part-2-a1441b9a4d8e*)

---

This is a simple car environment with a continuous state space and discrete action space with the goal of driving towards a green marker. Driving within 1.5 metres of the green marker causes the episode to end or if a certain amount of time has passed.

We can instantiate the environment as follows:


```
env = gym.make('SimpleDriving-v0', apply_api_compatibility=True, renders=False, isDiscrete=True, render_mode='tp_camera')
```


### Action Space

*   0: Reverse-Left
*   1: Reverse
*   2: Reverse-Right
*   3: Steer-Left (no throttle)
*   4: No throttle and no steering
*   5: Steer-Right (no throttle)
*   6: Forward-right
*   7: Forward
*   8: Forward-left


Before we can execute any code we first need to install the following packages:

In [1]:
## Running this on Ubuntu so installed locally. No need to run this code block.
## Note issue of asking for password anyway so running this in code block doesnt work.
# !pip install gym==0.26.2 pyvirtualdisplay pygame torch > /dev/null 2>&1
# !pip install git+https://github.com/fredsukkar/simple-car-env-template > /dev/null 2>&1
# !apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
# !apt-get install -y xvfb

Now import the necessary packages and following helper functions (you don't need the `display_video` function if running locally):

In [2]:
import os
os.environ['PYVIRTUALDISPLAY_DISPLAYFD'] = '0'

import gym
import simple_driving
# import pybullet_envs
import pybullet as p
import matplotlib.pyplot as plt
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
from IPython.display import HTML
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import math
from collections import defaultdict
import pickle
from IPython.display import clear_output
import torch
import random

display = Display(visible=0, size=(400, 300))
display.start()

def display_video(frames, framerate=30):
  """Generates video from `frames`.

  Args:
    frames (ndarray): Array of shape (n_frames, height, width, 3).
    framerate (int): Frame rate in units of Hz.

  Returns:
    Display object.
  """
  height, width, _ = frames[0].shape
  dpi = 70
  orig_backend = matplotlib.get_backend()
  matplotlib.use('Agg')  # Switch to headless 'Agg' to inhibit figure rendering.
  fig, ax = plt.subplots(1, 1, figsize=(width / dpi, height / dpi), dpi=dpi)
  matplotlib.use(orig_backend)  # Switch back to the original backend.
  ax.set_axis_off()
  ax.set_aspect('equal')
  ax.set_position([0, 0, 1, 1])
  im = ax.imshow(frames[0])
  def update(frame):
    im.set_data(frame)
    return [im]
  interval = 1000/framerate
  anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                  interval=interval, blit=True, repeat=False)
  
  #ipythondisplay.display(HTML(anim.to_html5_video()))
  return HTML(anim.to_html5_video())

pybullet build time: Nov 28 2023 23:51:11


In [3]:
######################### renders image from third person perspective for validating policy ##############################
env = gym.make("SimpleDriving-v0", apply_api_compatibility=True, renders=False, isDiscrete=True, render_mode='tp_camera')
##########################################################################################################################

######################### renders image from onboard camera ###############################################################
# env = gym.make("SimpleDriving-v0", apply_api_compatibility=True, renders=False, isDiscrete=True, render_mode='fp_camera')
##########################################################################################################################

######################### if running locally you can just render the environment in pybullet's GUI #######################
# env = gym.make("SimpleDriving-v0", apply_api_compatibility=True, renders=True, isDiscrete=True)
##########################################################################################################################

state, info = env.reset()
frames = []
frames.append(env.render())

for i in range(200):
    action = env.action_space.sample()
    state, reward, done, _, info = env.step(action)
    frames.append(env.render())  # if running locally not necessary unless you want to grab onboard camera image
    if done:
        break

env.close()
display_video(frames, framerate=5)  # remove if runnning locally

A random mushroom has been spawned. Robot will investigate.
Is the random mushroom edible?:  True
argv[0]=
argv[0]=
A random mushroom has been spawned. Robot will investigate.
Is the random mushroom edible?:  True
argv[0]=
argv[0]=


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  if not isinstance(terminated, (bool, np.bool8)):
  logger.warn(
  logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")


Sticking close to mushroom
Sticking close to mushroom
Sticking close to mushroom


Here is some code to help you get started.

In [4]:
# Helper function to discretize state
def discretize_state(state):
    return tuple(int(np.digitize(s, bins)) for s, bins in zip(state, state_bins))

# Epsilon-greedy function
def epsilon_greedy(state, exploration_rate):
    if state not in q_table:
        q_table[state] = {a: 0 for a in range(env.action_space.n)}
    
    if np.random.rand() < exploration_rate:
        action_probs = [0.7, 0.05, 0.05, 0.05, 0.03, 0.03, 0.03, 0.03, 0.03]
        action = np.random.choice(range(env.action_space.n), p=action_probs)
    else:
        action = max(q_table[state], key=q_table[state].get)
    
    return action

# Q-learning function
def q_learning(state, action, reward, next_state):
    if state not in q_table:
        q_table[state] = {a: 0 for a in range(env.action_space.n)}
    
    old_q_value = q_table[state][action]
    next_max = max(q_table[next_state].values()) if next_state in q_table else 0
    new_q_value = (1 - learning_rate) * old_q_value + learning_rate * (reward + discount_factor * next_max)
    q_table[state][action] = new_q_value

In [5]:
# TRAINING MODEL
# Define Learning parameters
learning_rate = 0.3  # learning rate (0.1 - 0.3)
discount_factor = 0.95  # Future gains (0.90 - 0.99)
exploration_rate = 1.0  # Initial exploration rate (Always 1.0)
exploration_decay = 0.98  # Decrease exploration factor over time (0.95 - 0.99)
num_episodes = 400  # If the problem is complex, we should train for longer.

# Create environment
env = gym.make("SimpleDriving-v0", apply_api_compatibility=True, renders=False, isDiscrete=True, render_mode='tp_camera')

# Define state discretization parameters
num_bins = 10
state_bins = [np.linspace(env.observation_space.low[i], env.observation_space.high[i], num_bins - 1) for i in range(len(env.observation_space.low))]

# Initialize Q-table as a nested dictionary
q_table = {}

# Training loop
print(f"Training has started.")
for episode in range(num_episodes):
    state, _ = env.reset()
    state = discretize_state(state)
    done = False
    total_reward = 0    
    while not done:
        action = epsilon_greedy(state, exploration_rate)
        next_state, reward, done, _, info = env.step(action)
        next_state = discretize_state(next_state)
        q_learning(state, action, reward, next_state)
        
        state = next_state
        total_reward += reward
    
    exploration_rate *= exploration_decay
    
    # Print episode details if needed
    print(f"Training Episode: {episode + 1}, Total Reward: {total_reward}, Exploration Rate: {exploration_rate}")
print(f"Training complete.")


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  if not isinstance(terminated, (bool, np.bool8)):
  logger.warn(
  logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")


A random mushroom has been spawned. Robot will investigate.
Is the random mushroom edible?:  True
argv[0]=
argv[0]=
Training has started.
Training Episode: 1, Total Reward: -356.4941707648726, Exploration Rate: 0.98
Training Episode: 2, Total Reward: -446.0835953855278, Exploration Rate: 0.9603999999999999
Sticking close to mushroom
Sticking close to mushroom
Sticking close to mushroom
Sticking close to mushroom
Sticking close to mushroom
Training Episode: 3, Total Reward: -227.8087513021333, Exploration Rate: 0.9411919999999999
Training Episode: 4, Total Reward: -466.4395522749293, Exploration Rate: 0.9223681599999999
Sticking close to mushroom
Sticking close to mushroom
Training Episode: 5, Total Reward: -309.3088321048116, Exploration Rate: 0.9039207967999998
Training Episode: 6, Total Reward: -335.2672150990993, Exploration Rate: 0.8858423808639998
Sticking close to mushroom
Sticking close to mushroom
Mushroom collected!
Sticking close to mushroom
Sticking close to mushroom
Trainin

In [10]:
### TEST LEARNT POLICY
# Create environment
env = gym.make("SimpleDriving-v0", apply_api_compatibility=True, renders=False, isDiscrete=True, render_mode='tp_camera')

print(f"Begin testing.")
frames = []  # For video
state, _ = env.reset()
state = discretize_state(state)
done = False
total_reward = 0

while not done:
    action = max(q_table[state], key=q_table[state].get)
    next_state, reward, done, _, info = env.step(action)
    state = discretize_state(next_state)
    total_reward += reward
    # Capture all episodes
    frames.append(env.render())
print(f"Total reward: {total_reward}")

# Close environment
env.close()
print(f"Test complete.")

# Display the video after training
display_video(frames, framerate=8)

  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  if not isinstance(terminated, (bool, np.bool8)):
  logger.warn(
  logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(


A random mushroom has been spawned. Robot will investigate.
Is the random mushroom edible?:  True
argv[0]=
argv[0]=
Begin testing.
Sticking close to mushroom
Sticking close to mushroom
Mushroom collected!
Sticking close to mushroom
Mushroom collected!
Sticking close to mushroom
REACHED GOAL
Total reward: 99.10306894877226
Test complete.
