In [1]:
#C2D2E.py
import random
import numpy as np
import pandas as pd
from enum import Enum, auto
import matplotlib.pyplot as plt


from mesa import Model, Agent
from mesa.datacollection import DataCollector
from mesa.time import RandomActivation
from mesa.space import MultiGrid
from mesa.visualization.modules import CanvasGrid
from mesa.visualization.ModularVisualization import ModularServer
from mesa.visualization.modules import ChartModule

from pathfinding.core.diagonal_movement import DiagonalMovement
from pathfinding.core.grid import Grid
from pathfinding.finder.a_star import AStarFinder
#Pathfinding functions used from https://github.com/brean/python-pathfinding. 
#Agent & Environment Maps must have positive values for agents to move.  
#In this API, map (i.e. array) values <= 0 equate to obstacles

#GLOBAL 'CONSTANTS' - not wrapped in Enums so that they can be manipulated by Batch Runs
MAX_STEPS = 500

MAP_HEIGHT = 50
MAP_WIDTH = 50

SENSOR_RANGE = 15                         #range at which Red & Blue units can detect each other

RED_TARGETING_EFFECTIVENESS = .5          #Red Side's effectiveness of detecting and targeting Blue Side coms
RED_RANGE = 8                             #Red-For ranged combat range (radius)
RED_RANGE_THREAT = 10                     #Red-For ranged combat threat magnitude
RED_CLOSE_RANGE = 2                       #Red-For close combat range (radius)
RED_CLOSE_THREAT = 30                     #Red-For close combat threat magnitude
RED_CFF_THREAT = 30

MSG_MODIFIER_POSREP = -.2                 #modifiers to RED_TARGETING_EFFECTIVENESS
MSG_MODIFIER_SITREP = .1                  #based IRL on message duration, required RF Freq / Output, etc.
MSG_MODIFIER_CSS = 0                      
MSG_MODIFIER_CFF = 0
MSG_MODIFIER_INTEL_UPDATE = .2

BLUE_COM_RISK_THRESHOLD = .5              #Blue Side's risk acceptance for establishing coms 
BLUE_COM_WINDOW = 10                      #mandated frequency of Blue Side communication with higher (in Steps)
BLUE_COM_EFFECTIVENESS_THRESHOLD = 70     #effectiveness threshold below which Blue Side will establish com regardless of risk
BLUE_RANGE = 8                             #Red-For ranged combat range (radius)
BLUE_RANGE_THREAT = 10                     #Red-For ranged combat threat magnitude
BLUE_CLOSE_RANGE = 2                       #Red-For close combat range (radius)
BLUE_CLOSE_THREAT = 30                     #Red-For close combat threat magnitude
BLUE_CFF_THREAT = 30
BLUE_CSS = 30                             #amount Blue Side effectiveness raised when requesting support

#GLOBAL Enums 
class AgentColor(Enum):
    BLUE = auto()
    RED = auto()
    UNKNOWN = auto()

class AgentType(Enum):
    GROUND = auto()
    AIR = auto()
    HEADQUARTERS = auto()
    
class EffectType(Enum):
    CSS = auto()
    CFF = auto()
    GROUND = auto()
    AIR = auto()
    IDF = auto()
    
class MessageType(Enum):
    POSREP = auto()
    SITREP = auto()
    CSS = auto()
    CFF = auto()
    INTEL_UPDATE = auto()
    
    
class PhysicalObstacle(Enum):
    #not currently in use
    #future use in map definition
    UNPASSABLE = 0
    RIVER = - 1
    MOUNTAIN = -2
    
def RedTargetingEffects(MsgType):
    #calculates chance of Red-For targeting of Blue-For communications
    #returns True if detected, else False

    AdjustedEffectiveness = RED_TARGETING_EFFECTIVENESS
    
    if MsgType == MessageType.POSREP : AdjustedEffectiveness += MSG_MODIFIER_POSREP
    elif MsgType == MessageType.SITREP : AdjustedEffectiveness += MSG_MODIFIER_SITREP
    elif MsgType == MessageType.CSS : AdjustedEffectiveness += MSG_MODIFIER_CSS
    elif MsgType == MessageType.CFF : AdjustedEffectiveness += MSG_MODIFIER_CFF
    elif MsgType == MessageType.INTEL_UPDATE : AdjustedEffectiveness += MSG_MODIFIER_INTEL_UPDATE
    
    return True if random.uniform(0, 1) < AdjustedEffectiveness else False
    
    
