### Importing the libraries

In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import deque,namedtuple

### Creating architecture of Neural Network

In [2]:
class Network(nn.Module):
    """
    A neural network for reinforcement learning in a lander environment.
    
    state_size = (x, y) of the lander, (x, y) of landing position,
                 (angle, angular velocity), (left leg, right leg) contact (boolean)
                 
    action_size = 4 possible actions (nothing, left, right, down)
    """

    def __init__(self, state_size, action_size, seed=42):
        
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)

        self.fc1 = nn.Linear(state_size, 64)   # Input layer
        self.fc2 = nn.Linear(64, 64)           # Hidden layer
        self.fc3 = nn.Linear(64, action_size)  # Output layer
        

    def forward(self, state):
        """
         fc1 will create a first layer with 8 inputs and 64 outputs and weights , biases are also initialised randomly within
         
         fc1(state) will take a tensor of shape (batch_size, state_size) and provide a output Y = state*weights + bias
        """
        
        y = self.fc1(state)          # First layer + ReLU
        y = F.relu(y)
        
        y = self.fc2(y)              # Second layer + ReLU
        y = F.relu(y)
        
        y = self.fc3(y)              # Output layer (no activation for raw Q-values)
        return y


## Training the AI

### Setting up environment

In [3]:
import gymnasium as gym

In [4]:
# env = gym.make("LunarLander-v3")
env = gym.make("LunarLander-v3", continuous=False, gravity=-10.0,
               enable_wind=False, wind_power=15.0, turbulence_power=1.5)

state_shape = env.observation_space.shape   #(8, )
state_size = env.observation_space.shape[0] # 8
number_actions = env.action_space.n         # 4


### Initializing hyperparameter

In [5]:
learning_rate = 5e-4

"""
Controls how quickly the agent updates its model (e.g., neural network weights).
A smaller value (like 0.0005) means slower but more stable learning.
Too high: risk of overshooting optimal solutions; too low: slow convergence.
"""



batch_size = 100

""" Number of experiences sampled from the replay buffer for each training step.
A larger batch size (like 100) provides more stable updates but requires more memory.
Smaller batches may lead to noisier updates but faster training per step."""


discount_factor = 0.99

""" Determines how much the agent values future rewards compared to immediate rewards.
A value of 0.99 means the agent prioritizes long-term rewards (future-focused).
Lower values (e.g., 0.9) make the agent focus more on short-term rewards."""


replay_buffer_size = 100000   

"""Maximum number of experiences stored in the replay buffer.
Experiences are used to train the agent by sampling from this buffer.
A larger buffer (like 100,000) allows for more diverse training data but requires more memory.
Older experiences are replaced when the buffer is full."""


interpolation_parameter = 0.001 

"""
Used in algorithms with target networks (e.g., DQN).
Controls how much the target network's weights are updated toward the main network's weights.
A small value (like 0.001) ensures slow, stable updates to the target network.
This is also known as "soft updating" and helps maintain training stability.
"""

print("")




### Implementing Experience Replay

In [6]:
class ReplayMemory:
    
    def __init__(self, capacity):
        # if gpu is avalibale or not (just for speeding up processes)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        
        self.capacity = capacity
        self.memory = []            # stores reward, state, next_state, action 
        
    def push(self, experience):
        
        if len(self.memory) >= self.capacity:
            
            del self.memory[0]     # remove oldest experience
            
        self.memory.append(experience)
            
    
    def sample(self, batch_size):
        
        experiences = random.sample(self.memory, k = batch_size)
        
        """experiences will have :
           index 0 = state
           index 1 = action
           index 2 = reward
           index 3 = next_state
           index 4 = done(boolean)
        """
        
        # convert np to torch tensor and add to computing power ie gpu or cpu
        
        states = np.vstack([e[0] for e in experiences if e is not None])        
        states = torch.from_numpy(states).float().to(self.device)  
        
        actions = np.vstack([e[1] for e in experiences if e is not None])        
        actions = torch.from_numpy(actions).long().to(self.device) 
        
        rewards = np.vstack([e[2] for e in experiences if e is not None])        
        rewards = torch.from_numpy(rewards).float().to(self.device) 
        
        next_states = np.vstack([e[3] for e in experiences if e is not None])        
        next_states = torch.from_numpy(next_states).float().to(self.device)  
        
        done = np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)        
        done = torch.from_numpy(done).float().to(self.device)  
        
        return states, next_states, actions, rewards, done  
        

### Implementing the DQN class

