In [None]:
import gym
import torch
import numpy as np

from gym.wrappers import GrayScaleObservation
from gym.wrappers import ResizeObservation
from gym.wrappers import FrameStack
from gym.wrappers import TransformObservation
import ale_py

import torch
from torch import nn
from torch.functional import F
import torch.optim as optim 

from matplotlib import pyplot as plt
from collections import deque
import sys
import importlib

from collections import deque
from qnetwork_genertic import QNetwork
from genertic import GeneticAlgorithm
from exp_replay import ExperienceReplay

In [None]:
env = gym.make('ALE/Pong-v5', render_mode='rgb_array')
done = True
print('Press interrupt to stop execution')
rewards = 0.0
try:
    for step in range(5000):
        if done:
            state = env.reset()
            state, reward, interupted, terminated, info = env.step(env.action_space.sample())
            rewards += reward
            done = interupted or terminated
except KeyboardInterrupt:
    print('Execution Interrupted.')
finally:
    env.close()
print('Total Reward:', rewards)

In [None]:

# Structure the environment for processing
env = GrayScaleObservation(env) # Converts the state to grayscale
env = TransformObservation(env, lambda x: x / 255.0)  # scale pixel values to range [0, 1]
env = ResizeObservation(env, 84) # Resizes the state into a 84x84.
env = FrameStack(env, 4) # Stacks 4 frames 



In [None]:
# Observation structure
obs, _ = env.reset()
obs = torch.Tensor(obs).squeeze()
print(obs.shape)

In [None]:
 # Plot the observation
fig, axs = plt.subplots(2, 2)
for idx, ax in enumerate(axs.flatten()):
    ax.imshow(obs[idx])

In [None]:
def play_game(env, q_network):
    done = False
    obs, _ = env.reset()
    obs = torch.Tensor(obs).squeeze()
    total_reward = 0

    while not done:
        with torch.no_grad():
            q_values = q_network(obs.unsqueeze(0))
            action = torch.argmax(q_values).item()

        obs, reward, done, info = env.step(action)
        obs = torch.Tensor(obs).squeeze()
        total_reward += reward

    return total_reward


In [None]:
def process_observation(obs):
    # Structure the environment for processing
    obs = GrayScaleObservation(obs) # Converts the state to grayscale
    obs = TransformObservation(obs, lambda x: x / 255.0)  # scale pixel values to range [0, 1]
    obs = ResizeObservation(obs, 84) # Resizes the state into a 84x84.
    obs = FrameStack(obs, 4) # Stacks 4 frames
    return obs

In [None]:
# Define hyperparameters
population_size = 20
mutation_rate = 0.01
num_generations = 10
num_params = 768
batch_size = 32
gamma = 0.99
num_eval_episodes = 3
epsilon = 0.1
replay_buffer_size = 10000

# Experience Replay, and Genetic Algorithm
experience_replay = ExperienceReplay(buffer_size=10000)
genetic_algorithm = GeneticAlgorithm(population_size=100, mutation_rate=0.1)

# Initialize genetic algorithm
gen_alg = GeneticAlgorithm(population_size, mutation_rate)
gen_alg.initialize_population(num_params)

# Initialize Q-network and target network

q_network = QNetwork(num_channels=4, num_out=6, hidden_size=128)
target_network = QNetwork(num_channels=4, num_out=6, hidden_size=128)
target_network.load_state_dict(q_network.state_dict())

# Initialize optimizer and loss function
optimizer = optim.Adam(q_network.parameters(), lr=0.001)
loss_function = nn.SmoothL1Loss()


# Train Q-network using genetic algorithm and experience replay
for generation in range(num_generations):
    print(f'Generation {generation + 1}')
    
    # Evaluate fitness of current population
    fitness_scores = gen_alg.compute_fitness_scores(q_network, env, num_eval_episodes, epsilon)
    
    # Select parents
    parents = gen_alg.select_parents(fitness_scores)
    
    # Generate children using crossover
    children = gen_alg.crossover(parents)
    
    # Mutate children
    mutated_children = gen_alg.mutate(children)
    
    # Merge parents and mutated children to form new population
    new_population = np.concatenate([parents, mutated_children], axis=0)
    
    # Initialize episode
    state = env.reset()
    done = False
    episode_reward = 0
    
    # Train Q-network using experience replay
    while not done:
        # Epsilon-greedy action
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            state_tensor = torch.tensor(state).unsqueeze(0).float()
            with torch.no_grad():
                q_values = q_network(state_tensor)
                action = q_values.argmax().item()
        
        # Take action and add experience to replay buffer
        next_state, reward, done, _ = env.step(action)
        episode_reward += reward
        experience_replay.add_experience(state, action, reward, next_state, done)
        state = next_state
        
        # Sample batch from replay buffer and update Q-network
        if experience_replay.size() >= batch_size:
            states, actions, rewards, next_states, dones = experience_replay.sample(batch_size)
            
            q_values = q_network(states)
            next_q_values = target_network(next_states)
            next_q_max_values, _ = next_q_values.max(dim=1, keepdim=True)
            targets = rewards + gamma * next_q_max_values * (1 - dones)
            
            q_values = q_values.gather(dim=1, index=actions.unsqueeze(1))
            loss = loss_function(q_values, targets)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Update target network
            for target_param, param in zip(target_network.parameters(), q_network.parameters()):
                target_param.data.copy_(target_param.data * (1.0 - 0.001) + param.data * 0.001)
    
    print(f'Episode reward: {episode_reward}')
    
    # Update Q-network weights
    for target_param, param in zip(q_network.parameters(), new_population):
        target_param.data.copy_(param)

    # Set target network weights equal to Q-network weights
    target_network.load_state_dict(q_network.state_dict())

    # Reset epsilon to initial value
    epsilon = 0.1


In [None]:
def evaluate_performance(q_network, env, num_episodes=10):
    total_reward = 0
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        while not done:
            action = q_network.choose_action(state)
            state, reward, done, info = env.step(action)
            total_reward += reward
    avg_reward = total_reward / num_episodes
    return avg_reward

# Evaluate the performance of the trained QNetwork
avg_reward = evaluate_performance(q_network, env, num_episodes=100)
print(f"Average Reward: {avg_reward}")
