In [None]:
from typing import Dict, Any, List
import random
from world import LogItem
from species import Species
import math
import constants
from scipy.stats import norm


In [None]:
class SpeedEmulator:
    """
    A class representing the speed-only emulator.

    Parameters
    ----------
    parameters: Dict(parameters):
        The list of trainable parameters

    Attributes
    ----------
    num_inital_species : constant(int)
        Number of inital species
    day : int
        Days elapsed since the start of the simulation (starts at 0)
    species_list : list(tuple(species,number))
        Stores the current state of the world. A list of current species with their respective number

    """

    def __init__(self, parameters = {"scale":1, "speed_factor":1}, num_initial_species) -> None:
        """
        Initialise the World object.
        """
        
        self.day = 0
        self.num_initial_species = num_initial_species
        self.species_list = [(Species(),num_initial_species)]
        self.parameters = parameters


    def run(self, mutation_rates, max_days=None) -> (int, List[LogItem]):
        """
        Run the World simulation with given mutation rates until the species goes extinct. 

        Parameters
        ----------
        mutation_rates : dict(string, int)
            Contains keys: size, speed, vision, aggression. 
            With corresponding values representing the mutation rates for each trait 
        max_days : optional(int)
            If not None, this is the maximum number of days the simulation can run for before being automatically terminated (default is None)

        Returns 
        -------
        days_survived : int 
            The number of days the species has survived until extinction 
        log : list(LogItem)
            A list of log item entries (important values for emulation training: see LogItem) made throughout the simulation's execution

        Attributes 
        ----------
        size_mutation_rate : int 
            Mutation rate for species size: size_{t+1} = N(size_t, size_mutation_rate)
        speed_mutation_rate : int 
            Mutation rate for species speed: speed_{t+1} = N(speed_t, speed_mutation_rate)
        vision_mutation_rate : int 
            Mutation rate for species vision: vision_{t+1} = N(vision_t, vision_mutation_rate)
        aggression_mutation_rate : int 
            Mutation rate for species aggression: aggression_{t+1} = N(aggression_t, aggression_mutation_rate)
        """
        
        self.speed_mutation_rate = mutation_rates["speed"]

        log = []
        is_extinct = False

        while not is_extinct:
            self.day += 1

            if max_days:
                if self.day > max_days:
                    self.day -= 1
                    break

            is_extinct, log_item = self.compute_timestep()
            log.append(log_item)

        return self.day, log

    def compute_timestep(self) -> None:
        """
        Perform a timestep (that is process 1 day) of the World simulation.

        Returns 
        -------
        is_extinct : bool 
            This is true if, and only if, all species have died
        log_item : LogItem
            The log item entry for this timestep of the simulation (important values for emulation training: see LogItem)
        """

        temperature, probability_of_food = self.calculate_environment_conditions()

        self.species_energy_change(probability_of_food)
        num_species_alive = self.species_die()

        log_item = LogItem(self.day, num_species_alive,
                           temperature, probability_of_food, traits_dict)

        is_extinct = num_species_alive == 0

        return is_extinct, log_item

    def get_traits_of_living_species(self) -> Dict[str, List[float]]:
        """
        Returns each trait of all living species in list form, accessible through a dictionary. 

        Returns
        -------
        traits_dict : Dict[str, List[float]]
            Contains traits of all living species in the form: 
            {
              "size" : [size_species_1, ..., size_species_n],
              "speed" : [speed_species_1, ..., speed_species_n],
              "vision" : [vision_species_1, ..., vision_species_n],
              "aggression" : [aggression_species_1, ..., aggression_species_n],
            }
        """

        traits_dict = {"size": [], "speed": [], "vision": [], "aggression": []}

        for species_tuple in self.species_list:
            traits_dict["size"].append(species_tuple[0].size)
            traits_dict["speed"].append(species_tuple[0].speed)
            traits_dict["vision"].append(species_tuple[0].vision)
            traits_dict["aggression"].append(species_tuple[0].aggression)

        return traits_dict

    def compute_temperature(self) -> int:
        """
        Compute the temperature depending on the day.

        This takes into account both (periodic) seasonal variance and (linear) climate change.

        We compute seasonal variance as: 8 - 18cos(2π * days / 365) as seen in Chapter 4.2 of https://link.springer.com/article/10.1007/s11538-008-9389-z.

        TODO: implement climate change

        Returns
        -------
        temperature : float
            Temperature on the current day
        """

        return 8 - 18 * math.cos(2 * math.pi * self.day / 365)

    def calculate_environment_conditions(self) -> None:
        """
        Calculate food probability based on temperature

        We model the probability of a food appearing in a location as a scaled Gaussian distribution:
            probability_of_food = scalar * exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        TODO: find a scientific backing behind how food availability depends on temperature

        Returns
        -------
        temperature : float 
            Current temperature on this day  
        probability_of_food : float
            Probability of a food being generated in any location on this day 
        """

        optimal_temperature = constants.OPTIMAL_TEMPERATURE
        scalar = constants.FOOD_PROBABILITY_SCALAR
        sigma = constants.FOOD_PROBABILITY_STD

        temperature = self.compute_temperature()

        probability_of_food = scalar * \
            math.exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        return temperature, probability_of_food

    def species_energy_change(self, probability_of_food) -> None:
        """
        Each species aquire a set amount of food based on speed and a trainable scaling factor.
        Each species loses a set amount of energy at the end of every day. 
        """
        
        for species_tuple in self.species_list:
            species_tuple[0].energy += probability_of_food * species.speed * self.parameters['speed_factor']
            
        energy_loss = constants.ENERGY_LOSS


    def species_reproduce(self) -> None:
        """
        If a species has more than N energy, they reproduce asexually with mutated speed.
        """

        reproduction_threshold = constants.REPRODUCTION_THRESHOLD
        
        for species_tuple in self.species_list:
            if species_tuple[0] >= reproduction_threshold:
                for _ in range(species_tuple[1]):
                    child_speed = random.normal(loc=species_tuple[0].speed, scale=mutation_rates["speed"])
                    new_species = True
                    for existing_species_tuple in self.species_list:
                        if existing_species_tuple[0].speed == child_speed:
                            existing_species_tuple[1] += 1
                    if new_species:
                        self.species_list.append((Species(speed=child_speed),1))
                        
    
    def species_die(self) -> bool:
        """
        If any species has less than or equal to 0 energy, they die. Energy sampled based on normal distribution with scale as a trainable parameter.

        Returns
        -------
        num_species_alive : int
            The number of species that are still alive in the simulation World
        """

        num_alive_species = 0  # Calculated like this for the logs

        for species_tuple in self.species_list:
            species_tuple[1] = int(species_tuple[1] * (1-norm.cdf(0, species_tuple[0].energy, species_tuple[0].energy * self.parameters['scale'])))
            num_alive_species += species_tuple[1]
                                   
        return num_alive_species

