# Training Model

In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from tqdm import tqdm
import numpy as np

#Actor network
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, action_dim)
        self.max_action = max_action


    def forward(self, state):
        state = state.to(torch.device("cuda"))
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return torch.tanh(self.fc3(x)) * self.max_action 

#Critic network
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)

        self.fc4 = nn.Linear(state_dim + action_dim, 256)
        self.fc5 = nn.Linear(256, 256)
        self.fc6 = nn.Linear(256, 1)

    def forward(self, state, action):
        state_action = torch.cat([state, action], dim=1).to(torch.device("cuda"))

        q1 = F.relu(self.fc1(state_action))
        q1 = F.relu(self.fc2(q1))
        q1 = self.fc3(q1)

        q2 = F.relu(self.fc4(state_action))
        q2 = F.relu(self.fc5(q2))
        q2 = self.fc6(q2)

        return q1, q2

# TD3 Agent
class TD3Agent:
    def __init__(self, state_dim, action_dim, max_action, gamma=0.99, tau=0.005, actor_lr=3e-4, critic_lr=3e-4):
        self.gamma = gamma
        self.tau = tau
        self.max_action = max_action

        self.actor = Actor(state_dim, action_dim, max_action).to(torch.device("cuda"))
        self.actor_target = Actor(state_dim, action_dim, max_action).to(torch.device("cuda"))
        self.critic = Critic(state_dim, action_dim).to(torch.device("cuda"))
        self.critic_target = Critic(state_dim, action_dim).to(torch.device("cuda"))

        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

        self.replay_buffer = []
        self.buffer_capacity = 2000000  
        self.batch_size = 128  
        self.policy_delay = 2
        self.policy_update_step = 0

        self.exploration_noise = 0.3 


    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(torch.device("cuda"))
        action = self.actor(state).cpu().data.numpy().flatten()


        if random.random() < 0.1:  
            action += np.array([0.5, 0.5, 0.5])  

        noise = np.random.normal(0, self.exploration_noise * self.max_action, size=action.shape)
        action = np.clip(action + noise, -self.max_action, self.max_action)

        # Decay noise over time
        self.exploration_noise = max(0.1, self.exploration_noise * 0.999)

        return action



    def store_transition(self, state, action, reward, next_state, done):
        if len(self.replay_buffer) >= self.buffer_capacity:
            self.replay_buffer.pop(0)
        self.replay_buffer.append((state, action, reward, next_state, float(done)))

    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        batch = random.sample(self.replay_buffer, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(torch.device("cuda"))
        actions = torch.FloatTensor(actions).to(torch.device("cuda"))
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(torch.device("cuda"))
        next_states = torch.FloatTensor(next_states).to(torch.device("cuda"))
        dones = torch.FloatTensor(dones).unsqueeze(1).to(torch.device("cuda"))

        next_actions = (self.actor_target(next_states)).clamp(-self.max_action, self.max_action)

        q1_target, q2_target = self.critic_target(next_states, next_actions)
        q_target = rewards + (1 - dones) * self.gamma * torch.min(q1_target, q2_target).detach()

        q1, q2 = self.critic(states, actions)
        critic_loss = F.mse_loss(q1, q_target) + F.mse_loss(q2, q_target)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        if self.policy_update_step % self.policy_delay == 0:
            actor_loss = -self.critic(states, self.actor(states))[0].mean()
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            self._soft_update(self.actor, self.actor_target)
            self._soft_update(self.critic, self.critic_target)

        self.policy_update_step += 1

    def _soft_update(self, local_model, target_model):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)

