In [143]:
import os

# Use os.getcwd() to check your current directory
print("Current Directory:", os.getcwd())
current_path = os.getcwd()
mario_root_path = current_path.split("MarioBros")[0] + "MarioBros"
print("Changing to MarioBros root directory:", mario_root_path)

# Use os.chdir() to change the directory
os.chdir(mario_root_path)


Current Directory: /Users/gaston/Code/MarioBros
Changing to MarioBros root directory: /Users/gaston/Code/MarioBros


In [168]:
from src.config import load_configuration

config = load_configuration("agents/experiments_1/lander_prev_action.yaml")
config['env']['num_envs'] = 1

Configuration loaded from: agents/experiments_1/lander_prev_action.yaml


In [169]:
import numpy as np
from tqdm import tqdm

from src.environment import create_environment
from src.agent_utils import execute_agent_step, create_agent
from src.tb_logging import DummySummaryWriter



def run_episodes(config, episodes=5, frame_downsampling=5):
  # Create environment, summary writer, and agent
  env = create_environment(config['env'], 'synchronous')
  summary_writer = DummySummaryWriter()
  agent = create_agent(config, env, summary_writer)
  
  last_layer_inputs, frames, cum_rewards, episode_nums, step_nums = [], [], [], [], []

  for i in range(episodes):
    last_layer_input_, frames_, cum_reward_ = run_episode(env, agent)
    # Compute cumulative rewards now:
    for j in range(len(cum_reward_)-2, -1, -1):
        cum_reward_[j] += config['agent']['gamma'] * cum_reward_[j+1]
    # Keep one frame every 5 (reducing amount of data)
    last_layer_input_ = last_layer_input_[::frame_downsampling]
    frames_ = frames_[::frame_downsampling]
    cum_reward_ = cum_reward_[::frame_downsampling]

    last_layer_inputs.extend(last_layer_input_)
    frames.extend(frames_)
    cum_rewards.extend(cum_reward_)
    episode_nums.extend([i+1 for _ in range(len(last_layer_input_))])
    steps = np.array(list(range(len(last_layer_input_)))).astype('float')
    steps /= float(len(last_layer_input_))
    step_nums.extend(steps.astype('float').tolist())

  last_layer_inputs = np.array(last_layer_inputs)
  frames = np.array(frames)

  # Normalize reward (between 1 and 5)
  cum_rewards = np.array(cum_rewards)
  cum_rewards = cum_rewards - cum_rewards.min()
  cum_rewards = cum_rewards / cum_rewards.max() * 4 + 1
  return last_layer_inputs, frames, cum_rewards, episode_nums, step_nums

def run_episode(env, agent):
  observation, _ = env.reset()
  steps = 0
  last_layer_input = []
  frames = []
  cum_reward = []

  def hook_fn(module, input, output):
    # Remove batch dimension, and for some reason the initial x is a tuple.
    #last_layer_input.append(output[0].detach().cpu().numpy())
    # input is a tuple, so we access the first element, and keep only one exemplar
    last_layer_input.append(input[0][0].detach().cpu().numpy())

  handle = agent.model.hidden_layers[-1].register_forward_hook(hook_fn)
  while True:

    action, q_values = agent.act(observation)
    steps += 1

    experience = execute_agent_step(action, lambda action: env.step(action),
                                    observation, agent.summary_writer)
    (observation, action, reward, next_observation, done, info) = experience
    cum_reward.append(reward[0])
    frames.append(info['observation_frame'][0])

    # Store experience if we're not at the begining of an episode
    for i, (obs, act, rew, neo, don) in enumerate(
        zip(observation.as_list_input('cpu'), action, reward,
            next_observation.as_list_input('cpu'), done)):
      if don:
        env.close()
        handle.remove()
        print(f"completed {steps=}")
        return last_layer_input, frames, cum_reward
    observation = next_observation


In [170]:
## UMAP solution
import numpy as np
import plotly.express as px
from PIL import Image
import io
import base64
import pandas as pd
from dash import Dash, dcc, html, Input, Output, no_update, callback

last_layer_inputs, frames, cum_rewards, episode_num, step_num = run_episodes(config, episodes=10, frame_downsampling=5)
print(" - Done running episodes")

Using device: mps
Resuming from checkpoint. Episode 0, global step 500000, trained experiences 15992064
Using uniform replay buffer with size 100000
Model summary: DQN(
  (convolutions): ModuleList()
  (activation): LeakyReLU(negative_slope=0.01)
  (hidden_layers): ModuleList(
    (0): Linear(in_features=33, out_features=64, bias=True)
    (1): Linear(in_features=64, out_features=32, bias=True)
    (2): Linear(in_features=32, out_features=16, bias=True)
    (3): Linear(in_features=16, out_features=4, bias=True)
  )
)
Parameters: 4852
completed steps=295
completed steps=246
completed steps=218
completed steps=190
completed steps=193
completed steps=240
completed steps=323
completed steps=239
completed steps=251
completed steps=250
 - Done running episodes


