In [328]:
import gym
from gym_chess import ChessEnvV2
import torch
import random
import numpy as np
from torch import nn
import torch.nn.functional as F
from torch.distributions import Categorical

import matplotlib.pyplot as plt
from IPython import display
from tqdm.notebook import tqdm

In [327]:
class ChessPolicyNet(nn.Module):
    def __init__(self):
        super(ChessPolicyNet, self).__init__()
        self.conv1 = nn.Conv2d(12, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(64 * 8 * 8, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 64)
        self.fc4_start = nn.Linear(64, 64)
        self.fc4_end = nn.Linear(64, 64*64)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x_start = self.fc4_start(x)
        x_end = self.fc4_end(x).view(1, 64, 64)

        start_probs = F.softmax(x_start, dim=1).squeeze().detach().numpy()
        end_probs = F.softmax(x_end, dim=1).squeeze().detach().numpy()

        return start_probs, end_probs

In [329]:
from collections import namedtuple

SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])

class ChessAI:
    def __init__(self):
        self.net = ChessPolicyNet()

        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=5e-3)
        self.mean_reward = None
        self.games = 0
        self.gamma = 0.99
        self.eps = np.finfo(np.float32).eps.item()

    def __call__(self, observation):
        board = np.array(observation['board'])
        one_hot_board = np.zeros((8, 8, 12))
        for i in range(8):
            for j in range(8):
                piece = board[i][j]
                if piece != 0:
                    if piece > 0:
                        one_hot_board[i][j][abs(piece) - 1] = 1
                    else:
                        one_hot_board[i][j][abs(piece) + 5] = 1
        one_hot_board = np.transpose(one_hot_board, (2, 0, 1))
        x = torch.from_numpy(one_hot_board).float().unsqueeze(0)
        start_probs, end_probs = self.net(x)

        m_start = Categorical(torch.from_numpy(start_probs))
        start_item = m_start.sample(sample_shape=torch.Size([])).item()
        start_pos = (start_item // 8, start_item % 8)

        m_end = Categorical(torch.from_numpy(end_probs[start_item]))
        end_item = m_end.sample(sample_shape=torch.Size([])).item()
        end_pos = (end_item // 8, end_item % 8)

        return start_pos, end_pos

    def init_game(self, observation, possible_moves):
        self.memory = []
        self.rewards = []
        self.total_reward = 0

    def update(self, observation, reward, terminated, truncated, info, status):
        self.total_reward += reward
        self.rewards.append(reward)
        if terminated:
            self.games += 1
            if self.mean_reward is None:
                self.mean_reward = self.total_reward
            else:
                self.mean_reward = self.mean_reward * 0.95 + self.total_reward * (1.0 - 0.95)

            # calculate discounted reward and make it normal distributed
            discounted = []
            R = 0
            for r in self.rewards[::-1]:
                R = r + self.gamma * R
                discounted.insert(0, R)
            discounted = torch.tensor(discounted)
            # discounted = (discounted - discounted.mean()) / (discounted.std() + self.eps)

            policy_losses = []
            value_losses = []
            for mem, discounted_reward in zip(self.memory, discounted):
                advantage = discounted_reward - mem.value.item()
                policy_losses.append(-(mem.log_prob * advantage))

                value_losses.append(F.smooth_l1_loss(mem.value, discounted_reward.unsqueeze(0)))

            self.optimizer.zero_grad()
            loss = torch.stack(policy_losses).sum() + torch.stack(value_losses).sum()
            loss.backward()
            self.optimizer.step()

            if self.games % 1000 == 0:
                self.save(f"model_{self.games}.pt")

    def load(self, PATH):
        checkpoint = torch.load(PATH)
        self.net.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.games = checkpoint['games']
        self.mean_reward = checkpoint['mean_reward']

    def save(self, PATH):
        torch.save({
            'games': self.games,
            'model_state_dict': self.net.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'mean_reward': self.mean_reward}, PATH)

In [330]:
from IPython.core.display_functions import clear_output


# Play Game Example etwas mit Gawron's Code gemerged
def play_game_AI_against_random(episodes=2, steps=50):
    env = gym.make("ChessVsRandomBot-v2", log=False)
    env.moves_max = steps
    total_rewards = 0
    average_rewards = 0
    steps_needed = 0
    did_legal_move = 0
    observation = env.reset()
    #policy = ACPolicy()
    policy = ChessAI()
    policy.init_game(observation, env.possible_moves)

    for i in range(episodes):
        print("\n", "=" * 10, "NEW GAME", "=" * 10)
        #env.render()
        episode_reward = 0

        for j in range(steps):

            #moves = env.possible_moves
            #move = random.choice(moves)
            move = policy(observation)
            action = env.move_to_action(move)
            if move in env.possible_moves:
                did_legal_move += 1
            #Clear prints
            #for i in range(20):
            #   clear_output(wait=True)

            # Eigene Aktion an das Spiel weitergeben
            observation, step_reward, done, info = env.step(action)
            episode_reward += step_reward
            policy.update(observation, step_reward, done, False, None, None)

            if done:
                env.render()
                print(">" * 5, "GAME", i, "REWARD:", episode_reward)
                #steps_needed = j
                break

        # Episode zu Ende
        observation = env.reset()
        policy.init_game(observation, env.possible_moves)

        total_rewards += episode_reward
        average_rewards = 0.05 * episode_reward + (1 - 0.05) * average_rewards
        # Kein Reward Threshold angegeben
        '''if average_rewards > env.spec.reward_threshold:
            print(f"Du bist zu gut für das Spiel :( \n "
                  f"Dein Average Reward beträgt {average_rewards} und du brauchtest nur {steps_needed} Schritte zum Sieg!")
        '''

    print("\n")
    print("#" * 40)
    print("#" * 40)
    print("#" * 40)
    print("\nAVERAGE SCORE: ", average_rewards)
    print("\nTOTAL REWARD: ", total_rewards)
    print("\nLEGAL MOVES AVERAGE: ", did_legal_move / episodes)

In [334]:
play_game_AI_against_random(20, 100)























########################################
########################################
########################################

AVERAGE SCORE:  -641.5140775914574

TOTAL REWARD:  -20000

LEGAL MOVES AVERAGE:  0.35