In [None]:
class SizeEmulator:
    """
    A class representing the size-only emulator.

    Parameters
    ----------
    parameters: Dict(parameters):
        The list of trainable parameters

    Attributes
    ----------
    num_inital_species : constant(int)
        Number of inital species
    day : int
        Days elapsed since the start of the simulation (starts at 0)
    species_list : list(tuple(species,number))
        Stores the current state of the world. A list of current species with their respective number
    
    """

    def __init__(self, parameters = {"scale":1, "speed_factor":1}, num_initial_species) -> None:
        """
        Initialise the World object.
        """
        
        self.day = 0
        self.num_initial_species = num_initial_species
        self.species_list = [(Species(),num_initial_species)]
        self.parameters = parameters


    def run(self, mutation_rates, max_days=None) -> (int, List[LogItem]):
        """
        Run the World simulation with given mutation rates until the species goes extinct. 

        Parameters
        ----------
        mutation_rates : dict(string, int)
            Contains keys: size, speed, vision, aggression. 
            With corresponding values representing the mutation rates for each trait 
        max_days : optional(int)
            If not None, this is the maximum number of days the simulation can run for before being automatically terminated (default is None)

        Returns 
        -------
        days_survived : int 
            The number of days the species has survived until extinction 
        log : list(LogItem)
            A list of log item entries (important values for emulation training: see LogItem) made throughout the simulation's execution

        Attributes 
        ----------
        size_mutation_rate : int 
            Mutation rate for species size: size_{t+1} = N(size_t, size_mutation_rate)
        speed_mutation_rate : int 
            Mutation rate for species speed: speed_{t+1} = N(speed_t, speed_mutation_rate)
        vision_mutation_rate : int 
            Mutation rate for species vision: vision_{t+1} = N(vision_t, vision_mutation_rate)
        aggression_mutation_rate : int 
            Mutation rate for species aggression: aggression_{t+1} = N(aggression_t, aggression_mutation_rate)
        """
        
        self.size_mutation_rate = mutation_rates["size"]

        log = []
        is_extinct = False

        while not is_extinct:
            self.day += 1

            if max_days:
                if self.day > max_days:
                    self.day -= 1
                    break

            is_extinct, log_item = self.compute_timestep()
            log.append(log_item)

        return self.day, log

    def compute_timestep(self) -> None:
        """
        Perform a timestep (that is process 1 day) of the World simulation.

        Returns 
        -------
        is_extinct : bool 
            This is true if, and only if, all species have died
        log_item : LogItem
            The log item entry for this timestep of the simulation (important values for emulation training: see LogItem)
        """

        temperature, probability_of_food = self.calculate_environment_conditions()

        self.species_energy_change(probability_of_food)
        num_species_alive = self.species_die()

        log_item = LogItem(self.day, num_species_alive,
                           temperature, probability_of_food, traits_dict)

        is_extinct = num_species_alive == 0

        return is_extinct, log_item

    def get_traits_of_living_species(self) -> Dict[str, List[float]]:
        """
        Returns each trait of all living species in list form, accessible through a dictionary. 

        Returns
        -------
        traits_dict : Dict[str, List[float]]
            Contains traits of all living species in the form: 
            {
              "size" : [size_species_1, ..., size_species_n],
              "speed" : [speed_species_1, ..., speed_species_n],
              "vision" : [vision_species_1, ..., vision_species_n],
              "aggression" : [aggression_species_1, ..., aggression_species_n],
            }
        """

        traits_dict = {"size": [], "speed": [], "vision": [], "aggression": []}

        for species_tuple in self.species_list:
            traits_dict["size"].append(species_tuple[0].size)
            traits_dict["speed"].append(species_tuple[0].speed)
            traits_dict["vision"].append(species_tuple[0].vision)
            traits_dict["aggression"].append(species_tuple[0].aggression)

        return traits_dict

    def compute_temperature(self) -> int:
        """
        Compute the temperature depending on the day.

        This takes into account both (periodic) seasonal variance and (linear) climate change.

        We compute seasonal variance as: 8 - 18cos(2π * days / 365) as seen in Chapter 4.2 of https://link.springer.com/article/10.1007/s11538-008-9389-z.

        TODO: implement climate change

        Returns
        -------
        temperature : float
            Temperature on the current day
        """

        return 8 - 18 * math.cos(2 * math.pi * self.day / 365)

    def calculate_environment_conditions(self) -> None:
        """
        Calculate food probability based on temperature

        We model the probability of a food appearing in a location as a scaled Gaussian distribution:
            probability_of_food = scalar * exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        TODO: find a scientific backing behind how food availability depends on temperature

        Returns
        -------
        temperature : float 
            Current temperature on this day  
        probability_of_food : float
            Probability of a food being generated in any location on this day 
        """

        optimal_temperature = constants.OPTIMAL_TEMPERATURE
        scalar = constants.FOOD_PROBABILITY_SCALAR
        sigma = constants.FOOD_PROBABILITY_STD

        temperature = self.compute_temperature()

        probability_of_food = scalar * \
            math.exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        return temperature, probability_of_food

    def species_energy_change(self, probability_of_food) -> None:
        """
        Each species aquire a set amount of food based on speed and a trainable scaling factor.
        Each species loses a set amount of energy at the end of every day. 
        """
        
        for species_tuple in self.species_list:
            species_tuple[0].energy += probability_of_food * species.speed
            
        energy_loss = constants.ENERGY_LOSS


    def species_reproduce(self) -> None:
        """
        If a species has more than N energy, they reproduce asexually with mutated speed.
        """

        reproduction_threshold = constants.REPRODUCTION_THRESHOLD
        
        for species_tuple in self.species_list:
            if species_tuple[0] >= reproduction_threshold:
                for _ in range(species_tuple[1]):
                    child_size = random.normal(loc=species_tuple[0].size, scale=mutation_rates["size"])
                    new_species = True
                    for existing_species_tuple in self.species_list:
                        if existing_species_tuple[0].size == child_size:
                            existing_species_tuple[1] += 1
                    if new_species:
                        self.species_list.append((Species(speed=child_size),1))
                        
    
    def species_die(self) -> bool:
        """
        If any species has less than or equal to 0 energy, they die. Energy sampled based on normal distribution with scale as a trainable parameter.

        Returns
        -------
        num_species_alive : int
            The number of species that are still alive in the simulation World
        """

        num_alive_species = 0  # Calculated like this for the logs

        for species_tuple in self.species_list:
            species_tuple[1] = int(species_tuple[1] * (1-norm.cdf(0, species_tuple[0].energy, species_tuple[0].energy * self.parameters['scale'])))
            num_alive_species += species_tuple[1]
                                   
        return num_alive_species

