In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
import numpy as np
import ale_py
import time, os, pickle

In [2]:
# Hyperparameters
learning_rate = 0.00005
hidden_dim = 128
gamma = 0.995
epsilon = 1.0
epsilon_decay = 0.997
epsilon_min = 0.05

# Reward Parameters
line_clear_reward = 2.0
step_penalty = -0.02
game_over_penalty = -20.0

# Training Episodes
num_episodes = 1000


In [None]:
gym.register_envs(ale_py)
env = gym.make("ALE/Tetris-v5", render_mode="human")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
state, _ = env.reset()
save_dir = "training_logs2"
os.makedirs(save_dir, exist_ok=True)
print(dir(env))

class Actor(nn.Module):
    def __init__(self, action_dim, hidden_dim=128):
        super(Actor, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=5, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)

        # Use a dummy input to calculate the flattened size after conv layers
        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, state.shape[0], state.shape[1])
            conv_output = self.conv_layers(dummy_input)
            conv_output_size = conv_output.view(-1).size(0)

        # Define fully connected layers
        self.fc1 = nn.Linear(conv_output_size, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, action_dim)

    def conv_layers(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        return x

    def forward(self, state):
        x = self.conv_layers(state)
        x = x.reshape(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        action_probs = torch.softmax(self.fc2(x), dim=-1)
        return action_probs

class Critic(nn.Module):
    def __init__(self, hidden_dim=128):
        super(Critic, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=4, stride=2)

        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, state.shape[0], state.shape[1])
            conv_output = self.conv_layers(dummy_input)
            conv_output_size = conv_output.view(-1).size(0)

        self.fc1 = nn.Linear(conv_output_size, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)

    def conv_layers(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        return x

    def forward(self, state):
        x = self.conv_layers(state)
        x = x.reshape(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        state_value = self.fc2(x)
        return state_value

def calculate_reward(lines_cleared, game_over, state):
    line_clear_reward = 1.0
    game_over_penalty = -10.0
    step_penalty = -0.01
    tower_height_penalty = -0.1  
    gaps_penalty = -0.05         

    reward = line_clear_reward * lines_cleared if lines_cleared > 0 else step_penalty

    if game_over:
        reward += game_over_penalty

    if state.shape[-1] == 3: 
        state = state.mean(axis=-1)  

    column_heights = []
    for col in range(state.shape[1]):
        column = state[:, col]
        non_zero_indices = np.where(column > 0)[0]  
        if len(non_zero_indices) > 0:
            column_heights.append(len(state) - non_zero_indices[0]) 
        else:
            column_heights.append(0)  

    tallest_column = max(column_heights)
    reward += tower_height_penalty * tallest_column

    num_gaps = 0
    for col in range(state.shape[1]):
        column = state[:, col]
        non_zero_indices = np.where(column > 0)[0]
        if len(non_zero_indices) > 0:
            highest_filled_row = non_zero_indices[0]
            gaps_below = np.sum(column[highest_filled_row + 1:] == 0)
            num_gaps += gaps_below

    reward += gaps_penalty * num_gaps

    return reward


def train_step(state, action, reward, next_state, done):
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).permute(0, 3, 1, 2) / 255.0
    next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0).permute(0, 3, 1, 2) / 255.0
    action = torch.tensor(action, dtype=torch.long)
    reward = torch.tensor(reward, dtype=torch.float32)

    # Critic update with gradient clipping
    value = critic(state)
    next_value = critic(next_state) * (1 - done)
    target_value = reward + gamma * next_value
    critic_loss = (value - target_value.detach()) ** 2

    critic_optimizer.zero_grad()
    critic_loss.backward()
    torch.nn.utils.clip_grad_norm_(critic.parameters(), max_norm=1.0)
    critic_optimizer.step()

    # Actor update with gradient clipping
    advantage = (target_value - value).detach()
    action_probs = actor(state)
    log_prob = torch.log(action_probs.squeeze(0)[action])
    actor_loss = -log_prob * advantage

    actor_optimizer.zero_grad()
    actor_loss.backward()
    torch.nn.utils.clip_grad_norm_(actor.parameters(), max_norm=1.0)
    actor_optimizer.step()

    return actor_loss.item(), critic_loss.item()

# Actor and Critic networks and optimizers
actor = Actor(action_dim=env.action_space.n)
critic = Critic()
actor_optimizer = optim.Adam(actor.parameters(), lr=learning_rate)
critic_optimizer = optim.Adam(critic.parameters(), lr=learning_rate)
print(action_dim)

# parameters and metrics
def save_parameters_and_metrics(episode, actor, critic, actor_optimizer, critic_optimizer, metrics):
    torch.save({
        'actor_state_dict': actor.state_dict(),
        'critic_state_dict': critic.state_dict(),
        'actor_optimizer_state_dict': actor_optimizer.state_dict(),
        'critic_optimizer_state_dict': critic_optimizer.state_dict()
    }, os.path.join(save_dir, f"model_checkpoint_ep{episode}.pt"))

    with open(os.path.join(save_dir, "training_metrics.pkl"), "wb") as f:
        pickle.dump(metrics, f)

metrics = {
    "episode_rewards": [],
    "step_actor_losses": [],
    "step_critic_losses": []
}

# Define a function to print the average weights and losses
def print_parameters_and_losses(actor, critic, episode, avg_actor_loss, avg_critic_loss):
    print(f"\n--- Statistics at Episode {episode} ---")
    
    # Print the average weight of specific layers
    actor_conv1_avg = actor.conv1.weight.data.mean().item()
    print(f"Actor Layer 1 Conv Weights Average: {actor_conv1_avg}")
    
    actor_fc1_avg = actor.fc1.weight.data.mean().item()
    print(f"Actor Layer 1 FC Weights Average: {actor_fc1_avg}")
    
    critic_conv1_avg = critic.conv1.weight.data.mean().item()
    print(f"Critic Layer 1 Conv Weights Average: {critic_conv1_avg}")
    
    critic_fc1_avg = critic.fc1.weight.data.mean().item()
    print(f"Critic Layer 1 FC Weights Average: {critic_fc1_avg}")

    # Print average losses for actor and critic
    print(f"Average Actor Loss: {avg_actor_loss}")
    print(f"Average Critic Loss: {avg_critic_loss}")
    
    print("--- End of Statistics ---\n")

for episode in range(num_episodes):
    state, _ = env.reset()
    done = False
    episode_reward = 0
    actor_losses = []
    critic_losses = []

    while not done:
        # Preprocess state for model input
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).permute(0, 3, 1, 2) / 255.0

        # Get action probabilities from the actor network
        action_probs = actor(state_tensor).detach().numpy().squeeze()

        # ε-greedy action selection
        if np.random.rand() < epsilon:
            # Exploration: choose a random action
            action = np.random.choice(action_dim)
        else:
            # Exploitation: choose the action with the highest probability
            action = np.argmax(action_probs)

        # Decay epsilon after each step
        epsilon = max(epsilon * epsilon_decay, epsilon_min)

        # Take the action in the environment
        next_state, _, done, _, info = env.step(action)
        
        # Calculate reward
        lines_cleared = info.get("lines_cleared", 0)
        reward = calculate_reward(lines_cleared, done, state)
        episode_reward += reward

        # Train actor and critic networks
        actor_loss, critic_loss = train_step(state, action, reward, next_state, done)

        # Log losses for each step
        actor_losses.append(actor_loss)
        critic_losses.append(critic_loss)

        # Update the state
        state = next_state

    # Calculate average losses for this episode
    avg_actor_loss = np.mean(actor_losses)
    avg_critic_loss = np.mean(critic_losses)

    # Print the episode results with average losses
    print(f"Episode {episode + 1}: Total Reward = {episode_reward}, Avg Actor Loss = {avg_actor_loss:.4f}, Avg Critic Loss = {avg_critic_loss:.4f}")

env.close()

['__annotations__', '__class__', '__class_getitem__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__orig_bases__', '__parameters__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_action_space', '_cached_spec', '_disable_render_order_enforcing', '_has_reset', '_is_protocol', '_metadata', '_np_random', '_np_random_seed', '_observation_space', '_saved_kwargs', 'action_space', 'class_name', 'close', 'env', 'get_wrapper_attr', 'has_reset', 'has_wrapper_attr', 'metadata', 'np_random', 'np_random_seed', 'observation_space', 'render', 'render_mode', 'reset', 'set_wrapper_attr', 'spec', 'step', 'unwrapped', 'wrapper_spec']
5
Episode 1: Total Reward = -635977.089999998, Avg Actor Loss = -1815.7746, Avg 

In [4]:
# Reset the environment and get the initial observation
env = gym.make("ALE/Tetris-v5", render_mode="human")
observation, _ = env.reset()
print("Observation structure:", observation.shape)  # Add this line to inspect the observation


Observation structure: (210, 160, 3)
