## 강화학습 프로젝트 (Swipe 벽돌깨기)
### 20101584 진민찬

# 환경구성 Swipe 게임

In [None]:
# 게임 로직
import pygame, random, math, numpy as np

# 상수 
BLOCK_SIZE, COLS       = 48, 6
PLAY_ROWS, UI_HEIGHT   = 7, 88
WIDTH, HEIGHT          = BLOCK_SIZE*COLS, PLAY_ROWS*BLOCK_SIZE + UI_HEIGHT
FPS                    = 60
BALL_SPEED, R          = 8, 6
A_MIN, A_MAX           = math.radians(-170), math.radians(-10)

# 색상
BEIGE=(250,240,225); WHITE=(255,255,255)
ORANGE=(255,165,0); RED=(255,0,0); GREEN=(0,255,0)
BALL_C=(0,200,255); TXT=(50,50,50)

pygame.init(); pygame.display.set_caption("Swipe RL")
screen = pygame.display.set_mode((WIDTH, HEIGHT), flags=pygame.HIDDEN)
clock  = pygame.time.Clock()
font   = pygame.font.SysFont("malgungothic", 20, bold=True)

#  유틸 
def grad(hp,mx): return tuple(int(ORANGE[i]+(RED[i]-ORANGE[i])*hp/mx) for i in range(3))
def circle_rect_overlap(cx,cy,r,rect):
    x=max(rect.left,min(cx,rect.right)); y=max(rect.top,min(cy,rect.bottom))
    return (cx-x)**2+(cy-y)**2<=r**2

#  엔티티 
class Ball:
    def __init__(s,pos,ang):
        s.x,s.y=pos; s.dx=BALL_SPEED*math.cos(ang); s.dy=BALL_SPEED*math.sin(ang)
    def update(s):
        s.prev_x,s.prev_y=s.x,s.y; s.x+=s.dx; s.y+=s.dy
        if s.x-R<=0 or s.x+R>=WIDTH: s.dx*=-1
        if s.y-R<=0:                s.dy*=-1
    def reflect(s,rect):
        cx=max(rect.left,min(s.prev_x,rect.right)); cy=max(rect.top,min(s.prev_y,rect.bottom))
        if abs(s.prev_x-cx)>abs(s.prev_y-cy): s.dx*=-1
        else:                                 s.dy*=-1
        s.x,s.y=s.prev_x,s.prev_y
        for _ in range(12):
            s.x+=s.dx*0.5; s.y+=s.dy*0.5
            if not circle_rect_overlap(s.x,s.y,R,rect): break

class Block:
    def __init__(s,c,r,hp):
        s.rect=pygame.Rect(c*BLOCK_SIZE,r*BLOCK_SIZE,BLOCK_SIZE-2,BLOCK_SIZE-2); s.hp=hp