In [171]:
import umap
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

def umap_projection(last_layer_inputs, n_components=3, random_state=0):
  reducer = umap.UMAP(n_components=n_components, random_state=random_state)
  projections = reducer.fit_transform(last_layer_inputs)
  print(" - Done fitting UMAP")
  return projections

def tsne_projection(last_layer_inputs, n_components=3, random_state=0):
  reducer = TSNE(n_components=n_components, random_state=random_state)
  projections = reducer.fit_transform(last_layer_inputs)
  print(" - Done fitting t-SNE")
  return projections

def pca_projection(last_layer_inputs, n_components=3, random_state=0):
  reducer = PCA(n_components=n_components)
  projections = reducer.fit_transform(last_layer_inputs)
  print(" - Done fitting PCA")
  return projections


def dataframe_with_images(projections, embeddings, frames, cum_rewards, episode_num, step_num):
  dataframe = pd.DataFrame(projections,
                        columns=[f'dim {i+1}' for i in range(projections.shape[1])])
  dataframe['reward'] = cum_rewards
  dataframe['episode_num'] = episode_num
  dataframe['embedding_distance'] = np.concatenate([[0], np.linalg.norm(embeddings[1:] - embeddings[:-1], axis=1)])
  dataframe['step_num'] = step_num
  dataframe['projection_distance'] = np.concatenate([[0], np.linalg.norm(projections[1:] - projections[:-1], axis=1)])


  def _image_to_base64_string(img_array):
    img = Image.fromarray(img_array.astype('uint8'))
    buffer = io.BytesIO()
    img.save(buffer, format="JPEG")
    img_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
    return f"data:image/jpeg;base64,{img_str}"
  base64_frames = np.array([_image_to_base64_string(frame) for frame in frames])
  dataframe['base64_frame'] = base64_frames
  print(" - Done creating dataframe")
  return dataframe

In [172]:
projections = pca_projection(last_layer_inputs, n_components=3, random_state=0)
pca_df = dataframe_with_images(projections, last_layer_inputs, frames, cum_rewards, episode_num, step_num)
projections = umap_projection(last_layer_inputs, n_components=3, random_state=0)
umap_df = dataframe_with_images(projections, last_layer_inputs, frames, cum_rewards, episode_num, step_num)
projections = tsne_projection(last_layer_inputs, n_components=3, random_state=0)
tsne_df = dataframe_with_images(projections, last_layer_inputs, frames, cum_rewards, episode_num, step_num)

 - Done fitting PCA
 - Done creating dataframe



n_jobs value 1 overridden to 1 by setting random_state. Use no seed for parallelism.



 - Done fitting UMAP
 - Done creating dataframe
 - Done fitting t-SNE
 - Done creating dataframe


In [173]:
import base64
import io
from PIL import Image

def create_gif_from_base64(base64_frames, output_path='animation.gif', duration=1):
    """
    Creates a GIF from a list of base64-encoded image strings.

    Args:
        base64_frames (list): A list of base64 image strings.
        output_path (str): The name of the output GIF file.
        duration (int): The duration of each frame in milliseconds.
    """
    # Create an empty list to store the PIL Image objects
    frames = []

    # Loop through each base64 string, decode it, and convert it to a PIL Image
    for b64_str in base64_frames:
        # Remove the "data:image/jpeg;base64," prefix
        img_data = b64_str.split(',')[1]

        # Decode the base64 string
        img_bytes = base64.b64decode(img_data)

        # Open the image from the bytes and add it to our list
        frames.append(Image.open(io.BytesIO(img_bytes)))

    # Save the list of images as a GIF
    frames[0].save(
        output_path,
        save_all=True,
        append_images=frames[1:],
        duration=duration,
        loop=0  # Loop forever
    )
    print(f"GIF saved to {output_path}")

create_gif_from_base64(umap_df['base64_frame'].values, output_path='animation.gif')

GIF saved to animation.gif


In [None]:
from IPython.display import HTML

HTML('<img src="../animation.gif?a=3">')

In [175]:
import plotly.graph_objects as go


