In [None]:
import math
import random

import gym
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import cv2
import matplotlib.pyplot as plt


%matplotlib inline

use_cuda = torch.cuda.is_available()
device   = torch.device("cuda" if use_cuda else "cpu")

In [None]:
torch.cuda.is_available()

In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, init_w=3e-3, log_std_min=-20, log_std_max=2):
        super(PolicyNetwork, self).__init__()
        
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        
        self.mean_linear = nn.Linear(hidden_size, num_actions)
        self.mean_linear.weight.data.uniform_(-init_w, init_w)
        self.mean_linear.bias.data.uniform_(-init_w, init_w)
        
        self.log_std_linear = nn.Linear(hidden_size, num_actions)
        self.log_std_linear.weight.data.uniform_(-init_w, init_w)
        self.log_std_linear.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        
        mean    = self.mean_linear(x)
        log_std = self.log_std_linear(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
        
        return mean, log_std
    
    def evaluate(self, state, epsilon=1e-6):
        mean, log_std = self.forward(state)
        std = log_std.exp()
        
        normal = Normal(0, 1)
        z      = normal.sample()
        action = torch.tanh(mean+ std*z.to(device))
        log_prob = Normal(mean, std).log_prob(mean+ std*z.to(device)) - torch.log(1 - action.pow(2) + epsilon)
        return action, log_prob, z, mean, log_std
        
    
    def get_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        mean, log_std = self.forward(state)
        std = log_std.exp()
        
        normal = Normal(0, 1)
        z      = normal.sample().to(device)
        action = torch.tanh(mean + std*z)
        
        action  = action.cpu()#.detach().cpu().numpy()
        return action[0]

In [None]:
env = gym.make("Walker2d-v2")

action_dim = env.action_space.shape[0]
state_dim  = env.observation_space.shape[0]
hidden_dim = 256
# Define the neural network
policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)# Define input_size and output_size accordingly

# Load the saved state dictionary
policy_net.load_state_dict(torch.load('policy_net.pth'))

# Test the loaded model
num_episodes = 100  # Number of evaluation episodes
results=[]
for episode in range(num_episodes):
        state = env.reset()
        done = False
        cumulative_reward = 0
        
        while not done:
            action = policy_net.get_action(state).detach()
            next_state, reward, done, _ = env.step(action)
            state = next_state
            cumulative_reward += reward
            
        results.append(cumulative_reward)




In [None]:
print(f"Average Return for sac: {np.mean(results)}")
print(f"Standard Deviation of Return for sac: {np.std(results)}")
print(f"Variance of Return for sac: {np.var(results)}")

In [None]:
perturbation_scale = 0.1
perturbed_rewards = []

for episode in range(num_episodes):
    state = env.reset()
    done = False
    cumulative_reward = 0
    
    while not done:
        action = policy_net.get_action(state).detach()
        action += np.random.normal(0, perturbation_scale, size=action.shape)  # Perturb the action
        next_state, reward, done, _ = env.step(action)
        state = next_state
        cumulative_reward += reward
    
    perturbed_rewards.append(cumulative_reward)

perturbed_rewards = np.array(perturbed_rewards)
print(f"Average Return under Perturbation for sac: {np.mean(perturbed_rewards)}")
print(f"Standard Deviation under Perturbation for sac: {np.std(perturbed_rewards)}")


In [None]:
rewards= np.load('sac_reward.npy')
times=np.load('sac_time.npy')

In [None]:
early_performance = np.mean(rewards[:int(len(rewards) * 0.1)])
print(f"Early Performance for sac: {early_performance}")


late_performance = rewards[-int(len(rewards) * 0.1):]
variance_late_performance = np.var(late_performance)
print(f"Variance of Late Performance for sac: {variance_late_performance}")


total_time = sum(times)
average_time_per_episode = np.mean(times)
print(f"Total Training Time for sac: {total_time} seconds")
print(f"Average Time per Episode for sac: {average_time_per_episode} seconds")


In [None]:
plt.figure(figsize=(12, 6))
plt.plot(rewards[:1000])
plt.xlabel('Episodes')
plt.ylabel('Rewards')
plt.title('Episode vs Reward')
plt.show()

In [None]:
env = gym.make('Walker2d-v2')  
frames = []


state = env.reset()
done = False
cumulative_reward = 0

while not done:

    frame = env.render(mode='rgb_array')
    frames.append(frame)


    action = policy_net.get_action(state).detach()
    next_state, reward, done, _ = env.step(action)
    state = next_state
    cumulative_reward += reward


env.close()

output_video_path = 'sac_output_video.avi'
frame_height, frame_width, _ = frames[0].shape
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(output_video_path, fourcc, 30, (frame_width, frame_height))


for frame in frames:
    video_writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))

video_writer.release()
print(f"Video saved to {output_video_path}")
