In [1]:
import random
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from tqdm import tqdm
from gymnasium import Env
from gymnasium.spaces import Box
import copy
import json
import pyarrow as pa
import pyarrow.parquet as pq
import ast

In [2]:
class Architecture:
    def __init__(self):
        self.space_flag = 0
        self.c_capacity, self.s_capacity = 1000, 100                                         # Capacity of the devices
        # self.a1, self.a2, self.a3, self.mc = 0.000125, 0.000010, 0.000200, 0.001           # Weight of every type of data
        self.weights = [0.0125, 0.0010, 0.0200, 0.1000]                                      # a1, a2, a3, mc
        self.lifetime = {'a1': 10000, 'a2': 20000, 'a3': 10000, 'mc': 1000}                  # Max lifetime of the different data types
        self.total_c = 2
        self.total_s = 6
        self.max_actions = 20

        self.devices = {}
        self.data = {
            'a1': {},
            'a2': {},
            'a3': {},
            'mc': {},
        }
        self.latencies = {}

        for i in range(self.total_c):
            for j in (self.data.keys()):
                self.data[j]['c'+str(i)] = 0
            self.devices['c'+str(i)] = {'a1': 0, 'a2': 0, 'a3': 0, 'mc': 0}
            self.latencies['c'+str(i)] = {}
        for i in range(self.total_s):
            for j in (self.data.keys()):
                self.data[j]['s'+str(i)] = 0
            self.devices['s'+str(i)] = {'a1': 0, 'a2': 0, 'a3': 0, 'mc': 0}
            self.latencies['s'+str(i)] = {}

        for i in self.latencies.keys():
            for j in self.latencies.keys():
                self.latencies[i][j] = 0

        self.data_types = []
        self.data_allocation = []
        self.data_times = []

        self.data_allocation_dict = []

        self.action_type = []
        self.action_device_dev = []
        self.action_device = []

        randomized = list(self.devices.keys())
        random.shuffle(randomized)

        bound = round(len(self.devices)/4)

        self.clusters = {
            'a1': randomized[:bound],
            'a2': randomized[bound:bound*2],
            'a3': randomized[bound*2:bound*3],
            'mc': randomized[bound*3:]
        }

        self.visualization()

    def to_int(self, device):
        return list(self.devices.keys()).index(device)

    def to_device(self, position):
        return list(self.devices.keys())[position]

    def construct_dictionaries(self):
        for i in self.data:
            for j in self.data[i]:
                self.data[i][j] = 0
        for i in self.devices:
            for j in self.devices[i]:
                self.devices[i][j] = 0
        for device, d_type in zip(self.data_allocation_dict,self.data_types):
            self.data[d_type][device] += 1
            self.devices[device][d_type] += 1

    def update(self, data_type, device):
        self.data_types.append(data_type)
        self.data_allocation_dict.append(device)
        self.data_allocation.append(self.to_int(device))
        self.data_times.append(0)
        self.devices[device][data_type] += 1
        self.data[data_type][device] += 1

    def heart_beat(self):
        deads = []
        for i in range(len(self.data_times)):
            self.data_times[i] += 1
            if self.data_times[i] == self.lifetime[self.data_types[i]]:
                deads.append(i)

        removed = 0
        for i in deads:
            self.devices[self.data_allocation_dict[i-removed]][self.data_types[i-removed]] -= 1
            self.data[self.data_types[i-removed]][self.data_allocation_dict[i-removed]] -= 1
            self.data_types.pop(i - removed)
            self.data_allocation_dict.pop(i - removed)
            self.data_allocation.pop(i - removed)
            self.data_times.pop(i - removed)
            removed += 1

    def free_space(self):
        load = {}
        for i in self.devices.keys():
            if list(i)[0] == 'c':
                load[i] = (self.c_capacity - sum([a*b for a,b in zip(list(self.devices[i].values()), self.weights)]))/self.c_capacity
            else:
                load[i] = (self.s_capacity - sum([a*b for a,b in zip(list(self.devices[i].values()), self.weights)]))/self.s_capacity
            if load[i] <= 0.2:
                self.space_flag = i
        return load

    def visualization(self):
        keys = list(self.devices.keys())
        cs = keys[:self.total_c]
        ss = keys[self.total_c:]

        colors = []
        sizes = []

        graph = nx.Graph()

        colors.append('lightblue')
        sizes.append(1000)
        index = 0
        for i in range(len(cs)):
            if i != len(cs)-1:
                colors.append('lightblue')
                sizes.append(1000)
                graph.add_edge(cs[i],cs[i+1])
                graph.add_weighted_edges_from([(cs[i],cs[i+1],10)])
                for j in range(int(len(ss)/self.total_c)):
                    colors.append('orange')
                    sizes.append(100)
                    graph.add_edge(cs[i],ss[index])
                    graph.add_weighted_edges_from([(cs[i],ss[index],4)])
                    index += 1
            else:
                graph.add_edge(cs[i],cs[0])
                graph.add_weighted_edges_from([(cs[i],cs[0],10)])
                for j in range(int(len(ss)/self.total_c)+len(ss)%self.total_c):
                    graph.add_edge(cs[i],ss[index])
                    graph.add_weighted_edges_from([(cs[i],ss[index],4)])
                    colors.append('orange')
                    sizes.append(100)
                    index += 1

        pos = nx.spring_layout(graph)  # Position nodes using a spring layout algorithm
        nx.draw(graph, pos, with_labels=True, node_size=sizes, node_color=colors, font_weight=12, font_color='black', edge_color='gray')
        edge_labels = nx.get_edge_attributes(graph, 'weight')
        nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)

        plt.title("Architecture:")
        plt.axis('off')
        plt.show()

        for u in graph.nodes:
            shortest_paths = nx.shortest_path_length(graph, source=u, weight='weight')
            for v, weight in shortest_paths.items():
                self.latencies[u][v] = weight

    def compute_total_latency(self):
        total_latency = 0
        for i in range(len(self.action_type)):
            total_latency += sum([a*b for a,b in zip(self.data[self.action_type[i]].values(), self.latencies[self.action_device_dev[i]].values())])
        return total_latency

    def generate(self, operation):
        if operation == 'a1' or operation == 'a2' or operation == 'a3':
            for i in self.devices.keys():
                self.update(operation,i)
        elif operation == 'mc':
            for i in self.devices.keys():
                if list(i)[0] == 's':
                    self.update('mc',i)
        elif operation.split('_')[0] == 'ai':
            device=random.sample(list(self.clusters[operation.split('_')[1]]),1)
            self.action_device_dev.append(device[0])
            self.action_device.append(self.to_int(device[0]))
            self.action_type.append(operation.split('_')[1])
            difference = len(self.action_device_dev) - self.max_actions
            if difference >= 0:
                for i in range(difference):
                    self.action_device_dev.pop(0)
                    self.action_device.pop(0)
                    self.action_type.pop(0)

    def greedy_algorithm(self): # baseline?
        if self.space_flag == 0:
            allocation = []
            allocation_dev = []
            for i in self.data_types:
                allocation.append(self.to_int(random.sample(list(self.clusters[i]),1)[0]))
                allocation_dev.append(random.sample(list(self.clusters[i]),1)[0])
            self.data_allocation = allocation
            self.data_allocation_dict = allocation_dev
            self.construct_dictionaries()
            result = self.free_space()
        else:
            problematic_key = None
            for key, value in self.clusters.items():
                if self.space_flag in value:
                    problematic_key = key
                    break
            self.clusters[problematic_key].extend(element for element in self.devices if element[0]==('c'))
            self.space_flag = 0
            result = self.greedy_algorithm()

        return result

