In [1]:
import matplotlib.pyplot as plt
import matplotlib
import math
import random
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [17]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

In [2]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """transition 저장"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [8]:
class DQN(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.timing = 0 #0 ~ 150
        self.swing = None #Boolean
        self.dim = 6
        self.hidden_dim = 32
        self.out_dim = 16 #0~150가능 -> 각 10당 할당, 나머지 하나는 no swing
        self.linear1 = nn.Linear(self.dim, self.hidden_dim)
        self.linear2 = nn.Linear(self.hidden_dim, self.hidden_dim)
        self.linear3 = nn.Linear(self.hidden_dim, self.out_dim)
        #이후 각 구간으로 나눈 다음 약간의 random성을 넣으면 좋을 듯
        #만약 0~15이면 0~10까지 할당 +-5값은 random하게 줌. 나머지 하나는 no swing
        self.gelu = nn.GELU()


    #forward에서 Standard Normal Distribution이후 해당 값을 변환
    def forward(self, x): #x에는 env, 현재 투수
        x = self.gelu(self.linear1(x))
        x = self.gelu(self.linear2(x))
        x = self.gelu(self.linear3(x))

        return x #output의 경우 최종으로 변환하여 -50~200으로 하는 것이 좋을 듯. 
    #Monte Carlo 방식으로 학습하면 좋을 듯?

In [4]:
class Environment():
    def __init__(self) -> None:
        self.total_score = 0
        self.strike = 0
        self.ball = 0
        self.out = 0
        self.pitcher = None
        self.curr_base = [0, 0, 0] #각 1,2,3 루
    
    def get_env(self):
        return [self.strike,self.ball, self.out,
                self.pitcher, self.curr_base[0], self.curr_base[1], self.curr_base[2]]
    
    def strike_plus(self):
        if self.strike == 2:
            if self.out == 2:
                self.strike, self.ball, self.out, self.curr_base = 0, 0, 0, [0, 0, 0]
            else:
                self.out += 1
        else:
            self.strike += 1

    def ball_plus(self):
        if self.ball == 3:
            if self.curr_base[0] == 0: #1루가 비어있는 경우
                self.curr_base[0] = 1
            elif self.curr_base[0] == 1 and self.curr_base[1] == 1: #1, 2루가 채워 있는 경우
                self.curr_base[2] = 1
            elif self.curr_base[0] == 1 and self.curr_base[1] == 1 and self.curr_base[2] == 1:
                self.total_score += 1
            elif self.curr_base[0] == 1 and self.curr_base[2] == 1:
                self.curr_base[1] = 1
            elif self.curr_base[0] == 1:
                self.curr_base[1] = 1
        else:
            self.ball += 1

    def out_plus(self):
        if self.out == 2:
            self.strike, self.ball, self.out, self.curr_base = 0, 0, 0, [0, 0, 0]
        else:
            self.out += 1

    def single(self):
        if self.curr_base[2] == 1:
            self.total_score += 1
            self.curr_base[2] = 0
        
        if self.curr_base[1] == 1:
            self.curr_base[2] = 1
            self.curr_base[1] = 0
        
        if self.curr_base[0] == 1:
            self.curr_base[1] = 1
            #self.curr_base[0] = 1 --> 할 필요없음, 어차피 안타친 타자가 1루에 들어감

    def double(self):
        if self.curr_base[2] == 1:
            self.total_score += 1
            self.curr_base[2] = 0
        
        if self.curr_base[1] == 1:
            self.total_score += 1
            self.curr_base[1] = 0
        
        if self.curr_base[0] == 1:
            self.curr_base[2] = 1
            self.curr_base[1] = 1
            self.curr_base[0] = 0

    def triple(self):
        if self.curr_base[2] == 1:
            self.total_score += 1
            self.curr_base[2] = 0
        
        if self.curr_base[1] == 1:
            self.total_score += 1
            self.curr_base[1] = 0
        
        if self.curr_base[0] == 1:
            self.total_score += 1
            self.curr_base[0] = 0
            self.curr_base[2] = 1

    def homerun(self):
        if self.curr_base[2] == 1:
            self.total_score += 1
            self.curr_base[2] = 0
        
        if self.curr_base[1] == 1:
            self.total_score += 1
            self.curr_base[1] = 0
        
        if self.curr_base[0] == 1:
            self.total_score += 1
            self.curr_base[0] = 0

        self.total_score += 1




In [6]:
class Pitcher():
    def __init__(self, conditional) -> None:
        self.fast_conditional = conditional
        #예시 : [50.7, 41.8, 36.1, 67.4, 50.2]

    def return_trajectory(self, env : Environment): #여기서 상황에 따라 자세히 구현해야 함.
        random_number = random.random()

        if env.strike == 0 and env.ball == 0:
            if self.fast_conditional[0] < random_number:
                return "fastball"
            else:
                return "curve"
        
        if env.strike > env.ball:
            if self.fast_conditional[2] < random_number:
                return "fastball"
            else:
                return "curve"
            
        if env.strike < env.ball:
            if self.fast_conditional[3] < random_number:
                return "fastball"
            else:
                return "curve"
            
        if env.strike == env.ball:
            if self.fast_conditional[4] < random_number:
                return "fastball"
            else:
                return "curve"

In [10]:
# BATCH_SIZE는 리플레이 버퍼에서 샘플링된 트랜지션의 수입니다.
# GAMMA는 이전 섹션에서 언급한 할인 계수입니다.
# EPS_START는 엡실론의 시작 값입니다.
# EPS_END는 엡실론의 최종 값입니다.
# EPS_DECAY는 엡실론의 지수 감쇠(exponential decay) 속도 제어하며, 높을수록 감쇠 속도가 느립니다.
# TAU는 목표 네트워크의 업데이트 속도입니다.
# LR은 ``AdamW`` 옵티마이저의 학습율(learning rate)입니다.
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

In [11]:
policy_net = DQN()
target_net = DQN()
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), amsgrad=True)
memory = ReplayMemory(10000)

In [12]:
def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max (1)은 각 행의 가장 큰 열 값을 반환합니다.
            # 최대 결과의 두번째 열은 최대 요소의 주소값이므로,
            # 기대 보상이 더 큰 행동을 선택할 수 있습니다.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randint(0, 16)]], dtype=torch.long)

In [13]:
episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # 100개의 에피소드 평균을 가져 와서 도표 그리기
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # 도표가 업데이트되도록 잠시 멈춤
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

In [None]:
def env_step(action, env : Environment):
    if action[0][0] == 16:
        timing = -1 #no swing
    else:
        timing = (action[0][0] * 10 + 5) + 5 * random.uniform(-1, 1)
         

In [14]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). 이것은 batch-array의 Transitions을 Transition의 batch-arrays로
    # 전환합니다.
    batch = Transition(*zip(*transitions))

    # 최종이 아닌 상태의 마스크를 계산하고 배치 요소를 연결합니다
    # (최종 상태는 시뮬레이션이 종료 된 이후의 상태)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Q(s_t, a) 계산 - 모델이 Q(s_t)를 계산하고, 취한 행동의 열을 선택합니다.
    # 이들은 policy_net에 따라 각 배치 상태에 대해 선택된 행동입니다.
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # 모든 다음 상태를 위한 V(s_{t+1}) 계산
    # non_final_next_states의 행동들에 대한 기대값은 "이전" target_net을 기반으로 계산됩니다.
    # max(1)[0]으로 최고의 보상을 선택하십시오.
    # 이것은 마스크를 기반으로 병합되어 기대 상태 값을 갖거나 상태가 최종인 경우 0을 갖습니다.
    next_state_values = torch.zeros(BATCH_SIZE)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    # 기대 Q 값 계산
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Huber 손실 계산
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # 모델 최적화
    optimizer.zero_grad()
    loss.backward()
    # 변화도 클리핑 바꿔치기
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

In [None]:
if torch.cuda.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # 환경과 상태 초기화
    state, info = Environment()
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward])
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)

        # 메모리에 변이 저장
        memory.push(state, action, next_state, reward)

        # 다음 상태로 이동
        state = next_state

        # (정책 네트워크에서) 최적화 한단계 수행
        optimize_model()

        # 목표 네트워크의 가중치를 소프트 업데이트
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()