In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions.categorical import Categorical

import numpy as np

import pygame, sys, pygame.locals
import game

pygame 2.1.0 (SDL 2.0.16, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
def reward_function(x1, y1, x2, y2, d1, d2, l1, l2):
    distance = (x2 - x1) ** 2
    distance += (y2 - y1) ** 2
    distance = min((distance ** 0.5) / 25, 5)
    
    cdistance1 = min(((((x1 - 400) ** 2) + ((y1 - 225) ** 2)) ** 0.5) / 50, 2)
    cdistance2 = min(((((x2 - 400) ** 2) + ((y2 - 225) ** 2)) ** 0.5) / 50, 2)
    
    lit1 = -5 if l1 > 90 else 0
    lit2 = -5 if l2 > 90 else 0
    facing1 = -5
    
    facing2 = -5
    if d1 == 0 and x1 < x2:
        facing1 = 10
    elif d1 == 1 and x1 > x2:
        facing1 = 10
    elif d1 == 2 and y1 < y2:
        facing1 = 5
    elif d1 == 3 and y1 > y2:
        facing1 = 5
    
    if d2 == 0 and x2 < x1:
        facing2 = 10
    elif d2 == 1 and x2 > x1:
        facing2 = 10
    elif d2 == 2 and y2 < y1:
        facing2 = 5
    elif d2 == 3 and y2 > y1:
        facing2 = 5
    return [
        (30 - distance - cdistance1 + lit1 + facing1) / 10,
        (30 - distance - cdistance2 + lit2 + facing2) / 10
    ]

In [3]:
def collision(player, spell):
    if ((spell.x > player.x and spell.x < player.x + game.PS * 2) or (spell.x + spell.dx > player.x and spell.x + spell.dx < player.x + game.PS * 2)) and ((spell.y > player.y and spell.y < player.y + game.PS * 2) or (spell.y + spell.dy > player.y and spell.y + spell.dy < player.y + game.PS * 2)):
        return True
    if ((player.x > spell.x and player.x < spell.x + spell.dx) or (player.x + player.x + game.PS * 2 > spell.x and player.x + player.x + game.PS * 2 < spell.x + spell.dx)) and ((player.y > spell.y and player.y < spell.y + spell.dy) or (player.y + game.PS * 2 > spell.y and player.y + game.PS * 2 < spell.y + spell.dy)):
        return True
    return False


class KCEnv():
    metadata = {'render_modes' : ['human'], 'render_fps' : 30}
    
    def __init__(self, conv, moveset):
        self.conv = conv
        self.DISPLAY = pygame.display.set_mode((800, 450), flags = pygame.SCALED)
        pygame.display.set_caption('Kinetic Chance')
        self.players = [game.Player(200, 4, moveset, (0, 0, 255), 2, 80, 280), game.Player(200, 4, moveset, (255, 0, 0), 2, 720, 280)]
        self.spells = []
        pygame.init()
        self.font = pygame.font.SysFont('Arial', 20)
        self.moveset = moveset
    def step(self, action):
        rewards = reward_function(
            self.players[0].x, self.players[0].y, 
            self.players[1].x, self.players[1].y,
            self.players[0].dir, self.players[1].dir,
            self.players[0].lit, self.players[1].lit
        )
        #
        #
        #
        self.DISPLAY.fill((40, 40, 40))
        self.render_text()
        for i in range(len(self.players)):
            if not self.players[i].alive:
                continue
            self.players[i].tick()
            newspell = self.players[i].autocast()
            if newspell != None:
                self.spells.append(newspell)
                self.spells[-1].owner = i
            self.players[i].render(self.DISPLAY)
            self.players[i].control(action[i][0])
            self.players[i].control(action[i][1] + 5)
            self.players[i].control(action[i][3] + 10)
            newspell = self.players[i].control(action[i][2] + 8)
            if newspell != None:
                self.spells.append(newspell)
                self.spells[-1].owner = i
            self.players[i].move()
            for j in range(len(self.spells)):
                if self.spells[j] == -1:
                    continue
                if i != self.spells[j].owner:
                    if collision(self.players[i], self.spells[j]):
                        self.players[i].trigger(self.spells[j])
                        o = self.spells[j].owner
                        self.spells[j] = -1
                        rewards[i] -= 1000
                        rewards[o] += 1000
                elif self.spells[j].movetype == 1:
                    self.spells[j].init(self.players[i].dir, self.players[i].x, self.players[i].y)
        for i in range(len(self.spells)):
            if self.spells[i] == -1:
                continue
            self.spells[i].render(self.DISPLAY)
            if self.spells[i].tick():
                self.spells[i] = -1
        while -1 in self.spells:
            self.spells.remove(-1)
        #
        #
        #
        pygame.display.flip()
        if self.conv:
            self.observation = np.array(pygame.surfarray.array3d(self.DISPLAY), dtype = np.float32) / 255
        else:
            self.observation = np.array([
                self.players[0].x / 800,
                self.players[0].y / 450,
                self.players[0].dir,
                self.players[0].lit / 30,
                self.players[0].streak / 2,
                self.players[0].selmove / 5,
                self.players[1].x / 800,
                self.players[1].y / 450,
                self.players[1].dir,
                self.players[1].lit / 30,
                self.players[1].streak / 2,
                self.players[1].selmove / 5],
                dtype = np.float32)
        return self.observation, rewards, not (self.players[0].alive and self.players[1].alive), False, {}
    def reset(self):
        self.players = [
            game.Player(200, 4, self.moveset, (0, 0, 255), 2, 80, 280), 
            game.Player(200, 4, self.moveset, (255, 0, 0), 2, 720, 280)]
        self.players[0].render(self.DISPLAY)
        self.players[1].render(self.DISPLAY)
        self.spells = []
        if self.conv:
            self.observation = np.array(pygame.surfarray.array3d(self.DISPLAY), dtype = np.float32) / 255
        else:
            self.observation = np.array([
                self.players[0].x / 800,
                self.players[0].y / 450,
                self.players[0].dir,
                self.players[0].lit / 30,
                self.players[0].streak / 2,
                self.players[0].selmove / 5,
                self.players[1].x / 800,
                self.players[1].y / 450,
                self.players[1].dir,
                self.players[1].lit / 30,
                self.players[1].streak / 2,
                self.players[1].selmove / 5],
                dtype = np.float32)
        return self.observation, {}
    def render_text(self):
        self.DISPLAY.blit(self.font.render(
        'Lit: {}, {} Moves: {}, {}'.format(
            str(round(self.players[0].lit)), 
            str(round(self.players[1].lit)),
            self.players[0].moveset[self.players[0].selmove],
            self.players[1].moveset[self.players[1].selmove]),
        True, (255, 255, 255)), (50, 50))
    def close(self):
        pygame.quit()

In [4]:
class VecActor(nn.Module):
    def __init__(self, actions):
        super(VecActor, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(12, 8),
            nn.ReLU(),
            nn.Linear(8, actions)
        )
    def forward(self, x):
        return self.network(x)
    
class VecCritic(nn.Module):
    def __init__(self):
        super(VecCritic, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(12, 8),
            nn.ReLU(),
            nn.Linear(8, 1)
        )
    def forward(self, x):
        return self.network(x)

In [5]:
class ConvActor(nn.Module):
    def __init__(self, actions):
        super(ConvActor, self).__init__()
        self.network = nn.Sequential(
            nn.MaxPool2d(4),
            nn.Conv2d(3, 4, 3),
            nn.ReLU(),
            nn.Conv2d(4, 4, 3),
            nn.ReLU(),
            nn.MaxPool2d(4),
            nn.Conv2d(4, 4, 3),
            nn.ReLU(),
            nn.Conv2d(4, 4, 3),
            nn.ReLU(),
            nn.MaxPool2d(4),
            nn.Flatten(),
            nn.Linear(220, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, actions)
        )
    def forward(self, x):
        return self.network(x)

In [6]:
class ConvCritic(nn.Module):
    def __init__(self):
        super(ConvCritic, self).__init__()
        self.network = nn.Sequential(
            nn.MaxPool2d(4),
            nn.Conv2d(3, 4, 3),
            nn.ReLU(),
            nn.Conv2d(4, 4, 3),
            nn.ReLU(),
            nn.MaxPool2d(4),
            nn.Conv2d(4, 4, 3),
            nn.ReLU(),
            nn.Conv2d(4, 4, 3),
            nn.ReLU(),
            nn.MaxPool2d(4),
            nn.Flatten(),
            nn.Linear(220, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    def forward(self, x):
        return self.network(x)

In [7]:
class Agent:
    def __init__(self, actions, idims, alpha, gamma, eps, l, epochs, bsize, liters, conv):
        self.actor = ConvActor(actions) if conv else VecActor(actions)
        self.critic = ConvCritic() if conv else VecCritic()
        self.asize = actions
        self.idims = idims
        self.actor.opt = torch.optim.Adam(self.actor.parameters(), lr = alpha)
        self.critic.opt = torch.optim.Adam(self.critic.parameters(), lr = alpha)
        self.gamma = gamma
        self.eps = eps
        self.l = l
        self.epochs = epochs
        self.bsize = bsize
        self.liters = liters
        self.states = []
        self.probs = []
        self.actions = []
        self.rewards = []
        self.values = []
        self.dones = []
        
    def reset_mem(self):
        self.states = []
        self.probs = []
        self.actions = []
        self.rewards = []
        self.values = []
        self.dones = []
    
    def store_mem(self, state, prob, action, reward, value, done):
        self.states.append(state.tolist())
        self.probs.append(prob)
        self.actions.append(action)
        self.rewards.append(reward)
        self.values.append(value)
        self.dones.append(done)
        
    def batching(self):
        bsindices = np.arange(0, len(self.states), self.bsize)
        bindices = np.arange(0, self.bsize, dtype = np.int64)
        np.random.shuffle(bindices)
        bindices = bindices.tolist()
        rvalue = []
        for i in bsindices:
            rvalue += bindices[i:i + self.bsize]
        return np.array(rvalue, dtype = np.int64)
        
    def act(self, obs):
        obs = obs.unsqueeze(0)
        policy = self.actor(obs).squeeze()
        policy1 = Categorical(F.softmax(policy[0:5], dim = -1))
        policy2 = Categorical(F.softmax(policy[5:8], dim = -1))
        policy3 = Categorical(F.softmax(policy[8:10], dim = -1))
        policy4 = Categorical(F.softmax(policy[10:12], dim = -1))
        value = self.critic(obs).squeeze().item()
        #action and log probs will be of size 3
        action1 = policy1.sample()
        action2 = policy2.sample()
        action3 = policy3.sample()
        action4 = policy4.sample()
        #since these log probs are passed directly into store mem,
        #and the same is done with the new probs, only the sum is returned
        prob1 = policy1.log_prob(action1).item()
        prob2 = policy2.log_prob(action2).item()
        prob3 = policy3.log_prob(action3).item()
        prob4 = policy3.log_prob(action3).item()
        
        return [action1.item(), action2.item(), action3.item(), action4.item()], prob1 + prob2 + prob3 + prob4, value
        
    def learn(self):
        adv = np.zeros(len(self.rewards) - 1, dtype = np.float32)
        self.states = np.array(self.states, dtype = np.float32)
        
        for i in range(self.epochs):
            bindices = np.array(self.batching(), dtype = np.int64)
            #gae
            #summation of memory
            for j in range(len(self.rewards) - 1):
                #delta coefficient
                disc = 1
                #advantage
                a = 0
                for k in range(j, len(self.rewards) - 1):
                    #delta of timestep = (done coefficient * gamma * next state value) + reward - current state value
                    #basically new value + reward - cur value
                    a += disc * (((1 - self.dones[k]) * self.gamma * self.values[k + 1]) + self.rewards[k] - self.values[k])
                    #gae lamba^n * gamma^n
                    disc *= self.gamma * self.l
                #adv at each timestep
                adv[j] = a
            adv = torch.Tensor(adv).float()
            sb = []
            p1 = []
            ab = []
            vb = []
            advb = []
            #sampling of random memory
            for i in bindices:
                sb.append(self.states[i])
                p1.append(self.probs[i])
                ab.append(self.actions[i])
                vb.append(self.values[i])
                advb.append(adv[i])
            sb = torch.Tensor(np.array(sb)).float()
            #these 2 are size 4 for the multi discrete implementation
            p1 = torch.Tensor(np.array(p1)).float()
            ab = torch.Tensor(np.array(ab)).long()
            vb = torch.Tensor(np.array(vb)).float()
            advb = torch.Tensor(advb).float()
            #predictions
            apred = self.actor(sb)
            apred1 = Categorical(F.softmax(apred[0, 0:5], dim = -1))
            apred2 = Categorical(F.softmax(apred[0, 5:8], dim = -1))
            apred3 = Categorical(F.softmax(apred[0, 8:10], dim = -1))
            apred4 = Categorical(F.softmax(apred[0, 10:12], dim = -1))
            cpred = self.critic(sb)
            #get new log probs corresponding to past actions from memory
            #there are 3 of these now
            #in the 37 implementation details thingy, they multiplied the probs for each distribution
            #since these are logits, they shall be added
            p2 = apred1.log_prob(ab[:, 0]) + apred2.log_prob(ab[:, 1]) + apred3.log_prob(ab[:, 2]) + apred4.log_prob(ab[:, 3])
            #actor loss calculation: this is the same now that the probs are combined
            pratio = p2.exp() / p1.exp()
            wpratio = pratio * advb
            cwpratio = torch.clamp(pratio, 1 - self.eps, 1 + self.eps) * advb
            aloss = (-torch.min(wpratio, cwpratio)).mean()
            #critic loss: gae + state value MSE'd with raw network prediction
            #gae + state value = new state + reward
            #in other words, optimize state value to become new state + reward
            ctarget = advb + vb
            criterion = torch.nn.MSELoss()
            #closs = ((ctarget - cpred) ** 2).mean()
            closs = criterion(ctarget.unsqueeze(-1), cpred)
            #now includes entropy term
            entropy = (0.1 * apred1.entropy()) + (0.8 * apred2.entropy()) + (0.2 * apred3.entropy()) + (0.1 * apred4.entropy())
            loss = aloss + (0.5 * closs) - (0.4 * entropy)
            self.actor.opt.zero_grad()
            self.critic.opt.zero_grad()
            loss.backward()
            self.actor.opt.step()
            self.critic.opt.step()
        self.reset_mem()

In [11]:
ACTIONS = 12
MOVESET = ['flare', 'multisword', 'heal', 'intoxicate', 'cannon']
INPUT_DIMS = 4
LR = 5e-4
DISCOUNT_FACTOR = 0.99
POLICY_CLIP = 0.1
SMOOTHING = 0.95
EPOCHS = 4
BATCH_SIZE = 5
LEARN_ITERS = 20
CONV = False

env = KCEnv(CONV, MOVESET)

agent1 = Agent(ACTIONS, INPUT_DIMS, LR, DISCOUNT_FACTOR, POLICY_CLIP, SMOOTHING, EPOCHS, BATCH_SIZE, LEARN_ITERS, CONV)
agent2 = Agent(ACTIONS, INPUT_DIMS, LR, DISCOUNT_FACTOR, POLICY_CLIP, SMOOTHING, EPOCHS, BATCH_SIZE, LEARN_ITERS, CONV)

agent1 = torch.load('vecagent1A.pt')
agent2 = torch.load('vecagent2A.pt')

EPISODES = 5

steps = 0
nobs = 0
rewards = [0, 0]

for i in range(EPISODES):
    
    for event in pygame.event.get():
        if event.type == pygame.locals.QUIT:
          pygame.quit()
          sys.exit()
    
    obs = torch.tensor(env.reset()[0])
    if CONV:
        obs = obs.transpose(0, -1)
    done = False
    score1 = 0
    score2 = 0
    while not done:
        action1, prob1, value1 = agent1.act(obs)
        action2, prob2, value2 = agent2.act(obs)
        nobs, rewards, done, _, _ = env.step([action1, action2])
        if CONV:
            nobs = torch.tensor(nobs).transpose(0, -1)
        else:
            nobs = torch.tensor(nobs)
        agent1.store_mem(obs, prob1, action1, rewards[0], value1, done)
        agent2.store_mem(obs, prob2, action2, rewards[1], value2, done)
        score1 += rewards[0] - 3
        score2 += rewards[1] - 3
        steps += 1
        obs = nobs
        if steps % agent1.liters == 0:
            agent1.learn()
            agent2.learn()
        pygame.time.Clock().tick(144)
    print("Episode: {} Scores: {}, {}".format(i + 1, score1, score2))
env.close()

AttributeError: 'numpy.ndarray' object has no attribute 'append'

In [9]:
pygame.quit()

In [10]:
torch.save(agent1, 'vecagent1A.pt')
torch.save(agent2, 'vecagent2A.pt')