In [140]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [141]:
cd "/content/drive/Othercomputers/My Laptop/_Notebooks/"

/content/drive/Othercomputers/My Laptop/_Notebooks


In [142]:
!pip install spektral
!pip install keras-tuner



In [143]:
import random
import numpy as np
import os
import tensorflow as tf

# Set the global seed
_GLOBAL_SEED = 42
random.seed(_GLOBAL_SEED)

# 1. Set `PYTHONHASHSEED` environment variable at a fixed value
os.environ['PYTHONHASHSEED']=str(_GLOBAL_SEED)

# 2. Set `python` built-in pseudo-random generator at a fixed value
random.seed(_GLOBAL_SEED)

# 3. Set `numpy` pseudo-random generator at a fixed value
np.random.seed(_GLOBAL_SEED)

# 4. Set `tensorflow` pseudo-random generator at a fixed value
tf.random.set_seed(_GLOBAL_SEED)


In [144]:
import pandas as pd

# Read the training dataset
train_df = pd.read_csv('../_Dataset/train_dataset.csv')

# Read the test dataset
test_df = pd.read_csv('../_Dataset/test_dataset.csv')


In [145]:
train_disorder_subclass_df = train_df.drop("genetic_disorder", axis=1)
test_disorder_subclass_df = test_df.drop("genetic_disorder", axis=1)

In [146]:
train_disorder_subclass_x = train_disorder_subclass_df.drop("disorder_subclass",axis=1)
train_disorder_subclass_y = train_disorder_subclass_df["disorder_subclass"]

test_disorder_subclass_x = test_disorder_subclass_df.drop("disorder_subclass",axis=1)
test_disorder_subclass_y = test_disorder_subclass_df["disorder_subclass"]

In [147]:
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical

# Split data into training and validation sets
train_disorder_subclass_x, val_disorder_subclass_x, train_disorder_subclass_y, val_disorder_subclass_y = train_test_split(train_disorder_subclass_x, train_disorder_subclass_y, test_size=0.1, random_state=_GLOBAL_SEED)

train_disorder_subclass_y = to_categorical(train_disorder_subclass_y)
val_disorder_subclass_y = to_categorical(val_disorder_subclass_y)
test_disorder_subclass_y = to_categorical(test_disorder_subclass_y)

In [148]:
from sklearn.preprocessing import StandardScaler

# Initialize the scaler
scaler = StandardScaler()

# Fit the scaler and transform the training data
train_disorder_subclass_x = scaler.fit_transform(train_disorder_subclass_x)

# Use the same scaler to transform the val data
val_disorder_subclass_x = scaler.transform(val_disorder_subclass_x)

# Use the same scaler to transform the test data
test_disorder_subclass_x = scaler.transform(test_disorder_subclass_x)

In [149]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import networkx as nx
from scipy.sparse import csr_matrix

# Compute similarity matrices for both datasets
similarity_matrix_train = cosine_similarity(train_disorder_subclass_x)
similarity_matrix_val = cosine_similarity(val_disorder_subclass_x)
similarity_matrix_test = cosine_similarity(test_disorder_subclass_x)

# Convert the similarity matrices into sparse adjacency matrices
threshold = 0.1
sparse_adjacency_matrix_train = csr_matrix(np.where(similarity_matrix_train > threshold, 1, 0))
sparse_adjacency_matrix_val = csr_matrix(np.where(similarity_matrix_val > threshold, 1, 0))
sparse_adjacency_matrix_test = csr_matrix(np.where(similarity_matrix_test > threshold, 1, 0))


# Create the graphs
G_train = nx.from_numpy_array(sparse_adjacency_matrix_train)
G_val = nx.from_numpy_array(sparse_adjacency_matrix_val)
G_test = nx.from_numpy_array(sparse_adjacency_matrix_test)

KeyboardInterrupt: 

In [None]:
import networkx as nx

nx.write_graphml(G_train, "G_train.graphml")

In [None]:
# # Get a list of connected components (subgraphs)
# connected_components = list(nx.connected_components(G_train))

# # Iterate over the connected components and remove the ones with less than 3 nodes
# for component in connected_components:
#     if len(component) < 3:
#         G_train.remove_nodes_from(component)

In [None]:
# import networkx as nx

# nx.write_graphml(G_train, "G_train_deleted_nodes.graphml")

In [None]:
import spektral

# Convert NetworkX graphs to adjacency matrices
A_train = nx.to_numpy_array(G_train)
A_val = nx.to_numpy_array(G_val)
A_test = nx.to_numpy_array(G_test)

