# CarRacing-v1

## Description
Easiest continuous control task to learn from pixels, a top-down
racing environment. Discreet control is reasonable in this environment as
well, on/off discretisation is fine.

The game is solved when the agent consistently gets 900+ points.
The generated track is random every episode.

Some indicators are shown at the bottom of the window along with the
state RGB buffer. From left to right: true speed, four ABS sensors,
steering wheel position, gyroscope.
To play yourself (it's rather fast for humans), type:
```
python gym/envs/box2d/car_racing.py
```
Remember it's a powerful rear-wheel drive car - don't press the accelerator
and turn at the same time.

## Action Space
There are 3 actions: steering (-1 is full left, +1 is full right), gas,
and breaking.

## Observation Space
State consists of 96x96 pixels.

## Rewards
The reward is -0.1 every frame and +1000/N for every track tile visited,
where N is the total number of tiles visited in the track. For example,
if you have finished in 732 frames, your reward is
1000 - 0.1*732 = 926.8 points.

## Starting State
The car starts stopped at the center of the road.

## Episode Termination
The episode finishes when all the tiles are visited. The car also can go
outside of the playfield - that is far off the track, then it will
get -100 and die.

## Arguments
There are no arguments supported in constructing the environment.

## Version History
- v0: Current version

## References
- Chris Campbell (2014), http://www.iforce2d.net/b2dtut/top-down-car.

## Credits
Created by Oleg Klimov

In [None]:
!pip install Box2D

In [None]:
import gym
import time
import numpy as np
import imageio
import matplotlib
import PIL
import pyvirtualdisplay

from PIL import ImageDraw, ImageFont

from absl import logging
from tqdm import tqdm

from IPython.display import clear_output
from gym.envs.toy_text.taxi import MAP as taxi_map

In [None]:
logging.set_verbosity(logging.INFO)

In [None]:
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

In [None]:
env = gym.make('CarRacing-v1')

## Random Action

In [None]:
def play_env(policy=lambda s: env.action_space.sample(), sleep_time=0.1, env_seed=None):
    
    if env_seed is not None:
        env.seed(env_seed)

    states = []
    rewards = []
    actions = []
    state = env.reset()
    states.append(state)
    max_steps = env.spec.max_episode_steps
    total_reward = 0
    is_done = False
    current_step = 0

    while is_done == False:
        # Get a random action
        action = policy(state)

        state, reward, is_done, info = env.step(action)
        states.append(state)
        rewards.append(reward)
        actions.append(action)

        total_reward += reward
        current_step += 1

        clear_output(wait=True)

        # Print header
        print('Step: {:03d}/{}, Reward: {}\n'.format(
            current_step,
            max_steps,
            total_reward,
        ))
        env.render()

        time.sleep(sleep_time)
        
        
    if current_step < max_steps:
        print('\nResult: Done with {} steps and total reward is {}.'.format(
            current_step,
            total_reward,
        ))
    else:
        print('\nResult: Unsolved')
        
    return states, rewards, actions

In [None]:
states, rewards, actions = play_env(sleep_time=0.01, env_seed=1)

In [None]:
PIL.Image.from_array(a_frame)

In [None]:
a_frame = states[0]
a_frame.shape

In [None]:
for frame in states:
    img = PIL.Image.fromarray(frame)
    img.show()

## Q-Learning

Credit: https://towardsdatascience.com/reinforcement-learning-teach-a-taxi-cab-to-drive-around-with-q-learning-9913e611028f

In [None]:
q_table_shape = [env.observation_space.n, env.action_space.n]
learning_rate = 0.1  # Learning rate, i.e. alpha
discount_factor = 0.99  # Discount factor, i.e. gamma
epsilon = 0.1  # Exploring vs exploiting
training_episodes = 100_000
# training_episodes = 1000
env_seed = 1

In [None]:
# Initialize the q-table with zero values
q_table = np.zeros(q_table_shape)

In [None]:
# Random generator
rng = np.random.default_rng()

### Train

In [None]:
for i in tqdm(range(training_episodes)):
    # Reset the environment first
    # env.seed(env_seed)
    state = env.reset()
    
    # env.seed(i)
    
    # print('state', state)
    
    done = False
    
    while not done:
        if rng.random() < epsilon:
            action = env.action_space.sample()  # Explore the action space (with a random action)
        else:
            action = np.argmax(q_table[state]) # Exploit leared values

        # Apply the action and see what happens
        next_state, reward, done, info = env.step(action)
        # print(i, next_state, reward, done, info)

        current_value = q_table[state, action]  # Current Q-value for the state-action pair
        next_max = np.max(q_table[next_state])  # Next best Q-value

        q_table[state, action] = (1 - learning_rate) * current_value + learning_rate * (reward + discount_factor * next_max)
        # print(i, state, action, q_table[state, action])
        
        state = next_state

### Eval

In [None]:
play_env(
    policy=lambda s: np.argmax(q_table[s]),
    sleep_time=0.5,
    env_seed=1,
)

In [None]:
states, rewards, actions = play_env(
    policy=lambda s: np.argmax(q_table[s]),
    sleep_time=0.25,
    env_seed=None,
)

In [None]:
states, rewards, actions

In [None]:
def get_char_txt(char_row, char_col, char='█'):
    txt = ''
    for r in range(char_row+2):
        for c in range(char_col * 2 + 2):
            if (char_row + 1) == r and (char_col * 2 + 1) == c:
                txt += char
            else:
                txt += ' '
        txt += '\n'
    
    return txt

def get_char_by_index(idx: int):
    if idx == 0:
        return 'R'
    elif idx == 1:
        return 'G'
    elif idx == 2:
        return 'Y'
    elif idx == 3:
        return 'B'
    elif idx == 4:
        return '_'
    
    return ' '

def get_char_pos_by_index(idx: int, taxi_row: int, taxi_col: int):
    if idx == 0:
        return [0, 0]
    elif idx == 1:
        return [0, 4]
    elif idx == 2:
        return [4, 0]
    elif idx == 3:
        return [4, 3]
    
    return [taxi_row, taxi_col]

def get_char_color_by_index(idx: int):
    if idx == 0:
        return (255, 0, 0) # Red
    elif idx == 1:
        return (0, 255, 0) # Green
    elif idx == 2:
        return (255, 255, 0) # Yellow
    elif idx == 3:
        return (0, 0, 255) # Blue

In [None]:
def enhance_frame(frame: np.ndarray, main_text=None, state=None, side_text=None, done=False) -> np.ndarray:
    if main_text is None:
        return frame
    
    # Convert array to PIl.Image
    image = PIL.Image.fromarray(frame).convert('RGB')

    # Get draw context
    draw = ImageDraw.Draw(image, 'RGB')

    # Get font
    font_file = '/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf'
    font = ImageFont.truetype(font_file, 24)
    
    # Variables
    draw_offset = (30, 30)
    side_offset = (220, 30)
    side_font_size = 20
    side_font = ImageFont.truetype(font_file, side_font_size)
    taxi_color = (255, 255, 0)
    passenger_color = (0, 0, 255)
    dest_color = (255, 0, 255)
    taxi_with_passenger_color = (0, 255, 0)
    
    # Render state
    if state is not None:
        # Draw taxi (color background)
        [taxi_row, taxi_col, passenger_location, destination] = list(env.decode(state))
        print([taxi_row, taxi_col, passenger_location, destination])
        taxi_txt = get_char_txt(taxi_row, taxi_col)
        
        taxi_color = taxi_with_passenger_color if (passenger_location == 4 and not done) else taxi_color 
        draw.text(draw_offset, taxi_txt, font=font, fill=taxi_color, stroke_width=1, stroke_fill=(100, 100, 100))
        
        # Draw map
        draw.text(draw_offset, main_text, font=font, fill=(0, 0, 0), stroke_width=1, stroke_fill=(255, 255, 255))
        
        # Draw passenger
        passenger_char = get_char_by_index(passenger_location)
        [passenger_row, passenger_col] = get_char_pos_by_index(passenger_location, taxi_row, taxi_col)
        passenger_txt = get_char_txt(passenger_row, passenger_col, char=passenger_char)
        # passenger_color = get_char_color_by_index(passenger_location)
        # print('passenger_txt', passenger_txt)
        draw.text(draw_offset, passenger_txt, font=font, fill=passenger_color, stroke_width=1, stroke_fill=(255, 255, 255))
        
        # Draw destination
        dest_char = get_char_by_index(destination)
        [dest_row, dest_col] = get_char_pos_by_index(destination, taxi_row, taxi_col)
        dest_txt = get_char_txt(dest_row, dest_col, char=dest_char)
        # dest_color = get_char_color_by_index(destination)
        # print('dest_txt', dest_txt)
        draw.text(draw_offset, dest_txt, font=font, fill=dest_color, stroke_width=1, stroke_fill=(255, 255, 255))
        
    else:
        # Draw background
        draw.text(draw_offset, main_text, font=font, fill=(0, 0, 0), stroke_width=1, stroke_fill=(255, 255, 255))
    
    if side_text is not None:
        draw.text(side_offset, side_text, font=side_font, fill=(0, 0, 0), stroke_width=1, stroke_fill=(255, 255, 255))

    return np.array(image)

In [None]:
env_name = 'Taxi-v3'
def create_states_video(
    states, rewards, filename=None, fps=30, 
    env_name=env_name, freeze_seconds=0, freeze_begin_seconds=0, step=None):
    if filename is None:
        filename = str(get_timestamp())
        
    filename = filename + '.mp4'
    logging.info('Env: %s', env_name)
    logging.info('Filename: %s', filename)
    map_txt = '\n'.join(taxi_map)

    with imageio.get_writer(filename, fps=fps) as video:
        logging.info('Begin')
        total_reward = 0.0
        frame_idx = 0
                
        for idx, (state, reward, action) in enumerate(zip(states, rewards, actions)):
            done = reward == 20
            # Freeze frame for a few seconds - At beginning
            if idx == 0 and freeze_begin_seconds > 0:
                text = f'Env: {env_name}'
                if step is not None:
                    text += f'\nStp: {step}'
                text += f'\nFrm: {frame_idx}'
                text += f'\nRw:  {total_reward:.2f}'

                frame = np.full((270, 480), 240.0)
                frame = enhance_frame(frame, '{}'.format(map_txt), side_text=text, state=state)

                for _ in range(fps * freeze_begin_seconds):
                    video.append_data(frame)

            action_name = action_names[action]
            
            total_reward += reward
            
            text = f'Env: {env_name}'
            if step is not None:
                text += f'\nStp: {step}'
            text += f'\nFrm: {frame_idx}'
            text += f'\nRw:  {total_reward:.2f}'
            text += f'\nAct: {action_name}'
            
            if done:
                text += f'\n\nDone!\nFeb 13, 2022'

            frame = np.full((270, 480), 240.0)
            frame = enhance_frame(frame, '{}'.format(map_txt), side_text=text, state=state, done=done)
            
            video.append_data(frame)
            
            frame_idx += 1
            
            # Freeze frame for a few seconds
            if frame_idx+1 >= len(states) and freeze_seconds > 0:
                for _ in range(fps * freeze_seconds):
                    video.append_data(frame)

    logging.info('All done')
    return filename
    # return embed_mp4(filename)

In [None]:
create_states_video(states, rewards, filename='taxi', fps=2, env_name='Taxi-v3', freeze_seconds=3, freeze_begin_seconds=2)