# Custom Reward Function
def custom_reward(state, next_state, action):
    velocity_x = next_state[5]  # Forward velocity
    height = next_state[0]  # Hopper height
    torso_angle = next_state[1]  # Torso angle
    hip_angle = next_state[2]  
    knee_angle = next_state[3]  
    ankle_angle = next_state[4]
    hinge_angle = next_state[8]  
    torso_angular_velocity = next_state[6]  # Angular velocity of torso
    

    energy_penalty = -0.01 * np.square(action).sum()
    forward_reward = 8.0 * velocity_x  
    jump_reward = 10.0 * (height - 0.5) + 5.0 * np.exp(-5 * (height - 1.2)**2) 
    hinge_extension_reward = 2.5 * (30 - hinge_angle) 
    height_penalty = -300.0 if height < 0.4 else -50.0 if height < 0.5 else 0.0  
    torso_penalty = -100.0 * abs(torso_angle) if abs(torso_angle) > 0.4 else 0.0  
    angular_velocity_penalty = -20.0 * abs(torso_angular_velocity)
    knee_movement_reward = 15.0 * abs(knee_angle - 0.6)  
    hip_penalty = -40.0 * max(0, abs(hip_angle) - 1.0)  
    knee_penalty = -40.0 * max(0, abs(knee_angle) - 1.2)  
    ankle_penalty = -40.0 * max(0, abs(ankle_angle) - 0.8)  

    # Final Reward Calculation
    return (forward_reward + jump_reward + hinge_extension_reward + height_penalty +
            torso_penalty + angular_velocity_penalty + hip_penalty + knee_penalty + 
            ankle_penalty + energy_penalty + knee_movement_reward)





# Environment Setup
env = gym.make(
    "Hopper-v4",
    render_mode="human",
    forward_reward_weight=10.0,  # Increase forward movement incentive
    ctrl_cost_weight=0.005,  # Reduce action penalty to encourage bigger movements
    healthy_reward=2.0,  # Small bonus for staying upright
    terminate_when_unhealthy=False,  # End episode if it falls
)


state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
agent = TD3Agent(state_dim, action_dim, max_action)

# Training
num_episodes = 10000
save_interval = num_episodes // 10  # Save every 10% of training
for episode in tqdm(range(num_episodes)):
    state, _ = env.reset()
    done = False
    episode_reward = 0

    while not done:
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        reward = custom_reward(state, next_state, action)
        done = terminated or truncated or (next_state[5] < 0.1 and next_state[0] < 0.5)  # Reset if stopped or fallen
        agent.store_transition(state, action, reward, next_state, done)
        agent.train()
        state = next_state
        episode_reward += reward

    print(f"Episode {episode}, Reward: {episode_reward:.2f}")
    if (episode + 1) % save_interval == 0:
        torch.save(agent.actor.state_dict(), f"td3_actor_{episode+1}.pth")
        torch.save(agent.critic.state_dict(), f"td3_critic_{episode+1}.pth")
        print(f"Model saved at episode {episode+1}")

# Testing the model

In [None]:
import sys
import importlib

spec = importlib.util.spec_from_file_location("Actor", r"C:\Users\______\Downloads\CODE\Custom_reward_td3.py")
module = importlib.util.module_from_spec(spec)
sys.modules["Actor"] = module
spec.loader.exec_module(module)
Actor = module.Actor

# Load environment
env = gym.make(
    "Hopper-v5",
    render_mode="human",
    forward_reward_weight=10.0,
    ctrl_cost_weight=0.005,
    healthy_reward=2.0,
    terminate_when_unhealthy=False,
)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])

# Load trained actor
actor = Actor(state_dim, action_dim, max_action).to(torch.device("cuda"))
actor.load_state_dict(torch.load(r"C:\Users\_____\Downloads\results\td3_actor_1000.pth")) 
actor.eval()


num_test_episodes = 10  
total_rewards = []

for episode in tqdm(range(num_test_episodes)):
    state, _ = env.reset()
    done = False
    episode_reward = 0

    while not done:
        state = torch.FloatTensor(state).unsqueeze(0).to(torch.device("cuda"))
        action = actor(state).cpu().data.numpy().flatten()
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        state = next_state
        episode_reward += reward
        env.render()

    total_rewards.append(episode_reward)
    print(f"Episode {episode+1}: Reward = {episode_reward:.2f}")

# Compute and display average performance
avg_reward = np.mean(total_rewards)
print(f"\nAverage Reward over {num_test_episodes} episodes: {avg_reward:.2f}")
env.close()