In [15]:
import tensorflow as tf
import keras
import numpy as np
import random
import pickle
import sys
from keras.models import Sequential
from keras.layers.convolutional import Conv2D
from keras.layers import Dense
from keras.optimizers import Adam
from collections import deque
from keras.layers import Reshape

In [4]:
BOARD_ROWS = 7
BOARD_COLS = 7

In [5]:
# 참고 https://www.secmem.org/blog/2020/02/08/snake-dqn/

In [6]:
class Germ:
  def __init__(self, alpha = 0.002, gamma = 0.95, epsilon = 0.1):
    self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
    self.board[0, 0] = self.board[BOARD_COLS-1, BOARD_COLS-1] = -1
    self.board[BOARD_ROWS-1, 0] = self.board[0, BOARD_COLS-1] = 1
    # 우리가 1, 선공일 때를 나타냄
    self.isEnd = False
    self.playerSymbol = 1
    self.alpha = alpha
    self.gamma = gamma
    self.epsilon = epsilon
    self.batch_size = 64
    self.min_replay_memory_size = 1000 # 얼마가 적당할지 잘 모르겠음.
    self.target_update_freq = 100

    self.model = self.build_model()
    self.target_model = self.build_model()
    self.target_model.set_weights(self.model.get_weights())
    self.model.summary()

    self.replay_memory_size = 5000
    self.replay_memory = deque(maxlen=self.replay_memory_size)
    self.target_update_counter = 0

  def cantmove(self): # 더 이상 움직일 수 없을 때 남은 곳을 상대 말로 채운다.
    self.isEnd = True
    for i in range(BOARD_ROWS):
      for j in range(BOARD_COLS):
        if self.board[i, j] == 0:
          self.board[i, j] = -self.playerSymbol
    return None
  
  def winner(self): # 맵이 다 찼다면 점수를 반환한다.
    if sum(map(sum, map(abs, self.board))) == BOARD_ROWS*BOARD_COLS:
      self.isEnd = True
      return sum(map(sum, self.board))
    return None

  def availableActions(self): # 가능한 행동들을 반환한다.
    Actions = []
    for i in range(BOARD_ROWS):
        for j in range(BOARD_COLS):
            if self.board[i, j] == self.playerSymbol:
                for ii in range(-2,3):
                    for jj in range(-2,3):
                        if ii == 0 and jj == 0:
                          continue
                        if i + ii < 0 or i + ii >= BOARD_ROWS or j + jj < 0 or j + jj >= BOARD_COLS:
                          continue
                        if self.board[i + ii, j + jj] == 0:
                          act = i
                          act = act*BOARD_COLS + j
                          act = act*BOARD_COLS + i + ii
                          act = act*BOARD_COLS + j + jj
                          Actions.append(act)
    return Actions

  def isAvailableAction(self, Action): # 가능한 행동인지?
    position = np.zeros(4)
    Action = int(Action)
    for i in range(4):
      position[3-i] = Action % BOARD_COLS
      Action = Action // BOARD_COLS
    position = [int(i) for i in position]
    return self.board[position[0]][position[1]]==self.playerSymbol and self.board[position[2]][position[3]]==0

  def updateState(self, Action): # 현재 상태에서 특정 행동을 한 다음 상태로 업데이트 한다.
      position = np.zeros(4)
      for i in range(4):
        position[3-i] = Action % BOARD_COLS
        Action = Action // BOARD_COLS
      position = [int(i) for i in position]
      #print(Action, position)
      ii = position[2] - position[0]
      jj = position[3] - position[1]
      if max(abs(ii), abs(jj)) == 2:
          self.board[position[0:2]] = 0
      
      dx1 = [-1, -1, -1, 0, 0, 1, 1, 1]
      dy1 = [-1, 0, 1, -1, 1, -1, 0, 1]
      i, j = position[2:4]
      self.board[i, j] = self.playerSymbol
      for ii, jj in zip(dx1, dy1):
          if i + ii < 0 or i + ii >= BOARD_ROWS or j + jj < 0 or j + jj >= BOARD_COLS:
              continue
          if self.board[i + ii, j + jj] == -self.playerSymbol:
              self.board[i + ii, j + jj] = self.playerSymbol
          
      # switch to another player
      self.playerSymbol = -self.playerSymbol

  def reset(self): # 리셋.
      self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
      self.board[0, 0] = self.board[BOARD_ROWS-1, BOARD_COLS-1] = 1
      self.board[BOARD_ROWS-1, 0] = self.board[0, BOARD_COLS-1] = -1
      self.boardHash = None
      self.isEnd = False
      self.playerSymbol = 1
    
  def test(self):
    while not self.isEnd:
      avac = availableActions()
      if not avac:
        self.cantmove()
      else:
        state = self.board
        state = np.float32(state)
        q_vals = self.p1.model.predict(state)[0,0,:]
        opt_action = 0
        for action in self.availableActions:
          if opt_action == 0:
            opt_action = action
          elif q_vals[opt_action]<q_vals[action]:
            opt_action = action
        self.updateState(opt_action)
      self.showBoard()
      win = self.winner()
      if win is not None:
        if win > 0:
          print(self.p1.name, "wins!")
        else:
          print(self.p2.name, "wins!")
        self.reset()
        break
      
      else:
        avac = availableActions()
        if not avac:
          self.cantmove()
        else:
          state = self.board
          state = np.float32(state)
          q_vals = self.p2.model.predict(state)[-1]
          opt_action == 0
          for action in self.availableActions:
            if opt_action == 0:
              opt_action = action
            elif q_vals[opt_action]<q_vals[action]:
              opt_action = action
          self.updateState(opt_action)
        self.showBoard()
        win = self.winner()
        if win is not None:
          if win > 0:
            print(self.p2.name, "wins!")
          else:
            print(self.p1.name, "wins!")
          print()
          self.reset()
          break
        
  def showBoard(self):
    # p1: o  p2: x
        for i in range(0, BOARD_ROWS):
            print('------------------------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                if self.board[i, j] == 1:
                    token = 'o'
                if self.board[i, j] == -1:
                    token = 'x'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('------------------------------')  

  def build_model(self): # DQN 모델을 생성한다.
    model = Sequential()
    model.add(Conv2D(16, (3, 3), padding = 'valid', input_shape=(7, 7, 1), activation='relu'))
    model.add(Conv2D(16, (3, 3), padding = 'valid', input_shape=(5, 5, 1), activation='relu'))
    model.add(Conv2D(16, (3, 3), padding = 'valid', input_shape=(3, 3, 1), activation='relu'))
    model.add(Dense(64 * BOARD_COLS * BOARD_COLS, activation='relu'))
    model.add(Dense(BOARD_COLS**4, activation='relu'))
    model.add(Reshape((BOARD_ROWS**4,)))
    model.compile(loss='mse', optimizer=Adam(lr=self.alpha))
    return model

  def update_replay_memory(self, current_state, action, reward, next_state, done): # 리플레이 메모리에 상황을 저장한다.
    self.replay_memory.append((current_state, action, reward, next_state, done))

  def get_q_values(self, x): # 현재 상태에서 할 행동들의 q_value를 반환, x는 board에 대응됨.
    return self.model.predict(x.reshape(1,BOARD_ROWS, BOARD_COLS, 1))

  def getAction(self, state, epsilon):
    if np.random.rand() <= epsilon:
       # 무작위 행동 반환
      avac_size = len(self.availableActions())
      return self.availableActions()[random.randrange(avac_size)]
    else:
       # 모델로부터 행동 산출
      state = np.float32(state*self.playerSymbol)
      q_values = self.model.predict(state.reshape(1,BOARD_ROWS,BOARD_COLS,1))
      return np.argmax(q_values)

  def epsbyepi(self, episode):
    return max(self.epsilon, 1 - 1/(1+np.exp(-episode/2500+6)))

  def play(self, episode):
    prev_state = np.zeros((BOARD_ROWS,BOARD_COLS))
    self.reset()
    while not self.isEnd:
      #self.showBoard()
      avac = self.availableActions()
      dora = False
      if not avac:
        self.cantmove()
      else:
        action = self.getAction(self.board, self.epsbyepi(episode))
        state = self.board * self.playerSymbol
        if self.isAvailableAction(action):
          self.updateState(action)
          win = self.winner()
          if win is None:
            reward = 0
          else:
            reward = win * self.playerSymbol
            dora = True
        else:
          self.isEnd = True
          reward = -100
      if self.isEnd and dora:
        self.update_replay_memory(prev_state[0], prev_state[1], -reward, prev_state[2], False)
      prev_state = (state, action, self.board)
      self.update_replay_memory(state, action, reward, self.board, self.isEnd)
      #print(action)
      #print(state)
      #print(win, reward, self.isEnd, dora, self.playerSymbol)
    

  def train(self):
    if len(self.replay_memory)<self.min_replay_memory_size: # 충분히 모이지 않으면 학습하지 않는다.
      return
    
    samples = random.sample(self.replay_memory, self.batch_size)
    current_input = np.stack([sample[0] for sample in samples]) # current_state들의 array
    current_q_values = self.model.predict(current_input.reshape(len(current_input),BOARD_ROWS, BOARD_COLS,1))
    next_input = np.stack([sample[3] for sample in samples])
    next_q_values = self.target_model.predict(next_input.reshape(len(next_input),BOARD_ROWS, BOARD_COLS,1))
    
    for i, (current_state, action, reward, _, done) in enumerate(samples):
      if done:
        next_q_value = reward
      else:
        next_q_value = reward + self.gamma * np.max(next_q_values[i])
      current_q_values[i, action] = next_q_value
    current_input = current_input.reshape((len(current_input),BOARD_ROWS,BOARD_COLS,1))
    hist = self.model.fit(current_input, current_q_values, batch_size=self.batch_size, verbose=1, shuffle=False)
    loss = hist.history['loss'][0]
    return loss

  def increase_target_update_counter(self): # target_model에 model을 업데이트한다. 그걸 세는 함수.
    self.target_update_counter += 1
    if self.target_update_counter >= self.target_update_freq:
      self.target_model.set_weights(self.model.get_weights())
      self.target_update_counter = 0

  def save(self, model_filepath, target_model_filepath):
    self.model.save(model_filepath)
    self.target_model.save(target_model_filepath)

  def load(self, model_filepath, target_model_filepath):
    self.model = keras.models.load_model(model_filepath)
    self.target_model = keras.models.load_model(target_model_filepath)


In [7]:
dora = Germ()
dora.build_model()
# dora.load(뭐시기)
episode = 1


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 5, 5, 16)          160       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 3, 3, 16)          2320      
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 1, 1, 16)          2320      
_________________________________________________________________
dense (Dense)                (None, 1, 1, 3136)        53312     
_________________________________________________________________
dense_1 (Dense)              (None, 1, 1, 2401)        7531937   
_________________________________________________________________
reshape (Reshape)            (None, 2401)              0         
Total params: 7,590,049
Trainable params: 7,590,049
Non-trainable params: 0
______________________________________________

