In [31]:
import os
import random
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
import torch.optim.lr_scheduler as lr_scheduler
from torch.autograd import Variable
from collections import deque, namedtuple

In [32]:
#! pip install swig
#! pip install gymnasium
#! pip install gymnasium[box2d]

In [33]:
env = gym.make("LunarLander-v3", gravity=-10, continuous=False,
               enable_wind=False, wind_power=15.0, turbulence_power=1.5)

print("Observation space:", env.observation_space)
print("Action space:", env.action_space)



Observation space: Box([ -2.5        -2.5       -10.        -10.         -6.2831855 -10.
  -0.         -0.       ], [ 2.5        2.5       10.        10.         6.2831855 10.
  1.         1.       ], (8,), float32)
Action space: Discrete(4)


In [34]:
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n

In [35]:
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


In [36]:

def random_policy(env, state):
    return env.action_space.sample()

state = env.reset()[0]

for counter in range(201):

    env.render()

    #select the action according to the given policy
    action = random_policy(env, state)
    
    #perform the action and store the next state information
    next_state, reward, done, truncated, info = env.step(action)

    if counter % 10 == 0:
        print("Step:", counter, next_state, reward, done, info)
        
env.close()

Step: 0 [-0.00477858  1.4176927  -0.23491082  0.13790031  0.00331049  0.00965768
  0.          0.        ] 1.9370168284079352 False {}
Step: 10 [-0.02903976  1.4460834  -0.2313669   0.11906596 -0.01922764 -0.11525144
  0.          0.        ] 1.1602144880147318 False {}
Step: 20 [-0.05325594  1.4628901  -0.25605923 -0.00361128 -0.04051466  0.05782334
  0.          0.        ] -0.4027076212859402 False {}
Step: 30 [-7.8324124e-02  1.4633204e+00 -2.6399714e-01  5.1563990e-04
 -5.3999264e-02  1.8473180e-02  0.0000000e+00  0.0000000e+00] -0.6990777263956136 False {}
Step: 40 [-0.10413218  1.4427974  -0.25605658 -0.1644567  -0.0668797  -0.08375414
  0.          0.        ] -1.4230373123607762 False {}
Step: 50 [-0.12919465  1.3734735  -0.24982949 -0.40441695 -0.1339107  -0.12467974
  0.          0.        ] -0.19841705168325346 False {}
Step: 60 [-0.15450153  1.261566   -0.23339967 -0.5677779  -0.13064271  0.03435066
  0.          0.        ] -0.7459743547640858 False {}
Step: 70 [-0.177905

  gym.logger.warn(


In [48]:
seed_value = 42

env = gym.make("LunarLander-v3", gravity=-10, continuous=True,
               enable_wind=False, wind_power=15.0, turbulence_power=1.5)
env.reset(seed=seed_value)

render_env = gym.make("LunarLander-v3", gravity=-10, continuous=True,
                      enable_wind=False, wind_power=15.0, turbulence_power=1.5, render_mode='human')
render_env.reset(seed=seed_value)

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
action_low = float(env.action_space.low[0])
action_high = float(env.action_space.high[0])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [38]:
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.out = nn.Linear(256, action_dim)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return torch.tanh(self.out(x))  # Output in [-1, 1]

In [39]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.out = nn.Linear(256, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.out(x)

In [40]:
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, *args):
        self.buffer.append(tuple(map(np.array, args)))

    def sample(self, batch_size):
        samples = random.sample(self.buffer, batch_size)
        return map(lambda x: torch.FloatTensor(np.vstack(x)).to(device), zip(*samples))

    def __len__(self):
        return len(self.buffer)

In [41]:
class DDPG:
    def __init__(self):
        self.actor = Actor(state_dim, action_dim).to(device)
        self.actor_target = Actor(state_dim, action_dim).to(device)
        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-3)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-2)
        self.memory = ReplayBuffer()
        self.gamma = 0.99
        self.tau = 0.005

    def select_action(self, state, noise=0.1):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = self.actor(state).detach().cpu().numpy()[0]
        action = action + noise * np.random.randn(action_dim)
        return np.clip(action, action_low, action_high)

    def train(self, batch_size=64):
        if len(self.memory) < batch_size:
            return
        states, actions, rewards, next_states, dones = self.memory.sample(batch_size)

        # Critic loss
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_Q = self.critic_target(next_states, next_actions)
            target_Q = rewards + self.gamma * (1 - dones) * target_Q
        current_Q = self.critic(states, actions)
        critic_loss = F.mse_loss(current_Q, target_Q)

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Actor loss
        actor_loss = -self.critic(states, self.actor(states)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Soft update
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

In [42]:
agent = DDPG()
episodes = 1000
timesteps = 300
initial_noise = 0.5
final_noise = 0.025
noise_decay = 0.995


In [43]:
for ep in range(episodes):
    state = env.reset()[0]
    episode_reward = 0
    noise = max(final_noise, initial_noise * (noise_decay ** ep))
    for t in range(timesteps):
        action = agent.select_action(state, noise=noise)
        next_state, reward, done, _, _ = env.step(action)
        agent.memory.push(state, action, reward, next_state, float(done))
        agent.train()
        state = next_state
        episode_reward += reward
        if done:
            break
    print(f"Episode {ep+1}, Reward: {episode_reward:.2f}, Noise: {noise:.3f}")

env.close()

Episode 1, Reward: -291.89, Noise: 0.500
Episode 2, Reward: -351.94, Noise: 0.497
Episode 3, Reward: -580.06, Noise: 0.495
Episode 4, Reward: -1271.77, Noise: 0.493
Episode 5, Reward: -684.11, Noise: 0.490
Episode 6, Reward: -578.00, Noise: 0.488
Episode 7, Reward: -564.58, Noise: 0.485
Episode 8, Reward: -372.41, Noise: 0.483
Episode 9, Reward: -533.33, Noise: 0.480
Episode 10, Reward: -306.94, Noise: 0.478
Episode 11, Reward: -24.56, Noise: 0.476
Episode 12, Reward: -318.31, Noise: 0.473
Episode 13, Reward: 41.90, Noise: 0.471
Episode 14, Reward: -61.94, Noise: 0.468
Episode 15, Reward: 15.41, Noise: 0.466
Episode 16, Reward: -84.34, Noise: 0.464
Episode 17, Reward: -43.47, Noise: 0.461
Episode 18, Reward: 6.83, Noise: 0.459
Episode 19, Reward: -117.41, Noise: 0.457
Episode 20, Reward: -47.83, Noise: 0.455
Episode 21, Reward: 89.05, Noise: 0.452
Episode 22, Reward: -22.60, Noise: 0.450
Episode 23, Reward: 38.84, Noise: 0.448
Episode 24, Reward: 62.43, Noise: 0.446
Episode 25, Reward:

In [44]:
env.close()

In [49]:
def test_agent(agent, env, episodes=5, max_steps=300):
    for ep in range(episodes):
        state = env.reset()[0]
        done = False
        episode_reward = 0
        step_count = 0
        while not done and step_count < max_steps:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)
            with torch.no_grad():
                action = agent.actor(state_tensor).cpu().numpy()[0]
            state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            step_count += 1
        print(f"Test Episode {ep+1}, Reward: {episode_reward:.2f}")
    render_env.close()


test_agent(agent, render_env)

Test Episode 1, Reward: 275.15
Test Episode 2, Reward: 276.20
Test Episode 3, Reward: 284.89
Test Episode 4, Reward: 242.69
Test Episode 5, Reward: 283.53


In [46]:
render_env.close()