In [1]:
import math
import numpy as np
from pysc2.agents import base_agent
from pysc2.lib import actions
from pysc2.lib import features
from pysc2.env import sc2_env, run_loop, available_actions_printer
from pysc2 import maps
from absl import flags


pygame 1.9.4
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
from collections import deque
import keras
from keras.models import Sequential
from keras.layers import Dense,Conv1D,Dropout,Flatten,Activation,MaxPool1D,MaxPool2D
from keras.optimizers import Adam


Using TensorFlow backend.


We define the flags for the environment

In [3]:
_AI_RELATIVE = features.SCREEN_FEATURES.player_relative.index
_AI_SELECTED = features.SCREEN_FEATURES.selected.index
_NO_OP = actions.FUNCTIONS.no_op.id
_MOVE_SCREEN = actions.FUNCTIONS.Attack_screen.id
_SELECT_ARMY = actions.FUNCTIONS.select_army.id
_SELECT_POINT = actions.FUNCTIONS.select_point.id
_MOVE_RAND = 1000
_MOVE_MIDDLE = 2000
_BACKGROUND = 0
_AI_SELF = 1
_AI_ALLIES = 2
_AI_NEUTRAL = 3
_AI_HOSTILE = 4
_SELECT_ALL = [0]
_NOT_QUEUED = [0]
EPS_START = 0.9
EPS_END = 0.025
EPS_DECAY = 2500

In [81]:
# define our actions
# it can choose to move to
# the beacon or to do nothing
# it can select the marine or deselect
# the marine, it can move to a random point
possible_actions = [
    _NO_OP,
    _SELECT_ARMY,
    _SELECT_POINT,
    _MOVE_SCREEN,
    _MOVE_RAND,
    _MOVE_MIDDLE
]
id_from_actions={}
for ix,k in enumerate(possible_actions):
    id_from_actions[k]=ix


In [62]:
def get_state(obs):
    ai_view = obs.observation['feature_screen'][_AI_RELATIVE]
    beaconxs, beaconys = (ai_view == _AI_NEUTRAL).nonzero()
    marinexs, marineys = (ai_view == _AI_SELF).nonzero()
    marinex, mariney = marinexs.mean(), marineys.mean()
        
    marine_on_beacon = np.min(beaconxs) <= marinex <=  np.max(beaconxs) and np.min(beaconys) <= mariney <=  np.max(beaconys)
        
    # get a 1 or 0 for whether or not our marine is selected
    ai_selected = obs.observation['feature_screen'][_AI_SELECTED]
    marine_selected = int((ai_selected == 1).any())
    return [np.array([ai_view]),np.array([marine_selected])]

In [108]:
input1 = keras.layers.Input(shape=(64,64))
model_view = Conv1D(16, kernel_size=(8,), input_shape=(64,64))(input1)
model_view = Activation('relu')(model_view)
model_view = MaxPool1D(pool_size=(2,), strides=(2,), padding='valid')(model_view)
model_view = Conv1D(32, kernel_size=(4, ), input_shape=(64,64))(model_view)
model_view = Activation('relu')(model_view)
model_view = MaxPool1D(pool_size=(2, ), strides=(4,), padding='valid')(model_view)
model_view=Flatten()(model_view)

#model_view.compile()

input2 = keras.layers.Input(shape=(1,))
# equivalent to added = keras.layers.add([x1, x2])
added = keras.layers.Add()([model_view, input2])

out = keras.layers.Dense(len(possible_actions),activation='softmax')(added)
model = keras.models.Model(inputs=[input1, input2], outputs=out)
model.summary()
model.compile(loss='mse', optimizer=Adam(lr=0.1))

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_13 (InputLayer)           (None, 64, 64)       0                                            
__________________________________________________________________________________________________
conv1d_13 (Conv1D)              (None, 57, 16)       8208        input_13[0][0]                   
__________________________________________________________________________________________________
activation_13 (Activation)      (None, 57, 16)       0           conv1d_13[0][0]                  
__________________________________________________________________________________________________
max_pooling1d_13 (MaxPooling1D) (None, 28, 16)       0           activation_13[0][0]              
__________________________________________________________________________________________________
conv1d_14 

