## Useful resources

### Documentation

- [OpenAI Gym](https://gym.openai.com/docs/#environments)
- [Retro](https://retro.readthedocs.io/en/latest/python.html)

### Reinforcement implementations
- https://bruceoutdoors.wordpress.com/2017/09/01/deep-q-learning-101-part-3-deep-q-learning/
- https://keon.github.io/deep-q-learning/

- https://github.com/h3nnn4n/Reverse-Engineering-the-GameBoy-Tetris/blob/master/mem_locations.md
- https://adventuresinmachinelearning.com/reinforcement-learning-tutorial-python-keras/
- https://yanpanlau.github.io/2016/07/10/FlappyBird-Keras.html
- https://github.com/keras-rl/keras-rl
- https://bruceoutdoors.wordpress.com/2017/08/30/deep-q-learning-101-part-1-convolutional-neural-networks/

In [1]:
# Standard
from pathlib import Path
import threading
import time
import random

# Extra
import retro
import numpy as np
from PIL import Image
import ipywidgets as widgets
from io import BytesIO

In [2]:
TETRIS_PATH = Path(".") / ".." / "assets" 
IMG_SHAPE = (36, 40)  # raw is 144 x 160 
IMG_SHAPE = (144, 160)

In [3]:
retro.data.Integrations.add_custom_path(TETRIS_PATH.resolve())

In [4]:
info_obj = {
    "steps": 0,
    "lines_cleared": 0,
    'frame_delay_remaining': 4,
    "level": 0,
    "frame_delay": 34,
    "lines_cleared_d1": 0,
    "preview_piece": 4,
    "screen_state": 0,
    "score": 0,
    "lines_cleared_d3": 25,
    "lines_cleared_d2": 25,
    "image": open(TETRIS_PATH / "test.png", "rb").read(),
    "action":0
}

actions = {
    "left": [0, 0, 0, 0, 0, 0, 1, 0, 0],
    "right": [0, 0, 0, 0, 0, 0, 0, 1, 0],
    "down": [0, 0, 0, 0, 0, 1, 0, 0, 0],
    "rotate_clock": [0, 0, 0, 0, 0, 0, 0, 0, 1],
    "rotate_counterclock": [1, 0, 0, 0, 0, 0, 0, 0, 0],
    "nothing": [0, 0, 0, 0, 0, 0, 0, 0, 0]
}

In [5]:
def create_widgets():
    # Create info outputs
    for key in info_obj.keys():
        if key == "image":
            all_widgets[key] = widgets.Image(
                format='png',
                width=IMG_SHAPE[1],
                height=IMG_SHAPE[0]
            )
        else:
            all_widgets[key] = widgets.IntText(
                description=f"{key}:",
                disabled=True,
                layout=widgets.Layout(width="260px"),
                style={"description_width": "140px"},
            )
    
    # Create stop button
    def on_button_clicked(_):
        global stop_threads
        stop_threads = True

    button = widgets.Button(description='Stop')
    button.on_click(on_button_clicked)
    all_widgets["Button"] = button

    # Layout widgets
    w = widgets.GridBox(
        [v for k, v in all_widgets.items()],
        layout=widgets.Layout(grid_template_columns="repeat(2, 300px)"),
    )
    return w

In [6]:
def update_widgets(info):
    for k, v in info_obj.items():
        if k in info:
            all_widgets[k].value = info[k]

In [7]:
def preprocess_image(ary):
    ary = ary[:,:,0]  # Remove color channels
    ary[ary < 220] = 0    # Black
    ary[ary >= 220] = 255 # White
    factor = ary.shape[0] // IMG_SHAPE[0]
    ary = ary.reshape((IMG_SHAPE[0], factor, IMG_SHAPE[1], factor)).min(3).min(1)  # rescale
    return ary

In [8]:
def ary_to_png(ary):
    img = Image.fromarray(ary)
    buf = BytesIO()
    img.save(buf, format='png')
    return buf.getvalue()

In [9]:
def run_game(id, stop):
    print("Starting Tetris...")
    env = retro.make(game="Tetris-GameBoy", inttype=retro.data.Integrations.ALL)
    obs = env.reset()  
    steps = 0
    last_piece = 0
    line_cleared = -1000
    while True:
        steps += 1
        time.sleep(0.0001)
        if steps % 2 == 0:
            action = random.choice(list(actions.values()))
        else:
            action = actions["nothing"]
            
        state_raw, rew, done, info = env.step(action)
        state = preprocess_image(state_raw)

        reward = info["lines_cleared"]**2 + steps / 10000
        
        if steps % 200 == 0:
            info["image"] = ary_to_png(state)
            info["steps"] = steps
            info["action"] = int("".join([str(i) for i in action]))        
            update_widgets(info)
            
        #if info["screen_state"] != 0:
            #print(f"Screen State: {info['screen_state']}.")
            #break
        if (info["lines_cleared"] != 0) and (line_cleared < 0):
            print(f"line cleared: {steps}")
            line_cleared = steps
        if line_cleared == steps - 150:
            break
        if done:
            steps = 0
            obs = env.reset()        
        if stop():
            print("Exiting loop.")
            break
       
    info["image"] = ary_to_png(state)
    info["steps"] = steps
    info["action"] = int("".join([str(i) for i in action]))         
    update_widgets(info)
    env.close()
    print(f"Execution Interrupted after {steps} steps.")

In [10]:
all_widgets = {}
stop_threads = False
worker = threading.Thread(target=run_game, args=(id, lambda: stop_threads))
display(create_widgets())

GridBox(children=(IntText(value=0, description='steps:', disabled=True, layout=Layout(width='260px'), style=De…

In [11]:
worker.start()

Starting Tetris...
line cleared: 1075
Execution Interrupted after 1225 steps.
