In [37]:
import numpy as np
from random import Random
import pandas as pd
import gymnasium as gym
from gymnasium import spaces
from seeds import known_seeds
import evaluation
from action import ActionSpace

In [38]:
space = spaces.Box(low=0, high=1000000, shape=(6, 7), dtype=np.int32)
space.sample()

array([[302172, 871533, 117545, 362815, 182033, 397766, 516639],
       [322787, 161778, 468511, 664169, 906386, 545880, 574501],
       [660425, 447849, 953794, 296499, 213196, 844293, 100144],
       [565387, 589308, 741326, 678758, 553518, 873053, 353618],
       [429161, 540517, 533817, 429609, 296815, 782600, 494890],
       [227065, 603034, 593544, 984822,  66116, 610823,  74984]],
      dtype=int32)

In [None]:
    #old fleet function
    def init_fleet(self):
        fleet = {"DC1":{},"DC2":{},"DC3":{},"DC4":{}}
        for i in fleet.keys():
            for j in self.server_generations:
                fleet[i].update(j, {})
                fleet[i][j].update("servers", [])
                fleet[i][j].update("timestep_bought", [])
                fleet[i][j].update("total_owned", 0)
        return fleet

In [43]:
from gymnasium.wrappers import FlattenObservation # type: ignore
class CustomEnv(gym.Env):
    def __init__(self):
        super(DominoEnv, self).__init__()

        """
        define self.actionspace and self.observation_space below using 
        variables available in "gym.spaces"
        """

        #agent action space (actions it can make)
        ##self.action_space = 
        
        self.default_demand, self.datacenters, self.servers, self.selling_prices = load_problem_data()

        self.fleet_columns = ['datacenter_id', 'server_generation', 'server_id', 'action',
       'server_type', 'release_time', 'purchase_price', 'slots_size', 'energy_consumption', 
       'capacity', 'life_expectancy', 'cost_of_moving','average_maintenance_fee', 'cost_of_energy',
       'latency_sensitivity', 'slots_capacity', 'selling_price', 'lifespan', 'moved']

        #agent observation space (what the agent can "see"/information that is fed to agent)
        #a 3d array of latency, server_gen, demand, concatenated with 
        #a 3d array of latency, server_gen, supply
        self.observation_space = spaces.Box(low=0, high=1000000, shape=(6, 7), dtype=np.int32)

        self.seeds_array = known_seeds("training")
        self.seed_counter = 0

        self.server_generations = ['CPU.S1', 'CPU.S2', 'CPU.S3', 'CPU.S4', 'GPU.S1', 'GPU.S2', 'GPU.S3']
        self.latencies = ['low', 'medium', 'high']
        self.data_centers = ['DC1', 'DC2', 'DC3', 'DC4']

    #might need func below to convert agent action into a relevant action
    #def conv_agent_action_to_move(self, action):
    
    #returns mask for the action space based on possible plays
    #def valid_action_mask(self):
    
    #initiallise/reset all of the base variables at the end of the "game"
    #has to return a base/initial observation

    def convert_demand_to_observation(self, demand):
        demand_observation = np.zeros((3,7))
        for i in len(self.latencies):
            for j in len(self.server_generations):
                demand_observation[i][j] = demand.query("latency_sensitivity == {self.latencies[i]}")[self.server_generations[j]]
        return demand_observation

    def convert_fleet_to_observation(self, fleet):
        observed_fleet = np.zeros((3,7))
        for i in len(self.data_centers[0:3]):
            for j in len(self.server_generations):
                #filter for the datacenter
                filtered_dc = fleet[fleet["datacenter_id"] == self.datacenters[i]]
                #get sum of the server generation
                gen_total = filterend_dc[filtered_dc["server_generation" == self.server_generations[j]]].sum()
                if(i == "DC3"):
                    #get dc4 and add onto dc3 total
                    filtered_dc = fleet[fleet["datacenter_id"] == self.datacenters[i+1]]
                    gen_total += filterend_dc[filtered_dc["server_generation" == self.server_generations[j]]].sum()
                observed_fleet[i][j] = gen_total
        return observed_fleet

    def init_fleet(self):
        FLEET = pd.DataFrame(columns=self.fleet_columns)

    def reset(self, seed=None, options=None):
        self.timestep = 1
        self.seed_counter += 1
        self.seed_counter %= 10
        np.random.seed(self.seeds_array[self.seed_counter])

        self.fleet = self.init_fleet()
        self.demand = evaluation.get_actual_demand(self.default_demand)
        self.timestep_demand = self.full_demand.query("time_step == {self.timestep}")
        observation_demand = convert_demand_to_observation(self.timestep_demand)
        observation_fleet = convert_fleet_to_observation(self.fleet)
        observation = np.concatenate((observation_demand, observation_fleet))
        self.done = False
        return observation, {}

    # def buy_demand(self, datacenter, server_gen, timestep_demand):
    #     self.fleet
    
    #buys "number" amount of servers at datacenter
    #NO VALIDITY CHECKS RIGHT NOW
    def buy(self, datacenter, server_gen, number=10):
        ts_fleet = pd.DataFrame(columns=self.fleet_columns)
        for i in range(number):
            server_id = ActionSpace.generate_server_id()
            ts_fleet["datacenter_id"] = datacenter
            ts_fleet["server_generation"] = server_gen
            ts_fleet["server_id"] = server_id
            ts_fleet["action"] = "buy"
        ts_fleet = ts_fleet.merge(self.servers, on='server_generation', how='left')
        ts_fleet = ts_fleet.merge(self.datacenters, on='datacenter_id', how='left')
        ts_fleet = ts_fleet.merge(self.selling_prices, 
                            on=['server_generation', 'latency_sensitivity'], 
                            how='left')
        ts_fleet.fillna(0)
        pd.concat([self.fleet, ts_fleet])

    #called in a loop where each time it is called the agent chooses an action and change
    #state of game appropriately according to agent action
    def step(self, action):
        """
        after agent move, do it, calc the new observation state
        and the reward from that move it made, if "end" of game set self.done to True

        """
        actions = ActionSpace.convert_actionspace_to_action(action)
        for i in actions:
            if(i[0] == "buy"):
                self.buy(datacenter = i[4], server_gen = i[1])
        
        Zf = evaluation.get_capacity_by_server_generation_latency_sensitivity(self.fleet)
        D = evaluation.get_time_step_demand(self.demand, self.timestep)
        U = evaluation.get_utilization(D, Zf)

        #check if fleet is empty
        if self.fleet.shape[0] > 0:
            # get the server capacity at timestep
            Zf = get_capacity_by_server_generation_latency_sensitivity(self.fleet)

            # evaluate objective function at current timestep
            U = get_utilization(D, Zf)
    
            L = get_normalized_lifespan(self.fleet)
    
            P = get_profit(D, 
                           Zf, 
                           self.selling_prices,
                           self.fleet)
            o = U * L * P
            OBJECTIVE += o

        #reached final timestep
        if(self.timestep == 168):
            self.done = True
        self.timestep += 1
        score = 0
        reward = 0

        #extra info on the game if wanted for yourself
        info = {}
        truncated = False
        
        self.timestep_demand = self.demand.query("time_step == {self.timestep}")
        
        observation_demand = convert_demand_to_observation(self.timestep_demand)
        observation_fleet = convert_fleet_to_observation(self.fleet)
        observation = np.concatenate((observation_demand, observation_fleet))

        return (observation, reward, self.done, truncated, info)
