In [None]:
import numpy as np
import matplotlib.pyplot as plt
import re

from gym_microrts import microrts_ai
from gym_microrts.envs.vec_env import MicroRTSGridModeVecEnv

In [None]:
map_size = 8
envs = MicroRTSGridModeVecEnv(
    num_selfplay_envs=0,
    num_bot_envs=1,
    max_steps=2000,
    render_theme=2,
    ai2s=[microrts_ai.naiveMCTSAI for _ in range(1)],
    #ai2s=[microrts_ai.coacAI for _ in range(1)],
    #ai2s=[microrts_ai.vulcanMCTSAI for _ in range(1)],
    #map_paths=["maps/4x4/base4x4.xml"],
    map_paths=["maps/8x8/basesWorkers8x8.xml"],
    reward_weight=np.array([10.0, 1.0, 1.0, 0.2, 1.0, 4.0]),
)
no_action = np.zeros((map_size, map_size, 7))
_ = envs.reset()

In [None]:
no_action.shape

#### For i in range 1000

In [None]:
action_mask = envs.get_action_mask()
action_mask = action_mask.reshape(-1, action_mask.shape[-1])
action_mask.shape

In [None]:
action_mask_tiles = action_mask.reshape(map_size, map_size, -1)
action_mask_tiles.shape

In [None]:
plt.figure(figsize=(5, 5))
plt.imshow(action_mask_tiles.sum(-1))

plt.gca().set_xticks(np.arange(-.5, map_size, 1), minor=True)
plt.gca().set_yticks(np.arange(-.5, map_size, 1), minor=True)
plt.grid(which="minor", axis="both", linestyle="-", color="k", linewidth=1)

plt.xticks(np.arange(0, map_size, 1))
plt.yticks(np.arange(0, map_size, 1))
plt.xlabel("x")
plt.ylabel("y")

plt.title("Action Mask")
plt.show()

In [None]:
def print_action_space(action_mask_tile):
    # (action_mask[0:6]),  # action type: {NOOP, move, harvest, return, produce, attack}
    # (action_mask[6:10]),  # move parameter: {north, east, south, west}
    # (action_mask[10:14]),  # harvest parameter: {north, east, south, west}
    # (action_mask[14:18]),  # return parameter: {north, east, south, west}
    # (action_mask[18:22]),  # produce_direction parameter: {north, east, south, west}
    # (action_mask[22:29]),  # produce_unit_type parameter: {resource, base, barracks, worker, light, heavy, ranged}
    # (action_mask[29 : sum(envs.action_space.nvec[1:])]),  # attack_target parameter: relative position of the unit that will be attacked

    # visualization of the action space for a given tile, translating where 1 means that the corresponding action is available
    
    action_types = ["NOOP", "move", "harvest", "return", "produce", "attack"]

    print("Action Type:")
    mask = action_mask_tile[0:6]
    print_masked(mask, mask)
    print_masked(mask, action_types)
    print()

    location_parameters = ["north", "east", "south", "west"]

    print("Move Parameter:")
    mask = action_mask_tile[6:10]
    print_masked(mask, mask)
    print_masked(mask, location_parameters)
    print()

    print("Harvest Parameter:")
    mask = action_mask_tile[10:14]
    print_masked(mask, mask)
    print_masked(mask, location_parameters)
    print()

    print("Return Parameter:")
    mask = action_mask_tile[14:18]
    print_masked(mask, mask)
    print_masked(mask, location_parameters)
    print()

    print("Produce Direction Parameter:")
    mask = action_mask_tile[18:22]
    print_masked(mask, mask)
    print_masked(mask, location_parameters)
    print()

    unit_types = ["resource", "base", "barracks", "worker", "light", "heavy", "ranged"]

    print("Produce Unit Type Parameter:")
    mask = action_mask_tile[22:29]
    print_masked(mask, mask)
    print_masked(mask, unit_types)
    print()

    print("Attack Target Parameter:")
    mask = action_mask_tile[29:sum(envs.action_space.nvec[1:])]
    print(mask.reshape(7, 7))
    print()