In [3]:
class SimulatedArchitecture(Env):
    def __init__(self, architecture, num_devices):
        self.arch = architecture
        self.num_devices = num_devices
        self.weights = arch.weights
        self.free_space = list(arch.free_space().values())
        self.reward = 0
        self.executions = ['a1','a2','mc','a1','ai_a2','mc','a3','ai_a1','mc','a1','ai_a3','mc','a2','a3','ai_a2','mc','ai_a1','a1','mc','ai_a3','ai_mc']
        self.len_executions = len(self.executions)
        self.index = 0
    def get_obs(self):
        return {"weights": self.weights,
                "free_space": self.free_space,
                "allocation": self.allocation,
                "last_calls": self.last_calls
                }
    def step(self, action):
        latency = self.arch.compute_total_latency()
        self.free_space = list(self.arch.free_space().values())
        self.reward = 0

        for i in self.free_space:
            if i <= 0.2:
                self.reward = -1
                break

        if self.reward != -1:
            self.reward = 4000000*(1/(latency+1)) + 0.5*(1/(np.std(self.free_space)+0.00001))              # With those weights, the scale of both latency and std is the same for high values while the latency is the only one taken into account in initial states (when there is plenty of space in the system and distribution is not that important)
        info = {}
        done = True

        return self.get_obs(), self.reward, done, info

    def render(self):
        pass
    def reset(self):
        if self.reward != -1:
            self.arch.generate(self.executions[self.index])
            self.arch.heart_beat()
            self.index = self.index + 1 if self.index < self.len_executions-1 else 0
        self.allocation = [self.arch.data_types,self.arch.data_allocation]
        self.last_calls = [self.arch.action_type,self.arch.action_device]
        self.action_space = Box(low=0, high=self.num_devices-1, shape=(len(self.allocation[1]),), dtype=np.int16)
    def close(self):
        pass