In [None]:
class VisionEmulator:
    """
    A class representing the speed-only emulator.

    Parameters
    ----------
    grid_length_size : int 
        Length of each size of the simulation grid.

    Attributes
    ----------
    num_inital_species : constant(int)
        Number of inital species
    day : int
        Days elapsed since the start of the simulation (starts at 0)
    species_list : list(tuple(species,number))
        Stores the current state of the world. A list of current species with their respective number
    parameters: Dict(parameters):
        The list of trainable parameters
    """

    def __init__(self, num_initial_species) -> None:
        """
        Initialise the World object.
        """
        
        self.day = 0
        self.num_initial_species = num_initial_species
        self.species_list = [(Species(),num_initial_species)]
        self.parameters = {"scale":1, "speed_factor":1}


    def run(self, mutation_rates, max_days=None) -> (int, List[LogItem]):
        """
        Run the World simulation with given mutation rates until the species goes extinct. 

        Parameters
        ----------
        mutation_rates : dict(string, int)
            Contains keys: size, speed, vision, aggression. 
            With corresponding values representing the mutation rates for each trait 
        max_days : optional(int)
            If not None, this is the maximum number of days the simulation can run for before being automatically terminated (default is None)

        Returns 
        -------
        days_survived : int 
            The number of days the species has survived until extinction 
        log : list(LogItem)
            A list of log item entries (important values for emulation training: see LogItem) made throughout the simulation's execution

        Attributes 
        ----------
        size_mutation_rate : int 
            Mutation rate for species size: size_{t+1} = N(size_t, size_mutation_rate)
        speed_mutation_rate : int 
            Mutation rate for species speed: speed_{t+1} = N(speed_t, speed_mutation_rate)
        vision_mutation_rate : int 
            Mutation rate for species vision: vision_{t+1} = N(vision_t, vision_mutation_rate)
        aggression_mutation_rate : int 
            Mutation rate for species aggression: aggression_{t+1} = N(aggression_t, aggression_mutation_rate)
        """
        
        self.speed_mutation_rate = mutation_rates["speed"]

        log = []
        is_extinct = False

        while not is_extinct:
            self.day += 1

            if max_days:
                if self.day > max_days:
                    self.day -= 1
                    break

            is_extinct, log_item = self.compute_timestep()
            log.append(log_item)

        return self.day, log

    def compute_timestep(self) -> None:
        """
        Perform a timestep (that is process 1 day) of the World simulation.

        Returns 
        -------
        is_extinct : bool 
            This is true if, and only if, all species have died
        log_item : LogItem
            The log item entry for this timestep of the simulation (important values for emulation training: see LogItem)
        """

        temperature, probability_of_food = self.calculate_environment_conditions()

        self.species_energy_change(probability_of_food)
        num_species_alive = self.species_die()

        log_item = LogItem(self.day, num_species_alive,
                           temperature, probability_of_food, traits_dict)

        is_extinct = num_species_alive == 0

        return is_extinct, log_item

    def get_traits_of_living_species(self) -> Dict[str, List[float]]:
        """
        Returns each trait of all living species in list form, accessible through a dictionary. 

        Returns
        -------
        traits_dict : Dict[str, List[float]]
            Contains traits of all living species in the form: 
            {
              "size" : [size_species_1, ..., size_species_n],
              "speed" : [speed_species_1, ..., speed_species_n],
              "vision" : [vision_species_1, ..., vision_species_n],
              "aggression" : [aggression_species_1, ..., aggression_species_n],
            }
        """

        traits_dict = {"size": [], "speed": [], "vision": [], "aggression": []}

        for species_tuple in self.species_list:
            traits_dict["size"].append(species_tuple[0].size)
            traits_dict["speed"].append(species_tuple[0].speed)
            traits_dict["vision"].append(species_tuple[0].vision)
            traits_dict["aggression"].append(species_tuple[0].aggression)

        return traits_dict

    def compute_temperature(self) -> int:
        """
        Compute the temperature depending on the day.

        This takes into account both (periodic) seasonal variance and (linear) climate change.

        We compute seasonal variance as: 8 - 18cos(2π * days / 365) as seen in Chapter 4.2 of https://link.springer.com/article/10.1007/s11538-008-9389-z.

        TODO: implement climate change

        Returns
        -------
        temperature : float
            Temperature on the current day
        """

        return 8 - 18 * math.cos(2 * math.pi * self.day / 365)

    def calculate_environment_conditions(self) -> None:
        """
        Calculate food probability based on temperature

        We model the probability of a food appearing in a location as a scaled Gaussian distribution:
            probability_of_food = scalar * exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        TODO: find a scientific backing behind how food availability depends on temperature

        Returns
        -------
        temperature : float 
            Current temperature on this day  
        probability_of_food : float
            Probability of a food being generated in any location on this day 
        """

        optimal_temperature = constants.OPTIMAL_TEMPERATURE
        scalar = constants.FOOD_PROBABILITY_SCALAR
        sigma = constants.FOOD_PROBABILITY_STD

        temperature = self.compute_temperature()

        probability_of_food = scalar * \
            math.exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        return temperature, probability_of_food

    def species_energy_change(self, probability_of_food) -> None:
        """
        Each species aquire a set amount of food based on speed and a trainable scaling factor.
        Each species loses a set amount of energy at the end of every day. 
        """
        
        for species_tuple in self.species_list:
            species_tuple[0].energy += probability_of_food * species.speed * self.parameters['speed_factor']
            
        energy_loss = constants.ENERGY_LOSS


    def species_reproduce(self) -> None:
        """
        If a species has more than N energy, they reproduce asexually with mutated speed.
        """

        reproduction_threshold = constants.REPRODUCTION_THRESHOLD
        
        for species_tuple in self.species_list:
            if species_tuple[0] >= reproduction_threshold:
                for _ in range(species_tuple[1]):
                    child_speed = random.normal(loc=species_tuple[0].speed, scale=mutation_rates["speed"])
                    new_species = True
                    for existing_species_tuple in self.species_list:
                        if existing_species_tuple[0].speed == child_speed:
                            existing_species_tuple[1] += 1
                    if new_species:
                        self.species_list.append((Species(speed=child_speed),1))
                        
    
    def species_die(self) -> bool:
        """
        If any species has less than or equal to 0 energy, they die. Energy sampled based on normal distribution with scale as a trainable parameter.

        Returns
        -------
        num_species_alive : int
            The number of species that are still alive in the simulation World
        """

        num_alive_species = 0  # Calculated like this for the logs

        for species_tuple in self.species_list:
            species_tuple[1] = int(species_tuple[1] * (1-norm.cdf(0, species_tuple[0].energy, species_tuple[0].energy * self.parameters['scale'])))
            num_alive_species += species_tuple[1]
                                   
        return num_alive_species