def print_masked(mask, params):
    for i, param in enumerate(params):
        if mask[i] == 1:
            print(f"\033[1m{param: ^7}\033[0m", end=" ")
        else:
            print(f'{param: ^7}', end=" ")
    print()


#print_action_space(action_mask_tiles[0][1])
    

In [None]:
rows_letters = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"]
action_types = ["NOOP", "move", "harvest", "return", "produce", "attack"]
location_parameters = ["north", "east", "south", "west"]
unit_types = ["resource", "base", "barracks", "worker", "light", "heavy", "ranged"]

def action_mask_to_string(action_mask_tiles, row, col):
    resulting_strings = []

    unit = f"{rows_letters[row]}{col}"

    # NOOP
    if action_mask_tiles[0] == 1:
        resulting_strings.append(f"({unit}, NOOP)")

    # move
    if action_mask_tiles[1] == 1:
        for i, param in zip(action_mask_tiles[6:10], location_parameters):
            if i == 1:
                resulting_strings.append(f"({unit}, move, {param})")

    # harvest
    if action_mask_tiles[2] == 1:
        for i, param in zip(action_mask_tiles[10:14], location_parameters):
            if i == 1:
                resulting_strings.append(f"({unit}, harvest, {param})")

    # return
    if action_mask_tiles[3] == 1:
        for i, param in zip(action_mask_tiles[14:18], location_parameters):
            if i == 1:
                resulting_strings.append(f"({unit}, return, {param})")

    # produce
    if action_mask_tiles[4] == 1:
        for i, param in zip(action_mask_tiles[18:22], location_parameters):
            if i == 1:
                for j, param2 in zip(action_mask_tiles[22:29], unit_types):
                    if j == 1:
                        resulting_strings.append(f"({unit}, produce, {param}, {param2})")

    return resulting_strings

In [None]:
def decode_action_mask(action_mask):

    action_mask = action_mask.reshape(-1, action_mask.shape[-1])
    action_mask = action_mask.reshape(map_size, map_size, -1)
    
    valid_actions = []
    for row in range(map_size):
        for col in range(map_size):
            am_tile = action_mask[row][col]
            valid_action = action_mask_to_string(am_tile, row, col)
            if len(valid_action) > 0:
                valid_actions.extend(valid_action)

    prompt_valid_actions = "\n".join(valid_actions)
    return prompt_valid_actions

In [None]:
decode_action_mask(action_mask_tiles)

In [None]:
[
    [1, 0, 0, 0, 0, 0],  # action type: {NOOP, move, harvest, return, produce, attack}
    [0, 0, 0, 0],  # move parameter: {north, east, south, west}
    [0, 0, 0, 0],  # harvest parameter: {north, east, south, west}
    [0, 0, 0, 0],  # return parameter: {north, east, south, west}
    [0, 0, 0, 0],  # produce_direction parameter: {north, east, south, west}
    [0, 0, 0, 0, 0, 0, 0],  # produce_unit_type parameter: {resource, base, barracks, worker, light, heavy, ranged}
    np.zeros(sum(envs.action_space.nvec[1:]))  # attack_target parameter: relative position of the unit that will be attacked
]

In [None]:
ac_move_east = np.array([
    1,  # action type: {NOOP, move, harvest, return, produce, attack}
    1,  # move parameter: {north, east, south, west}
    0,  # harvest parameter: {north, east, south, west}
    0,  # return parameter: {north, east, south, west}
    0,  # produce_direction parameter: {north, east, south, west}
    0,  # produce_unit_type parameter: {resource, base, barracks, worker, light, heavy, ranged}
    0  # attack_target parameter: relative position of the unit that will be attacked
])
ac_produce_worker_south = np.array([4, 0, 0, 0, 2, 3, 0])
ac_produce_worker_west = np.array([4, 0, 0, 0, 3, 3, 0])

