In [21]:
import gym # pip install gym
import numpy as np
import matplotlib.pyplot as plt
import imageio
from skimage.transform import resize
from PIL import Image, ImageDraw, ImageFont
# import seaborn as sns
# sns.set_theme()

from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras import layers

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.callbacks import Callback

from rl.agents import DQNAgent  # pip install keras-rl2
from rl.policy import BoltzmannQPolicy  # important to have gym==0.25.2
from rl.memory import SequentialMemory
import warnings
warnings.filterwarnings("ignore")

In [2]:
env = gym.make("LunarLander-v2")  # no render mode to prevent display while training

states = env.observation_space.shape[0]
actions = env.action_space.n

print(states)
print(actions)

8
4


In [3]:
model = Sequential()
model.add(Flatten(input_shape=(1, states)))
model.add(Dense(64, activation="relu"))
model.add(Dense(64, activation="relu"))
model.add(Dense(actions, activation="linear"))
# Compile the model
model.compile(optimizer=Adam(), loss='mse')

In [9]:
agent = DQNAgent(
    model=model,
    memory=SequentialMemory(limit=50000, window_length=1),
    policy=BoltzmannQPolicy(),
    nb_actions=actions,
    nb_steps_warmup=100,
    target_model_update=0.01
)

agent.compile(Adam(lr=0.001), metrics=["mae"])

In [10]:
# Define a custom callback to record loss and reward values during training
class LossHistory(Callback):
    def __init__(self):
        self.loss_values = []
        self.reward_values = []

    def on_episode_end(self, episode, logs):  # Changed from 'on_epoch_end' to 'on_episode_end'
        self.loss_values.append(logs.get('loss'))
        self.reward_values.append(logs.get('episode_reward'))

In [11]:
# Initialize the callback
loss_history = LossHistory()


In [12]:
# Define the number of training steps
total_steps = 100000

# Fit the agent to the environment
history = agent.fit(
    env,
    nb_steps=total_steps,
    visualize=False,
    verbose=1,
    callbacks=[loss_history]
)

Training for 100000 steps ...
Interval 1 (0 steps performed)
25 episodes - episode_reward: -154.071 [-352.353, 31.170] - loss: 10.338 - mae: 17.561 - mean_q: 8.944

Interval 2 (10000 steps performed)
10 episodes - episode_reward: -14.473 [-243.270, 58.696] - loss: 6.516 - mae: 29.720 - mean_q: 32.894

Interval 3 (20000 steps performed)
12 episodes - episode_reward: -64.931 [-322.399, 52.351] - loss: 7.495 - mae: 33.819 - mean_q: 43.339

Interval 4 (30000 steps performed)
12 episodes - episode_reward: 2.558 [-206.494, 46.107] - loss: 7.173 - mae: 35.081 - mean_q: 45.948

Interval 5 (40000 steps performed)
10 episodes - episode_reward: -21.511 [-341.396, 67.049] - loss: 6.463 - mae: 33.472 - mean_q: 44.197

Interval 6 (50000 steps performed)
10 episodes - episode_reward: -15.468 [-142.057, 53.093] - loss: 4.048 - mae: 27.889 - mean_q: 37.102

Interval 7 (60000 steps performed)
13 episodes - episode_reward: 13.336 [-347.640, 158.845] - loss: 3.583 - mae: 26.194 - mean_q: 35.044

Interval 

In [13]:
# Collect loss and reward values from the callback
losses = loss_history.loss_values
rewards = loss_history.reward_values

In [14]:
# Save the trained model
agent.save_weights('trained_model_weights.h5', overwrite=True)

In [15]:
# Plot the learning progress
plt.figure(figsize=(10, 5))
plt.plot(range(1, len(losses) + 1), losses, label='Loss')
plt.plot(range(1, len(rewards) + 1), rewards, label='Mean Reward')
plt.xlabel('Episode')
plt.ylabel('Value')
plt.title('Learning Progress')
plt.legend()
plt.grid(True)
plt.show()

In [16]:
# Plot the loss curve
plt.plot(losses)
plt.title('Training Loss')
plt.xlabel('Episode')  # Changed from 'Epoch' to 'Episode'
plt.ylabel('Loss')
plt.show()

In [17]:
agent.load_weights("trained_model_weights.h5")

In [19]:
results = agent.test(env, nb_episodes=100, visualize=True)

Testing for 100 episodes ...
Episode 1: reward: 212.334, steps: 403
Episode 2: reward: 211.187, steps: 481
Episode 3: reward: 225.190, steps: 399
Episode 4: reward: 252.268, steps: 378
Episode 5: reward: 199.001, steps: 414
Episode 6: reward: 225.323, steps: 421
Episode 7: reward: 231.380, steps: 398
Episode 8: reward: 113.804, steps: 1000
Episode 9: reward: 117.668, steps: 1000
Episode 10: reward: 245.953, steps: 412
Episode 11: reward: 228.611, steps: 409
Episode 12: reward: 224.253, steps: 386
Episode 13: reward: 149.531, steps: 1000
Episode 14: reward: 237.134, steps: 642
Episode 15: reward: 167.427, steps: 1000
Episode 16: reward: 268.726, steps: 404
Episode 17: reward: 213.177, steps: 400
Episode 18: reward: 220.715, steps: 546
Episode 19: reward: 247.059, steps: 816
Episode 20: reward: 122.088, steps: 1000
Episode 21: reward: 251.361, steps: 338
Episode 22: reward: 272.423, steps: 429
Episode 23: reward: 228.474, steps: 387
Episode 24: reward: 253.713, steps: 514
Episode 25: rew

In [20]:
# Define a function to resize the frame
def resize_frame(frame):
    # Get the height and width of the frame
    height, width = frame.shape[:2]
    # Calculate the new height and width that are divisible by 16
    new_height = ((height - 1) // 16 + 1) * 16
    new_width = ((width - 1) // 16 + 1) * 16
    # Resize the frame to the new dimensions
    resized_frame = resize(frame, (new_height, new_width))
    return resized_frame

In [22]:
# Initialize the list to store frames
frames = []

# Reset the environment and get the initial state
state = env.reset()

# Flag to track whether the episode is done
done = False

# Initialize episode number
episode_number = 1

# Run the episode until it's done
while not done:
    # Get the current frame from the environment
    frame = env.render(mode='rgb_array')
    
    # Resize the frame to have dimensions divisible by 16
    frame_resized = resize_frame(frame)
    
    # Convert the frame to a PIL Image
    img = Image.fromarray((frame_resized * 255).astype(np.uint8))
    
    # Add text overlay for episode number every 10 episodes
    if episode_number % 10 == 0:
        draw = ImageDraw.Draw(img)
        font = ImageFont.truetype("arial.ttf", 20)
        draw.text((10, 10), f"Episode: {episode_number}", fill=(255, 255, 255), font=font)
    
    # Convert the PIL Image back to numpy array
    frame_with_text = np.array(img)
    
    # Append the resized frame with text overlay to the list of frames
    frames.append(frame_with_text)
    
    # Take a step in the environment
    action = agent.forward(state)
    state, _, done, _ = env.step(action)
    
    # Increment episode number if episode is done
    if done:
        episode_number += 1

# Save the video of the episode
imageio.mimsave("episode_video.mp4", frames, fps=30)

In [23]:
env.close()