In [11]:
import nest_asyncio
nest_asyncio.apply()

from sc2 import maps
from sc2.bot_ai import BotAI
from sc2.data import Difficulty, Race
from sc2.ids.ability_id import AbilityId
from sc2.ids.buff_id import BuffId
from sc2.ids.unit_typeid import UnitTypeId
from sc2.main import run_game
from sc2.player import Bot, Computer
import random
import math
import numpy as np; from numpy import linspace, pi

### needs to be a state machine

class BBBot(BotAI):
    action = []

    # pylint: disable=R0912
    async def on_step(self, iteration):
        if iteration % 5 == 0:
            self.tick5 = True
        else:
            self.tick5 = False
        
        bases = self.townhalls
        num_bases = self.townhalls.amount
        num_workers = self.workers.amount
        if num_bases > 0:
            worker_ratio = num_workers/num_bases
        else:
            worker_ratio = 0

        await self.distribute_workers()
        #await self.gas()
        await self.build_supply()
        await self.build_rax()
        await self.build_marines()
        await self.marine_patrol()
        await self.upgrade_marines()

        if worker_ratio < 22:
            await self.build_workers()
        elif worker_ratio > 26:
            # TODO: send extra workers to their deaths
            pass
        else:
            await self.expand()

    async def build_workers(self):
        for cc in self.townhalls.ready.idle:
            if self.can_afford(UnitTypeId.SCV):
                cc.train(UnitTypeId.SCV)

    async def expand(self):
        if self.townhalls.amount < 5 and self.can_afford(UnitTypeId.COMMANDCENTER) and not self.already_pending(UnitTypeId.COMMANDCENTER):
            await self.expand_now()
            return

    async def build_supply(self):
        ccs = self.townhalls(UnitTypeId.COMMANDCENTER).ready
        if ccs.exists:
            cc = ccs.first
            if self.supply_left < 4 and not self.already_pending(UnitTypeId.SUPPLYDEPOT):
                if self.can_afford(UnitTypeId.SUPPLYDEPOT):
                    await self.build(UnitTypeId.SUPPLYDEPOT, near=cc.position.towards(self.game_info.map_center, 10))

    async def build_rax(self):
        ccs = self.townhalls.ready
        if ccs.exists:
            for cc in ccs:
                if self.structures(UnitTypeId.BARRACKS).closer_than(5, cc).amount < 2 and self.already_pending(UnitTypeId.BARRACKS) < 2:
                    if self.can_afford(UnitTypeId.BARRACKS):
                        await self.build(UnitTypeId.BARRACKS, near=cc.position)

    async def build_marines(self):
        if self.units(UnitTypeId.MARINE).amount < self.townhalls.amount * 30:
            ccs = self.townhalls(UnitTypeId.COMMANDCENTER).ready
            if ccs.exists:
                for cc in ccs:
                    if self.structures(UnitTypeId.BARRACKS).closer_than(20, cc).amount > 0:
                        for rax in self.structures(UnitTypeId.BARRACKS).closer_than(20, cc).ready.idle:
                            if self.can_afford(UnitTypeId.MARINE):
                                rax.train(UnitTypeId.MARINE)

    async def marine_patrol(self):
        mrns = self.units(UnitTypeId.MARINE)
        ccs = self.townhalls
        if mrns.exists:
            for mr in mrns:
                pos = mr.position
                u_in_r = self.enemy_units.in_attack_range_of(mr, 20)
                s_in_r = self.enemy_structures.in_attack_range_of(mr, 20)
                r_in_r = self.destructables.closer_than(5, pos)
                if u_in_r.amount > 0:
                    nearest_enemy = u_in_r.closest_to(mr)
                    mr.attack(nearest_enemy)
                elif s_in_r.amount > 0:
                    nearest_struct = s_in_r.closest_to(mr)
                    mr.attack(nearest_struct)
                elif r_in_r.amount > 0:
                    nearest_rocks = r_in_r.closest_to(mr)
                    mr.attack(nearest_rocks)
                else:
                    #if self.tick5:
                    ### boid ai begins here ###
                    mr_facing = mr.facing # float, 0 to 2pi
                    neighbors = self.units(UnitTypeId.MARINE).closest_n_units(pos, 7).closer_than(20,pos)
                    if neighbors.exists:
                        avg_neighbor_heading = mr_facing
                        count = 0
                        for n in neighbors:
                            avg_neighbor_heading += n.facing
                            count += 1
                        avg_neighbor_heading = avg_neighbor_heading / count
                        calc_pos = pos.towards(pos + [math.cos(avg_neighbor_heading),
                                                    math.sin(avg_neighbor_heading)],
                                                    8)
                        nearest_neighbor = neighbors.closest_to(pos)
                        nn_heading = nearest_neighbor.facing
                        d_factor = nearest_neighbor.distance_to_squared(pos) + 1
                        calc_pos = calc_pos.towards(pos - [math.cos(nn_heading),
                                                        math.sin(nn_heading)],
                                                        4/d_factor)
                        calc_pos = calc_pos.towards(neighbors.center, 2)

                    else:
                        calc_pos = pos.towards_with_random_angle(pos + [math.cos(mr_facing),
                                                                        math.sin(mr_facing)])
                    if self.all_enemy_units.amount > 0:
                        calc_pos = calc_pos.towards_with_random_angle(self.all_enemy_units.center,
                                                                    5,
                                                                    (math.pi/8)
                                                                    )
                    else:
                            calc_pos = calc_pos.towards_with_random_angle(self.enemy_start_locations[0],
                                                                2,
                                                                (math.pi/8)
                                                                )
                    if self.in_pathing_grid(calc_pos):
                        mr.move(calc_pos)
                    else:
                        d = calc_pos.distance_to(pos)
                        mr.move(calc_pos.towards_with_random_angle(pos,d, (math.pi/2)))

    async def upgrade_marines(self):
        if self.structures(UnitTypeId.BARRACKS).ready.amount > 0:
            rax = self.structures(UnitTypeId.BARRACKS).ready.first
            if not rax.has_techlab and self.can_afford(UnitTypeId.TECHLAB):
                await self.build(UnitTypeId.TECHLAB, near=rax.position)

    def log_action(self, location):
        if location in self.action:
            pass
        else:
            self.action.append(location)
        if len(self.action) > 20:
            self.action.pop(0)

    async def gas(self):
        # Build refineries (on nearby vespene) when at least one barracks is in construction
        if (
            self.structures(UnitTypeId.BARRACKS).ready.amount + self.already_pending(UnitTypeId.BARRACKS) > 0
            and self.already_pending(UnitTypeId.REFINERY) < 1
        ):
            # Loop over all townhalls that are 100% complete
            for th in self.townhalls.ready:
                # Find all vespene geysers that are closer than range 10 to this townhall
                vgs = self.vespene_geyser.closer_than(10, th)
                for vg in vgs:
                    if await self.can_place_single(UnitTypeId.REFINERY,
                                                   vg.position) and self.can_afford(UnitTypeId.REFINERY):
                        workers = self.workers.gathering
                        if workers:  # same condition as above
                            worker = workers.closest_to(vg)
                            # Caution: the target for the refinery has to be the vespene geyser, not its position!
                            worker.build_gas(vg)

                            # Dont build more than one each frame
                            break