In [None]:
action = np.zeros((map_size, map_size, 7))
action.shape

In [None]:
action[0][1] = ac_move_east
action[1][1] = ac_produce_worker_west

In [None]:
_ = envs.reset()
img = envs.render(mode='rgb_array')
plt.imshow(img)

In [None]:
for i in range(10):
    response = envs.step(action)

    img = envs.render(mode='rgb_array')
    plt.imshow(img)

    if response[2][0]:
        print("done")
        break

In [None]:
owners = ["NEUTRAL", "PLAYER", "ENEMY"]
obs_unit_types = ["NO_UNIT"] + unit_types

positions = np.array([
    [
        [f"{rows_letters[j]}{i}"] for j in range(map_size)
    ] for i in range(map_size)
])

def extract_observation_features(features):
    feat_hit_points = features[0:5]
    hit_points = np.argmax(feat_hit_points)

    feat_resources = features[5:10]
    resources = np.argmax(feat_resources)
    resources = None if resources == 0 else f"resources: {resources}"

    feat_owner = features[10:13]
    owner = np.argmax(feat_owner)
    str_owner = owners[owner]

    feat_unit_types = features[13:21]
    unit_type = np.argmax(feat_unit_types)
    str_unit_type = obs_unit_types[unit_type]

    feat_action_type = features[21:27]
    action_type = np.argmax(feat_action_type)
    str_action_type = action_types[action_type]

    obs_str = (
        str_unit_type, 
        f"hp:{hit_points}", 
        resources, 
        str_owner, 
        str_action_type
    )

    return obs_str

In [None]:
def decode_observation(observation):
    decoded_observation = np.apply_along_axis(extract_observation_features, 2, observation[0])
    decoded_observation = np.concatenate((positions, decoded_observation), axis=-1)
    obs_with_units = decoded_observation[decoded_observation[:, :, 1] != 'NO_UNIT']
    prompt_str = np.apply_along_axis(lambda x: "("+", ".join(x)+")", 1, obs_with_units)
    prompt_str = "\n".join(prompt_str)

    return prompt_str

In [None]:
positions

# LLAMA

In [None]:
def fill_prompt(observation, valid_actions):
    prompt = f"""
You are a professional MicroRTS player.
You know all the dependencies between units, buildings, attack system and the rules of the game.
The primary goal is to defeat the opponent by either destroying their bases or eliminating all their units.

## RULES

Gridworld Board Description:

The game is played on a grid-based map, with a size of 4x4 grid.
Each cell represents a specific location. 
Rows are labeled A, B, C, and D, while columns are numbered 0, 1, 2, and 3.
Example: A0 represents the top-left cell, and D3 represents the bottom-right cell.

Game Elements:

(A) Resources:
- Mineral Resources: Used to create new units and buildings, can be harvested by workers. Once harvested, the worker needs to bring the mineral to a base.

(B) Buildings:
- Base: Accumulates resources and trains workers. Bases are buildings.
- Barrack: Creates new attack units. Barracks are buildings.

(C) Units:
- Worker: Can harvest minerals and construct buildings. Is able to move one tile at a timestep, can only harvest adjacent tiles, can only construct buildings in adjacent tiles.
- Light: Attack unit with low power but fast melee capabilities. Is able to move one tile at a timestep, can attack enemies in adjacent tiles.
- Heavy: Attack unit with high power but slow melee capabilities. Is able to move one tile at a timestep, can attack enemies in adjacent tiles.
- Ranged: Attack unit with long-range capabilities. Is able to move one tile at a timestep, can attack enemies that are within a 3 tile radius of the current position.

## OBSERVATION

Obervation:
(position, unit_type, hp, resources, owner, action_type)
[
{observation}
]

Set of valid actions:
(position, action_type, direction, production_type)
[
{valid_actions}
]

## ACTION

Based on the current observation of the game state, Units and Buildings from both sides.
You can only issue micro actions to the units, meaning that before a certain worker is able to harverst resources at A1, it needs first to be in an adjacent tile, for example A2 or B1. 
Also, you can only issue actions for the current timestep.
Each unit can execute only one action, which means that in the response dictionary, you can only include one action per unit.
You don't need to issue actions to all units, but you can. 
You must respect the set of valid actions.
Use your best judgement and strategy to select the actions. Provide the next set of micro immediate actions. 

You should only respond in the format as described below:

RESPONSE FORMAT:
(tile, action, direction, produce_unit_type)

Select the actions in the set of valid actions and provide the response in the format above.
"""
    return prompt