def render_dataframe(dataframe, color_selector='episode_num'):
    fig = go.Figure(
        data=go.Scatter3d(
            x=dataframe['dim 1'],
            y=dataframe['dim 2'],
            z=dataframe['dim 3'],
            mode='markers',
            marker=dict(
                size=5,
                color=dataframe[color_selector],
                colorscale='darkmint',
                colorbar=dict(title=color_selector),
            ),
            customdata=np.stack([
                dataframe['episode_num'],
                dataframe['step_num'],
                dataframe['reward'],
                dataframe['projection_distance'],
                dataframe['embedding_distance'],
                dataframe['base64_frame'],
            ], axis=-1),
        ),
      frames=[go.Frame(
        data=[go.Scatter3d(
          x=dataframe['dim 1'][:i+1],
          y=dataframe['dim 2'][:i+1],
          z=dataframe['dim 3'][:i+1],
          mode='markers',
          marker=dict(
              size=5,
              color=dataframe[color_selector][:i+1],
              colorscale='darkmint',
              colorbar=dict(title=color_selector),
          ),
        )]
      ) for i in range(len(dataframe))],
    # Now we got to update menues to show the play buttton action
        layout=go.Layout(
            width=600, height=600,
            scene=dict(
                xaxis=dict(range=[-10, 10], autorange=True, zeroline=True),
                yaxis=dict(range=[-10, 10], autorange=True, zeroline=True),
                zaxis=dict(range=[-10, 10], autorange=True, zeroline=True),
            ),
            updatemenus=[dict(type="buttons",
                                        buttons=[dict(label="Play",
                                                        method="animate",
                                                        args=[None])])]
        )
    )
    return fig

# Set up the app now
app = Dash()

app.layout = html.Div(
    children=[
        html.Div(
            children=[
                dcc.Graph(id="graph-2-dcc", figure=render_dataframe(pca_df, color_selector='reward'), clear_on_unhover=True),
                dcc.Tooltip(id="graph-tooltip-2"),
            ],
            style={'display': 'inline-block', 'width': '45%'}
        ),
        html.Div(
            children=[
                dcc.Graph(id="graph-3-dcc", figure=render_dataframe(umap_df, color_selector='reward'), clear_on_unhover=True),
                dcc.Tooltip(id="graph-tooltip-3"),
            ],
            style={'display': 'inline-block', 'width': '45%'}
        ),
        html.Div(
            children=[
                dcc.Graph(id="graph-4-dcc", figure=render_dataframe(tsne_df, color_selector='reward'), clear_on_unhover=True),
                dcc.Tooltip(id="graph-tooltip-4"),
            ],
            style={'display': 'inline-block', 'width': '45%'}
        ),
    ],
)

@callback(
    Output("graph-tooltip-2", "show"),
    Output("graph-tooltip-2", "bbox"),
    Output("graph-tooltip-2", "children"),
    Output("graph-tooltip-2", "direction"),
    Input("graph-2-dcc", "hoverData"),
)
def display_hover(hoverData):
    if hoverData is None:
        return False, no_update, no_update, no_update

    # demo only shows the first point, but other points may also be available
    hover_data = hoverData["points"][0]
    bbox = hover_data["bbox"]

    # dump it to base64
    im_url = hover_data['customdata'][5]
    # control the position of the tooltip
    y = hover_data["y"]
    direction = "bottom" if y > 1.5 else "top"

    children = [
        html.Img(
            src=im_url,
            style={"width": "250px"},
        ),
        html.P("Episode: {}".format(hover_data['customdata'][0])),
    ]

    return True, bbox, children, direction


@callback(
    Output("graph-tooltip-3", "show"),
    Output("graph-tooltip-3", "bbox"),
    Output("graph-tooltip-3", "children"),
    Output("graph-tooltip-3", "direction"),
    Input("graph-3-dcc", "hoverData"),
)
def display_hover_3(hoverData):
    if hoverData is None:
        return False, no_update, no_update, no_update
    
    hover_data = hoverData["points"][0]
    bbox = hover_data["bbox"]

    im_url = hover_data['customdata'][5]
    y = hover_data["y"]
    direction = "bottom" if y > 1.5 else "top"

    children = [
        html.Img(
            src=im_url,
            style={"width": "250px"},
        ),
        html.P("Episode: {}".format(hover_data['customdata'][0])),
    ]

    return True, bbox, children, direction


@callback(
    Output("graph-tooltip-4", "show"),
    Output("graph-tooltip-4", "bbox"),
    Output("graph-tooltip-4", "children"),
    Output("graph-tooltip-4", "direction"),
    Input("graph-4-dcc", "hoverData"),
)
def display_hover_4(hoverData):
    if hoverData is None:
        return False, no_update, no_update, no_update
    
    hover_data = hoverData["points"][0]
    bbox = hover_data["bbox"]

    im_url = hover_data['customdata'][5]
    y = hover_data["y"]
    direction = "bottom" if y > 1.5 else "top"

    children = [
        html.Img(
            src=im_url,
            style={"width": "250px"},
        ),
        html.P("Episode: {}".format(hover_data['customdata'][0])),
    ]

    return True, bbox, children, direction



app.run(debug=True)


In [176]:
# Now let's scatter plot embedding_distance_color and umap_distance_color
fig = go.Figure(
  data=[
    go.Scatter(x=pca_df['embedding_distance'], y=pca_df['projection_distance'], mode='markers')
  ],
  layout=go.Layout(
    title="Embedding Distance vs. Projection Distance",
    xaxis=dict(title="Embedding Distance"),
    yaxis=dict(title="Projection Distance"),
    height=500,
    width=600
  )
)

fig.show()