In [8]:
for i in range(10000):
  dora.play(episode)
  episode += 1
  dora.increase_target_update_counter()
  if i%100==0 and i>0:
    print("round",i)
    dora.train()

round 100
round 200
round 300
round 400
round 500
round 600
round 700
round 800
round 900
round 1000
round 1100
round 1200
round 1300
round 1400
round 1500
round 1600
round 1700
round 1800
round 1900
round 2000
round 2100
round 2200
round 2300
round 2400
round 2500
round 2600
round 2700
round 2800
round 2900
round 3000
round 3100
round 3200
round 3300
round 3400
round 3500
round 3600
round 3700
round 3800
round 3900
round 4000
round 4100
round 4200
round 4300
round 4400
round 4500
round 4600
round 4700
round 4800
round 4900
round 5000
round 5100
round 5200
round 5300
round 5400
round 5500
round 5600
round 5700
round 5800
round 5900
round 6000
round 6100
round 6200
round 6300
round 6400
round 6500
round 6600
round 6700
round 6800
round 6900
round 7000
round 7100
round 7200
round 7300
round 7400
round 7500
round 7600
round 7700
round 7800
round 7900
round 8000
round 8100
round 8200
round 8300
round 8400
round 8500
round 8600
round 8700
round 8800
round 8900
round 9000
round 9100
round 92

