In [1]:
# ! pip install tf_agents-nightly gymnasium tensorflow numpy

In [2]:
import os
import cv2
import matplotlib.pyplot as plt
from tf_agents.environments import wrappers
import tensorflow as tf
import numpy as np
from tf_agents.environments import suite_gym

os.environ['TF_USE_LEGACY_KERAS'] = '1'

In [3]:
def get_q_model(layers_shapes):
    input = tf.keras.Input(shape = (layers_shapes[0],))
    x = input
    for shape in layers_shapes[1:-1]:
        x = tf.keras.layers.Dense(shape, activation = 'relu')(x)
    output = tf.keras.layers.Dense(layers_shapes[-1])(x)
    model = tf.keras.Model(input, output)
    return model

In [4]:
env = suite_gym.load('Pendulum-v1')
print('Action Spec:', env.action_spec())

discrete_action_env = wrappers.ActionDiscretizeWrapper(env, num_actions=5)
print('Discretized Action Spec:', discrete_action_env.action_spec())

Action Spec: BoundedArraySpec(shape=(1,), dtype=dtype('float32'), name='action', minimum=-2.0, maximum=2.0)
Discretized Action Spec: BoundedArraySpec(shape=(), dtype=dtype('int32'), name='action', minimum=0, maximum=4)


  from pkg_resources import resource_stream, resource_exists


In [5]:
env = discrete_action_env

In [6]:
env.observation_spec()

BoundedArraySpec(shape=(3,), dtype=dtype('float32'), name='observation', minimum=[-1. -1. -8.], maximum=[1. 1. 8.])

In [7]:
env.action_spec()

BoundedArraySpec(shape=(), dtype=dtype('int32'), name='action', minimum=0, maximum=4)

In [8]:
num_states = 3
action_range = (env.action_spec().minimum, env.action_spec().maximum)
action_space = np.linspace(action_range[0], action_range[1], 5)

In [9]:
shapes = [num_states, 32, 24, len(action_space)]
q_model = get_q_model(shapes)
target_model = get_q_model(shapes)
q_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = 0.0001), loss = tf.keras.losses.MeanSquaredError())
target_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate = 0.0001), loss = tf.keras.losses.MeanSquaredError())

Importing weights for training

In [10]:
# # weights_path = "pendulum_best.weights.h5"
# weights_path = "most_trained.weights.h5"
# q_model.load_weights(weights_path)
# target_model.load_weights(weights_path)

Hyperparameters

In [11]:
episodes = 1000
buffer_size = 200
gamma = 0.98
buffer = []
epsilon = 0.3
min_eps = 0.1

Uncomment this to run training loop

In [12]:
# for i in range(episodes):
#     loss = 0
#     curr_state = env.reset()
#     action_sequence = []
#     total_reward = 0
#     while True:
#         if np.random.rand() < epsilon:
#             action = np.random.choice(action_space)
#         else:
#             action = action_space[np.argmax(q_model.predict(curr_state.observation.reshape(-1, curr_state.observation.shape[0]), verbose = 0)[0])]
#         action_sequence.append(action)
#         next_state = env.step(int(action))
#         reward = next_state.reward
        
#         total_reward += reward
#         if next_state.is_last():
#             end = True
#         else:
#             end = False
#         quadruple = (curr_state, action, reward, next_state, end)
#         if len(buffer) > buffer_size:
#             buffer.pop(0)
#         buffer.append(quadruple)
#         if next_state.is_last():
#             break
#         curr_state = next_state
        
#     sample_indices = np.random.choice(np.arange(0, buffer_size - 1), int(buffer_size/5), False)
#     sample = [buffer[i] for i in sample_indices]

#     for curr_state, action, reward, next_state, end in sample:
#         if end == True:
#             target_q_val = reward
#         else:
#             target_q_val = reward + gamma * np.max(target_model.predict(next_state.observation.reshape(-1, next_state.observation.shape[0]), verbose = 0)[0])
        
#         curr_q = q_model.predict(curr_state.observation.reshape(-1, next_state.observation.shape[0]), verbose = 0)
#         target_q = curr_q.copy()
#         target_q[0][np.where(action_space == action)] = target_q_val

#         history = q_model.fit(curr_state.observation.reshape(-1, curr_state.observation.shape[0]), target_q, epochs = 1, verbose = 0)
#         loss += history.history["loss"][0]
    
#     if i%5 == 0:
#         target_model.set_weights(q_model.get_weights())
#     epsilon = min(epsilon * 0.9, min_eps)
#     print(i, "Loss", loss)
#     print("Action_sequence", action_sequence)
#     print("No. of actions", len(action_sequence))
#     print("reward", total_reward)
        

Saving the trained model

In [13]:
# target_model.save_weights("most_trained.weights.h5")

Loading the trained weights

In [14]:
weights_path = "most_trained.weights.h5"
q_model.load_weights(weights_path)
target_model.load_weights(weights_path)

  saveable.load_own_variables(weights_store.get(inner_path))


Running the game

In [15]:
env = suite_gym.load('Pendulum-v1')

discrete_action_env = wrappers.ActionDiscretizeWrapper(env, num_actions=5)
env = discrete_action_env

observation = env.reset()
done = False
total_reward = 0
action_history = []
frames = []

for i in range(200):
    action = action_space[np.argmax(q_model.predict(observation.observation.reshape(-1, observation.observation.shape[0]), verbose=0)[0])]
    action_history.append(action)
    
    observation = env.step(int(action))
    total_reward += observation.reward
    frames.append(env.render(mode='rgb_array'))

env.close()
print(f"Total reward: {total_reward}")




Total reward: -132.59016183391213


Saving the video

In [16]:
def create_video_from_frames(frames, output_path, fps):
    height, width, layers = frames[0].shape
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    for frame in frames:
        out.write(frame)
    
    out.release()

output_path = 'new_output_video.mp4'
fps = 30  

create_video_from_frames(frames, output_path, fps)