In [109]:
EPISODES = 500
import random

class DQNAgent:
    def __init__(self, model):
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = model

   
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(possible_actions)
        act_values = self.model.predict(state)
        return possible_actions[np.argmax(act_values[0])]
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.model.predict(next_state)))
            target_f = self.model.predict(state)
            target_f[0][id_from_actions[action]] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)




In [110]:
FLAGS = flags.FLAGS
FLAGS(['run_sc2'])

viz = False
save_replay = False
steps_per_episode = 0 # 0 actually means unlimited
MAX_EPISODES =35
MAX_STEPS = 400
steps = 0

# create a map
beacon_map = maps.get('MoveToBeacon')

In [111]:
def get_action(id_action,feature_screen):
    beacon_pos = (feature_screen == _AI_NEUTRAL).nonzero()

    if id_action== _NO_OP:
        func = actions.FunctionCall(_NO_OP, [])
    elif id_action == _MOVE_SCREEN:
        beacon_x, beacon_y = beacon_pos[0].mean(), beacon_pos[1].mean()
        func = actions.FunctionCall(_MOVE_SCREEN, [_NOT_QUEUED, [beacon_y, beacon_x]])
    elif id_action == _SELECT_ARMY:
        func = actions.FunctionCall(_SELECT_ARMY, [_SELECT_ALL])
    elif id_action == _SELECT_POINT:
        backgroundxs, backgroundys = (feature_screen == _BACKGROUND).nonzero()
        point = np.random.randint(0, len(backgroundxs))
        backgroundx, backgroundy = backgroundxs[point], backgroundys[point]
        func = actions.FunctionCall(_SELECT_POINT, [_NOT_QUEUED, [backgroundy, backgroundx]])
    elif id_action == _MOVE_RAND:
        beacon_x, beacon_y = beacon_pos[0].max(), beacon_pos[1].max()
        movex, movey = np.random.randint(beacon_x, 64), np.random.randint(beacon_y, 64)
        func = actions.FunctionCall(_MOVE_SCREEN, [_NOT_QUEUED, [movey, movex]])
    elif id_action == _MOVE_MIDDLE:
        func = actions.FunctionCall(_MOVE_SCREEN, [_NOT_QUEUED, [32, 32]])
    return func
    

In [None]:
with sc2_env.SC2Env(agent_race=None,
                    bot_race=None,
                    difficulty=None,
                    map_name=beacon_map,
                    visualize=viz,agent_interface_format=sc2_env.AgentInterfaceFormat(
              feature_dimensions=sc2_env.Dimensions(
                  screen=64,
                  minimap=64))) as env :
    agent = DQNAgent(model)
    #agent.load("./save/move_2_beacon-dqn.h5")
    
    done = False
    batch_size = 32
    
    for e in range(EPISODES):
        obs = env.reset()
        score=0
        state = get_state(obs[0])
        for time in range(500):
            # env.render()
            a=agent.act(state)
            if not a in obs[0].observation.available_actions:
                a=_NO_OP
            func=get_action(a,state[0][0])
            next_obs=env.step([func])
            next_state = get_state(next_obs[0])
            reward = next_obs[0].reward
            score+= reward
            done=next_obs[0].last()
            agent.remember(state, a, reward, next_state, done)
            state = next_state
            obs=next_obs
            if done:
                print("episode: {}/{}, score: {}, e: {:.2}"
                      .format(e, EPISODES, score, agent.epsilon))
                break
            if len(agent.memory) > batch_size:
                agent.replay(batch_size)
        agent.save("./save/move_2_beacon-dqn.h5")
    
            