def sector_mask(shape,centre,radius,angle_range):
    #re-used from StackExchange
    #https://stackoverflow.com/questions/18352973/mask-a-circular-sector-in-a-numpy-array?noredirect=1
    
    #used to mask areas of a 2D array to reflect threat rings (circles) 
        
    """
    Return a boolean mask for a circular sector. The start/stop angles in  
    `angle_range` should be given in clockwise order.
    """

    x,y = np.ogrid[:shape[0],:shape[1]]
    cx,cy = centre
    tmin,tmax = np.deg2rad(angle_range)

    # ensure stop angle > start angle
    if tmax < tmin:
            tmax += 2*np.pi

    # convert cartesian --> polar coordinates
    r2 = (x-cx)*(x-cx) + (y-cy)*(y-cy)
    theta = np.arctan2(x-cx,y-cy) - tmin

    # wrap angles between 0 and 2*pi
    theta %= (2*np.pi)

    # circular mask
    circmask = r2 <= radius*radius

    # angular mask
    anglemask = theta <= (tmax-tmin)

    return circmask*anglemask

def generate_sector_mask(coord, radius, angle_range=(0,360)):
    #returns a mask of the environment_map array (i.e. [0][0]=True, [0][1]=False, etc.)
    #by default returns a circle of radius RADIUS, centered on COORD
    #can return a sector
    x, y = coord
    mask = sector_mask((MAP_WIDTH, MAP_HEIGHT), (x, y), radius, angle_range) 
    return mask

def generate_threat_ring(coord, radius, threat) :
    #returns a map with a threat ring as weights, intended to be added to existing environment map
    #threat ring weigts = THREAT, circle size = RADIUS, centered on COORD
    map_update = np.zeros((MAP_WIDTH, MAP_HEIGHT))
    mask = generate_sector_mask(coord, radius)
    map_update[mask] = threat
    return map_update


In [2]:
#EventList.py
def printme(st = "ASDFASDFASDFASDFASD"):
    print(st)
    return st

def addme(x=0, y=0):
    print(x+y)
    return(x+y)

class EventList:
    
    def __init__(self):
        self.events = []
    
    def add_event(self, step, action, params=()):
        self.events.append((step, action, params))
        
        #when adding events with 1 parameter
        #parameter must be added as a tuple with trailing ','
        #    e.g. ('asdf', )
        #otherwise exec_events won't unpack param correctly
        
        #when adding class methods/functions pass parent object to action
        #    e.g. add_event(1, some_object.some_action, (params,))
        
    def show_events(self):
        print(self.events)
    
    def exec_events(self, step):
        for events in self.events:
            if events[0] == step:
                x = events[1]
                y  = events[2]
                x(*y)
                
        
#myevents = EventList()
#myevents.add_event(1, printme, ("ASDF",))
#myevents.add_event(1, addme, (1, 5))
#myevents.add_event(2, myevents.show_events)
#myevents.show_events()
#myevents.exec_events(2)


In [3]:
#agent.py
from C2D2E import *
from eventlist import *

def damage_mod():
    return np.random.beta(5, 1.5)

