# **TETRIS:**

# **Team NEURON BANK**

## link to the dueling DQN method we used:
https://www.youtube.com/watch?v=3ILECq5qxSk

## **installing all the required packages:**

In [None]:
pip install ale_py

Collecting ale_py
  Downloading ale_py-0.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Downloading ale_py-0.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/2.1 MB[0m [31m7.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.1/2.1 MB[0m [31m32.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m25.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ale_py
Successfully installed ale_py-0.10.1


In [None]:
pip install gymnasium


Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [None]:
import gymnasium as gym
import ale_py
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from IPython import get_ipython
from IPython.display import display, clear_output
from collections import namedtuple, deque
from itertools import count

## **Creating an environment:**

In [None]:
from gymnasium import RewardWrapper

class TetrisRewardWrapper(RewardWrapper): #modify the reward system of the present tetris game.
    def __init__(self, env):
        super().__init__(env)

    def reward(self, reward):

        # Get the state of the board (you may need to extract it depending on your environment)
        board = self.get_board_from_observation(self.env.render())  # Adjust based on how your environment exposes the state

        # Calculate components of the reward
        max_height = self.get_max_height(board)
        gap_penalty = self.get_gap_penalty(board)

        # Combine rewards
     # Retain original reward for clearing lines
        modified_reward = reward
        modified_reward -= 1.0 * max_height  # Penalize tall stacks
        modified_reward -= 0.5 * gap_penalty  # Penalize gaps

        return modified_reward

    def get_board_from_observation(self, observation):

        # Process the observation to extract the board
        # NOTE: This will depend on the specific Tetris environment
        # For example, you might threshold pixel values to identify blocks
        grayscale = np.mean(observation, axis=2)  # Convert to grayscale
        board = (grayscale > 128).astype(int)  # Binarize (adjust threshold as needed)

        # Crop to the region of the screen containing the Tetris board
        # This depends on the screen layout of the Tetris game in ALE
        board_region = board[50:210, 30:130]  # Example cropping (adjust as needed)
        return board_region

    def get_max_height(self, board):
        """Calculate the maximum height of the stack."""
        for row in range(len(board)):
            if any(board[row]):
                return len(board) - row
        return 0  # No stack

    def get_gap_penalty(self, board):
        """Calculate the number of gaps in the stack."""
        gaps = 0
        for col in range(len(board[0])):  # Iterate over each column
            filled = False
            for row in range(len(board)):
                if board[row][col]:  # Block present
                    filled = True
                elif filled:  # Gap below a block
                    gaps += 1
        return gaps

In [None]:
env = gym.make("ALE/Tetris-ram-v5", render_mode="rgb_array")
env = TetrisRewardWrapper(env)

In [None]:
state,_ = env.reset()

total_reward = 0
for _ in range(100):  # Play 100 steps
    action = env.action_space.sample()  # Random action
    observations, reward, terminated, truncated, _ = env.step(action)
    total_reward += reward
    if terminated or truncated:
        break
print(observations)
print(terminated)
print(truncated)
print(f"Total reward for random actions: {total_reward}")
env.close()

[  1   1   0   0   0  12  12   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   1   1   0   0   0   0   0   0   0   0   0   0   0   0
   0   0   0   0   0   0   0   0 238 170 170 170 238   0   0   0   0   0
   0   0   0   0   0   0   0   0   0   0   0   0 119  85  85  85 119   0
   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0   0
 126  21   5   5   6   6   4   8   4   8   0   0   0   0   7   6   5   4
   4   6   1   2  16   0   0   0  75 255  75 255  15 240   0   0   0   0
 207 252]
False
False
Total reward for random actions: -227127.0


## **creating a replay memory class for storing all the episodes and its output:**

In [None]:
transition = namedtuple('transition', ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

  def __init__(self, capacity):
    self.memory = deque([], maxlen=capacity)

  def push(self, *arguements):
    self.memory.append(transition(*arguements))

  def sample(self, batch_size):
    return random.sample(self.memory, batch_size)

  def length(self):
    return len(self.memory)


## **Neural Network class:**

In [None]:
class DQN(nn.Module):                                                   #for a change, we use dueling DQN method instead of the regular one
  def __init__(self, n_obv,n_hidden, n_act, enable_dueling_dqn = True):
    super(DQN, self).__init__()

    self.enable_dueling_dqn = enable_dueling_dqn

    self.hidden = nn.Linear(n_obv, n_hidden)

    if self.enable_dueling_dqn:
      #value stream
      self.fc_value = nn.Linear(n_hidden, 128)
      self.value = nn.Linear(128, 1)

      #advantages stream
      self.fc_advantages = nn.Linear(n_hidden, 128)
      self.advantages = nn.Linear(128, n_act)

    else:
      self.output = nn.Linear(n_hidden, n_act)


  def forward(self, x):
    x = F.relu(self.hidden(x))

    if self.enable_dueling_dqn:
      v = F.relu(self.fc_value(x))
      V = self.value(v)

      a = F.relu(self.fc_advantages(x))
      A = self.advantages(a)

      Q = V + A - torch.mean(A, dim=1, keepdim=True)

    else:
     Q = self.output(x)

    return Q

## **Initializing all the hyper-parameters:**

In [None]:
BATCH_SIZE = 64
GAMMA = 0.995
EPS_START = 0.99
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LEARN_RATE = 1e-3

In [None]:
n_act = env.action_space.n                #number of actions in the action space of the environment(4)
state, info = env.reset()                 #reset the state and info from the environment
n_obv = state.shape[0]                    #number of observations in the state(8)

In [None]:
online_net = DQN(n_obv, 128, n_act, enable_dueling_dqn = True)       #creating an constantly updating onine_network
target_net = DQN(n_obv, 128, n_act, enable_dueling_dqn = True)       #creating a target_network which is soft updated with values from the online_network
target_net.load_state_dict(online_net.state_dict())       #initially copying all the weights of the online_network to the target_network

optimizer = optim.AdamW(online_net.parameters(), lr = LEARN_RATE, amsgrad = True)         #optimizing the online_network to minimize the loss function

## **create the action selection function using the epsilon-greedy method:**

In [None]:
replay_memory = ReplayMemory(101010)      #create a replay_memory from the replay memory class with length of 101010
import math


steps_done = 0
def action_selection(state):

  global steps_done

  sample = random.random()               #generating a random number for sample
  eps_treshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1 * steps_done / EPS_DECAY)            #changing the epsilon value with every step, initially the eps value is equal to eps start, but slowly it will migrate to the eps_end value

  steps_done += 1
  if sample > eps_treshold:
    with torch.no_grad():
      return online_net(state).max(1).indices.view(1,1)          #generating the max value action if the sample is greater than the threshold (exploitation)

  else:
    return torch.tensor([[env.action_space.sample()]], dtype = torch.long)         #generating a random action if the sample is less than the threshold (exploration)


## **Creating the plot function:**

In [None]:
episode_rewards = []   #store the rewards for each episode to plot

def plot_rewards(show_result=False):
  plt.figure(1)
  rewards_t = torch.tensor(episode_rewards, dtype=torch.float)

  plt.figure(figsize=(14, 10))
  if show_result:
    plt.title('Result')

  else:
    plt.clf()
    plt.title("training . . .")
  plt.xlabel("Episode")
  plt.ylabel("Reward")
  plt.plot(rewards_t.numpy())

  if len(rewards_t) >=100:                                #plot a second mean function
    means = rewards_t.unfold(0, 100, 1).mean(1).view(-1)
    means = torch.cat((torch.zeros(99), means))

    plt.plot(means.numpy())


  plt.pause(0.01)

  if get_ipython() is not None:
    if not show_result:
      print("Training . . .")
      clear_output(wait=True)
    else:
      display(plt.gcf())



## **Creating the Optimizing function:**

In [None]:
def optimize_model():
  if replay_memory.length() < BATCH_SIZE:            #maintaining a minimum batch size to be used for optimizing function
    return
  transitions = replay_memory.sample(BATCH_SIZE)     #sampling random episodes to train from the replay_memory
  batch = transition(*zip(*transitions))             #maintaining a batch of all the required info to access directly from the initial namedtuple
  mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool)        #a mask of type torch-tensor-boolean which tells if the state has a next_state or not

  next_states = torch.cat([s for s in batch.next_state if s is not None])
  state_batch = torch.cat(batch.state)
  action_batch = torch.cat(batch.action)
  reward_batch = torch.cat(batch.reward)

  state_action_values = online_net(state_batch).gather(1, action_batch)         #gather all the action values produced by the DQN, when given the state_batch

  next_state_values = torch.zeros(BATCH_SIZE)

  with torch.no_grad():
    next_state_values[mask] = target_net(next_states).max(1).values


  expected_state_action_values = (next_state_values * GAMMA) + reward_batch
  criterion = nn.SmoothL1Loss()                                                                 #calculating the huber_loss
  loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

  optimizer.zero_grad()
  loss.backward()                                                                       #calculating the loss and optimizing the network to minimize the loss
  torch.nn.utils.clip_grad_value_(online_net.parameters(), 100)
  optimizer.step()



## **training the Online_Network:**

In [None]:
from types import new_class
iterations_epi = 3000


for i in range(iterations_epi):
  state, info = env.reset()
  state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)              #convert the state into a state tensor

  total_reward = 0

  for t in count():
    action = action_selection(state)                                             # select an action based on the state
    observation, reward, terminated, truncated, _ = env.step(action.item())
    reward = torch.tensor([reward])

    total_reward += reward.item()

    if terminated:
      next_state = None

    else:
      next_state = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)            #the next_state is the observation after the action is completed


    replay_memory.push(state, action, next_state, reward)

    state = next_state

    optimize_model()

    #soft updating the weights in target net using the weights from the online_net
    target_net_state_dict = target_net.state_dict()
    online_net_state_dict = online_net.state_dict()
    for key in online_net_state_dict:
      target_net_state_dict[key] = online_net_state_dict[key] * TAU + target_net_state_dict[key] * (1 - TAU)

    if (terminated or truncated):
      episode_rewards.append(total_reward)
      break
print('Done')
plot_rewards(show_result=True)
plt.ioff()
plt.show()


In [None]:
pip install swig


In [None]:
pip install gymnasium[box2D]

In [None]:
pip install imageio-ffmpeg

## **Rendering the game:**

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
import gymnasium as gym
import torch

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        action = action_selection(state_tensor)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(online_net, "ALE/Tetris-ram-v5")

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data=f'''<video alt="test" autoplay loop controls style="height: 400px;">
                              <source src="data:video/mp4;base64,{encoded.decode('ascii')}" type="video/mp4" />
                             </video>'''))
    else:
        print("Could not find video")

show_video()