In [7]:
class Agent:
    
    def __init__(self, state_size, action_size):
        """
        Initializes the reinforcement learning agent.

        Parameters:
        - state_size (int): The number of features in the state representation.
        - action_size (int): The number of possible actions the agent can take.
        """

        # Check if GPU is available; use it if possible for faster computation
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        
        
        self.state_size = state_size  # Example: 8 features in the state
        self.action_size = action_size  # Example: 4 possible actions
        
        
        # Initialize two neural networks:
        # 1. local_qnetwork: Used to select actions during training
        # 2. target_qnetwork: Used to compute target Q-values for training stability
        
        self.local_qnetwork = Network(state_size, action_size).to(self.device)
        self.target_qnetwork = Network(state_size, action_size).to(self.device)
        
        
        # Optimizer for training the local Q-network
        self.optimizer = torch.optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate)
        
        
        # Replay buffer to store and sample past experiences (for experience replay)
        self.memory = ReplayMemory(replay_buffer_size)
        
        # Counter to keep track of time steps for training adjustments
        self.time_step = 0

        
    def step(self, state, action, reward, next_state, done):
        
        self.memory.push((state, action, reward, next_state, done))
        self.time_step = self.time_step + 1
        

        # after every 4 step the network replay the memory
        
        if self.time_step == 4:
            
            # network only learns if memory has 100 replays
            
            if len(self.memory.memory) > batch_size:
                experiences = self.memory.sample(100)
                self.learn(experiences, discount_factor)
            
            self.time_step = 0  #reset the counter
            
            
    """
    epsilon greedy in which 1-E time it will select best action(for exploitation) 

    while (e) time it will explore and select random action(for exploration) 

    """ 
    def act(self, state, epsilon = 0.):
        
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)  # adds extra dimension for batch and convert to tensor
       
        # Set model to evaluation mode to disable dropout and use running statistics
        self.local_qnetwork.eval()
        
        # Compute action values without gradient calculation (no backpropogation)
        with torch.no_grad():
            # local_qnetwork.forward(state) == local_qnetwork(state) because of overriding parent forward method
            action_values = self.local_qnetwork(state)
            
            
        # continue normal training
        self.local_qnetwork.train()
        
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy()) # retun best action for exploitation
        else:
            return random.choice(np.arange(self.action_size))  # return random action for exploration
        
    
    def learn(self, experiences, discount_factor):
        
        states, next_states, actions, rewards, done = experiences
        next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        
        q_target = rewards + discount_factor * next_q_targets *(1-done)
        
        # local qnetwork predicted value
        q_expected = self.local_qnetwork(states).gather(1, actions)
        
        #loss
        loss = F.mse_loss(q_expected, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)
        
    def soft_update(self, local_model, target_model, interpolation_parameter):
        
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) *target_param.data)

## Initialize DQN Agent

In [8]:
agent = Agent(state_size, number_actions)

## Training the DQN Agent

In [9]:
# Number of episodes(epochs) for training the agent
number_episodes = 2000  

# Maximum steps per episode (limits how long the agent can act in one episode)
maximum_number_timesteps_per_episodes = 1000  

# Initial epsilon value (start with full exploration)
epsilon_starting_value = 1.0  

# Minimum epsilon value (ensures at least 1% exploration at the end)
epsilon_ending_value = 0.01  

# Decay rate for epsilon (reduces exploration gradually)
epsilon_decay_value = 0.995  

# Set epsilon to its starting value
epsilon = epsilon_starting_value  

# Maintain a list of scores from the last 100 episodes (used for performance tracking)
score_on_100_episodes = deque(maxlen=100)  # Stores only the last 100 scores





for episode in range(1, number_episodes+1):
    
    state, _ = env.reset()
    score = 0
    
    for t in range(1, maximum_number_timesteps_per_episodes+1):
        
        action = agent.act(state, epsilon)
        
        next_state, reward, done, _, _ = env.step(action)
        
        agent.step(state, action, reward, next_state, done)
        
        state = next_state
        score = score + reward
        
        if done:
            break
    score_on_100_episodes.append(score)
    epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
    
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(score_on_100_episodes)), end = "")
    
    if episode%100 == 0:
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(score_on_100_episodes)),end ="")


    if np.mean(score_on_100_episodes) >= 250.0:
        print(f"Environment solved in {episode} Average score : {np.mean(score_on_100_episodes)}")
        torch.save(agent.local_qnetwork.state_dict(),'checkpoint.pth')
        break

Episode 1037	Average Score: 255.95Environment solved in 1037 Average score : 255.94965453862307


## Visualizing the result

In [10]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode = 'rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps = 30)
    
show_video_of_model(agent, 'LunarLander-v3')


def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

