In [13]:
"""
Agent for Move to Beacon minimap of Spacecraft II 
"""
from pysc2.agents import base_agent
from pysc2.lib import actions
from pysc2.env import sc2_env
from pysc2.lib import actions, features
import time
from collections import deque
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers


PLAYER_SELF = features.PlayerRelative.SELF
PLAYER_NEUTRAL = features.PlayerRelative.NEUTRAL  # beacon/minerals
PLAYER_ENEMY = features.PlayerRelative.ENEMY
ACTION_STR = '_act'
BATCH_SIZE = 100
EXPLORATION_DECAY = 0.99
FUNCTIONS = actions.Functions
LEARNING_SIZE = 100
LOCATION_STR = '_loc'
MEMORY_SIZE = 1000000
RAW_FUNCTIONS = actions.Functions
FUNCTIONS = actions.FUNCTIONS

class MoveToBeacon(base_agent.BaseAgent):
    def __init__(self):
        '''
        The constructor method
        '''
        super(MoveToBeacon, self).__init__()

        self.memory = deque(maxlen = MEMORY_SIZE)
        self.exploration_rate = 0.9

        self.observation_space = 7640
        self.action_space = 3
        self.location_space = 2
        
        # build the NN model
        initializer = tf.keras.initializers.GlorotNormal()
        self.action_model = tf.keras.models.Sequential(
            [
                layers.Dense(
                    self.observation_space,
                    input_shape=(self.observation_space,),
                    activation="linear",
                    kernel_initializer = initializer,
                ),
                layers.Dense(
                    1024, activation= "linear", kernel_initializer = initializer
                ),
                layers.Dense(
                    1024, activation="linear", kernel_initializer = initializer
                ),
                layers.Dense(
                    self.action_space, activation="linear"
                    ),
            ]
        )

        self.action_model.compile(
            loss="categorical_crossentropy",
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        )

        self.action_model.summary()

        self.location_model = tf.keras.models.Sequential(
            [
                layers.Dense(
                    self.observation_space,
                    input_shape=(self.observation_space,),
                    activation="linear",
                    kernel_initializer = initializer,
                ),
                layers.Dense(
                    1024, activation= "linear", kernel_initializer = initializer
                ),
                layers.Dense(
                    1024, activation="linear", kernel_initializer = initializer
                ),
                layers.Dense(
                    self.location_space, activation="linear"
                    ),
            ]
        )

        self.location_model.compile(
            loss="categorical_crossentropy",
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        )            
        
        self.location_model.summary()

    def step(self, obs):
        super(MoveToBeacon, self).step(obs)

        if (np.random.rand() <= self.exploration_rate):
            return self.explore_step(obs)
        else:
            return self.model_step(obs)

    def model_step(self, obs):
        # outputs [no_op/0, select_army/7, move_screen/331]

        action = ""
        current_state = self.get_state(obs)
        current_state = np.array(current_state).reshape(-1, self.observation_space)
        model_output = self.action_model.predict(current_state)
        current_action = np.argmax(model_output)
        print("NN output:", model_output, "    action index: ", current_action)
        if current_action == 0:
            curr_action_id = 0

        elif current_action == 1:
            action = FUNCTIONS.select_army("select")
            curr_action_id = 7
        else:
            loc = self.location_model.predict(current_state)
            NNx = int(loc[0][0])
            NNy = int(loc[0][1])

            if NNx < 0 or NNx > 83:
                NNx = 0
            if NNy < 0 or NNy > 83:
                NNy = 0
            action = FUNCTIONS.Move_screen("now", [NNx, NNy])
            curr_action_id = 331

        if curr_action_id not in obs.observation.available_actions:
            action = FUNCTIONS.no_op()
            current_action = 0
            curr_action_id = 0
        print("Taking NN action with ID:", curr_action_id)
        if curr_action_id == 331:
            print ("location: ", NNx, NNy)

        return action

    def explore_step(self, obs):
        if FUNCTIONS.Move_screen.id in obs.observation.available_actions:
            player_relative = obs.observation.feature_screen.player_relative
            #beacon = _xy_locs(player_relative == PLAYER_NEUTRAL)
            mask = player_relative == PLAYER_NEUTRAL
            y, x = mask.nonzero()
            beacon = list(zip(x, y))
            if not beacon:
                return FUNCTIONS.no_op()
            beacon_center = np.mean(beacon, axis=0).round()
            return FUNCTIONS.Move_screen("now", beacon_center)
        else:
            return FUNCTIONS.select_army("select")

    def get_state (self, obs):
        the_state = []

        action_list = obs.observation.available_actions.tolist()

        dummy = [0 for i in range(573)]
        for action in action_list:
            dummy[action] = 1

        #player relative array
        dummy2 = [
            item for sublist in obs.observation.feature_screen.player_relative.tolist()
            for item in sublist
        ]

        dummy3 = obs.observation["player"].tolist()
        the_state = dummy + dummy2 + dummy3
        return the_state

    
    def remember (self, curr_obs, action, reward, next_obs):
        state = self.get_state(curr_obs)
        next_state = self.get_state(next_obs)
        print("Length of state:", len(state))

        player_relative = curr_obs.observation.feature_screen.player_relative
        #beacon = _xy_locs(player_relative == PLAYER_NEUTRAL)
        mask = player_relative == PLAYER_NEUTRAL
        y, x = mask.nonzero()
        beacon = list(zip(x, y))
        if not beacon:
            beacon_location = [0,0]
        else:
            beacon_location = np.mean(beacon, axis=0).round()

        print("str(FUNCTIONS[action[0]]:", str(FUNCTIONS[action[0]]))
        action_code = int(str(FUNCTIONS[action[0]]).split("/")[0])
        if  action_code == 0:
            simp_action = [1.0,0.0,0.0]
        elif action_code == 7:
            simp_action = [0.0, 1.0, 0.0]
        elif action_code == 331:
            simp_action = [0.0, 0.0, 1.0]
        else:
            simp_action = [0.0, 0.0, 0.0]        

        self.memory.append([state, simp_action, beacon_location, reward, next_state])

    def train(self):
        # train the model
        # don't do anything until you have enough data
        if len(self.memory) < LEARNING_SIZE:
            return
        if r.randint(0,100) < 95:
            return
        print("Learning!")
        # pick random data from all saved data to use to improve the model
        batch = random.sample(self.memory, LEARNING_SIZE)
        states_batch = []
        action_ys_batch = []
        location_ys_batch = []

        start_time = time.time()

        for s, a, bl, r, ns in batch:
            states_batch.append(s)
            action_ys_batch.append(a)
            location_ys_batch.append(bl)

        # update the model
        states_batch = np.array(states_batch).reshape(-1, self.observation_space)
        location_ys_batch = np.array(location_ys_batch).reshape(-1, self.location_space)
        action_ys_batch = np.array(action_ys_batch).reshape(-1, self.action_space)

        print ("...prep time: --- %s seconds ---" % (time.time() - start_time))

        self.action_model.fit(
            states_batch, action_ys_batch, batch_size=BATCH_SIZE, epochs=20, verbose=0
        )
        
        self.location_model.fit(
            states_batch, location_ys_batch, batch_size=BATCH_SIZE, epochs=20, verbose=0
        )

        # update the exploration value
        self.exploration_rate *= EXPLORATION_DECAY

        print ("...total learning time: --- %s seconds ---" % (time.time() - start_time))


