<a href="https://colab.research.google.com/github/karaage0703/mario-ai-challenge/blob/main/colab_notebooks/large_language_mario.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Large Language Mario

LLM으로 슈퍼마리아 플레이하 도전

## 필ㅛ한 라이브러리 설치 및 가져오기
AI 마리오에 필요한 라이러리 설치 및 가져오기 실행합니다 

[OpenAI Gym 슈마리오용 패키지](https://github.com/Kautenja/gym-super-mario-bros)을 설치합니다. 강화학습 환경으로 사용합니다. 자한 사양은 [GitHub](https://github.com/Kautenja/gym-super-mario-bros)에서 확인할 수  있습니다.


In [None]:
!pip install -qqq gym-super-mario-bros==7.3.0

기타 필요한 라이브러리를  설치합니다.

In [None]:
!apt -qq update
!apt install -y -qq xvfb
!pip install -qqq pyvirtualdisplay

필요한 라이브러리를 가져옵니다.

In [None]:
# Import the game
import gym_super_mario_bros
# Import the Joypad wrapper
from nes_py.wrappers import JoypadSpace
# Import the SIMPLIFIED controls
from gym.spaces import Box
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT
from gym_super_mario_bros.actions import COMPLEX_MOVEMENT
from gym_super_mario_bros.actions import RIGHT_ONLY

import gym
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import imshow
import cv2
from PIL import Image
import requests

import os
from google.colab import userdata
import base64
import json

## AI 슈퍼 마리오 환경구축

AI 슈퍼 마리오 환경을 구축합니다.



### AI 슈퍼마리오 환경 초기화 및 동작 확인 
환경 확인합니다.

In [None]:
STAGE_NAME = 'SuperMarioBros-1-1-v0'

In [None]:
# 마리오1-1 환경을 초기화합니다. 
env = gym_super_mario_bros.make(STAGE_NAME)

# 슈퍼마리오의 행동공간을 설정합니다. 
env = JoypadSpace(env, SIMPLE_MOVEMENT)

# MOVEMENT = [["right"], ["right", "A"]]
# env = JoypadSpace(env, MOVEMENT)
# env = JoypadSpace(env, COMPLEX_MOVEMENT)
# env = JoypadSpace(env, RIGHT_ONLY)

스테이지 초기화와 행동공간 설정을 하고 있습니다. 

PyTorch 튜토리얼에서는 아래와 같이 2가지 패턴으로 단순화된 행동을 하는 마리오로 제한되어 있습니다. 

```
# 행동 공간은 다음과 같이 제한합니다.
#   0. 오른쪽으로 걷기 
#   1. 오른쪽 방향으로 점프 
env = JoypadSpace(env, [["right"], ["right", "A"]])
```

직접 자유롭게 설정하는 것외에도 `gym_super_mario_bros`에는  `SIMPLE_MOVEMENT`, `COMPLEX_MOVEMENT`등 여러가지 패턴이 등록되어 있습니다. 어떤 패턴이  등록되어 있습니다. 어떤 패턴이 있는지는 [gym_super_mario_bros 소스코드](https://github.com/Kautenja/gym-super-mario-bros/blob/master/gym_super_mario_bros/actions.py)를 읽거나 아래와 같이  내용을 `print`하 확인하는 방법이 있습니다.

L이번에는 `SIMPLE_MOVEMENT`를 사용하였습니다.

In [None]:
print(SIMPLE_MOVEMENT)
print(COMPLEX_MOVEMENT)

환경을 재설정하고 환경을 확인합니다.

`env.step(action=0)`과 에이전트(슈퍼마리오)가 액션을 취하면 환경으로부터 상태(`state`)와 보상(`reward`), 완료 여부의 플래그(`done`), 정보(`info`)를 돌려받습니다. 

In [None]:
env.reset()
state, reward, done, info = env.step(action=0)

print('state:', state.shape)
print('reward:', reward)
print('done:', done)
print('info:', info)

행동  `action`의 범위나 상태 `state`의 범위를 확인하려면 다음을 수행하십시오. 

```python
from gym.spaces import Box, Discrete
def print_spaces(space):
    print(space)

    if isinstance(space, Box):
        print('min:', space.low)
        print('max:', space.high)
    if isinstance(space, Discrete):
        print('min:', 0)
        print('max:', space.n-1)

print('action space------------------')
print_spaces(env.action_space)
print('observation space------------------')
print_spaces(env.observation_space)
```

마지막으로 `state`를 이미지로 시각화해 버립니다. 

슈퍼마리오의 플레이 화면임을 확인할 수 있습니다. 

In [None]:
plt.imshow(state)

`state`는 `numpy.ndarray'입니다.

In [None]:
print(type(state))

`state`를 이미지로 변환합니다. 

In [None]:
image = Image.fromarray(state)
image.save('state.png')

이미지도 표시를 확인합니다.

In [None]:
img = Image.open('state.png')
imshow(img)

## LLM 테스트 

LLM이 이미지를 올바르게 불러와 재생할 수 있는지 테스트합니다.
OpenAI 라이브러리를 설치합니다.

In [None]:
!pip install -qqq openai

OpenAI API KEY를 불러옵니다.

미리 Google Colab의 스크릿키로 `OPENAI_API_KEY`를 설정해 두어야 합니다. 시크릿키 설정방법은 따로 찾아보세요. 




LLM에서 이미지를 읽을 수 있도록 인코딩 함수를 정의합니다.

In [None]:
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

이미지에서 슈퍼마리오 움직임을 추론하는 함수를 정의합니다.

In [None]:
def predict(state):
    # 현재 슈퍼마리오 상태를 PNG 이미지로 저장
    image = Image.fromarray(state)
    image.save('state.png')

    # 이미지 파일 인코딩 
    image_path = "./state.png"
    base64_image = encode_image(image_path)

    api_key = userdata.get("OPENAI_API_KEY")

    # 프롬프트 
    prompt = """
        이 이미지는 게임 슈퍼마리오의 플레이 화면입니다.
    　　 화면에 따라 아래 7가지 버튼조작이 가능합니다. 버튼 조작은 아래 7가지 중 하나를 선택해 주세요.
    　　 A는 점프, B는 대시, NOOP는 조작하지 않습니다. 

        0 = 'NOOP'
        1 = 'right'
        2 = 'right', 'A'
        3 = 'right', 'B'
        4 = 'right', 'A', 'B'
        5 = 'A'
        6 = 'left'

        아래와 같이 json으로 출력해 주세요. 한국어로 부탁드립니다.

    　　 explanation: 화면설정
        reason: 버튼조작의 이유
        action: 버튼조작의 종류 
    """

    headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {api_key}"
    }

    payload = {
    "model": "gpt-4o-2024-08-06",
    "messages": [
        {
        "role": "user",
        "content": [
            {
            "type": "text",
            "text": f"{prompt}"
            },
            {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/jpeg;base64,{base64_image}"
            }
            }
        ]
        }
    ],
    "max_tokens": 300,
    "temperature": 1,
    "response_format": {
        "type": "json_schema",
        "json_schema": {
        "name": "mario_action",
        "schema": {
            "type": "object",
            "properties": {
            "explanation": {
                "type": "string",
            },
            "reason": {
                "type": "string",
            },
            "action": {
                "type": "integer"
            }
            },
            "required": ["explanation", "reason", "action"],
        }
        }
    }
    }

    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

    content_dict = json.loads(response.json().get('choices')[0].get('message').get('content'))

    action = content_dict.get('action')
    explanation = content_dict.get('explanation')
    reason = content_dict.get('reason')

    if action == None:
        action = 0


    return action, explanation, reason

추론하는 함수를 테스트합니다. 

In [None]:
action, explanation, reason = predict(state)

액션을 확인합니다. 오른쪽으로 이동（`1`） 또는 대시하여 오른쪽으로 이동（`2`）되어 있는지 확인합니다. 

In [None]:
action

슈퍼마리오의 상황파악(이미지 설명)을 확인합니다.

In [None]:
explanation

행동 이유를 확인합니다.

In [None]:
reason

## AI 슈퍼마리오 테스트

AI모델을 테스트합니다.

테스트 횟수등을 설정합니다.

In [None]:
EPISODE_NUMBERS = 1
MAX_TIMESTEP_TEST = 1000
SKIP_RATE = 10

In [None]:
import copy

total_reward = [0] * EPISODE_NUMBERS
total_time = [0] * EPISODE_NUMBERS
best_reward = 0
frames_best = []

for i in range(EPISODE_NUMBERS):
    state = env.reset()  # reset for each new trial
    done = False
    total_reward[i] = 0.0
    total_time[i] = 0
    skip_numb = SKIP_RATE
    frames = []

    while not done and total_time[i] < MAX_TIMESTEP_TEST:
        if skip_numb < SKIP_RATE:
            skip_numb += 1
        else:
            skip_numb = 0
            action, explanation, reason = predict(state)

            plt.imshow(state)
            plt.pause(0.001)

            print(f'action: {action}')
            print(f'explanation: {explanation}')
            print(f'reason: {reason}')

        state, reward, done, info = env.step(action)
        total_reward[i] += reward
        total_time[i] += 1
        frames.append(copy.deepcopy(env.render(mode = 'rgb_array')))

    if total_reward[i] > best_reward:
        best_reward = total_reward[i]
        frames_best = copy.deepcopy(frames)

    print('test episode:', i, 'reward:', total_reward[i], 'time:', total_time[i])

    # Judgement Mario goal or not
    if info['flag_get']:
        print(f"<<< Mario get the flag. GOOOOOOOOOOOOOOOOOOOOOAL! >>>")
        break

print('average reward:', (sum(total_reward) / EPISODE_NUMBERS),
      'average time:', (sum(total_time) / EPISODE_NUMBERS),
      'best_reward:', best_reward)

`<<< Mario get the flag. GOOOOOOOOOOOOOOOOOOOOOAL! >>>`라는 표시가 나오면, 클리어한 것입니다.

클리어하지 못하면, 여러번 플레이하여 가장 좋은 플레이 영상이 저장됩니다.

실행할 때마다 결과가 달라집니다. 여러번 시도해 봅시다.

## AI 슈퍼마리오 플레이 영상저장 및 재생 

저장된 이미지 로그에서 동영상을 저장하고 재생할 수 있습니다.

이미지가 많으면 동영상 생성 및 확인에 시간이 오래 걸리므로 이미지를 간소화합니다. 

이미지를 가볍게 만드는 비율을 입력합니다. (클수록 가볍게 만들어짐)

In [None]:
SKIP_RATE = 4

이미지 간소화 처리를 합니다.

In [None]:
if SKIP_RATE == 1:
  frames_new = np.array(frames_best)
else:
  np_frames = np.array(frames_best)
  frames_new = np_frames[::SKIP_RATE].tolist()

Google Colab에서 확인할 수 있는 동영상을 생성합니다. 생성 후 재생버튼을 누르면 AI 슈퍼마리오의 플레이 영상을 볼 수 있습니다.

In [None]:
import matplotlib.animation
from IPython.display import HTML

matplotlib.rcParams['animation.embed_limit'] = 2**128
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi = 72)
patch = plt.imshow(frames_new[0])
plt.axis('off')
animate = lambda i: patch.set_data(frames_new[i])
ani = matplotlib.animation.FuncAnimation(plt.gcf(), animate, frames=len(frames_new), interval = 50)
HTML(ani.to_jshtml())

동영상을 저장합니다.

In [None]:
ani.save('ai_mario.mp4')

동영상 파일을 다운로드합니다.

In [None]:
from google.colab import files
files.download('ai_mario.mp4')

GIF 애니메이션 형식으로 저장합니다.

In [None]:
import imageio
imageio.mimsave('ai_mario.gif', [np.array(img) for i, img in enumerate(frames_best) if i%SKIP_RATE == 0], fps=15)

GIF애니메이션을 다운로드합니다.

In [None]:
from google.colab import files
files.download('ai_mario.gif')

##  참고자료 

- https://platform.openai.com/docs/guides/vision
- https://platform.openai.com/docs/guides/structured-outputs/introduction
- https://openai.com/index/introducing-structured-outputs-in-the-api/
- https://platform.openai.com/settings/organization/billing/overview