In [1]:
import numpy as np
import socket
import cv2
import matplotlib.pyplot as plt

## Install Stable Baseline3 version >= 2.0.0a5
#### Note some SB3 versions are not compatible with Gymnasium interface.

In [2]:
!pip install "stable-baselines3[extra] >= 2.0.0a5"

Collecting pygame (from stable-baselines3[extra]>=2.0.0a5)
  Using cached pygame-2.5.2-cp312-cp312-win_amd64.whl.metadata (13 kB)
Collecting tqdm (from stable-baselines3[extra]>=2.0.0a5)
  Using cached tqdm-4.66.2-py3-none-any.whl.metadata (57 kB)
Collecting rich (from stable-baselines3[extra]>=2.0.0a5)
  Using cached rich-13.7.1-py3-none-any.whl.metadata (18 kB)
Collecting shimmy~=1.3.0 (from shimmy[atari]~=1.3.0; extra == "extra"->stable-baselines3[extra]>=2.0.0a5)
  Using cached Shimmy-1.3.0-py3-none-any.whl.metadata (3.7 kB)
Collecting autorom~=0.6.1 (from autorom[accept-rom-license]~=0.6.1; extra == "extra"->stable-baselines3[extra]>=2.0.0a5)
  Using cached AutoROM-0.6.1-py3-none-any.whl.metadata (2.4 kB)
Collecting click (from autorom~=0.6.1->autorom[accept-rom-license]~=0.6.1; extra == "extra"->stable-baselines3[extra]>=2.0.0a5)
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.6.1; extra == "

ERROR: Could not find a version that satisfies the requirement ale-py~=0.8.1; extra == "atari" (from shimmy[atari]) (from versions: none)
ERROR: No matching distribution found for ale-py~=0.8.1; extra == "atari"


## Run the Java Tetris Server using subprocess

In [3]:
# Download v0.6 server from AIoTLab website
!wget http://www.aiotlab.org/teaching/oop/tetris/TetrisTCPserver_v0.6.jar

'wget' 不是内部或外部命令，也不是可运行的程序
或批处理文件。


In [4]:
import subprocess
subprocess.Popen(["java","-jar","TetrisTCPserver_v0.6.jar"])

FileNotFoundError: [WinError 2] 系统找不到指定的文件。

## Create our own Tetris Test environment by inheriting Gym class

In [None]:
import gymnasium as gym
from gymnasium import spaces

In [None]:
class TetrisEnv(gym.Env):

    '''
        The supported actions are
        0: move -1
        1: move 1
        2: rotate 0 // counter-clockwise
        3: rotate 1 // clockwise
        4: drop down
    '''
    N_DISCRETE_ACTIONS = 5

    IMG_HEIGHT = 200
    IMG_WIDTH = 100
    IMG_CHANNELS = 3


    def __init__(self, host_ip="127.0.0.1", host_port=10612):
        super().__init__()

        self.action_space = spaces.Discrete(self.N_DISCRETE_ACTIONS)
        # Example for using image as input (channel-first; channel-last also works):
        self.observation_space = spaces.Box(low=0, high=255,
                                            shape=(self.IMG_HEIGHT, self.IMG_WIDTH, self.IMG_CHANNELS), dtype=np.uint8)
        self.server_ip = host_ip
        self.server_port = host_port

        self.client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_sock.connect((self.server_ip, self.server_port))

    def step(self, action):
        if action == 0:
            self.client_sock.sendall(b"move -1\n")
        elif action == 1:
            self.client_sock.sendall(b"move 1\n")
        elif action == 2:
            self.client_sock.sendall(b"rotate 0\n")
        elif action == 3:
            self.client_sock.sendall(b"rotate 1\n")
        elif action == 4:
            self.client_sock.sendall(b"drop\n")

        terminated, lines, height, holes, observation = self.get_tetris_server_response(self.client_sock)
        self.observation = observation

        reward = 0
        if action == 4: # Drop reward
            reward += 5

        # Negative height reward
        if height > self.height:
            reward -= (height - self.height)*5

        # Positive hole reduction reward
        if holes < self.holes:
            reward += (self.holes - holes)*10

        if lines > self.lines_removed:
            reward = reward + (lines - self.lines_removed)*1000
            self.lines_removed = lines

        self.reward = self.reward + reward
        self.holes = holes
        self.height = height
        self.lifetime += 1
        truncated = False
        info = {'removed_lines':self.lines_removed, 'lifetime':self.lifetime}
        return (observation, reward, terminated, truncated, info)

    def reset(self, seed=None, options=None):
        self.client_sock.sendall(b"start\n")
        terminated, lines, height, holes, observation = self.get_tetris_server_response(self.client_sock)
        self.observation = observation
        self.reward = 0
        self.lines_removed = 0
        self.holes = 0
        self.height = 0
        self.lifetime = 0
        info = {}
        return observation, info

    def render(self):
        ''''''
        #if self.render_mode == "console":
        #    print('Total reward ' + str(self.reward))
        '''
        if self.render_mode == "human":
            cv2.imshow("Image", self.observation)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        '''

    def close(self):
        self.client_sock.close()

    def get_tetris_server_response(self, sock):
        is_game_over = (sock.recv(1) == b'\x01')
        removed_lines = int.from_bytes(sock.recv(4), 'big')
        height = int.from_bytes(sock.recv(4), 'big')
        holes = int.from_bytes(sock.recv(4), 'big')
        img_size = int.from_bytes(sock.recv(4), 'big')
        img_png = sock.recv(img_size)

        nparr = np.frombuffer(img_png, np.uint8)
        np_image = cv2.imdecode(nparr, -1)

        return is_game_over, removed_lines, height, holes, np_image