def main():
    run_game(
        maps.get("(2)CatalystLE"),
        [Bot(Race.Terran, BBBot()),
         Computer(Race.Terran, Difficulty.Hard)],
        realtime=False,
    )


if __name__ == "__main__":
    main()


2023-02-28 14:19:01.747 | INFO     | sc2.protocol:_execute:72 - Client status changed to Status.launched (was None)
2023-02-28 14:19:01.747 | INFO     | sc2.controller:create_game:37 - Creating new game
2023-02-28 14:19:01.748 | INFO     | sc2.controller:create_game:38 - Map:     (2)CatalystLE
2023-02-28 14:19:01.748 | INFO     | sc2.controller:create_game:39 - Players: Bot BBBot(Terran), Computer Hard(Terran, RandomBuild)
2023-02-28 14:19:01.749 | INFO     | sc2.protocol:_execute:72 - Client status changed to Status.init_game (was Status.launched)
2023-02-28 14:19:06.977 | INFO     | sc2.protocol:_execute:72 - Client status changed to Status.in_game (was None)
2023-02-28 14:19:06.978 | INFO     | sc2.main:_play_game:221 - Player 1 - Bot BBBot(Terran)
2023-02-28 14:21:38.033 | ERROR    | sc2.protocol:__request:50 - Cannot receive: Connection already closed.
2023-02-28 14:21:38.035 | INFO     | sc2.sc2process:_close_connection:231 - Closing connection at 62062...
2023-02-28 14:21:38.035

ConnectionResetError: Cannot write to closing transport