In [1]:
import os 
import pickle 
from natsort import natsorted

def load_data_as_dict(directory_path):
    """
    Return pickle file in directory_path as a list. 
    Also returns a list of all the unique id's in the dataset. 
    Why do we need unique id? 
        Lets say a task is randomly mapped 100 times. 
        All the 100 data (dict in pickle file) will have the same id. 
        So it is easier to retrieve all the mapping for a single task
    """
    entries = os.listdir(directory_path)
    files = natsorted([entry for entry in entries if os.path.isfile(os.path.join(directory_path, entry))])

    list_of_dicts = []
    list_of_uuids = []

    for file_name in files:
        file_path = os.path.join(directory_path, file_name)

        with open(file_path, 'rb') as file:
            data_dict = pickle.load(file)
            list_of_dicts.append(data_dict)

            uuid = data_dict['task_dag'].id

            if uuid not in list_of_uuids:
                list_of_uuids.append(uuid)

    return list_of_dicts, list_of_uuids

In [2]:
dataset, _ = load_data_as_dict('data/task_from_graph')

In [3]:
import networkx as nx
dataset[0]['task_dag'].graph
def get_node_attribute_types(G):
    attribute_types = {}

    for node, attributes in G.nodes(data=True):
        for attr_name, attr_value in attributes.items():
            attribute_types[attr_name] = type(attr_value)

    return attribute_types

get_node_attribute_types(dataset[0]['task_dag'].graph)

{'delay': int}

In [4]:
import pandas as pd 
df = pd.DataFrame(dataset)
df['network_processing_time'] = pd.to_numeric(df['network_processing_time'])
df['network_processing_time'].describe()

count    81200.000000
mean      2040.257993
std        354.290683
min       1027.000000
25%       1738.000000
50%       2033.000000
75%       2322.000000
max       3672.000000
Name: network_processing_time, dtype: float64

In [5]:
import os 
from torch_geometric.utils import from_networkx
from torch_geometric.data import Data, Batch
import torch

def convert_graph_to_tensor(graph, latency):
    # graph_tensor = from_networkx(graph, group_node_attrs=['weight', 'type'])
    graph_tensor = from_networkx(graph)
    graph_tensor.y = torch.tensor([latency])
    # graph_tensor.x[:, 0] = graph_tensor.x[:, 0] / 100
    # del graph_tensor.pos
    return graph_tensor

def convert_edge_index(edge_index, num_of_tasks):
    converted_edge_index = []
    node_mapping = {'Start': 0, 'Exit': num_of_tasks}

    for src, dest in edge_index:
        if src == 'Start':
            src = node_mapping[src]

        if dest == 'Exit':
            dest = node_mapping[dest]

        converted_edge_index.append((int(src), int(dest)))

    return converted_edge_index

directory_path = 'data/task_from_graph_tensor'

if not os.path.exists(directory_path):
    os.makedirs(directory_path)
    print(f"Directory '{directory_path}' created successfully.")
else:
    print(f"Directory '{directory_path}' already exists.")

for idx, data in enumerate(dataset):
    
    task_dag = data['task_dag']
    task_processing_time = float(data['network_processing_time'])
    target_value = torch.tensor([task_processing_time]).float()

    task_graph = task_dag.graph
    edge_index = list(task_graph.edges)
    
    total_tasks = len(task_graph.nodes)
    last_task = len(task_graph.nodes) - 1
    
    converted_edge_index = convert_edge_index(edge_index, last_task)
    converted_edge_index_torch = torch.tensor(converted_edge_index, dtype=torch.long).t().contiguous()

    dummy_input = torch.ones(total_tasks).view(-1,1)
    data = Data(x=dummy_input,edge_index=converted_edge_index_torch, y=target_value)

    # print(data.x.shape)
    # break
    torch.save(data, f'{directory_path}/graph_{idx}.pt')

Directory 'data/task_from_graph_tensor' already exists.