In [10]:
dora.save("./model/model10000","./model/target10000")

INFO:tensorflow:Assets written to: ./model/model10000/assets
INFO:tensorflow:Assets written to: ./model/target10000/assets


In [11]:
"""
from google.colab import files
files.download('model10000')
files.download('target10000')
"""

"\nfrom google.colab import files\nfiles.download('model10000')\nfiles.download('target10000')\n"

In [12]:
# testing

In [16]:
dora = Germ()
dora.load("./model/model10000","./model/target10000")

Model: "sequential_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_15 (Conv2D)           (None, 5, 5, 16)          160       
_________________________________________________________________
conv2d_16 (Conv2D)           (None, 3, 3, 16)          2320      
_________________________________________________________________
conv2d_17 (Conv2D)           (None, 1, 1, 16)          2320      
_________________________________________________________________
dense_10 (Dense)             (None, 1, 1, 3136)        53312     
_________________________________________________________________
dense_11 (Dense)             (None, 1, 1, 2401)        7531937   
_________________________________________________________________
reshape_5 (Reshape)          (None, 2401)              0         
Total params: 7,590,049
Trainable params: 7,590,049
Non-trainable params: 0
____________________________________________

In [19]:
class Player:
    def __init__(self,isHuman):
        self.isHuman=isHuman
        self.model = self.build_model
    
    def build_model(self): # DQN 모델을 생성한다.
        model = Sequential()
        model.add(Conv2D(16, (3, 3), padding = 'valid', input_shape=(7, 7, 1), activation='relu'))
        model.add(Conv2D(16, (3, 3), padding = 'valid', input_shape=(5, 5, 1), activation='relu'))
        model.add(Conv2D(16, (3, 3), padding = 'valid', input_shape=(3, 3, 1), activation='relu'))
        model.add(Dense(64 * BOARD_COLS * BOARD_COLS, activation='relu'))
        model.add(Dense(BOARD_COLS**4, activation='relu'))
        model.add(Reshape((BOARD_ROWS**4,)))
        model.compile(loss='mse', optimizer=Adam(lr=self.alpha))
        return model

    def getAction(self):
        if self.isHuman:
            print("original row, col, target row, col")
            ro = int(input())
            co = int(input())
            rt = int(input())
            ct = int(input())
            if not ro>=0 and ro<7 and co>=0 and co<7 and rt>=0 and rt<7 and ct>=0 and ct<7:
                sys.exit(1)
            return ro*7*7*7+co*7*7+rt*7+ct
        else:
            return self.model.predict(x.reshape(1,BOARD_ROWS, BOARD_COLS, 1))

    def load(self, model_filepath):
        self.model = keras.models.load_model(model_filepath)

In [None]:
class contest:
    def __init__(self,p1,p2):
        self.board = np.zeros((7,7))
        self.p1=p1
        self.p2=p2
        
    def start(self,p1,p2): # 게임을 시작한다.
        
        