class Bonus:
    def __init__(s,c):
        s.rect=pygame.Rect(0,0,20,20); s.rect.center=(c*BLOCK_SIZE+BLOCK_SIZE//2,BLOCK_SIZE//2)

#  게임 
class SwipeGame:
    def __init__(s):
        s.y_line=PLAY_ROWS*BLOCK_SIZE; s.reset()

    # 초기화
    def reset(s):
        s.game_over = False 
        s.blocks=[]; s.bonuses=[]; s.balls=[]
        s.balls_tot,s.turn=1,1
        s.shoot=[WIDTH//2, s.y_line-R]
        s.spawn_row(); return s.state_vec()

    # 상태 86‑dim (Block 상태 42, 보너스 상태 42, 보유 공 개수 1, 공의 위치 1)
    def state_vec(self):
        hp  = np.zeros((COLS, PLAY_ROWS), np.float32)
        bn  = np.zeros_like(hp)

        for blk in self.blocks:
            c = blk.rect.x // BLOCK_SIZE
            r = blk.rect.y // BLOCK_SIZE
            if r < PLAY_ROWS:
                hp[c, r] = blk.hp / (self.turn + 1)

        for bo in self.bonuses:
            c = bo.rect.centerx // BLOCK_SIZE
            r = bo.rect.centery // BLOCK_SIZE
            if r < PLAY_ROWS:
                bn[c, r] = 1.0

        launch_x   = self.shoot[0] / WIDTH
        balls_norm = self.balls_tot / (self.turn + 1)

        return np.concatenate(
            [hp.flatten(), bn.flatten(), [launch_x, balls_norm]],
            dtype=np.float32
        )


    # 새 행 생성
    def spawn_row(self):
        for o in (*self.blocks, *self.bonuses):
            o.rect.y += BLOCK_SIZE
        col = random.randrange(COLS)
        for c in range(COLS):
            if c == col:
                self.bonuses.append(Bonus(c))
            elif random.random() < 0.7:
                self.blocks.append(Block(c, 0, self.turn))


    # 동시 발사 (줄지어 발사, 꼬리 전체 이동 보정)
    def spawn_balls(self, ang):
        """
        1) shoot 지점에서 발사 각도의 역방향으로 2R 간격으로 공을 나열
        2) 가장 뒤 공이 화면 밖이면, 꼬리 전체를 발사 방향 쪽으로
           필요한 만큼 이동시켜 모든 공이 화면 안에 들어오게 함
        결과: 공들이 한 줄로, 일정 간격을 두고 동시에 발사된다.
        """
        # 간격 벡터 (꼬리 간격 = 2R)
        dx = math.cos(ang) * R * 2
        dy = math.sin(ang) * R * 2

        # 초기 꼬리 좌표 계산
        tail = [(
            self.shoot[0] - dx * i,
            self.shoot[1] - dy * i
        ) for i in range(self.balls_tot)]

        # 가장 뒤 좌표가 화면 안에 있는지 확인
        min_x = min(p[0] for p in tail)
        max_x = max(p[0] for p in tail)
        min_y = min(p[1] for p in tail)

        shift_x, shift_y = 0.0, 0.0

        # 좌우 경계
        if min_x < R:
            shift_x = R - min_x
        elif max_x > WIDTH - R:
            shift_x = (WIDTH - R) - max_x

        # 상단, 바닥 경계(바닥은 y_line)
        if min_y < R:
            shift_y = R - min_y

        # 꼬리 전체를 동일한 벡터만큼 이동
        if shift_x or shift_y:
            tail = [(x + shift_x, y + shift_y) for x, y in tail]

        # ④ Ball 객체 생성
        return [Ball(pos, ang) for pos in tail]

    # 한 턴 플레이 (학습, 로직용)
    def play_turn(self, angle_rad):
        #  턴 시작 선‑GameOver 판정 
        if any(blk.rect.top >= self.y_line for blk in self.blocks):
            return -1.0, True

        GAP = 4
        MAX_FRAMES = 10_000
        launch_left = self.balls_tot
        balls, frame = [], 0
        self.balls = balls
        pre_hp = sum(b.hp for b in self.blocks)
        first_landed = None

        # Gap 간격으로 공을 발사해 꼬리처럼 수행행
        while (launch_left > 0 or balls) and frame < MAX_FRAMES:
            if launch_left > 0 and frame % GAP == 0:
                balls.append(Ball(tuple(self.shoot), angle_rad))
                launch_left -= 1

            for b in balls[:]:
                b.update()
                # 블록 충돌
                for blk in self.blocks[:]:
                    if circle_rect_overlap(b.x,b.y,R,blk.rect):
                        blk.hp -= 1; b.reflect(blk.rect)
                        if blk.hp == 0: self.blocks.remove(blk)
                        break
                # 보너스 충돌
                for bon in self.bonuses[:]:
                    if circle_rect_overlap(b.x,b.y,R,bon.rect):
                        self.balls_tot += 1; self.bonuses.remove(bon); break
                # 바닥 도달
                if b.y + R >= self.y_line:
                    if first_landed is None: first_landed = (b.x,b.y)
                    balls.remove(b)
                # 속도 0 보정
                if abs(b.dx) < 1e-6 and abs(b.dy) < 1e-6:
                    b.dx = BALL_SPEED*0.1; b.dy = -BALL_SPEED*0.1
            frame += 1

        #  보상 계산 
        post_hp = sum(b.hp for b in self.blocks)
        reward = 0 if pre_hp == 0 else (pre_hp - post_hp) / pre_hp

        # 프레임 한도 초과 → 비정상 종료 (버그 발생시 학습 중지 방지지)
        if frame >= MAX_FRAMES:
            reward -= 1
            return reward, True

        # 정상 종료 뒤 GameOver 판정
        if any(blk.rect.top >= self.y_line for blk in self.blocks):
            reward -= 1
            return reward, True

        if first_landed:
            x = first_landed[0]

            # 우측 래핑 0 → WIDTH 보정
            if x == 0:
                x = WIDTH

            PAD = R + 2
            safe_x = max(PAD, min(WIDTH - PAD, x))
            self.shoot[0] = int(round(safe_x))

        self.turn += 1
        self.spawn_row()

        return reward, False


    #시각화용 (60 FPS)
    def play_turn_visual(self, angle_rad, surf, clock, fps=60, gap_frames=4):
        if any(blk.rect.top >= self.y_line for blk in self.blocks):
            self._render(surf); pygame.display.flip()
            return -1.0, True

        MAX_FRAMES = 10_000
        launch_left = self.balls_tot
        balls, frame = [], 0
        self.balls = balls
        pre_hp = sum(b.hp for b in self.blocks)
        first_landed = None

        while (launch_left > 0 or balls) and frame < MAX_FRAMES:
            if launch_left > 0 and frame % gap_frames == 0:
                balls.append(Ball(tuple(self.shoot), angle_rad))
                launch_left -= 1

            for b in balls[:]:
                b.update()
                for blk in self.blocks[:]:
                    if circle_rect_overlap(b.x,b.y,R,blk.rect):
                        blk.hp -= 1; b.reflect(blk.rect)
                        if blk.hp == 0: self.blocks.remove(blk); break
                for bon in self.bonuses[:]:
                    if circle_rect_overlap(b.x,b.y,R,bon.rect):
                        self.balls_tot += 1; self.bonuses.remove(bon); break
                if b.y+R >= self.y_line:
                    if first_landed is None: first_landed = (b.x,b.y)
                    balls.remove(b)
            self.balls = balls
            self._render(surf); pygame.display.flip(); clock.tick(fps)
            frame += 1

        post_hp = sum(b.hp for b in self.blocks)
        reward = 0 if pre_hp == 0 else (pre_hp - post_hp) / pre_hp

        if frame >= MAX_FRAMES or any(blk.rect.top >= self.y_line for blk in self.blocks):
            reward -= 1
            return reward, True

        # 공 - 경계면 충돌 버그 방지지
        if first_landed:
            x = first_landed[0]

            # 우측 래핑 0 → WIDTH 보정
            if x == 0:
                x = WIDTH

            PAD = R + 2
            safe_x = max(PAD, min(WIDTH - PAD, x))
            self.shoot[0] = int(round(safe_x))

        self.turn += 1
        self.spawn_row()
        
        return reward, False

    # 렌더링
    def _render(s,surf):
        surf.fill(BEIGE)
        pygame.draw.line(surf,TXT,(0,s.y_line),(WIDTH,s.y_line),3)
        mx=max([blk.hp for blk in s.blocks],default=1)
        for blk in s.blocks:
            pygame.draw.rect(surf,grad(blk.hp,mx),blk.rect)
            t=font.render(str(blk.hp),True,WHITE)
            surf.blit(t,t.get_rect(center=blk.rect.center))
        for bon in s.bonuses: pygame.draw.circle(surf,GREEN,bon.rect.center,10)
        for ball in getattr(s,'balls',[]): pygame.draw.circle(surf,BALL_C,(int(ball.x),int(ball.y)),R)
        pygame.draw.circle(surf,BALL_C,s.shoot,7)
        ui=font.render(f"Level:{s.turn}  Balls:{s.balls_tot}",True,TXT)
        surf.blit(ui,(10,s.y_line+12))


# 게임 환경을 학습이 가능하도록 Gym 환경으로 정의

In [None]:
# 변수 및 Gym 환경 정의
import gym
import numpy as np
from stable_baselines3 import DQN

# 21-discrete 발사 각 LUT  (deg → rad)
ANGLE_LUT = np.radians(np.linspace(-170, -10, 21))

class SwipeEnv(gym.Env):
    metadata = {"render_modes": []}

    def __init__(self):
        super().__init__()
        self.game = SwipeGame()
        self.action_space      = gym.spaces.Discrete(len(ANGLE_LUT))
        self.observation_space = gym.spaces.Box(
            low = 0.0, high = 1.0, shape = (86,), dtype = np.float32
        )

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.game.reset()
        return self.game.state_vec(), {}

    def step(self, action):
        angle   = ANGLE_LUT[int(action)]
        reward, done = self.game.play_turn(angle)
        obs   = self.game.state_vec()
        info  = {"turns": self.game.turn}
        return obs, reward, done, False, info
    
    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return [seed]

# DQN 학습 코드
## Inference만 수행 시 실행 X

In [None]:
#DQN 학습 + 그래프 저장
import os, warnings, pandas as pd, matplotlib.pyplot as plt
from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.logger import configure

# 학습 및 평가가 환경
train_env = SwipeEnv()
eval_env  = SwipeEnv()
eval_env.seed(42)

# 모델 정의의
model = DQN(
    policy                 = "MlpPolicy",
    env                    = train_env,
    learning_rate          = 3e-4,
    buffer_size            = 100_000,
    batch_size             = 256,
    gamma                  = 0.99,
    target_update_interval = 1,
    exploration_fraction   = 0.40,
    verbose                = 1,
    exploration_final_eps  = 0.1,
    tau = 0.005,
)

# CSV 형식으로 log 저장
log_dir = "result/logs"
os.makedirs(log_dir, exist_ok=True)
model.set_logger(configure(log_dir, ["stdout", "csv"]))

# best model 저장 콜백
eval_cb = EvalCallback(
    eval_env,
    n_eval_episodes      = 5,
    eval_freq            = 1000,              # 1000 step마다 평가
    best_model_save_path = "result/best",
    deterministic        = True,
)

# 학습
TOTAL_STEPS = 50000
model.learn(total_timesteps=TOTAL_STEPS, callback=eval_cb)
model.save("result/last_model")

# 그래프
progress_csv = os.path.join(log_dir, "progress.csv")
if not os.path.isfile(progress_csv):
    warnings.warn("그래프 생성 실패패")
else:
    df = pd.read_csv(progress_csv)

    def _save_curve(keys, title, fname, ylabel):
        """
        열 후보 리스트 (순서대로 우선순위)
        """
        key = next((k for k in keys if k in df.columns), None)
        if key is None:
            warnings.warn(f"{keys[0]} 열 없음음 → {title} 그래프 건너뜀.")
            return
        x = df["time/total_timesteps"]
        y = df[key]
        plt.figure()
        valid = y.notna()
        plt.plot(x[valid], y[valid])
        plt.title(title); plt.xlabel("training updates"); plt.ylabel(ylabel)
        plt.tight_layout(); plt.savefig(os.path.join("result", fname)); plt.close()

    _save_curve(["rollout/ep_rew_mean"],  "Mean Episode Reward",  "reward_curve.png",  "reward")
    _save_curve(["rollout/ep_len_mean"],  "Mean Episode Length",  "length_curve.png",  "steps")
    _save_curve(["train/loss", "train/td_loss"], "TD-Loss (Q-network)", "loss_curve.png", "loss")
    _save_curve(
        ["train/exploration_rate", "train/epsilon", "rollout/exploration_rate"],
        "Epsilon (exploration)", "epsilon_curve.png", "ε"
    )

    print("그래프 저장 완료")


# Inference 코드
## Result의 Best model로 수행

In [None]:
# 시각화 Inference
import pygame, time

# Pygame 초기화
pygame.init() 
pygame.display.set_caption("Swipe RL Inference")

# 창, 폰트 객체
screen_vis = pygame.display.set_mode((WIDTH, HEIGHT))
font       = pygame.font.SysFont("malgungothic", 20, bold=True)

# 게임, 모델
game_vis = SwipeGame(); game_vis.reset()
model    = DQN.load("result/best/best_model.zip")

done, obs = False, game_vis.state_vec()
while not done:
    # 게임 종료 (ESC or X창 클릭)
    for event in pygame.event.get():
        if event.type == pygame.QUIT or (
            event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE
        ):
            done = True
            break
    if done:
        break

    # 정책 추론 , 시각화
    action, _ = model.predict(obs, deterministic=True)
    angle     = ANGLE_LUT[int(action)]
    reward, done = game_vis.play_turn_visual(
        angle, surf=screen_vis, clock=pygame.time.Clock(), fps=60
    )
    obs = game_vis.state_vec()
    time.sleep(0.01)

pygame.quit()