# Normalize adjacency matrices
A_train_tensor = spektral.utils.normalized_adjacency(A_train)
A_val_tensor = spektral.utils.normalized_adjacency(A_val)
A_test_tensor = spektral.utils.normalized_adjacency(A_test)

Define the GNN

In [None]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from spektral.layers import GCNConv

class GNN(Model):
    def __init__(self, input_dim, conv_sizes, n_classes, learning_rate):
        super().__init__()
        self.convs = [GCNConv(conv_sizes[0], input_dim=input_dim)] + [GCNConv(size) for size in conv_sizes[1:]]
        self.dense = Dense(n_classes, activation='softmax')
        self.optimizer = tf.keras.optimizers.Adam(learning_rate)

    def call(self, inputs, training=False):
        x, a = inputs
        for conv in self.convs:
            x = conv([x, a])
        return self.dense(x)


In [None]:
from spektral.data import Graph, Dataset, SingleLoader

class MyDataset(Dataset):
    def __init__(self, graphs, **kwargs):
        self.graphs = graphs
        super().__init__(**kwargs)

    def read(self):
        return self.graphs

In [None]:
from kerastuner import HyperModel

class GNNHyperModel(HyperModel):
    def __init__(self, input_dim, output_dim):
        self.input_dim = input_dim
        self.output_dim = output_dim

    def build(self, hp):
        model = GNN(
            input_dim=self.input_dim,
            conv_sizes=[hp.Int('conv_size_' + str(i), min_value=32, max_value=512, step=32) for i in range(hp.Int('num_convs', 1, 3))],
            n_classes=self.output_dim,
            learning_rate=hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')
        )

        model.compile(
            optimizer=model.optimizer,
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        return model

Perform Hyperparameter tuning

In [None]:
from kerastuner.tuners import RandomSearch
from kerastuner.engine.hyperparameters import Float

_NUM_EPOCHS = 10
_NUM_CLASSES = 3

n_features = train_disorder_subclass_x.shape[1]  # Number of features

hypermodel = GNNHyperModel(n_features, _NUM_CLASSES)

tuner = RandomSearch(
    hypermodel,
    objective='val_accuracy',
    max_trials=5,
    executions_per_trial=3,
    directory='gnn_models',
    project_name='genetic_disorder'
)

tuner.search_space_summary()

# Create Graph objects
graph_train = Graph(x=train_disorder_subclass_x, a=A_train_tensor, y=train_disorder_subclass_y)
graph_val = Graph(x=val_disorder_subclass_x, a=A_val_tensor, y=val_disorder_subclass_y)
graph_test = Graph(x=test_disorder_subclass_x, a=A_test_tensor, y=test_disorder_subclass_y)

# Create a list of Graph objects
graphs_train = [graph_train]
graphs_val = [graph_val]
graphs_test = [graph_test]

# Create Dataset
dataset_train = MyDataset(graphs_train)
dataset_val = MyDataset(graphs_val)
dataset_test = MyDataset(graphs_test)

# Create SingleLoader
loader_train = SingleLoader(dataset_train)
loader_val = SingleLoader(dataset_val)
loader_test = SingleLoader(dataset_test)


tuner.search(x=loader_train.load(),
             steps_per_epoch=loader_train.steps_per_epoch,
             validation_data=loader_val.load(),
             validation_steps=loader_val.steps_per_epoch,
             epochs=_NUM_EPOCHS
)

# Get the optimal hyperparameters
best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]

# Build the model with the optimal hyperparameters
model = tuner.hypermodel.build(best_hp)

# Train the model
history = model.fit(
    loader_train.load(),
    steps_per_epoch=loader_train.steps_per_epoch,
    validation_data=loader_test.load(),
    validation_steps=loader_test.steps_per_epoch,
    epochs=_NUM_EPOCHS
)

In [None]:
import numpy as np
from sklearn.metrics import recall_score, mean_squared_error

# Evaluate the model on the testing dataset
test_loss, test_accuracy = model.evaluate(loader_test.load(), steps=loader_test.steps_per_epoch)

# Make predictions on the testing dataset
test_predictions = model.predict(loader_test.load(), steps=loader_test.steps_per_epoch)

# Convert the predictions to class labels
test_predictions = np.argmax(test_predictions, axis=1)

test_disorder_subclass_y = np.argmax(test_disorder_subclass_y, axis=1)

# Calculate the recall
test_recall = recall_score(test_disorder_subclass_y, test_predictions, average='macro')

# Calculate the mean squared error
test_mse = mean_squared_error(test_disorder_subclass_y, test_predictions)

# Print the results
print("Test Accuracy:", test_accuracy)
print("Test Recall:", test_recall)
print("Test Mean Squared Error:", test_mse)