In [None]:
class AggressionEmulator:
    """
    A class representing the speed-only emulator.

    Parameters
    ----------
    grid_length_size : int 
        Length of each size of the simulation grid.

    Attributes
    ----------
    num_inital_species : constant(int)
        Number of inital species
    day : int
        Days elapsed since the start of the simulation (starts at 0)
    species_list : list(tuple(species,number))
        Stores the current state of the world. A list of current species with their respective number
    parameters: Dict(parameters):
        The list of trainable parameters
    """

    def __init__(self, num_initial_species) -> None:
        """
        Initialise the World object.
        """
        
        self.day = 0
        self.num_initial_species = num_initial_species
        self.species_list = [(Species(),num_initial_species)]
        self.parameters = {"scale":1, "speed_factor":1}


    def run(self, mutation_rates, max_days=None) -> (int, List[LogItem]):
        """
        Run the World simulation with given mutation rates until the species goes extinct. 

        Parameters
        ----------
        mutation_rates : dict(string, int)
            Contains keys: size, speed, vision, aggression. 
            With corresponding values representing the mutation rates for each trait 
        max_days : optional(int)
            If not None, this is the maximum number of days the simulation can run for before being automatically terminated (default is None)

        Returns 
        -------
        days_survived : int 
            The number of days the species has survived until extinction 
        log : list(LogItem)
            A list of log item entries (important values for emulation training: see LogItem) made throughout the simulation's execution

        Attributes 
        ----------
        size_mutation_rate : int 
            Mutation rate for species size: size_{t+1} = N(size_t, size_mutation_rate)
        speed_mutation_rate : int 
            Mutation rate for species speed: speed_{t+1} = N(speed_t, speed_mutation_rate)
        vision_mutation_rate : int 
            Mutation rate for species vision: vision_{t+1} = N(vision_t, vision_mutation_rate)
        aggression_mutation_rate : int 
            Mutation rate for species aggression: aggression_{t+1} = N(aggression_t, aggression_mutation_rate)
        """
        
        self.speed_mutation_rate = mutation_rates["speed"]

        log = []
        is_extinct = False

        while not is_extinct:
            self.day += 1

            if max_days:
                if self.day > max_days:
                    self.day -= 1
                    break

            is_extinct, log_item = self.compute_timestep()
            log.append(log_item)

        return self.day, log

    def compute_timestep(self) -> None:
        """
        Perform a timestep (that is process 1 day) of the World simulation.

        Returns 
        -------
        is_extinct : bool 
            This is true if, and only if, all species have died
        log_item : LogItem
            The log item entry for this timestep of the simulation (important values for emulation training: see LogItem)
        """

        temperature, probability_of_food = self.calculate_environment_conditions()

        self.species_energy_change(probability_of_food)
        num_species_alive = self.species_die()

        log_item = LogItem(self.day, num_species_alive,
                           temperature, probability_of_food, traits_dict)

        is_extinct = num_species_alive == 0

        return is_extinct, log_item

    def get_traits_of_living_species(self) -> Dict[str, List[float]]:
        """
        Returns each trait of all living species in list form, accessible through a dictionary. 

        Returns
        -------
        traits_dict : Dict[str, List[float]]
            Contains traits of all living species in the form: 
            {
              "size" : [size_species_1, ..., size_species_n],
              "speed" : [speed_species_1, ..., speed_species_n],
              "vision" : [vision_species_1, ..., vision_species_n],
              "aggression" : [aggression_species_1, ..., aggression_species_n],
            }
        """

        traits_dict = {"size": [], "speed": [], "vision": [], "aggression": []}

        for species_tuple in self.species_list:
            traits_dict["size"].append(species_tuple[0].size)
            traits_dict["speed"].append(species_tuple[0].speed)
            traits_dict["vision"].append(species_tuple[0].vision)
            traits_dict["aggression"].append(species_tuple[0].aggression)

        return traits_dict

    def compute_temperature(self) -> int:
        """
        Compute the temperature depending on the day.

        This takes into account both (periodic) seasonal variance and (linear) climate change.

        We compute seasonal variance as: 8 - 18cos(2π * days / 365) as seen in Chapter 4.2 of https://link.springer.com/article/10.1007/s11538-008-9389-z.

        TODO: implement climate change

        Returns
        -------
        temperature : float
            Temperature on the current day
        """

        return 8 - 18 * math.cos(2 * math.pi * self.day / 365)

    def calculate_environment_conditions(self) -> None:
        """
        Calculate food probability based on temperature

        We model the probability of a food appearing in a location as a scaled Gaussian distribution:
            probability_of_food = scalar * exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        TODO: find a scientific backing behind how food availability depends on temperature

        Returns
        -------
        temperature : float 
            Current temperature on this day  
        probability_of_food : float
            Probability of a food being generated in any location on this day 
        """

        optimal_temperature = constants.OPTIMAL_TEMPERATURE
        scalar = constants.FOOD_PROBABILITY_SCALAR
        sigma = constants.FOOD_PROBABILITY_STD

        temperature = self.compute_temperature()

        probability_of_food = scalar * \
            math.exp(-0.5 * ((temperature - optimal_temperature) / sigma) ** 2)

        return temperature, probability_of_food

    def species_energy_change(self, probability_of_food) -> None:
        """
        Each species aquire a set amount of food based on speed and a trainable scaling factor.
        Each species loses a set amount of energy at the end of every day. 
        """
        
        for species_tuple in self.species_list:
            if species_tuple[0] < 1:
                species_tuple[0].energy += probability_of_food * species.speed * self.parameters['dove_factor']
            else:
                species_tuple[0].energy += probability_of_food * species.speed * self.parameters['hawk_factor']
                
            
        energy_loss = constants.ENERGY_LOSS


    def species_reproduce(self) -> None:
        """
        If a species has more than N energy, they reproduce asexually with mutated speed.
        """

        reproduction_threshold = constants.REPRODUCTION_THRESHOLD
        
        for species_tuple in self.species_list:
            if species_tuple[0] >= reproduction_threshold:
                for _ in range(species_tuple[1]):
                    child_aggression = random.normal(loc=species_tuple[0].aggression, scale=mutation_rates["aggression"])
                    new_species = True
                    for existing_species_tuple in self.species_list:
                        if existing_species_tuple[0].aggression == child_aggression:
                            existing_species_tuple[1] += 1
                    if new_species:
                        self.species_list.append((Species(aggression=child_aggression),1))
                        
    
    def species_die(self) -> bool:
        """
        If any species has less than or equal to 0 energy, they die. Energy sampled based on normal distribution with scale as a trainable parameter.

        Returns
        -------
        num_species_alive : int
            The number of species that are still alive in the simulation World
        """

        num_alive_species = 0  # Calculated like this for the logs

        for species_tuple in self.species_list:
            species_tuple[1] = int(species_tuple[1] * (1-norm.cdf(0, species_tuple[0].energy, species_tuple[0].energy * self.parameters['scale'])))
            num_alive_species += species_tuple[1]
                                   
        return num_alive_species

