<a href="https://colab.research.google.com/github/andreigann/Correlate/blob/master/Connect4Runner.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [142]:
!pip install pygame



In [0]:
from google.colab import drive

In [144]:
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [0]:
import sys
sys.path.append('/content/gdrive/My Drive/Colab Notebooks/')

In [0]:
from gym_connect_four import RandomPlayer, ConnectFourEnv, Player, SavedPlayer
import random

In [0]:
class MyRandomPlayer(Player):
    def __init__(self, env, name='MyRandomPlayer', id=None):
        super(MyRandomPlayer, self).__init__(env, name)
        self.id=id
        self.wins=0
        self.draws=0
        self.losses=0

        self.main_score_wins = 0
        self.main_score_draws = 0
        self.main_score_losses = 0

    def get_next_action(self, state: np.ndarray) -> int:
      am = self.env.available_moves()

      for _ in range(100):
        action = np.random.randint(self.env.action_space.n)
        if self.env.is_valid_action(action):
            return action
      raise Exception('Unable to determine a valid move! Maybe invoke at the wrong time?')


In [0]:
class DQNSolver:
  """
  Vanilla Multi Layer Perceptron version
  """

  def __init__(self, observation_space, action_space):
    self.GAMMA = 0.95
    self.LEARNING_RATE = 0.001

    self.MEMORY_SIZE = 1000000
    self.BATCH_SIZE = 20

    self.EXPLORATION_MAX = 1.0
    self.EXPLORATION_MIN = 0.01
    self.EXPLORATION_DECAY = 0.995

    self.exploration_rate = self.EXPLORATION_MAX

    self.action_space = action_space
    self.memory = deque(maxlen=self.MEMORY_SIZE)

    self.model = Sequential()
    self.model.add(Flatten(input_shape=observation_space))
    self.model.add(Dense(24, activation="relu"))
    self.model.add(Dense(24, activation="relu"))
    self.model.add(Dense(self.action_space, activation="linear"))
    self.model.compile(loss="mse", optimizer=Adam(lr=self.LEARNING_RATE))

  def remember(self, state, action, reward, next_state, done):
    self.memory.append((state, action, reward, next_state, done))

  def act(self, state, available_moves=[]):
    if np.random.rand() < self.exploration_rate:
      return random.randrange(self.action_space)
    q_values = self.model.predict(state)
    q_values = np.array([[x if idx in available_moves else -100 for idx, x in enumerate(q_values[0])]])
    return np.argmax(q_values[0])

  def experience_replay(self):
    if len(self.memory) < self.BATCH_SIZE:
      return
    batch = random.sample(self.memory, self.BATCH_SIZE)
    for state, action, reward, state_next, terminal in batch:
      q_update = reward
      if not terminal:
        q_update = (reward + self.GAMMA * np.amax(self.model.predict(state_next)[0]))
      q_values = self.model.predict(state)
      q_values[0][action] = q_update
      self.model.fit(state, q_values, verbose=0)
    self.exploration_rate *= self.EXPLORATION_DECAY
    self.exploration_rate = max(self.EXPLORATION_MIN, self.exploration_rate)

  def save_model(self, file_prefix: str):
    self.model.save(f"{file_prefix}.h5")



In [0]:
class MyNNPlayer(Player):
  def __init__(self, env, name='MyNNPlayer', id=None):
    super(MyNNPlayer, self).__init__(env, name)
    self.observation_space = env.observation_space.shape
    self.action_space = env.action_space.n

    self.dqn_solver = DQNSolver(self.observation_space, self.action_space)

    self.id=id
    self.wins=0
    self.draws=0
    self.losses=0

    self.main_score_wins = 0
    self.main_score_draws = 0
    self.main_score_losses = 0

  def get_next_action(self, state: np.ndarray) -> int:
    state = np.reshape(state, [1] + list(self.observation_space))
    action = self.dqn_solver.act(state, self.env.available_moves())
    return action

  def learn(self, state, action, reward, state_next, done) -> None:
    state = np.reshape(state, [1] + list(self.observation_space))
    state_next = np.reshape(state_next, [1] + list(self.observation_space))

    # reward = reward if not done else -reward
    self.dqn_solver.remember(state, action, reward, state_next, done)

    if not done:
      self.dqn_solver.experience_replay()

  def save_model(self):
    self.dqn_solver.save_model(self.name)