In [14]:
from absl import flags
import sys
num_episodes = 5
def main():
    # Parse flags before creating the environment
    FLAGS = flags.FLAGS
    FLAGS(sys.argv[:1])  # Only parse the first argument (the script name)

    # Set up the environment
    env = sc2_env.SC2Env(
        map_name="MoveToBeacon",
        players=[sc2_env.Agent(sc2_env.Race.terran)],
        agent_interface_format=features.AgentInterfaceFormat(
            feature_dimensions=features.Dimensions(screen=84, minimap=64),
            use_feature_units=True
        ),
        step_mul=16, # about 150 APM
        game_steps_per_episode=0, # no time limit
        visualize=True
    )

    # Create an instance of your agent
    agent = MoveToBeacon()

    try:
        for episode in range(num_episodes):  
            agent.setup(env.observation_spec(), env.action_spec())
            obs = env.reset()
            while True:
                #print("timesteps[0]", timesteps[0])
                step_actions = [agent.step(obs[0])]
                acts = step_actions[0]
                #print(f"Action ID : {str(FUNCTIONS[acts[0]])}")
                
                if obs[0].last():
                    print(f"final score: {obs[0].observation.score_cumulative[0]}")
                    break

                print(f"step_actions : {step_actions}")
                if actions.FUNCTIONS.Move_screen.id in obs[0].observation["available_actions"]:
                    print("Yes")
                next_obs = env.step(step_actions) # next observation
                agent.remember(obs[0], acts, 0, next_obs[0]) # pass S, A, R, next S
                print(f"after remember ")
                if int(str(FUNCTIONS[acts[0]]).split("/")[0]) == 7: #rare action
                    agent.remember(obs[0], acts, 0, next_obs[0])
                    agent.remember(obs[0], acts, 0, next_obs[0])
                    agent.remember(obs[0], acts, 0, next_obs[0])
                    # agent.remember(obs, actions, 0, obs_next)
                    # agent.remember(obs, actions, 0, obs_next)
                    # agent.remember(obs, actions, 0, obs_next)
                    # agent.remember(obs, actions, 0, obs_next)
                    obs = next_obs
                    agent.train()


            print(f"Episode {episode + 1} finished.")
    finally:
        env.close()

