In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import SGD
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv, Linear, GATConv

import torch_geometric.transforms as T
from torch_geometric.utils import to_undirected
from torchviz import make_dot

import networkx as nx
from networkx.classes.function import density, degree
from scipy.stats import pearsonr
import optuna

import csv
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['figure.dpi'] = 360

In [None]:
def graph_r(r):

    nodes = []
    edges = []
    mass = []

    with open(f'./data/rosette{r}_nodes.csv', mode='r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            if (row!=0):
                values = list(row.values())
                n = []
                n.append(float(values[0]))
                n.extend(22.5-2.5*np.log10([float(n) for n in values[2:-1]]))
                n.append(float(values[-1]))
                nodes.append(n)
                mass.append(float(values[1]))

    with open(f'./data/rosette{r}_edges.csv', mode='r') as csv_file:
        csv_reader = csv.DictReader(csv_file)
        for row in csv_reader:
            if (row!=0):
                edges.append([float(n) for n in list(row.values())])

    return (nodes,edges,mass) 

In [None]:
rosettes = [3,6,7,11,12,13,14,15,18,19]
props = ['log flux_g','log flux_r','log flux_z','log flux_w1','log flux_w2','z']

In [None]:
nodes, edges, mass = graph_r(rosettes[0])
graph = nx.Graph()
for i in range(len(nodes)):
    graph.add_node(i, attr=nodes[i][1:])

id_to_position = {node[0]: i for i, node in enumerate(nodes)}
mapped_edges = [[id_to_position[edge[0]], id_to_position[edge[1]], edge[2]] for edge in edges]
for edge in mapped_edges:
    graph.add_edge(edge[0], edge[1], weight=edge[2])

graph = graph.to_undirected()

In [None]:
edge_index = torch.tensor(list(graph.edges), dtype=torch.long).t().contiguous()
edge_attr = torch.tensor([[graph[u][v]['weight']] for u, v in graph.edges], dtype=torch.float)
x = torch.tensor([graph.nodes[i]['attr'] for i in range(len(graph.nodes))], dtype=torch.float)
x = F.normalize(x, dim=0)
mass = torch.tensor([mass[i] for i in range(len(graph.nodes))], dtype=torch.float)

num_nodes = len(nodes)
train_percentage = 0.8
train_mask = torch.zeros(num_nodes, dtype=torch.bool)
num_train_nodes = int(num_nodes*train_percentage)
train_mask[:num_train_nodes] = True
test_mask = ~train_mask

graph = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=mass, train_mask=train_mask, test_mask=test_mask)

In [None]:
graph.validate(raise_on_error=True)

In [None]:
class GNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout_rate=0.5):
        super(GNN, self).__init__()

        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.convs = nn.ModuleList([GCNConv(hidden_dim, hidden_dim) for _ in range(num_layers)])
        self.bn_layers = nn.ModuleList([nn.BatchNorm1d(hidden_dim) for _ in range(num_layers)])
        self.conv_out = GCNConv(hidden_dim, output_dim)
        self.dropout = dropout_rate

    def forward(self, data):
        x, edge_index, edge_attr = data.x, data.edge_index, data.edge_attr
        x = F.relu(self.conv1(x, edge_index, edge_attr))
        x = self.bn1(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        for conv, bn in zip(self.convs, self.bn_layers):
            x = F.relu(conv(x, edge_index, edge_attr))
            x = bn(x)
            x = F.dropout(x, p=self.dropout, training=self.training)

        x = self.conv_out(x, edge_index, edge_attr)

        return x

In [None]:
class ModelOptimizaton:

    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout_rate):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_layers = num_layers
        self.dropout_rate = dropout_rate

    def objective(self, trial):
        input_dim = trial.suggest_int('input_dim', 1, 10)
        hidden_dim = trial.suggest_int('hidden_dim', 8, 124)
        output_dim = trial.suggest_int('output_dim', 1, 10)
        num_layers = trial.suggest_int('num_layers', 1, 64)
        dropout_rate = trial.suggest_float('dropout_rate', 0, 1)
        gnn = GNN(self, input_dim, hidden_dim, output_dim, num_layers, dropout_rate)
        gnn.fit(self.x_train, self.y_train)
        return regressor.score(self.x_test, self.y_test)

if __name__=='__main__':
    nodes, _, _ = graph_r(rosettes[0])
    model = ModelOptimizaton(x_train, x_test, y_train, y_test)
    study = optuna.create_study(direction='maximize')
    study.optimize(model.objective, n_trials=100)

In [None]:
input_dim, hidden_dim, output_dim, num_layers = 6, 64, 1, 2
model = GNN(input_dim, hidden_dim, output_dim, num_layers, dropout_rate=0.5)

print(model)
print(f'Parameters: {sum(p.numel() for p in model.parameters())}')
tcv = make_dot(model(graph), params=dict(list(model.named_parameters()))).render("./torchviz/gnn_torchviz", format="png")