class UnitAgent(Agent):
    """ A Military unit agent """
    def __init__(self, unique_id, model,  agent_type=AgentType.GROUND, color=AgentColor.BLUE, should_move=False, objective=None, higher=None):
        super().__init__(unique_id, model)
        
        self.agent_type = agent_type
        self.color = color
        self.should_move = should_move
        self.objective = [objective] #LIFO list of objectives, next = objective[-1], final = objective[0]
        self.higher = higher            
        self.pos = (-1,-1)
        self.last_com=0              #last turn had com w higher
        self.effectiveness = 100.0   #hybrid of statistic of unit health/supply/combat effectiveness
        self.agent_map = np.ones((MAP_WIDTH, MAP_HEIGHT))  #what each agent knows about the world
                                                #Blue-For agents do not explictly know positions of Red-For agents
                                                #Red-For agents do not currently move and can be identified by map weights
        
        #stats for data collector, zeroed at beginning of every step
        self.stat_cff = 0                 #Blue-For call for fire effects
        self.stat_attrition_self = 0      #Blue-For 'damage' taken
        self.stat_attrition_opfor= 0      #Blue-For 'damage' dealt
        self.stat_css = 0                 #Blue-For 'health' restored
        self.stat_com_made = False        #Blue-For established communications with higher
        self.stat_com_targeted = False    #Blue-For communications detected and targeted by Red-For
        self.stat_map_update_sent = False #Blue-For sent intel update
        self.stat_map_update_received = False #Blue-For received intel update
        self.stat_path_length = 0         #Number of steps required to reach objective
        self.curr_effects = []            #FIFO list of effects each turn, gathered throghout turn and applied by self.effects()
        self.stat_cff_targets = []        
        self.stat_range_targets = []
        self.stat_close_targets = []
        
        
        #NOTE: agent.pos not defined until model.grid.add_agent is called. agent.pos cant be accessed in agent.__init
    
    def zero_stats(self):
        #stats for data collector, zeroed at beginning of every step
        self.stat_cff = 0                 #Blue-For call for fire effects
        self.stat_attrition_self = 0      #Blue-For 'damage' taken
        self.stat_attrition_opfor= 0      #Blue-For 'damage' dealt
        self.stat_css = 0                 #Blue-For 'health' restored
        self.stat_com_made = False        #Blue-For established communications with higher
        self.stat_com_targeted = False    #Blue-For communications detected and targeted by Red-For
        self.stat_map_update_sent = False #Blue-For sent intel update
        self.stat_map_update_received = False #Blue-For received intel update
        self.stat_path_length = 0         #Number of steps required to reach objective
        self.curr_effects = []            #FIFO list of effects each turn, gathered throghout turn and applied by self.effects()
        self.stat_cff_targets = []        
        self.stat_range_targets = []
        self.stat_close_targets = []
        
    def print_stats(self):
        #stats for data collector, zeroed at beginning of every step
        print("")
        print("")
        print(self.model.step_count, self.unique_id, self.pos, self.agent_map[self.pos])
        print("CURR EFFECTS ", self.curr_effects)            #FIFO list of effects each turn, gathered throghout turn and applied by self.effects()
        print("CFF TARGETS ", self.stat_cff_targets)
        print("RANGE TARGETS ", self.stat_range_targets)
        print("CLOSE TARGETS ", self.stat_close_targets)
        print("CFF ", self.stat_cff)                 #Blue-For call for fire effects
        print("ATTR SELF ", self.stat_attrition_self)      #Blue-For 'damage' taken
        print("ATTR OTHER ", self.stat_attrition_opfor)      #Blue-For 'damage' dealt
        print("CSS ", self.stat_css)                 #Blue-For 'health' restored
        print("COM MADE ", self.stat_com_made)        #Blue-For established communications with higher
        print("COM TARGETED ", self.stat_com_targeted)    #Blue-For communications detected and targeted by Red-For
        print("UPDATE SENT ", self.stat_map_update_sent) #Blue-For sent intel update
        print("UPDATE RECEIVED ", self.stat_map_update_received) #Blue-For received intel update
        print("PATH LENGTH ", self.stat_path_length)         #Number of steps required to reach objective
        print("CURR EFFECTIVENESS", self.effectiveness)
        
    def sense(self):
        #replaces a section (radius) of agent map with an update from environement map
        #occurs every step/turn with radius = SENSOR_RANGE 
        mask = generate_sector_mask(self.pos, SENSOR_RANGE)
        self.agent_map[mask] = self.model.environment_map[mask]
        return
    
    def communicate(self):
        #determines whether not to send one or more communication messages
        #each message can be detected and targeted by Red-For 
        #if com detected, Blue-For unit takes effects = current map weight
        
        rnd = random.uniform(0, 1)       #chance to decide to send comms this step
        
        if self.agent_type == AgentType.HEADQUARTERS : return
        
        if self.color == AgentColor.BLUE :
            
            if self.agent_type == AgentType.AIR :
                #single air unit is used primarily to demonstrate functionality of agent/event scheduling
                #only exists for one step/turn, makes sensor sweep, reports map to higher
                mask = self.agent_map > 1     # portion of map that agent has sensed
                self.higher.agent_map[mask] = self.agent_map[mask]
                return

            #POSREP
            if rnd < BLUE_COM_RISK_THRESHOLD or self.model.step_count - self.last_com > BLUE_COM_WINDOW :
                #send Position Report to higher if risk allows or BLUE_COM_WINDOW has passed
                self.higher.subordinates_lastpos[self.unique_id] = self.pos
                if RedTargetingEffects(MessageType.POSREP) : self.stat_com_targeted = True
                self.last_com = self.model.step_count
                self.stat_com_made = True
            
            #SITREP
            if rnd < BLUE_COM_RISK_THRESHOLD:
                #send Situation Report (i.e. push map updates to higher)
                mask = self.agent_map > 1     # portion of map that agent has sensed
                self.higher.agent_map[mask] = self.agent_map[mask]
                if RedTargetingEffects(MessageType.SITREP) : self.stat_com_targeted = True
                self.last_com = self.model.step_count
                self.stat_map_update_sent = True
                self.stat_com_made = True
            
            #CSS
            if rnd < BLUE_COM_RISK_THRESHOLD or self.effectiveness < BLUE_COM_EFFECTIVENESS_THRESHOLD:
                #send Combat Service Support request to higher if risk allows or effectiveness below threshold
                #CSS message is a catch all for MEDEVAC, Resupply, Combat Replacements, etc.
                #will increase effectiveness
                self.curr_effects.append((EffectType.CSS, BLUE_CSS))
                if RedTargetingEffects(MessageType.CSS) : self.stat_com_targeted = True
                self.last_com = self.model.step_count
                self.stat_com_made = True

            #CFF
            if (rnd < BLUE_COM_RISK_THRESHOLD and self.agent_map[self.pos] > 1) or (self.agent_map[self.pos] > RED_RANGE_THREAT) :
                #request Call For Fire if near Red-For and risk allows OR if in close combat with Red-For unit
                #proximity to Red-For unit currenlty implied from map_weights
                #will need to be updated if Red-For updated to maneuver
                self.curr_effects.append((EffectType.CFF, BLUE_CFF_THREAT))
                if RedTargetingEffects(MessageType.CFF) : self.stat_com_targeted = True
                self.last_com = self.model.step_count
                self.stat_com_made = True
            
            #INTEL UPDATE
            if rnd < BLUE_COM_RISK_THRESHOLD :
                #request Intelligence Update (i.e. pull map updates to higher)
                mask = self.higher.agent_map > 1     # portion of map that has been sensed
                self.agent_map[mask] = self.higher.agent_map[mask] 
                if RedTargetingEffects(MessageType.INTEL_UPDATE) : self.stat_com_targeted = True
                self.last_com = self.model.step_count
                self.stat_map_update_received = True
                self.stat_com_made = True
                
                
        if self.color == AgentColor.RED:
            #future modification should allow for Blue-For targeting of RedFor coms
            pass
                
        if self.stat_com_targeted : 
            #add effect if Red-For successfully targets Blue-For communication
            self.curr_effects.append((EffectType.IDF, RED_CFF_THREAT))
                    
    def move(self):
        #this method relies on external pathfinding API 
        #https://github.com/brean/python-pathfinding
        #not optimized for models with large maps or many agents
        
        #uses pathfinder to plot optimal course based on curent state of map 
        #then moves 1 position toward objective
        #will recalculate move next turn
        #In future - optimize to not calculate full path
        
        if not self.objective or not self.should_move: return  #if objective list is empty or unit shouldn't move: return
            #(should raise handled error in future)
        
        if self.pos != self.objective[-1]: #if current location not equal to next objective (i.e. last element of list)
            
            #create pathfinding grid from MESA grid
            #MESA grid and pathfinding grid are oriented differently, requires transposition of array => map.T
            grid = Grid(matrix=self.agent_map.T) 
            x, y = self.pos 
            start = grid.node(x, y)
            x, y = self.objective[-1]
            end = grid.node(x, y)

            finder = AStarFinder(diagonal_movement=DiagonalMovement.always)
            path, runs = finder.find_path(start, end, grid)
            self.stat_path_length = len(path)
            
            if len(path) > 1: #if there is a solution path, then take first move
                x, y = path[1]
                self.model.grid.move_agent(self, (x, y)) #move
                if self.pos == self.objective[-1] : self.objective.pop() # if have arrived at objective, set next objective
  

    def effects(self):
        if self.color == AgentColor.BLUE:
            
            if self.agent_type == AgentType.AIR : return
            
            cff_effect = 0
            
            #com effects
            for effect in self.curr_effects: #curr_effect list is a type, (EffectType, scalar magnitude)
                
                if effect[0] == EffectType.CFF: #Blue-For Call for Fire
                    cff_effect += effect[1]
                elif effect[0] ==  EffectType.CSS: #Blue-For request for Combat Service Support
                    self.effectiveness += effect[1]
                    self.effectiveness = 100 if self.effectiveness > 100 else self.effectiveness #max effectiveness = 100
                    self.stat_css += effect[1]
                else: #everything else is damage
                    damage = damage_mod() * effect[1] #'damage' is stochastic, % of THREAT/EFFECT, skewed toward .8
                    self.effectiveness -= damage
                    self.stat_attrition_self += damage
                    #NOTE: Red-For CFF effects included in effects list as EffectType.IDF
            
            
            #ground combat effects -< CFF, Ranged, Close >- Effects all Stack
            #for simplicity BLUE/RED effect ranges are currently equal
            
            #CFF Effects
            for RedAgent in self.model.grid.get_neighbors(self.pos, True, True, BLUE_RANGE + 1):
                #+1 range accounts for Blue-For CFF request made in agent.Communicate() which occurs before agent.Move()
                if RedAgent.color != AgentColor.RED : continue
                self.stat_cff_targets.append((RedAgent.unique_id, RedAgent.pos))
                    
                #Blue-For only, Red-For CFF effects applied above in Com Effects
                damage = damage_mod() * cff_effect #'damage' is stochastic, % of THREAT/EFFECT, skewed toward .8
                RedAgent.effectiveness -= damage
                self.stat_cff += damage
                self.stat_attrition_opfor += damage
            
            #Ranged Effects
            for RedAgent in self.model.grid.get_neighbors(self.pos, True, True, BLUE_RANGE):
                if RedAgent.color != AgentColor.RED : continue
                self.stat_range_targets.append((RedAgent.unique_id, RedAgent.pos))
                    
                #Red-For effects
                damage = damage_mod() * RED_RANGE_THREAT #'damage' is stochastic, % of THREAT/EFFECT, skewed toward .8
                self.effectiveness -= damage
                self.stat_attrition_self += damage
                
                #Blue-For effects
                damage = damage_mod() * BLUE_RANGE_THREAT #'damage' is stochastic, % of THREAT/EFFECT, skewed toward .8
                RedAgent.effectiveness -= damage
                self.stat_attrition_opfor += damage
            
            #Close Effects
            for RedAgent in self.model.grid.get_neighbors(self.pos, True, True, BLUE_CLOSE_RANGE):
                if RedAgent.color != AgentColor.RED : continue
                self.stat_close_targets.append((RedAgent.unique_id, RedAgent.pos))

                #Red-For effects
                damage = damage_mod() * RED_CLOSE_THREAT #'damage' is stochastic, % of THREAT/EFFECT, skewed toward .8
                self.effectiveness -= damage
                self.stat_attrition_self += damage
                
                #Blue-For effects
                damage = damage_mod() * BLUE_CLOSE_THREAT #'damage' is stochastic, % of THREAT/EFFECT, skewed toward .8
                RedAgent.effectiveness -= damage
                self.stat_attrition_opfor += damage                

                
        if self.effectiveness < 0:        #agent dies at end of turn if effectiveness falls below 0
            self.model.grid.remove_agent(self)
            self.model.schedule.remove(self)
            if self.color == AgentColor.BLUE : self.model.stat_blue_units_destroyed += 1
            if self.color == AgentColor.RED : self.model.stat_red_units_destroyed += 1
            if self.unique_id == "Goal" : self.model.stop_sim(True)
                
        #update remaining stats
        self.model.stat_red_total_attrition += self.stat_attrition_opfor
        self.model.stat_blue_total_attrition += self.stat_attrition_self
        self.model.stat_blue_total_css += self.stat_css


    def step(self):
        self.zero_stats()
        self.sense()
        self.communicate()
        self.move()
        self.effects()
        #debug
        #if self.color == AgentColor.BLUE : self.print_stats()
    
    
