In [None]:
pip install gym




In [None]:
!pip install -q swig
!pip install -q gymnasium[box2d]

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.8/953.8 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone


In [None]:
import os
import cv2 as cv
import numpy as np

class capture_video():
    def __init__(self,video_lim=4,dim=(600,600)):

        self.video_num = {}
        self.images = [] # images will be stored here

        self.backup_limit = video_lim # we will backup upto no of video given in video limit
        self.dim = dim # dim of the stored video

    def store_video(self,fps=30,model_name='PPO1'):

        self.model_name = model_name
        if model_name not in self.video_num.keys(): self.video_num[model_name] = 0 # creating the key to store no of videos stored in specific name

        if not os.path.exists(f'./video/{self.model_name}/'): os.mkdir(f'./video/{self.model_name}/') # creating a dir if the dir doesn't exists

        video_converter =  cv.VideoWriter_fourcc(*'mp4v') # creating video_writer
        video = cv.VideoWriter(f'./video/{self.model_name}/video-{self.video_num[model_name]}.avi',video_converter,fps,self.dim)


        if len(self.images) == 0 : print(f" can't store videos as no images has been stored")
        else: # storing the video
            [video.write(np.array(image)) for image in self.images]
            video.release()
            print(f' video n=in folder {model_name} and number {self.video_num[model_name]} has been stored')
            self.video_num[model_name] = (self.video_num[model_name] % 4) + 1

    def clear_images(self): # erasing the images
        self.images = []

    def clear_memory(self):
        self.video_num = {}
        self.images = []

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class ActorCritic(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim=64):
        super(ActorCritic, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim),
            nn.Softmax(dim=-1)
        )
        self.critic = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        policy = self.actor(x)
        value = self.critic(x)
        return policy, value


In [None]:
import gym

def compute_returns(rewards, gamma=0.9):
    returns = []
    discounted_reward = 0
    for r in reversed(rewards):
        discounted_reward = r + gamma * discounted_reward
        returns.insert(0, discounted_reward)
    return returns

def a2c(env_name='LunarLander-v2', num_episodes=10000, max_steps=1000, learning_rate=1e-3, gamma=0.9):
    env = gym.make(env_name)
    input_dim = env.observation_space.shape[0]
    output_dim = env.action_space.n

    model = ActorCritic(input_dim, output_dim)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for episode in range(num_episodes):
        state = env.reset()
        log_probs = []
        values = []
        rewards = []
        sum = 0

        for step in range(max_steps):

            state = torch.FloatTensor(state)
            policy, value = model(state)
            action = torch.multinomial(policy, 1).item()
            next_state, reward, done, _ = env.step(action)

            log_prob = torch.log(policy[action])
            log_probs.append(log_prob)
            values.append(value)
            rewards.append(reward)
            sum+=reward
            state = next_state

            if done:
                break

        returns = compute_returns(rewards, gamma)
        log_probs = torch.stack(log_probs)
        values = torch.stack(values)
        returns = torch.FloatTensor(returns)

        advantage = returns - values
        actor_loss = -(log_probs * advantage.detach()).mean()
        critic_loss = advantage.pow(2).mean()

        loss = actor_loss + critic_loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if episode % 10 == 0:
            print(f"Episode {episode}, Loss: {loss.item()}, Rewards = {sum}")
    env.close()
    return model

if __name__ == "__main__":
    a2c()


Episode 0, Loss: 445.5770263671875, Rewards = -183.17197112170751
Episode 10, Loss: 454.6297607421875, Rewards = -132.56917525154938
Episode 20, Loss: 562.0738525390625, Rewards = -65.39091641964544
Episode 30, Loss: 402.86053466796875, Rewards = -119.1914665885368
Episode 40, Loss: 322.295654296875, Rewards = -88.08740283490634
Episode 50, Loss: 307.3476867675781, Rewards = -102.21511549103609
Episode 60, Loss: 384.0231628417969, Rewards = -90.30188716861196
Episode 70, Loss: 1112.3878173828125, Rewards = -316.4326684118374
Episode 80, Loss: 275.2765808105469, Rewards = 40.89660670941653
Episode 90, Loss: 296.0479431152344, Rewards = -199.16278771784863
Episode 100, Loss: 1199.486572265625, Rewards = -350.7816162063899
Episode 110, Loss: 668.658203125, Rewards = -46.1979379570313
Episode 120, Loss: 331.2369384765625, Rewards = -101.83241285041444
Episode 130, Loss: 657.548583984375, Rewards = -269.80027313582195
Episode 140, Loss: 415.19158935546875, Rewards = -354.17901640725427
Epis

In [None]:
final_model= a2c()

Episode 0, Loss: 2979.094970703125, Rewards = -237.82106632534857
Episode 10, Loss: 1337.2562255859375, Rewards = -340.77330986182324
Episode 20, Loss: 880.9132080078125, Rewards = -150.5912389576531
Episode 30, Loss: 662.7376708984375, Rewards = -228.0742497603149
Episode 40, Loss: 465.0077819824219, Rewards = -119.56937298375226
Episode 50, Loss: 394.7176513671875, Rewards = -103.64194377715857
Episode 60, Loss: 520.2841796875, Rewards = -128.41160493037933
Episode 70, Loss: 554.1443481445312, Rewards = -129.75239259876426
Episode 80, Loss: 664.8717041015625, Rewards = -278.88160411032493
Episode 90, Loss: 1806.6627197265625, Rewards = -431.18713562378
Episode 100, Loss: 517.2161865234375, Rewards = -69.90422107496009
Episode 110, Loss: 2276.50048828125, Rewards = -440.8977235918449
Episode 120, Loss: 394.373779296875, Rewards = -125.82979776890664
Episode 130, Loss: 370.3176574707031, Rewards = -86.25501834801736
Episode 140, Loss: 389.21978759765625, Rewards = -118.43193787873294
E

KeyboardInterrupt: ignored

In [None]:
 import matplotlib.pyplot as plt

images=[]
sum = 0
env = gym.make('LunarLander-v2')
state = env.reset()
state = torch.FloatTensor(state)
policy, value = final_model(state)

for steps in range(1000):
    #img = env.render(mode='rgb_array')
    #images.append(img)

    # Find the index of the action with the highest probability
    best_action_index = torch.argmax(policy)

    # Convert the index tensor to a Python integer
    best_action = best_action_index.item()

    print("Best Action Index:", best_action_index)
    print("Best Action:", best_action)


    next_state, reward, done, _ = env.step(best_action)
    sum = sum + reward
    state = next_state
    state = torch.FloatTensor(state)
    policy, value = final_model(state)
    print(policy, state)

    if done:
      break
    print(sum)

In [None]:
from matplotlib import animation
def save_frames_as_gif(frames, path='./', filename='gym_animation.gif', fps=30, dpi=72):

    #Mess with this to change frame size
    plt.figure(figsize=(frames[0].shape[1] / dpi, frames[0].shape[0] / dpi), dpi=dpi)

    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frames), interval=50)
    anim.save(path + filename, writer='ffmpeg', fps=fps)

In [None]:
save_frames_as_gif(images, filename='Lunar_lander_video.mp4', fps=2)