## Use SB3 env_checker to check our environment

In [None]:
from stable_baselines3.common.env_checker import check_env

env = TetrisEnv()
# It will check your custom environment and output additional warnings if needed
# No response may be caused by mismatched action state definition and implementation
check_env(env)

## Randomly test the environment

In [None]:
obs, info = env.reset()
n_steps = 20
for _ in range(n_steps):
    # Random action
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    env.render() # We render nothing now

    if terminated:
        break

In [None]:
# Show the final screen
%matplotlib inline
plt.imshow(obs)

In [None]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

## Create an environment with 30 client threads

In [None]:
# Let's try DQN by creating 30 environments
vec_env = make_vec_env(TetrisEnv, n_envs=30)

## We choose DQN with CNN policy, and train 3,000,000 steps

In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# Train the agent
model = DQN("CnnPolicy", vec_env, verbose=1, tensorboard_log=f'/content/drive/MyDrive/CS5446/log')
model.learn(3000000, tb_log_name="dqn")
model.save(f'/content/drive/MyDrive/CS5446/models/dqn')

## Test our model with 1000 steps and record all plays.

In [None]:
import os
import shutil

# Test the trained agent
# using the vecenv
obs = vec_env.reset()
test_steps = 1000

replay_folder = './replay'
if os.path.exists(replay_folder):
    shutil.rmtree(replay_folder)

n_env = obs.shape[0] # Number of environments. DQN will play all envs
ep_id = np.zeros(n_env, int)
ep_steps = np.zeros(n_env, int)
cum_reward = np.zeros(n_env)
max_reward = -1e10
max_game_id = 0
max_ep_id = 0
max_rm_lines = 0
max_lifetime = 0

for step in range(test_steps):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, info = vec_env.step(action)

    if step % 20 == 0:
        print(f"Step {step}")
        print("Action: ", action)
        print("reward=", reward, " done=", done)

    for eID in range(n_env):
        cum_reward[eID] += reward[eID]
        folder = f'{replay_folder}/{eID}/{ep_id[eID]}'
        if not os.path.exists(folder):
            os.makedirs(folder)
        fname = folder + '/' + '{:06d}'.format(ep_steps[eID]) + '.png'
        cv2.imwrite(fname, obs[eID])
        #cv2.imshow("Image" + str(eID), obs[eID])
        #cv2.waitKey(10)
        ep_steps[eID] += 1

        if done[eID]:
            if cum_reward[eID] > max_reward:
                max_reward = cum_reward[eID]
                max_game_id = eID
                max_ep_id = ep_id[eID]
                max_rm_lines = info[eID]['removed_lines']
                max_lifetime = info[eID]['lifetime']

            ep_id[eID] += 1
            cum_reward[eID] = 0
            ep_steps[eID] = 0

#cv2.destroyAllWindows()

In [None]:
best_replay_path = replay_folder + '/' + str(max_game_id) + '/' + str(max_ep_id)

print("After playing 30 envs each for ", test_steps, " steps:")
print(" Max reward=", max_reward, " Best video: " + best_replay_path)
print(" Removed lines=", max_rm_lines, " lifetime=", max_lifetime)

## Make a gif image to visualize the best play

In [None]:
import glob
import imageio

filenames = sorted(glob.glob(best_replay_path + '/*.png'))

images = []
for filename in filenames:
    images.append(imageio.imread(filename))
imageio.mimsave('/content/drive/MyDrive/CS5446/models/dqn/replay.gif', images, loop=0)

In [None]:
from IPython.display import Image
Image(filename='/kaggle/working/replay.gif')

In [None]:
model.save('/content/drive/MyDrive/CS5446/models/dqn/your_studentID_dqn_30env_3M.zip')

In [None]:
with open('/content/drive/MyDrive/CS5446/models/dqn/tetris_best_score.csv', 'w') as fs:
    fs.write('Id,Predicted\n')
    fs.write(f'game_score,{max_reward}\n')

In [None]:
# Upload your results to Kaggle
from IPython.display import FileLink
FileLink('tetris_best_score.csv')