In [None]:
from mlai.deepgp_tutorial import initialize, staged_optimize, posterior_sample, visualize, visualize_pinball

import deepgp
deepgp.DeepGP.initialize=initialize
deepgp.DeepGP.staged_optimize=staged_optimize
deepgp.DeepGP.posterior_sample=posterior_sample
deepgp.DeepGP.visualize=visualize
deepgp.DeepGP.visualize_pinball=visualize_pinball

In [None]:
hidden = 1
model = deepgp.DeepGP([y.shape[1],hidden,x.shape[1]],Y=yhat, X=x, inits=['PCA','PCA'], 
                  kernels=[GPy.kern.RBF(hidden,ARD=True),
                           GPy.kern.RBF(x.shape[1],ARD=True)], # the kernels for each layer
                  num_inducing=50, back_constraint=False)
model.initialize()
for layer in m.layers:
    layer.likelihood.variance.constrain_positive(warning=False)
m.optimize(messages=True,max_iters=100)
m.staged_optimize(messages=(True,True,True))

In [None]:
from emukit.examples.multi_fidelity_dgp.multi_fidelity_deep_gp import DGP_Base, init_layers_mf
from gpflow.kernels import RBF, White, Linear
from gpflow.likelihoods import Gaussian
from gpflow.actions import Loop, Action
from gpflow.mean_functions import Zero
from gpflow.training import AdamOptimizer
import gpflow.training.monitor as mon