In [None]:
def fill_prompt(observation, valid_actions):
    prompt = f"""
You are a professional MicroRTS player.
You know all the dependencies between units, buildings, attack system and the rules of the game.
The primary goal is to defeat the opponent by either destroying their bases or eliminating all their units.

Obervation:
(position, unit_type, hp, resources, owner, action_type)
[
{observation}
]


Based on the current observation of the game state, evaluate what is the best action to execute.
Select the desired actions in the set of valid actions below:

Set of valid actions:
(position, action_type, direction, production_type)
[
{valid_actions}
]

RESPONSE FORMAT:
(tile, action, direction, produce_unit_type)

Select the actions in the set of valid actions and provide the response in the format above.
"""
    return prompt

# Game Lifecycle

In [None]:
def string_to_action(response):
    actions = np.zeros((map_size, map_size, 7))

    for value in response:
        try:
            s_action = value.strip("()").split(", ")

            unit = s_action[0]
            actiontype_choice = s_action[1]
            actiontype_param = action_types.index(actiontype_choice)

            location_choice = s_action[2]
            location_param = location_parameters.index(location_choice)

            if len(s_action) > 3:
                unit_type_choice = s_action[3] 
                unit_types_param = unit_types.index(unit_type_choice)
            else:
                unit_types_param = 0

            row = rows_letters.index(unit[0])
            col = int(unit[1])

            action = [
                actiontype_param,
                location_param,
                location_param,
                location_param,
                location_param,
                unit_types_param,
                0  # attack_target parameter TBD
            ]

            actions[row][col] = action
        except:
            print(f"Invalid action: {value}")
            continue
        print(s_action)
        

    return actions

In [None]:
# ollama serve

from ollama import Client
client = Client(host='http://localhost:11434')

In [None]:
map_size = 4
no_action = np.zeros((1, map_size*map_size, 78))

envs = MicroRTSGridModeVecEnv(
    num_selfplay_envs=0,
    num_bot_envs=1,
    max_steps=2000,
    render_theme=2,
    #ai2s=[microrts_ai.naiveMCTSAI for _ in range(1)],
    ai2s=[microrts_ai.coacAI for _ in range(1)],
    map_paths=["maps/4x4/base4x4.xml"],
    reward_weight=np.array([10.0, 1.0, 1.0, 0.2, 1.0, 4.0]),
)
_ = envs.reset()
action_mask = envs.get_action_mask()
img = envs.render(mode='rgb_array')
plt.imshow(img)

In [None]:
images = []
action = no_action
for i in range(2000):
    print(i)

    if i%10 == 1:
        print(i)

        prompt_obs = decode_observation(response[0])

        action_mask = envs.get_action_mask()
        prompt_valid_actions = decode_action_mask(action_mask)

        input_llm = fill_prompt(prompt_obs, prompt_valid_actions)

        llama_response = client.chat(model='mistral', messages=[{'role': 'user','content': input_llm,},])

        content = llama_response['message']['content']
        # use regex to find all the actions
        found_actions = re.findall(r'\((.*?)\)', content)
        # treat cases where llm didn't answer
        action = string_to_action(found_actions)

        import pdb; pdb.set_trace()
        
    print(action.shape)
    response = envs.step(action)

    img = envs.render(mode='rgb_array')
    images.append(img)    

    if response[2][0]:
        print("done")
        break


In [None]:
from gus.utils import save_video
save_video(images, path='experiment_llm3.mp4')