if __name__ == "__main__":
    main()


step_actions : [FunctionCall(function=<_Functions.select_army: 7>, arguments=[[<SelectAdd.select: 0>]])]
Length of state: 7640
str(FUNCTIONS[action[0]]: 7/select_army (7/select_add [2])
after remember 
Length of state: 7640
str(FUNCTIONS[action[0]]: 7/select_army (7/select_add [2])
Length of state: 7640
str(FUNCTIONS[action[0]]: 7/select_army (7/select_add [2])
Length of state: 7640
str(FUNCTIONS[action[0]]: 7/select_army (7/select_add [2])
step_actions : [FunctionCall(function=<_Functions.Move_screen: 331>, arguments=[[<Queued.now: 0>], [64.0, 27.0]])]
Yes
Length of state: 7640
str(FUNCTIONS[action[0]]: 331/Move_screen (3/queued [2]; 0/screen [0, 0])
after remember 
step_actions : [FunctionCall(function=<_Functions.Move_screen: 331>, arguments=[[<Queued.now: 0>], [64.0, 27.0]])]
Yes
Length of state: 7640
str(FUNCTIONS[action[0]]: 331/Move_screen (3/queued [2]; 0/screen [0, 0])
after remember 
step_actions : [FunctionCall(function=<_Functions.Move_screen: 331>, arguments=[[<Queued.now:



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 143ms/step
NN output: [[ 0.34816357 -0.05873812  0.73158157]]     action index:  2
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 100ms/step
Taking NN action with ID: 331
location:  0 0
step_actions : [FunctionCall(function=<_Functions.Move_screen: 331>, arguments=[[<Queued.now: 0>], [0, 0]])]
Yes
Length of state: 7640
str(FUNCTIONS[action[0]]: 331/Move_screen (3/queued [2]; 0/screen [0, 0])
after remember 
step_actions : [FunctionCall(function=<_Functions.Move_screen: 331>, arguments=[[<Queued.now: 0>], [64.0, 27.0]])]
Yes
Length of state: 7640
str(FUNCTIONS[action[0]]: 331/Move_screen (3/queued [2]; 0/screen [0, 0])
after remember 
step_actions : [FunctionCall(function=<_Functions.Move_screen: 331>, arguments=[[<Queued.now: 0>], [64.0, 27.0]])]
Yes
Length of state: 7640
str(FUNCTIONS[action[0]]: 331/Move_screen (3/queued [2]; 0/screen [0, 0])
after remember 
step_actions : [FunctionCall(function=<_Functions.

ValueError: Function 331/Move_screen is currently not available