In [None]:
def make_dgpMF_model(X, Y, Z):
    
    L = len(X)

    Din = X[0].shape[1]
    Dout = Y[0].shape[1]

    kernels = []
    k_2 = RBF(Din, active_dims=list(range(Din)), variance=1., lengthscales=10., ARD=True)
    kernels.append(k_2)
    for l in range(1,L):
        
        D = Din + Dout
        D_range = list(range(D))
        k_corr_2 = RBF(Din, active_dims=D_range[:Din], lengthscales=0.1,  variance=1.5, ARD=True)
        k_corr = k_corr_2
        
        k_prev = RBF(Dout, active_dims=D_range[Din:], variance = 1., lengthscales=0.1, ARD=True)
        k_in = RBF(Din, active_dims=D_range[:Din], variance=0.1, lengthscales=1., ARD=True)
        k_bias = Linear(Dout, active_dims=D_range[Din:], variance = 1e-6)
        k_in.variance = 1e-6
        k_l = k_corr*(k_prev + k_bias) + k_in
        kernels.append(k_l)

    '''
    A White noise kernel is currently expected by Mf-DGP at all layers except the last.
    In cases where no noise is desired, this should be set to 0 and fixed, as follows:
    
        white = White(1, variance=0.)
        white.variance.trainable = False
        kernels[i] += white
    '''
    for i, kernel in enumerate(kernels[:-1]):
        kernels[i] += White(1, variance=0.)
            
    num_data = 0
    for i in range(len(X)):
        print('\nData at Fidelity ', (i+1))
        print('X - ', X[i].shape)
        print('Y - ', Y[i].shape)
        print('Z - ', Z[i].shape)
        num_data += X[i].shape[0]
        
    layers = init_layers_mf(Y, Z, kernels, num_outputs=1)
        
    model = DGP_Base(X, Y, Gaussian(), layers, num_samples=10, minibatch_size=1000)

    return model

def run(model, lr, iterations, callback=None):
    adam = AdamOptimizer(lr).make_optimize_action(model)
    actions = [adam] if callback is None else [adam, callback]
    loop = Loop(actions, stop=iterations)()
    model.anchor(model.enquire_session())
    
dgp.layers[0].feature.Z.trainable = False
dgp.layers[1].feature.Z.trainable = False
dgp.layers[0].q_sqrt.trainable = False
dgp.likelihood.likelihood.variance.trainable = False
dgp.run_adam(0.01, 1500)