In [None]:
pip install torch torchvision numpy matplotlib gym

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gym
import matplotlib.pyplot as plt
import random
from collections import deque
import os

# Set seeds for reproducibility
def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    random.seed(seed)

class OrnsteinUhlenbeckNoise:
    def __init__(self, size, mu=0., theta=0.15, sigma=0.2):
        self.mu = mu * np.ones(size)
        self.theta = theta
        self.sigma = sigma
        self.reset()

    def reset(self):
        self.state = np.copy(self.mu)

    def sample(self):
        dx = self.theta * (self.mu - self.state) + self.sigma * np.random.randn(len(self.state))
        self.state += dx
        return self.state

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.layer_1 = nn.Linear(state_dim, 64)
        self.layer_2 = nn.Linear(64, 64)
        self.layer_3 = nn.Linear(64, action_dim)
        self.max_action = max_action

    def forward(self, x):
        x = torch.relu(self.layer_1(x))
        x = torch.relu(self.layer_2(x))
        x = torch.tanh(self.layer_3(x))
        return x * self.max_action

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.layer_1 = nn.Linear(state_dim + action_dim, 64)
        self.layer_2 = nn.Linear(64, 64)
        self.layer_3 = nn.Linear(64, 1)

    def forward(self, x, u):
        xu = torch.cat([x, u], 1)
        x1 = torch.relu(self.layer_1(xu))
        x1 = torch.relu(self.layer_2(x1))
        x1 = self.layer_3(x1)
        return x1

class DDPG:
    def __init__(self, state_dim, action_dim, max_action, gamma=0.99):
        self.actor = Actor(state_dim, action_dim, max_action)
        self.actor_target = Actor(state_dim, action_dim, max_action)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-3)

        self.critic = Critic(state_dim, action_dim)
        self.critic_target = Critic(state_dim, action_dim)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

        self.replay_buffer = ReplayBuffer(int(1e6))
        self.noise = OrnsteinUhlenbeckNoise(action_dim)

        self.gamma = gamma
        self.tau = 0.005
        self.batch_size = 64

    def select_action(self, state, add_noise=True):
        state = torch.FloatTensor(state.reshape(1, -1))
        action = self.actor(state).cpu().data.numpy().flatten()
        if add_noise:
            action += self.noise.sample()
        return np.clip(action, -1, 1)

    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        # Sample batch
        state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)

        state = torch.FloatTensor(state)
        action = torch.FloatTensor(action)
        reward = torch.FloatTensor(reward).reshape(-1, 1)
        next_state = torch.FloatTensor(next_state)
        done = torch.FloatTensor(done).reshape(-1, 1)

        # Compute target Q value
        next_action = self.actor_target(next_state)
        target_Q = self.critic_target(next_state, next_action)
        target_Q = reward + (1 - done) * self.gamma * target_Q

        # Compute current Q value
        current_Q = self.critic(state, action)

        # Compute critic loss
        critic_loss = nn.MSELoss()(current_Q, target_Q.detach())

        # Optimize critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Compute actor loss
        actor_loss = -self.critic(state, self.actor(state)).mean()

        # Optimize actor
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Update target networks
        for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

        for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

def evaluate_policy(agent, env, eval_episodes=10):
    """Evaluate the policy for several episodes without exploration noise"""
    avg_reward = 0
    for _ in range(eval_episodes):
        state = env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = agent.select_action(state, add_noise=False)
            state, reward, done, _ = env.step(action)
            episode_reward += reward
        avg_reward += episode_reward

    return avg_reward / eval_episodes

def train_ddpg(env_name="CartPole-v1", gamma=0.99, max_episodes=500, seed=42):
    """Train DDPG agent on specified environment"""
    set_seed(seed)

    # Load continuous CartPole environment
    # Note: You'll need to use the continuous version from the provided link
    env = gym.make(env_name)

    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0] if hasattr(env.action_space, 'shape') else 1
    max_action = float(env.action_space.high[0]) if hasattr(env.action_space, 'high') else 1.0

    agent = DDPG(state_dim, action_dim, max_action, gamma=gamma)

    episode_rewards = []
    evaluation_rewards = []

    for episode in range(max_episodes):
        state = env.reset()
        agent.noise.reset()
        episode_reward = 0
        done = False

        while not done:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)

            agent.replay_buffer.push(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward

            # Train the agent
            agent.train()

        episode_rewards.append(episode_reward)

        # Evaluate policy every 10 episodes
        if episode % 10 == 0:
            eval_reward = evaluate_policy(agent, env)
            evaluation_rewards.append(eval_reward)
            print(f"Episode {episode}, Evaluation Reward: {eval_reward:.2f}")

    return evaluation_rewards, episode_rewards

def run_gamma_experiments():
    """Run experiments with different gamma values"""
    gamma_values = [0.5, 0.7, 0.9, 0.99]
    all_results = {}

    for gamma in gamma_values:
        print(f"\nRunning experiments with gamma = {gamma}")
        gamma_results = []

        # Run 10 independent experiments
        for seed in range(10):
            print(f"  Run {seed + 1}/10")
            eval_rewards, _ = train_ddpg(gamma=gamma, seed=seed)
            gamma_results.append(eval_rewards)

        all_results[gamma] = gamma_results

    return all_results

def plot_results(all_results):
    """Plot learning curves for different gamma values"""
    plt.figure(figsize=(12, 8))

    for gamma, results in all_results.items():
        # Average across all runs
        max_len = max(len(run) for run in results)
        padded_results = []

        for run in results:
            padded = run + [run[-1]] * (max_len - len(run))
            padded_results.append(padded)

        avg_rewards = np.mean(padded_results, axis=0)
        std_rewards = np.std(padded_results, axis=0)
        episodes = np.arange(0, len(avg_rewards) * 10, 10)

        plt.plot(episodes, avg_rewards, label=f'γ = {gamma}', linewidth=2)
        plt.fill_between(episodes, avg_rewards - std_rewards, avg_rewards + std_rewards, alpha=0.2)

    plt.xlabel('Training Episodes')
    plt.ylabel('Average Return (Evaluation)')
    plt.title('DDPG Learning Performance on CartPole with Different Discount Factors')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig('ddpg_gamma_comparison.png', dpi=300, bbox_inches='tight')
    plt.show()

if __name__ == "__main__":
    # Run the experiments
    print("Starting DDPG experiments with different gamma values...")
    results = run_gamma_experiments()

    # Plot the results
    plot_results(results)

    print("\nExperiment completed! Results saved as 'ddpg_gamma_comparison.png'")
