# Genetic optimisation to reduce rate of infection in crowded spaces
 Plymouth University Comp Sci Bacherlor Showcase


In [None]:
from platypus import NSGAII, NSGAIII, Problem, Integer, Real, nondominated
from platypus.core import nondominated_sort
from IPython.display import display, Markdown, HTML
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import cellpylib as cpl
import random
from collections import Counter
import itertools
from PIL import Image
import cellpylibhack.functions2d as cplhack

plt.rcParams["animation.html"] = "jshtml"


## Contents
* [Problem](#problem)
* [Objectives](#objectives)
* [Evaluation](#evaluation)
* [Testing](#testing)

## Problem
Genetic code to optimise allows GA to control a number of environmental factors (windows, air conditioning) and social factors (face masks, social distance, vaccinated).

* Windows open / closed: influence air flow.
* Air conditioning: influence air flow in entire room.
* Face masks: agents wear face masks.
* Social distance: distance agents should keep between each other.
* Vaccinated: % of agents who are vaccinated.

Each factor can be toggled on / off or be hard set to challenge optimiser.

## Objectives
In order to explore compromising options (avoid simply maximise use of all measures), certain objectives need to be optimised in conflict with each other.

_Minimising_
* Infection count: minimise number of infected agents.
* Inverse social distance punishment: encourage minimal required social distancing.
* Inverse tempurature punishment: use of windows and air conditioning reduces room tempurature. Encourage minimal required use to prevent solution with uninhabitable room tempurature.
* Inverse vaccination punishment: find minimal required % of vaccinated agents to achieve positive impacts.

Inverse punishments encourage the GA to locate a balance between social measures and impact on score (simulating social pushback), if a balance is possible.

## Evaluation
* Use solution code to construct environment for CA.
* Simulate a random population for N epochs.
* Construct objective scores from simulation results.

In [None]:
class AutomataOptimisation(Problem):
    def __init__(
        self,
        environment,
        pathogen_info,
        n_objectives=3,
        init_infected_ratio=0.1,
        agentPopSize=10,
        mask_effectiveness=0.5,
        epochs=50,
        verbose=0,
        evaluating=1,
    ):
        """Optimise social and physical factors to minimise infection rate in enclosed environment.

        NOTE: '#' commented decision variables / objectices are not implemented at present. When implementing, adjust all indexing for accesses of solutions.

        Decision variables (indexed):
            [0]: face mask toggle (boolean).
            [1]: social distancing (agents are disuaded from moving into spaces that violate social distancing, and will move away fron others if made to violate) (integer range 0-5 metres).
            [2-N]: window open/closed (boolean).

        Objectives:
            [0]: Infection count (integer).
            [1]: Inverse social distance (integer).
            [2]: Inverse tempurature (float).

        """

        self.environment = environment

        # map solution features to map coordinates (for physical feature toggles)
        self.windowMap = np.empty((0, 2), dtype=int)
        for (x, y), value in np.ndenumerate(self.environment):
            if value == 2:  # if window, add coords to window list
                self.windowMap = np.append(self.windowMap, [[x, y]], axis=0)

        # decision variables = number of windows (physical) + social distance (social)
        self.physical_nvars = self.windowMap.shape[0]
        self.social_nvars = 2   # masking, social distancing
        decision_variables = self.physical_nvars + self.social_nvars
        objectives = n_objectives

        super(AutomataOptimisation, self).__init__(decision_variables, objectives)

        # type format: [<social vars>, <physical vars (windows)>]
        self.types = [Integer(0, 1), Integer(0, 3),] # masking, social distancing, ...
        for _ in range(self.windowMap.shape[0]):
            self.types.append(Integer(0, 1))    # open/closed window

        self.dimensions = self.environment.shape
        self.evaluating = evaluating
        self.verbose = verbose
        self.agentPopSize = agentPopSize  # number of agents in simulation
        self.init_infected_ratio = init_infected_ratio
        self.epochs = (
            epochs  # number of simulation epochs for testing effectiveness of solution
        )
        self.max_airflow_epochs = 1000  # maximum epochs to allow airflow to run for before compromising with current best approximation
        self.pathogen_info = pathogen_info
        self.mask_effectiveness = mask_effectiveness    # efficiency of masking

    def evaluate(self, solution):
        """Evaluate using lazy or expensive methods."""

        self.full_evaluate(solution)

        #if self.evaluating == 0:
        #    self.lazy_evaluate(solution)
        #elif self.evaluating == 1:
        #    self.full_evaluate(solution)

    def full_evaluate(self, solution):
        """Expensive evaluation. Run full simulation with CA."""

        x = solution.variables[:]

        params = {
            'masking': x[0],
            'social_distance': x[1],
            'physical_vars': x[self.social_nvars:]
        }

        matrix, airflow = self.get_environment(params['physical_vars'])
        agents = self.get_agents(matrix)

        solution.objectives[:] = self.simulate_environment(params, matrix, agents, airflow)

    #def lazy_evaluate(self, solution):
    #    """Lazy evaluation. Estimate objective values."""
    #    x = solution.variables[:]
    #    infection_count, inverse_tempurature, = (
    #        int(),
    #        int(),
    #        int(),
    #    )
    #    # temp score is inverse to windows opened.
    #    inverse_tempurature = Counter(solution[: self.windowMap.shape[0]])[1]
    #    # infection count is projected infection count
    #    """ Infection rate for single epoch, multiplied by total epochs.
    #    * Contact infection is proportional to available space in room, number of agents, number of infected agents, and chance of infection from contact.
    #    * Airborne infection is proportional to number of open windows, number of agents, number of infected agents, airborne infection release rate, airborne decay of infection, and chance of infection from airborne infection.
    #    """
    #    free_space = Counter(i for i in list(itertools.chain.from_iterable(self.environment)))[0]
    #    init_infected = int(self.agentPopSize * self.init_infected_ratio)
    #    init_healthy = self.agentPopSize - init_infected
    #    chance_of_contact_infection = self.pathogen_info.infection_from_contact
    #    chance_of_airborne_infection = self.pathogen_info.infection_from_max_pollutants
    #    airborne_infection_release_rate = self.pathogen_info.pathogen_release_payload
    #    max_pollutants = self.pathogen_info.max_pollutants
    #    airborne_decay = self.pathogen_info.airborne_decay
    #    chance_of_contact = (self.agentPopSize / free_space) 
    #    contact_infections = ((free_space / self.agentPopSize) * chance_of_contact_infection * init_infected) * self.epochs
    #    airborne_infections = ((free_space / self.agentPopSize) * chance_of_airborne_infection * init_infected / inverse_tempurature) * self.epochs
    #    infection_count = contact_infections + airborne_infections + init_infected
    #    # update solution objectives
    #    solution.objectives[:] = infection_count, inverse_tempurature

    def get_environment(self, physical_vars):
        """Modify environment matrix by options.

        Inputs:
            solution: uses physical factors influencing environment.

        Outputs:
            matrix: map of environment, adjusted for physical factors.
            airflow: superimposed map of airflow in room.

        """

        matrix = self.environment[:]  # copy environment map

        # toggle for windows
        for idx, toggle in enumerate(
            physical_vars
        ):  # indexes 0-9
            x, y = self.windowMap[idx]

            if toggle == 1:
                matrix[x][y] = 1  # set window to wall if not open

        # recalculate airflow map
        airflow = self.get_airflow(matrix)

        return matrix, airflow

    def get_airflow(self, matrix):
        """Calculate airflow map for environment by appling CA ruleset.

        Inputs:
            matrix: map of environment, with windows toggled open/closed.
            #toggle_ac: indicator of wether air conditioning is enabled in room or not.

        Outputs:
            airflow: map of airflow cell objects, indicating airflow velocity at given cell.

        """

        cellular_automaton = cpl.init_simple2d(*self.dimensions)

        # set positive and negative air sources
        for x, y in self.windowMap:
            if matrix[x][y] == 2:
                if y < 5:  # positive incoming pressure for windows in top half
                    pressure = 10000
                else:  # negative incoming pressure for windows in bottom half
                    pressure = -10000

                cellular_automaton[:, x, y] = pressure

        # hard set for testing
        # cellular_automaton[:, [0, 0, 0, 4, 5], [1, 5, 8, 9, 0]] = 10000
        # cellular_automaton[:, [8, 8, 9, 9, 9], [0, 9, 2, 5, 7]] = -10000

        airflow_ca = cpl.evolve2d(
            cellular_automaton,
            timesteps=self.until_fixed_point_or_max(),
            apply_rule=AirflowRule(matrix),
            neighbourhood="Moore",
        )

        if self.verbose > 0:
            print("Number of timesteps to reach fixed point: %s" % len(airflow_ca))
            airflow_ani = cpl.plot2d_animate(
                airflow_ca,
                show_grid=True,
                title="Airflow",
                interval=50,
                colormap="Blues",
            )
            display(HTML(airflow_ani.to_jshtml()))

        airflow = airflow_ca[-1]  # final state == fixed state and correct airflow

        return airflow

    def until_fixed_point_or_max(self):
        """Run evolution till stable or reached max epochs.

        Outputs:
            _timesteps: callable function to determine if evolution has halted. Exits if previous and current generations are same state, or a maximum number of generations has been reached.

        """

        def _timesteps(ca, t):
            if len(ca) > 1:
                return (
                    False
                    if (ca[-2] == ca[-1]).all() or len(ca) >= self.max_airflow_epochs
                    else True
                )

            return True

        return _timesteps

    def get_agents(self, matrix):
        """Create population of agents.

        Inputs:
            solution: uses social factors influencing environment.

        Outputs:
            agents: population.

        """

        cellular_automaton = cpl.init_simple2d(*self.dimensions)

        # get list of valid non-wall cells to spawn agents in
        spawnpoints = np.empty((0, 2), dtype=int)
        for (x, y), value in np.ndenumerate(matrix):
            if value == 0:  # if air, then free for spawning
                spawnpoints = np.append(spawnpoints, [[x, y]], axis=0)

        # random shuffle coords and extract popSize list of spawnpoints
        random.shuffle(spawnpoints)

        init_infected = int(self.agentPopSize * self.init_infected_ratio)
        init_healthy = self.agentPopSize - init_infected

        for x, y in [coord for coord in spawnpoints[:init_infected]]:
            cellular_automaton[:, x, y] = 2

        for x, y in [
            coord for coord in spawnpoints[init_infected + 1 : init_healthy + 1]
        ]:
            cellular_automaton[:, x, y] = 1

        return cellular_automaton

    def simulate_environment(self, params, matrix, agents, airflow):
        """Simulate environment for N epochs.

        Outputs:
            infection count: final number of infected agents.
            inverse social distance: social distance parameter fed from solution.
            inverse tempurature: number of windows open.

        """

        infection_count, inverse_social_distance, inverse_tempurature, = (
            int(),
            int(),
            int(),
        )

        inverse_tempurature = Counter(params['physical_vars'])[1]  # tempurature punishment == number of open windows
        inverse_social_distance = social_distance = params['social_distance']  # social distance punishment == social distance policy, to discourage large distancing
        masking = params['masking']

        infection = cpl.init_simple2d(
            *self.dimensions
        )  # initialise empty space for airborne infection layer

        agent_ca, infection_ca = evolveParallel(
            agents,
            infection,
            timesteps=self.epochs,
            apply_rule1=AgentRule(matrix, social_distance, self.pathogen_info, masking, self.mask_effectiveness),
            apply_rule2=InfectionRule(matrix, airflow, pathogen_info),
            neighbourhood="Moore"
        )

        if self.verbose > 0:
            agent_ani = cpl.plot2d_animate(
                agent_ca,
                show_grid=True,
                title="Agents",
                interval=100,
                colormap="Accent",
            )
            infection_ani = cpl.plot2d_animate(
                infection_ca,
                show_grid=True,
                title="Infection airflow",
                interval=100,
                colormap="YlGn",
            )

            # render as js animations
            display(HTML(agent_ani.to_jshtml()))
            display(HTML(infection_ani.to_jshtml()))

        final_state = agent_ca[-1]

        infection_count = Counter(i for i in list(itertools.chain.from_iterable(final_state)))[2]

        #return infection_count, inverse_tempurature
        return infection_count, inverse_social_distance, inverse_tempurature


In [None]:
class PathogenInfo:
    def __init__(
        self,
        infection_from_contact,
        infection_from_max_pollutants,
        max_pollutants,
        airborne_decay,
        pathogen_release_payload,
    ):
        """
        Attributes:
            infection from contact: chance of infection from contact with infected agent.
            infection from max pollutants: chance of infection from being exposed to maximum airborne pollutant concentration.
            max pollutants: airborne pollutants required to have full chance of infection_from_max_pollutants.
            airborne decay: rate of decay of pollutants from air.
            pathogen release payload: number of pollutants released by agent per epoch.

        """

        self.infection_from_contact = infection_from_contact
        self.infection_from_max_pollutants = infection_from_max_pollutants
        self.max_pollutants = max_pollutants
        self.airborne_decay = airborne_decay
        self.pathogen_release_payload = pathogen_release_payload


In [None]:
class AirflowRule(cpl.BaseRule):
    """Simulate positive airflow from windows, updating velocities of air in cells.

    Assumptions:
        * Windows are either negative or positive sources of pressure

    """

    def __init__(self, matrix):
        """
        Attributes:
            matrix: map environment indicating walls, to prevent air flow from passing impassable boundaries.
        """

        self.matrix = matrix

    def __call__(self, neighbourhood, coords, timestamp):
        """

        Inputs:
            neighbourhood: the neighbourhood.

            coords: the index of the current cell.

            timestamp: the current timestep.

        Outputs:
            airflow_value: the new cell state.

        """

        x, y = coords

        if not self.matrix[x][y] == 0:  # if not air, pass
            return neighbourhood[1][
                1
            ]  # return centre neighbour value == this cell value

        # remove neighbours if illegal (wraparound) or are walls
        valid_neighbour_values = np.array([])
        for (i, j), neighbour in np.ndenumerate(neighbourhood):
            if not (
                (x == 0 and i == 0)
                or (x == 9 and i == 2)
                or (y == 0 and j == 0)
                or (y == 9 and j == 2)
            ):
                valid_neighbour_values = np.append(valid_neighbour_values, neighbour)

        # evaluate neighbourhood to calculate pressure level at current position
        airflow_value = sum(
            [val / len(valid_neighbour_values) for val in valid_neighbour_values]
        )  # mean of filtered list, removing None (rejected) neighbours

        # update cell to be mean of neighbourhood
        return airflow_value


In [None]:
class AirflowRule(cpl.BaseRule):
    """Simulate positive airflow from windows, updating velocities of air in cells.

    Assumptions:
        * Windows are either negative or positive sources of pressure

    """

    def __init__(self, matrix):
        """
        Attributes:
            matrix: map environment indicating walls, to prevent air flow from passing impassable boundaries.
        """

        self.matrix = matrix

    def __call__(self, neighbourhood, coords, timestamp):
        """

        Inputs:
            neighbourhood: the neighbourhood.

            coords: the index of the current cell.

            timestamp: the current timestep.

        Outputs:
            airflow_value: the new cell state.

        """

        x, y = coords

        if not self.matrix[x][y] == 0:  # is window or is wall
            return neighbourhood[1][
                1
            ]  # return centre neighbour value == this cell value

        # remove neighbours if illegal (wraparound) or are walls
        valid_neighbour_values = np.array([])
        for (i, j), neighbour in np.ndenumerate(neighbourhood):
            if not (
                (x == 0 and i == 0)
                or (x == 9 and i == 2)
                or (y == 0 and j == 0)
                or (y == 9 and j == 2)
            ):
                valid_neighbour_values = np.append(valid_neighbour_values, neighbour)

        # evaluate neighbourhood to calculate pressure level at current position
        airflow_value = sum(
            [val / len(valid_neighbour_values) for val in valid_neighbour_values]
        )  # mean of filtered list, removing None (rejected) neighbours

        # update cell to be mean of neighbourhood
        return airflow_value


In [None]:
class AgentRule(cpl.BaseRule):
    """Simulate positive airflow from windows, updating velocities of air in cells.

    Assumptions:
        * Windows are either negative or positive sources of pressure

    """

    def __init__(self, matrix, social_distance, pathogen_info, masking, mask_effectiveness):
        """
        Attributes:
            matrix: map environment indicating walls, to prevent air flow from passing impassable boundaries.
            infection: map indicating airborne pathogens from air layer.
            social distance: target distance to encourage agents to stay away from each other.
            pathogen: infection object describing e.g.: chance of infection from contact.
            masking: if policy is to wear masks.
            mask_effectiveness: efficiency of masks to filter pathogen from entering environment.
        """

        self.matrix = matrix
        self.social_distance = social_distance
        self.pathogen_info = pathogen_info
        self.masking = True if masking == 1 else False
        self.mask_effectiveness = mask_effectiveness

        self.shape_x = self.matrix.shape[0]
        self.shape_y = self.matrix.shape[1]

    def __call__(
        self,
        neighbourhood,
        infection_neighbourhood,
        coords,
        timestamp,
        preview,
        infection_layer,
        agent_neighbours,
    ):
        """If called for an agent cell, update.

        Agent behaviour:
            1. Check surrounding cells for direct contact infected agent.
            2. Check infection air layer for pathogen on current cell.
            3. If infected, increase pathogen density level for current cell in infection air layer.
            4. Stay in same position, or select a neighbour cell that isn't a wall, doesn't have an agent occupying it, and won't have an agent occupying it next generation (via preview).

        Inputs:
            neighbourhood: the neighbourhood.

            infection neighbourhood: neighbourhood of agent in infection layer.

            coords: the index of the current cell.

            timestamp: the current timestep.

            preview: preview of currently constructed next generation.

            infection: current infection layer object.

        Outputs:
            r: the new cell state.

        """

        x, y = coords
        #agent_value = neighbourhood[self.social_distance][self.social_distance]
        agent_value = neighbourhood[1][1]

        # ISSUE: air cells do not take action, but they will attempt to propogate their 0 value to the next generation.
        # This can cause agents to vanish, by propogating their 0 value to a cell an agent wants to occupy, removing the agent.
        # Returning None can flag the evolution function to ignore this cell output.
        if agent_value == 0:  # if air, then skip
            return (
                x,
                y,
            ), None  # return None value, indicating should keep existing cell value

        # remove neighbours if illegal (wraparound), edge walls or interior walls
        valid_neighbours = np.array([])
        valid_neighbourhood = neighbourhood[:]
        for (i, j), neighbour in np.ndenumerate(neighbourhood):
            a, b = (x - 1 + i), (y - 1 + j)
            if (
                (
                    a >= 0 and a < self.shape_x and
                    b >= 0 and b < self.shape_y
                ) and   # bounds checks
                not (
                    (x == 0 and i == 0)
                    or (x == 9 and i == 2)
                    or (y == 0 and j == 0)
                    or (y == 9 and j == 2)
                )   # not edge wall or illegal wrap-around
                #and (abs(a-self.social_distance-1) + abs(b-self.social_distance-1)) == 1    # new square is 1 unit from current position
                and self.matrix[a][b] == 0  # not a wall
            ):
                valid_neighbours = np.append(valid_neighbours, neighbour)
            else:
                valid_neighbourhood[i][j] = -1


        if agent_value != 2:  # no use checking for infection if already infected
            # check for contact infection
            for neighbour in valid_neighbours:
                if neighbour == 2:  # if infected neighbour
                    agent_value = (
                        2
                        if random.random() < self.pathogen_info.infection_from_contact
                        else agent_value
                    )  # infected if pass random chance check

            infection_in_cell = infection_neighbourhood[1][1]
            if infection_in_cell > 0:
                # infected if pass random chance, modified for % infection relative to max pollutants requried for max airborne infection chance
                agent_value = (
                    2
                    if random.random()
                    < (
                        self.pathogen_info.infection_from_max_pollutants
                        * (infection_in_cell / self.pathogen_info.max_pollutants)
                    )
                    else agent_value
                )

        else:  # agent is infected
            # pollute infection layer at current cell
            if self.masking:
                payload = self.pathogen_info.pathogen_release_payload * (1 - self.mask_effectiveness)
            else:
                payload = self.pathogen_info.pathogen_release_payload
            
            infection_layer[x][y] += payload

        # assemble potential new movement coordinates
        movement_coords = np.empty((0, 2), dtype=int)

        # accept neighbour cell for movement if not occupied and not wall (== is air)
        for (i, j), neighbour in np.ndenumerate(valid_neighbourhood):
            a, b = (x - 1 + i), (y - 1 + j)
            if (
                (
                    a >= 0 and a < self.shape_x and
                    b >= 0 and b < self.shape_y
                ) and   # bounds checks
                #(abs(a-self.social_distance-1) + abs(b-self.social_distance-1)) == 1 and    # if in range
                self.matrix[a][b] == 0 and neighbour == 0 and preview[a][b] == 0    # square is not a wall, not occupied by another agent, won't be occupied by an agent in the current future.
            ):  # if non-air now and in future
                # adjust coordinates from local to global
                movement_coords = np.append(movement_coords, [[a, b]], axis=0)

        # also add current cell as option
        movement_coords = np.append(movement_coords, [[x, y]], axis=0)

        #x_, y_ = random.choice(movement_coords)

        if agent_neighbours.shape[0] == 0:  # if no agents nearby
            x_, y_ = random.choice(movement_coords)
        else:
            # calculate distances between agent and other agents for each movement option
            movement_options = np.empty((0,3), dtype=int)
            for i, j in movement_coords:
                curr_score = np.Infinity
                # calculate this cell distance to all other agents, score is smallest value
                for i_, j_ in agent_neighbours:
                    dist = abs(i_-self.social_distance-1) + abs(j_-self.social_distance-1)
                    if curr_score > dist and dist > 0:
                        curr_score = dist
                
                movement_options = np.append(movement_options, [[i, j, curr_score]], axis=0)
            # filter for movement that conforms to social distancing
            conforming_movement = [(i, j) for i, j, score in movement_options if score >= self.social_distance]
            if len(conforming_movement) == 0:
                # find best worst movement
                best, best_score = None, 0
                for i, j, score in movement_options:
                    # if closest agent to current cell is further than current best, is dominating
                    if best_score < score:
                        best = (i, j)
                x_, y_ = best
            else:
                # select random conforming movement
                x_, y_ = random.choice(conforming_movement)
    
        x_, y_, agent_value = int(x_), int(y_), int(agent_value)

        return (x_, y_), agent_value


In [None]:
class InfectionRule(cpl.BaseRule):
    """Simulate positive airflow from windows, updating velocities of air in cells.

    Assumptions:
        * Windows are either negative or positive sources of pressure

    """

    def __init__(self, matrix, airflow, pathogen_info):
        """
        Attributes:
            matrix: map environment indicating walls, to prevent air flow from passing impassable boundaries.
        """

        self.matrix = matrix
        self.airflow = airflow
        self.pathogen_info = pathogen_info

        self.shape_x = self.matrix.shape[0]
        self.shape_y = self.matrix.shape[1]

    def __call__(self, neighbourhood, coords, timestamp, r):
        """

        Inputs:
            n: the neighbourhood.

            c: the index of the current cell.

            t: the current timestep.

        Outputs:
            r: the new cell state.

        """

        x, y = coords
        #air_value = neighbourhood[r][r]
        air_value = neighbourhood[1][1]

        if not self.matrix[x][y] == 0:  # is window or is wall
            return (x, y), None  # skip this value

        # remove neighbours if illegal (wraparound) or are walls
        valid_neighbour_coords = np.empty((0, 2), dtype=int)
        for (i, j), neighbour in np.ndenumerate(neighbourhood):
            a, b = (x - 1 + i), (y - 1 + j)
            if (
                (
                    a >= 0 and a < self.shape_x and
                    b >= 0 and b < self.shape_y
                ) and   # bounds checks
                not (
                    (x == 0 and i == 0)
                    or (x == 9 and i == 2)
                    or (y == 0 and j == 0)
                    or (y == 9 and j == 2)
                )
                #and (abs(a-r-1) + abs(b-r-1)) == 1 
                and self.matrix[a][b] == 0
            ):
                valid_neighbour_coords = np.append(
                    valid_neighbour_coords, [[a, b]], axis=0
                )

        # get coords of cell with greatest negative pressure
        x_, y_ = x, y

        # dominating search valid neighbours
        for a, b in valid_neighbour_coords:
            if (
                self.airflow[a][b] < self.airflow[x_][y_]
            ):  # if air pressure dominates (is lower) than current best, set as new best
                x_, y_ = a, b

        # reduce pathogen by decay rate
        air_value -= self.pathogen_info.airborne_decay

        if air_value < 0:
            air_value = 0

        # move airborne contents from current cell to new cell, following path of lowest pressure
        # subtracts air_value from current coords, adds air_value to new target coords.
        return (x_, y_), air_value


## CellPyLib fork
A hack / fork of CellPyLib to support setting state of different cell from cell update. Runs two layers in parallel, a ground and an air layer.

Fork notes:
* No support for memoization.
* No support for dynamic evolution, only fixed length.
* Rules can update cell state of other cells in layer, view future layer (at current progress of generation) and update cell states of other layers.

In [None]:
def evolveParallel(
    layer1, layer2, timesteps, apply_rule1, apply_rule2, neighbourhood="Moore"
):
    """Evolves two cellular automaton in parallel. Generates new generation for first layer then uses the output to generate 2nd layer."""
    assert (
        layer1.shape == layer2.shape,
        "Cannot evolve different layer shapes in parallel.",
    )

    r = 1   # only give vision of 3x3 centered around player - otherwise breaks movement

    von_neumann_mask = np.zeros((2 * r + 1, 2 * r + 1), dtype=bool)
    for i in range(len(von_neumann_mask)):
        mask_size = np.absolute(r - i)
        von_neumann_mask[i][:mask_size] = 1
        if mask_size != 0:
            von_neumann_mask[i][-mask_size:] = 1

    _, rows, cols = layer1.shape
    neighbourhood_indices = cplhack._get_neighbourhood_indices(rows, cols, r)

    cell_indices = cplhack._get_cell_indices((rows, cols))
    cell_idx_to_neigh_idx = cplhack._get_cell_indices_to_neighbourhood_indices(
        cell_indices, rows, cols, r
    )

    # NOTE: to simplify total copied code, can only run this as fixed number of epochs.
    # TODO: add dynamic evolution.
    return _evolveParallel_fixed(
        layer1,
        layer2,
        timesteps,
        apply_rule1,
        apply_rule2,
        neighbourhood,
        rows,
        cols,
        neighbourhood_indices,
        von_neumann_mask,
        cell_indices,
        cell_idx_to_neigh_idx,
        r,
    )


In [None]:
def _evolveParallel_fixed(
    layer1,
    layer2,
    timesteps,
    apply_rule1,
    apply_rule2,
    neighbourhood,
    rows,
    cols,
    neighbourhood_indices,
    von_neumann_mask,
    cell_indices,
    cell_idx_to_neigh_idx,
    r
):
    """
    Evolves the 2 layers of cellular automaton in parallel for a fixed of timesteps.


    """
    initial_conditions1 = layer1[-1]
    array1 = np.zeros((timesteps, rows, cols), dtype=layer1.dtype)
    array1[0] = initial_conditions1

    initial_conditions2 = layer2[-1]
    array2 = np.zeros((timesteps, rows, cols), dtype=layer2.dtype)
    array2[0] = initial_conditions2

    for t in range(1, timesteps):
        cell_layer1 = array1[t - 1]
        cell_layer2 = array2[t - 1]

        # get coords of all agents in matrix
        agent_neighbours = np.empty((0, 2), dtype=int)
        for (i, j), cell in np.ndenumerate(array1[t]):
            if (
                cell > 1  # if healthy or infected agent
            ):
                agent_neighbours = np.append(agent_neighbours, [[i, j]], axis=0)

        for row, cell_row1 in enumerate(cell_layer1):
            cell_row2 = cell_layer2[row]

            for col, cell1 in enumerate(cell_row1):
                cell2 = cell_row2[col]

                n1 = cplhack._get_neighbourhood(
                    cell_layer1,
                    neighbourhood_indices,
                    row,
                    col,
                    neighbourhood,
                    von_neumann_mask,
                )

                n2 = cplhack._get_neighbourhood(
                    cell_layer2,
                    neighbourhood_indices,
                    row,
                    col,
                    neighbourhood,
                    von_neumann_mask,
                )

                # NOTE: allow rule to return set of coordinates and new state.
                # This allows agents to move in environment by triggering rule from current position then setting next self position in different place - movement.

                # array1 is agent layer, so needs infection layer neighbours for agent location
                coords, value = apply_rule1(n1, n2, (row, col), t, array1[t], array2[t], agent_neighbours)

                if value:  # if non-None value
                    x, y = coords
                    array1[t][x][y] = value

                # array2 is infection layer, so doesn't need infection layer neighbours for agent location, nor preview of next layer
                coords, value = apply_rule2(n2, (row, col), t, r)

                if value:  # if non-None value
                    x, y = coords
                    # add to new position, allow multiple cells to merge into a single point
                    array2[t][x][y] += value

    return np.concatenate((layer1, array1[1:]), axis=0), np.concatenate(
        (layer2, array2[1:]), axis=0
    )


## Testing
* 


### Loading matrix from image
In order to accelerate testing, new matrix environments can be loaded from image data. 

*NOTE: it is important the image is saved as an 8-bit JPG image, so each pixel is uint8 decodeable.*

Current matrix options are:
* 0: air.
* 1: wall.
* 2: window.

In [None]:
def parse_matrix_from_image(filepath):
    """ Parse an environment matrix from image file.

        Arguments:
            filepath (str): absolute filepath to target map.

        Returns:
            matrix (numpy.array): formatted array of environment matrix.
    """

    # open file
    image = Image.open(filepath)
    image.load()

    # convert image into numpy array
    image_data = np.asarray(image)

    # convert into heatmap data
    shape = np.shape(image_data)
    matrix = np.zeros(shape, dtype='uint8')

    for (x, y), pixel in np.ndenumerate(image_data):
        pixel = int(pixel)
        matrix[x, y] = pixel

    print(Counter(i for i in list(itertools.chain.from_iterable(matrix))))

    plt.imshow(matrix, cmap=plt.get_cmap("Accent"),)
    plt.plot()
    
    return matrix

In [None]:
# environment = np.array(
#    [
#        [1, 2, 1, 1, 1, 2, 1, 1, 2, 1],
#        [1, 0, 0, 0, 0, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
#        [1, 0, 1, 1, 1, 1, 1, 1, 0, 2],
#        [2, 0, 0, 0, 0, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 0, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
#        [2, 0, 0, 0, 1, 0, 0, 0, 0, 2],
#        [1, 1, 2, 1, 1, 2, 1, 2, 1, 1],
#    ]
# )  # encoded environment map (walls and windows) (air=0, walls=1, windows=2)

#environment = np.array(
#    [
#        [1, 2, 1, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 2, 1],
#        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
#        [1, 0, 1, 1, 1, 1, 1, 1, 0, 2, 1, 0, 1, 1, 1, 1, 1, 1, 0, 2],
#        [2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1],
#        [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
#        [2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2],
#        [1, 1, 2, 1, 1, 2, 1, 2, 1, 1, 1, 1, 2, 1, 1, 2, 1, 2, 1, 1],
#    ]
#)  # encoded environment map (walls and windows) (air=0, walls=1, windows=2)

filepath = "testing/smb.png"
environment = parse_matrix_from_image(filepath)

pathogen_info = PathogenInfo(
    infection_from_contact=0.8,
    infection_from_max_pollutants=0.4,
    max_pollutants=50,
    airborne_decay=-2,
    pathogen_release_payload=10,
)

problem = AutomataOptimisation(environment, pathogen_info, n_objectives=3, verbose=0, evaluating=1)


In [None]:
algorithm = NSGAII(problem)
algorithm.run(10)


In [None]:
Y = np.array([s.objectives for s in algorithm.result])

fig = plt.figure(figsize = (10, 7))
ax = plt.axes(projection ="3d")
ax.scatter3D(Y[:, 0], Y[:, 1], Y[:, 2])
ax.set_xlabel("infection count")
ax.set_ylabel("social distancing")
ax.set_zlabel("tempurature")
plt.show()

nondominated_solutions = nondominated(algorithm.result)
Y = np.array([s.objectives for s in nondominated_solutions])

fig = plt.figure(figsize = (10, 7))
ax = plt.axes(projection ="3d")
ax.scatter3D(Y[:, 0], Y[:, 1], Y[:, 2])
ax.set_xlabel("infection count")
ax.set_ylabel("social distancing")
ax.set_zlabel("tempurature")
plt.show()


In [None]:
best = nondominated_solutions[
    int(np.sum(Y, axis=1).argmin(axis=0))
]  # find most minimising solution

decision_vars = []
int_ = Integer(0, 19)
for var in best.variables:
    decision_vars += [
        int_.decode(var),
    ]

print(f"Solution: {decision_vars}\nEvaluation: {best.objectives}")

environment = np.array(
    [
        [1, 2, 1, 1, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1, 1, 2, 1, 1, 2, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1],
        [1, 0, 1, 1, 1, 1, 1, 1, 0, 2, 1, 0, 1, 1, 1, 1, 1, 1, 0, 2],
        [2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
        [2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2],
        [1, 1, 2, 1, 1, 2, 1, 2, 1, 1, 1, 1, 2, 1, 1, 2, 1, 2, 1, 1],
    ]
)  # encoded environment map (walls and windows) (air=0, walls=1, windows=2)

pathogen_info = PathogenInfo(
    infection_from_contact=0.8,
    infection_from_max_pollutants=0.4,
    max_pollutants=50,
    airborne_decay=-2,
    pathogen_release_payload=10,
)

class Dummy:
    def __init__(self, variables, n_objectives):
        self.variables = variables
        self.objectives = [0,]*n_objectives

# show solution simulation
problem = AutomataOptimisation(environment, pathogen_info, n_objectives=3, verbose=2, evaluating=1)
dummy = Dummy(variables=decision_vars, n_objectives=3)
problem.evaluate(dummy)