In [0]:
class MyCNNPlayer(Player):
  def __init__(self, env, name='MyCNNPlayer', id=None):
    super(MyCNNPlayer, self).__init__(env, name)
    self.id=id
    self.wins=0
    self.draws=0
    self.losses=0

    self.main_score_wins = 0
    self.main_score_draws = 0
    self.main_score_losses = 0

  def get_next_action(self, state: np.ndarray) -> int:
    for _ in range(100):
      action = np.random.randint(self.env.action_space.n)
      if self.env.is_valid_action(action):
        return action
      raise Exception('Unable to determine a valid move! Maybe invoke at the wrong time?')


In [0]:

def create_random_player(i)-> MyRandomPlayer:
  return MyRandomPlayer(env, name=('OpponentRandomPlayer' + str(i)), id=i)

def create_nn_player(i)-> MyNNPlayer:
  return MyNNPlayer(env, name=('NNPlayer' + str(i)), id=i)

def create_cnn_player(i)-> MyCNNPlayer:
  return MyCNNPlayer(env, name=('CNNPlayer' + str(i)), id=i)


In [0]:
def play_a_game(player1, player2, rounds=ROUNDS):
  # print(f"{player1.name} vs {player2.name}")

  result = [0] * 3
  for episodes in range(rounds):
    match_result = None

    state = env.reset()
    done = False
    p2 = False
    while not done:
        action1 = player1.get_next_action(state)
        state1, reward1, done1, _ = env.step(action1)

        if p2:
            player2.learn(state1, action2, env._reverse_reward(reward1), state, done1)
        else:
            p2 = True

        if not done1:
            action2 = player2.get_next_action(state1)
            state2, reward2, done2, _ = env.step(action2)

            player1.learn(state, action1, env._reverse_reward(reward2), state2, done2)

            if done2:
                done = True
                player2.learn(state1, action2, reward2, state2, done2)
                if reward2 != env.DRAW_REWARD:
                    # player2 Won
                    player2.wins=player2.wins+1
                    player1.losses=player1.losses+1;
                else:
                    # player2 Draw
                    player1.draws=player1.draws+1
                    player2.draws=player2.draws+1

            state = state2
        else:
            done = True
            player1.learn(state, action1, reward1, state1, done1)
            if reward1 != env.DRAW_REWARD:
                # player1 Won
                  player1.wins=player1.wins+1
                  player2.losses=player2.losses+1;
            else:
                # player1 Draw
                  player1.draws=player1.draws+1
                  player2.draws=player2.draws+1



In [157]:
ROUNDS=int(10)
NN_PLAYERS=int(1)
RANDOM_PLAYERS=int(1)
CNN_PLAYERS=int(1)
ENV_NAME = "ConnectFour-v0"
TRAIN_EPISODES = 1000
env = gym.make(ENV_NAME)

def train():
  players = []
  games_to_play = []

  for i in range(1, NN_PLAYERS+1):
    players.append(create_nn_player(i))

  for i in range(1, RANDOM_PLAYERS+1):
    players.append(create_random_player(i))

  for i in range(1, CNN_PLAYERS+1):
    players.append(create_cnn_player(i))

  random.shuffle(players)

  for p1 in players:
    for p2 in players:
      if p1 == p2:
        continue
      games_to_play.append((p1, p2))

  for game in games_to_play:
    player1 = game[0]
    player2 = game[1]
    print(player1.name, player2.name)

    play_a_game(player1, player2, ROUNDS)

    print(player1.wins, player2.wins)

    if player1.wins > player2.wins:
      player1.main_score_wins = player1.main_score_wins + 1
      player2.main_score_losses = player2.main_score_losses + 1

    if (player1.wins < player2.wins):
      player2.main_score_wins = player2.main_score_wins + 1
      player1.main_score_losses = player1.main_score_losses + 1

    if (player1.wins == player2.wins):
      player1.main_score_draws = player1.main_score_draws + 1
      player2.main_score_draws = player2.main_score_draws + 1

    player1.wins = 0
    player2.wins = 0
    player1.draws = 0
    player2.draws = 0
    player1.losses = 0
    player2.losses = 0
  
  for p in players:
    print(p.name, ": ", p.main_score_wins, p.main_score_draws, p.main_score_losses)


if __name__ == "__main__":
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        train()

OpponentRandomPlayer1 NNPlayer1


Exception: ignored