In [5]:
def getData():
    observations_file_path = 'TFM/observations.csv'
    next_observations_file_path = 'TFM/next_observations.csv'
    actions_file_path = 'TFM/actions.parquet'
    rewards_file_path = 'TFM/rewards.parquet'

    observations_pd = pd.read_csv(observations_file_path)
    observations = observations_pd.to_dict(orient='records')
    next_observations_pd = pd.read_csv(next_observations_file_path)
    next_observations = next_observations_pd.to_dict(orient='records')
    actions_table = pq.read_table(actions_file_path)
    rewards_table = pq.read_table(rewards_file_path)
    actions = np.array(actions_table.to_pandas())
    rewards = np.array(rewards_table.to_pandas())

    return {"observations": observations, "actions":actions, "rewards": rewards, "next_observations":next_observations}

def processing(dataset):
    for i in range(len(dataset)):
        offline_dataset['observations'][i]["weights"] = ast.literal_eval(offline_dataset['observations'][i]["weights"])
        offline_dataset['observations'][i]["free_space"] = ast.literal_eval(offline_dataset['observations'][i]["free_space"])
        offline_dataset['observations'][i]['allocation'] = ast.literal_eval(offline_dataset['observations'][i]['allocation'])
        offline_dataset['observations'][i]["last_calls"] = ast.literal_eval(offline_dataset['observations'][i]["last_calls"])
        offline_dataset['next_observations'][i]["weights"] = ast.literal_eval(offline_dataset['next_observations'][i]["weights"])
        offline_dataset['next_observations'][i]["free_space"] = ast.literal_eval(offline_dataset['next_observations'][i]["free_space"])
        offline_dataset['next_observations'][i]['allocation'] = ast.literal_eval(offline_dataset['next_observations'][i]['allocation'])
        offline_dataset['next_observations'][i]["last_calls"] = ast.literal_eval(offline_dataset['next_observations'][i]["last_calls"])

In [None]:
offline_dataset = getData()
len(offline_dataset)

In [104]:
processing(offline_dataset)

In [115]:
free_space = pd.DataFrame(columns=['c0','c1','s0','s1','s2','s3','s4','s5'])
for i in tqdm(offline_dataset["observations"]):
    free_space.loc[len(free_space)] = i["free_space"]
free_space

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 63000/63000 [02:46<00:00, 378.11it/s]


Unnamed: 0,c0,c1,s0,s1,s2,s3,s4,s5
0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
1,1.0,1.0,1.0,1.0,1.0,0.9995,1.0,0.9995
2,0.999995,0.999997,1.0,1.0,1.0,0.999375,1.0,0.999625
3,0.999996,0.999996,1.0,1.0,0.997,0.999375,0.997,0.999625
4,"[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124...","[0.999994, 0.999998, 1.0, 1.0, 0.996, 0.999124..."
...,...,...,...,...,...,...,...,...
62995,"[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,...","[0.901356, 0.8685, 0.2242, 0.5173749999999999,..."
62996,"[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5...","[0.903366, 0.8644375, 0.23579999999999998, 0.5..."
62997,"[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000...","[0.8984985000000001, 0.86765, 0.24060000000000..."
62998,"[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156...","[0.904021, 0.8651, 0.23200000000000004, 0.5156..."


In [122]:
offline_dataset["observations"][4]['last_calls']

'[[], []]'