class HeadquartersAgent(UnitAgent):
    #headquarters units must be instantiated prior to subordiantes
    #so command relationship can be created
    
    def __init__(self, unique_id, model, color=AgentColor.BLUE, should_move=False, objective=None, higher=None):
        super().__init__(unique_id, model,  AgentType.HEADQUARTERS, color, should_move, objective, higher)
        
        self.subordinates = []                    #in future, this will be generic to UnitAgent
        self.subordinates_lastpos = {}
    
    def add_subordinate(self, agent):
        #establishes 'command' relationships between agents and a HQ agent
        #this must be called after 'agent' object has been added to grid
        #otherwise reference to agent.pos will fail
        
        #add agent to subordinates
        self.subordinates.append(agent)
        #ensure reciprocal command relationship
        agent.higher = self
        #add last known position
        self.subordinates_lastpos[agent.unique_id] = agent.pos
    
    def command(self):
        #future: coordinate movement and effects of agents
        pass
    
    

In [4]:
#model.py
from C2D2E import *
from agent import *
from eventlist import *


    
class C2D2EModel(Model):
    """A model with some number of agents."""
    def __init__(self, N=1):
        
        #init vars / maps
        self.num_agents = N
        self.grid = MultiGrid(MAP_WIDTH, MAP_HEIGHT, False) #MESA grid object
        self.schedule = RandomActivation(self) #MESA scheduling object
        self.environment_map = np.zeros((MAP_WIDTH, MAP_HEIGHT)) #secondary internal map for storing map weights
        self.step_count = 0
        self.model_events = EventList()
        self.waiting_agents = []
    
        #init stats
        self.stat_red_total_attrition = 0
        self.stat_red_units_destroyed = 0
        self.stat_blue_total_attrition = 0
        self.stat_blue_total_css = 0
        self.stat_blue_units_destroyed = 0
        self.stat_blue_final_effectiveness = 0
        self.stat_mission_accomplished = False
        self.stat_total_steps = 0

        #add starting agents
        self.__initialize_agents()
        
        #add data collector
        self.datacollector = DataCollector(
            model_reporters={"Red Total Attrition" : "stat_red_total_attrition",
                             "Red Units Destroyed" : "stat_red_units_destroyed",
                             "Blue Total Attrition" :"stat_blue_total_attrition",
                             "Blue Total CSS" : "stat_blue_total_css",
                             "Blue Units Destroyed" : "stat_blue_units_destroyed",
                             "Blue Final Effectiveness" : "stat_blue_final_effectiveness",
                             "Mission Accomplished" : "stat_mission_accomplished",
                             "Total Steps" : "stat_total_steps"},
            agent_reporters={"POS" : "pos",
                             "Effectiveness": "effectiveness", 
                             "CFF Damage" : "stat_cff",
                             "Attrition Self" : "stat_attrition_self",
                             "Attrition OpFor" : "stat_attrition_opfor",
                             "CSS" : "stat_css",
                             "COM Made" : "stat_com_made",
                             "COM Targeted" : "stat_com_targeted",
                             "Update Sent" : "stat_map_update_sent",
                             "Update Received" : "stat_map_update_received",
                             "Path Length" : "stat_path_length",
                             "Current Effects" : "curr_effects",
                             "CFF Targets" : "stat_cff_targets",
                             "Range Targets" : "stat_range_targets",
                             "Close Targets" : "stat_close_targets"})
         
        #for MESA visualization
        self.running = True
      
        #schedule future events
        #Future AIR agent and events scheduled in self.Initialize_Agents

    def place_red_unit(self, unique_id, coord):
        # agent init function 
        #def __init__(self, unique_id, model,  agent_type, color=AgentColor.BLUE, should_move=False, objective=None, higher=None):
        
        should_move = False
        a = UnitAgent(unique_id, self, AgentType.GROUND, AgentColor.RED, should_move)
        self.schedule.add(a)
        self.grid.place_agent(a, coord)
        return a
    
    def place_blue_unit(self, unique_id, coord, objective, higher = None):
        # agent init function 
        #def __init__(self, unique_id, model,  agent_type, color=AgentColor.BLUE, should_move=False, objective=None, higher=None):
        
        should_move = True
        a = UnitAgent(unique_id, self, AgentType.GROUND, AgentColor.BLUE, should_move, objective, higher)        
        self.schedule.add(a)
        self.grid.place_agent(a, coord)
        if a.higher: a.higher.add_subordinate(a) #if higher != to None establish command relationship
        return a
        
    def __initialize_agents(self):
            
        # agent init function 
        #def __init__(self, unique_id, model,  agent_type, color="blue", should_move=False, objective=None, higher=None):
        
        #debug stuff
        #self.place_blue_unit("v12", (0,0), (19,19))
        #self.place_red_unit("red", (10, 19))
    
        HQ = HeadquartersAgent("2d Marines", self, AgentColor.BLUE, False, (25,45))
        self.schedule.add(HQ)
        self.grid.place_agent(HQ, (0,0))
        
        self.place_blue_unit("V12", (10, 1), (25,45), HQ)
        self.place_blue_unit("V22", (25, 1), (25,45), HQ)
        self.place_blue_unit("V32", (40, 1), (25,45), HQ)
        
        self.place_red_unit("red1", (5, 20))
        self.place_red_unit("red2", (20, 20))
        self.place_red_unit("red3", (25, 20))
        self.place_red_unit("red4", (35, 20))
        self.place_red_unit("red5", (45, 20))
        self.place_red_unit("red6", (15, 30))
        self.place_red_unit("red7", (25, 30))
        self.place_red_unit("red8", (40, 30))
        self.place_red_unit("Goal", (25, 45))
    
        #schedule future agent / events
        #currently only used to demonstrate capability
        
        # agent init function 
        #def __init__(self, unique_id, model,  agent_type, color="blue", should_move=False, objective=None, higher=None):
        #interarrrival = max(int(np.random.exponential(10)), 1) 
        interarrrival = 0
        agt = UnitAgent("Black Knight 1-1", self, AgentType.AIR, AgentColor.BLUE, False, None, HQ)
        self.model_events.add_event(interarrrival, self.schedule.add, (agt, ))
        self.model_events.add_event(interarrrival, self.grid.place_agent, (agt, (10,10)))
        self.model_events.add_event(interarrrival + 1, self.schedule.remove, (agt, ))
        self.model_events.add_event(interarrrival + 1, self.grid.remove_agent, (agt,))

    def __update_map_weights(self):
        
        #generates current map weights
        #map weights = (potential) damage taken by blue units when moving through map
        
        #all map weights are zeroed and re-calculated each step according to current location of units
        self.environment_map = np.ones((MAP_WIDTH, MAP_HEIGHT)) #secondary internal map for storing map weights

        #generate (potential) threat to blue units based on location of red unitws
        for agent in self.schedule.agents:

            #for all red agents, create threat rings centered on red units
            #reflect threat rings as map weights
            if agent.color == AgentColor.RED:
                x, y = agent.pos

                #close combat threat ring
                self.environment_map += generate_threat_ring((x, y), RED_CLOSE_RANGE, RED_CLOSE_THREAT)

                #ranged threat ring
                self.environment_map += generate_threat_ring((x, y), RED_RANGE, RED_RANGE_THREAT)
           
    def stop_sim(self, success=False):
        self.running = False
        self.stat_mission_accomplished = success
        self.stat_total_steps = self.step_count
        for agent in self.schedule.agents:
            if agent.color == AgentColor.BLUE : self.stat_blue_final_effectiveness += agent.effectiveness        
                
    def step(self):
        self.__update_map_weights()  
        self.model_events.exec_events(self.step_count)
        self.schedule.step()
        self.datacollector.collect(self)
        self.step_count += 1
        if self.step_count > MAX_STEPS : self.stop_sim(False)

    def run_model(self, n):
        for i in range(n):
            self.step()    

                
    

In [5]:
#run.py
# Interactive Visualization
# show environment map and agents for 

from C2D2E import *
from model import *
from agent import *


def agent_portrayal(agent):
    portrayal = {"Shape": "circle",
                 "Filled": "true",
                 "Layer": 0,
                 "Color": agent.color.name,
                 "r": 0.5}
    return portrayal

grid = CanvasGrid(agent_portrayal, MAP_WIDTH, MAP_HEIGHT)
chart = ChartModule([{"Label": "Blue Total Attrition",
                      "Color": "Blue"},
                    {"Label": "Red Total Attrition",
                      "Color": "Red"}],
                    data_collector_name='datacollector')
server = ModularServer(C2D2EModel,
                       [grid, chart],
                       "Unit Model",
                       {"N": 7})

server.port = 8521 # The default
server.launch()

Interface starting at http://127.0.0.1:8521


In [6]:
#THIS EXPERIMENT WAS RUN IN SPYDER
#PRESENTED HERE FOR DIPLAY ONLY

from C2D2E import * 
from model import *
from agent import *

#vary risk acceptance of friendly com
Experiment1 = None
n = 0
#degree of RED_TARGETING_EFFECTIVENESS = independent variable
for i in range(1, 11, 1):
    
    
    
    
    continue # CONTINUE INCLUDED ONLY FOR JUPYTER/GITHUB TO PREVENT EXECUTION OF THIS CODE BLOCK
             # REMOVE FOR USE
    
    
    
    
    #Sim will look at 10 values of Red-For targeting [.1, .2, .3, ... 1]
    RED_TARGETING_EFFECTIVENESS = i * .1

    for j in range(500):
        #at each value of RED_TARGETING, 500 samples / runs will be measured for Blue Com
        BLUE_COM_RISK_THRESHOLD = np.random.uniform()

        c2model = C2D2EModel()
        while c2model.running :
            c2model.step()
        filename = "Ex1RedTgt" + str(RED_TARGETING_EFFECTIVENESS) + "BlueCom" + str(BLUE_COM_RISK_THRESHOLD)

        agent_stats = c2model.datacollector.get_agent_vars_dataframe()
        agent_stats.to_excel(filename + "agents.xlsx")

        model_stats = c2model.datacollector.get_model_vars_dataframe()
        lastrow = model_stats.tail(1)
        lastrow["RedTgt"] = RED_TARGETING_EFFECTIVENESS
        lastrow["BlueCom"] = BLUE_COM_RISK_THRESHOLD
        if n == 0 :
            Experiment1 = pd.DataFrame(lastrow)
        else:
            Experiment1 = Experiment1.append(lastrow)
        print(n, i, j)
        n += 1
        Experiment1.to_excel("Experiment1.xlsx")
print("ALL DONE")

ALL DONE
Socket opened!
{"type":"get_params"}
{"type":"reset"}
