Code for implementing the distributed Finite-Time consensus experiments.

Parts of this code are taken from Fernando Gama, available at: https://github.com/alelab-upenn/graph-neural-networks.

## Imports

In [None]:
#Standard libraries:
import os
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import pickle
import datetime
from copy import deepcopy

import torch; torch.set_default_dtype(torch.float64)
import torch.nn as nn
import torch.optim as optim

#GNNs libraries:
import Utils.graphTools as graphTools
import Utils.graphAdaptiveActivations as gaActivations
import Utils.dataTools
import Utils.graphML as gml
import Utils.graphML as gml
import Modules.architectures as archit
import Modules.model as model
import Modules.train as train
import Modules.loss as loss
from Utils.miscTools import writeVarValues
from Utils.miscTools import saveSeed

# Generate and Save Graphs - Only Once

These are the parameters for generating the graph. Below I am generating an SBM graph using and saving it so I can later always use the same graphs.

In [None]:
graphType = 'SBM' 
nNodes = 20 # Number of nodes
nClasses = 2 # Number of classes (i.e. number of communities)
graphOptions = {} # Dictionary of options to pass to the graphTools.createGraph function
graphOptions['nCommunities'] = nClasses # Number of communities
graphOptions['probIntra'] = 0.8 # Probability of drawing edges intra communities
graphOptions['probInter'] = 0.1 # Probability of drawing edges inter communities

I create 10 different graphs.

In [None]:
allGraphs = []
allAdjMatrix = []

for i in range(0, 7):
    G = graphTools.Graph(graphType, nNodes, graphOptions)
    allGraphs.append(G)
    allAdjMatrix.append(G.W)

And I save them.

In [None]:
with open('graphs', 'wb') as fp:
    pickle.dump(allAdjMatrix, fp)

# Setting Parameters

Create directories for saving the results.

In [None]:
graphType = 'SBM_08_01'
thisFilename = 'Finite_Time_Consensus' # General name of all related files
saveDirRoot = 'experiments' # Relative location
saveDir = os.path.join(saveDirRoot, thisFilename) # Where we save the results from each run
dataDir = os.path.join('datasets','movielens')

We can create a `.txt` for saving the parameters setted.

In [None]:
today = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
saveDir = saveDir + '-' + graphType + '-' + today

# Create directory 
if not os.path.exists(saveDir):
    os.makedirs(saveDir)
    
# Create the file where all the (hyper)parameters and results will be saved.
varsFile = os.path.join(saveDir,'hyperparameters.txt')
with open(varsFile, 'w+') as file:
    file.write('%s\n\n' % datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

We save the seeds for reproducibility.

In [None]:
#   PyTorch seeds
torchState = torch.get_rng_state()
torchSeed = torch.initial_seed()

#   Numpy seeds
numpyState = np.random.RandomState().get_state()

#   Collect all random states
randomStates = []
randomStates.append({})
randomStates[0]['module'] = 'numpy'
randomStates[0]['state'] = numpyState
randomStates.append({})
randomStates[1]['module'] = 'torch'
randomStates[1]['state'] = torchState
randomStates[1]['seed'] = torchSeed

saveSeed(randomStates, saveDir)

If available, we use GPU.

In [None]:
useGPU = True

# Training Parameters

Optimizer:

In [None]:
trainer = 'ADAM' 
learningRate = 0.001 
beta1 = 0.9 
beta2 = 0.999 

Loss function:

In [None]:
lossFunction = torch.nn.MSELoss

General training parameters:

In [None]:
nEpochs = 500 # Number of epochs
batchSize = 20 # Batch size
doLearningRateDecay = False # Learning rate decay
learningRateDecayRate = 0.9 # Rate
learningRateDecayPeriod = 1 # How many epochs after which update the learning rate
validationInterval = 5 # How many training steps to do the validation

We can save the values:

In [None]:
writeVarValues(varsFile,
               {'trainer': trainer,
                'learningRate': learningRate,
                'beta1': beta1,
                'beta2': beta2,
                'lossFunction': lossFunction,
                'nEpochs': nEpochs,
                'batchSize': batchSize,
                'doLearningRateDecay': doLearningRateDecay,
                'learningRateDecayRate': learningRateDecayRate,
                'learningRateDecayPeriod': learningRateDecayPeriod,
                'validationInterval': validationInterval})

# Architecture

I am using the functionality for training multiple models provided in Fernando Gama's code, in case we want to add more models later. But for now I am only using a local GNN with 2 layers.

In [None]:
modelList = []

In [None]:
hParamsLclGNN = {} #Hyperparameters for the Local GNN (LclGNN)
    
hParamsLclGNN['name'] = 'LclGNN'
hParamsLclGNN['archit'] = archit.LocalGNN #Architecture
    
# Graph convolutional parameters
hParamsLclGNN['dimNodeSignals'] = [1, 32, 32] #Features per layer
hParamsLclGNN['nFilterTaps'] = [10, 10] #Number of filter taps per layer
hParamsLclGNN['bias'] = True #Decide whether to include a bias term

# Nonlinearity
hParamsLclGNN['nonlinearity'] = gaActivations.MaxGAActivation

# Pooling
hParamsLclGNN['poolingFunction'] = gml.NoPool #Summarizing function -> No pooling performed
hParamsLclGNN['nSelectedNodes'] = None # To be determined later on
hParamsLclGNN['poolingSize'] = [1, 1] #poolingSize-hop neighborhood that is affected by the summary -> Note that 
                                        #in this code no pooling is performed
    
# Readout layer: local linear combination of features
hParamsLclGNN['dimReadout'] = [1] #Dimension of the fully connected layers after the GCN layers 
        
# Graph structure
hParamsLclGNN['GSO'] = None #To be determined later on, based on data
hParamsLclGNN['order'] = None #Not used because there is no pooling

hParamsLclGNN2Ly = deepcopy(hParamsLclGNN)

hParamsLclGNN2Ly['name'] += '2Ly' # Name of the architecture

#Save Values:
writeVarValues(varsFile, hParamsLclGNN2Ly)
modelList += [hParamsLclGNN2Ly['name']]

# Logging

In [None]:
doPrint = True #Decide whether to print while running
doLogging = True #Log into tensorboard
doSaveVars = True #Save (pickle) useful variables
doFigs = True #Plot some figures (this only works if doSaveVars is True)

# Parameters:
printInterval = 0 # After how many training steps, print the partial results
#   0 means to never print partial results while training
xAxisMultiplierTrain = 10 # How many training steps in between those shown in
    # the plot, i.e., one training step every xAxisMultiplierTrain is shown.
xAxisMultiplierValid = 1 # How many validation steps in between those shown,
    # same as above.
figSize = 5 # Overall size of the figure that contains the plot
lineWidth = 2 # Width of the plot lines
markerShape = 'o' # Shape of the markers
markerSize = 3 # Size of the markers

#Save values:
writeVarValues(varsFile,
               {'doPrint': doPrint,
                'doLogging': doLogging,
                'doSaveVars': doSaveVars,
                'doFigs': doFigs,
                'saveDir': saveDir,
                'printInterval': printInterval,
                'figSize': figSize,
                'lineWidth': lineWidth,
                'markerShape': markerShape,
                'markerSize': markerSize})

# Setup

Determine the device.

In [None]:
if useGPU and torch.cuda.is_available():
    device = 'cuda:0'
    torch.cuda.empty_cache()
else:
    device = 'cpu'

    
if doPrint:
    print("Device selected: %s" % device)

In [None]:
if doLogging:
    # If logging is on, load the tensorboard visualizer and initialize it
    from Utils.visualTools import Visualizer
    logsTB = os.path.join(saveDir, 'logsTB')
    logger = Visualizer(logsTB, name='visualResults')

Next, we set the training options.

In [None]:
trainingOptions = {}

if doLogging:
    trainingOptions['logger'] = logger
if doSaveVars:
    trainingOptions['saveDir'] = saveDir
if doPrint:
    trainingOptions['printInterval'] = printInterval
if doLearningRateDecay:
    trainingOptions['learningRateDecayRate'] = learningRateDecayRate
    trainingOptions['learningRateDecayPeriod'] = learningRateDecayPeriod
trainingOptions['validationInterval'] = validationInterval

# Data Generation

We can generate the data, for example, by drawing from a Gaussian distribution. When it comes to the format of the data, we can choose in which way to define the labels. We can either assign one label per data point (i.e. we are interested in predicting the value of the signal at only one node) or mutiple labels per data point (i.e. we are interested in predicting the missing signal at multiple nodes).

NOTE: If multiple labels are defined per data point, the size of the labels list should be the same as the batch size.

In [None]:
def data_generation_finite_time_consensus_multiple_labels():
    signals = []
    labels = []
    labelIDs = []
    labels_nodes = [i for i in range(0, 100)]
    
    for i in range(0, 2000):
        sg = np.random.normal(0,1,20)
        mean_sg = np.mean(sg)
        
        signals.append(sg)
        labels.append(mean_sg)
        labelIDs.append(labels_nodes)

    return signals, labels, labelIDs

In [None]:
def data_generation_finite_time_consensus_one_label():
    signals = []
    labels = []
    labelIDs = []
    labels_nodes = [i for i in range(0, 100)]
    
    for i in range(0, 100):
        sg = np.random.normal(0,1,20)
        mean_sg = np.mean(sg)        
        
        for labelNode in range (0, 20):
            labels.append(mean_sg)
            signals.append(sg)
            labelIDs.append(labelNode)
            
    return signals, labels, labelIDs

Now we define a function for defining the data using the `Finite_time_consensus` class, so we can futher use the associated data methods during training.

In [None]:
def getData(signals, labels, labelIDs, train_indexes, validation_indexes, test_indexes):
    train_data = []
    train_labels = []
    train_labelIDs = []
    for tr_index in train_indexes:
        train_data.append(signals[tr_index])
        train_labels.append(labels[tr_index])
        train_labelIDs.append(labelIDs[tr_index])
        
        
    validation_data = []
    validation_labels = []
    validation_labelIDs = []
    for val_index in validation_indexes:
        validation_data.append(signals[val_index])
        validation_labels.append(labels[val_index])
        validation_labelIDs.append(labelIDs[val_index])
        
    test_data = []
    test_labels = []
    test_labelIDs = []
    for tst_index in test_indexes:
        test_data.append(signals[tst_index])
        test_labels.append(labels[tst_index])
        test_labelIDs.append(labelIDs[tst_index])
        
        
    #We define the finite time consensus data based in the already given training, validation, test splits        
    data = Utils.dataTools.Finite_time_consensus(np.asarray(train_data), np.asarray(train_labels),
                                                 np.asarray(train_labelIDs),np.asarray(validation_data), 
                                                 np.asarray(validation_labels), np.asarray(validation_labelIDs), 
                                                 np.asarray(test_data), np.asarray(test_labels), np.asarray(test_labelIDs))
    
   
    
    data.astype(torch.float64)
    data.to(device)
    
    return data
    

# Read the graphs

We always use the same graphs for training -> the ones we have defined and saved before.

In [None]:
with open ('graphs', 'rb') as fp:
    allGraphs = pickle.load(fp)

# Define permutations

We generate indexes up to the length of the dataset we are using (in this example, we use 2000 data points). Then we randomly permute these indexes multiple times (e.g. 10 times) such that we generate different permutations to account for the randomization involved when we split the data in train, validation and test.

In [None]:
indexes = np.arange(0, 2000)
allPermutations = []
for i in range(0,10):
    perm_indexes = np.random.permutation(indexes)
    allPermutations.append(perm_indexes)

# Model Initialization

We defince a method for initializing the model with the parameters defined above.

In [None]:
def initialize_model(thisModel, nNodes, S, order):
    # If a new model is to be created, it should be called for here.
    
    if doPrint:
        print("Model initialization...", flush = True)


    # Get the corresponding parameter dictionary
    hParamsDict = deepcopy(eval('hParams' + thisModel))

    #Remove the 'name' and 'archit' fields from the dictionary,
    #as these do not need to be passed to the architecture.
    thisName = hParamsDict.pop('name')
    callArchit = hParamsDict.pop('archit')

    #Optimizer options
    thisTrainer = trainer
    thisLearningRate = learningRate
    thisBeta1 = beta1
    thisBeta2 = beta2

    # Graph Shift Operaror
    hParamsDict['GSO'] = S
    
    # Add the number of nodes for the no-pooling part
    if '1Ly' in thisName:
        hParamsDict['nSelectedNodes'] = [nNodes]
    elif '2Ly' in thisName:
        hParamsDict['nSelectedNodes'] = [nNodes, nNodes]

    #Architecture
    thisArchit = callArchit(**hParamsDict)
    thisArchit.to(device)

    #Optimizer
    if thisTrainer == 'ADAM':
        thisOptim = optim.Adam(thisArchit.parameters(),
                                   lr = learningRate,
                                   betas = (beta1, beta2)) #, weight_decay=1e-5)
    elif thisTrainer == 'SGD':
        thisOptim = optim.SGD(thisArchit.parameters(),
                                  lr = learningRate)
    elif thisTrainer == 'RMSprop':
        thisOptim = optim.RMSprop(thisArchit.parameters(),
                                      lr = learningRate, alpha = beta1)

    #Loss
    thisLossFunction = loss.adaptExtraDimensionLoss(lossFunction)


    #Model
    modelCreated = model.Model(thisArchit,
                                   thisLossFunction,
                                   thisOptim,
                                   thisName, saveDir, order)
    

    writeVarValues(varsFile,
                       {'name': thisName,
                        'thisTrainer': thisTrainer,
                        'thisLearningRate': thisLearningRate,
                        'thisBeta1': thisBeta1,
                        'thisBeta2': thisBeta2})

        
    return modelCreated

# Model Training

In [None]:
#List in which we will store the performances for all the graphs and permutations
graphs_perf = []

cnt = 0

#Loop through all the graphs
for GW in allGraphs:
    print(f"GRAPH {cnt}")
    
        
    #Define the data
    signals, labels, labelIDs = data_generation_finite_time_consensus_one_label()
    
    
    cntp = 0
    for perm_indexes in allPermutations:
        print(f"PERM {cntp}")
        
        #Save the results from the runs
        thisFilename = 'finiteTimeConsensus graph ' + str(cnt) + ' perm ' + str(cntp) # This is the general name of all related files
        saveDir2 = os.path.join(saveDir, thisFilename) # Dir where to save all the results from each run
        if not os.path.exists(saveDir2):
            os.makedirs(saveDir2)
        trainingOptions['saveDir2'] = saveDir2

        #Split the data
        train_indexes, validation_indexes, test_indexes = np.split(perm_indexes, [int(.8 * len(perm_indexes)), int(.9 * len(perm_indexes))])
        data = getData(signals, labels, labelIDs, train_indexes, validation_indexes, test_indexes)
        
        
        #Initialize the models dictionary
        modelsGNN = {}
        thisName = modelList[0]

        
        #Compute the shift operator for the current graph -> normalized adjacency
        GS = GW
        # Get the largest eigenvalue of the weighted adjacency matrix
        EW, VW = graphTools.computeGFT(GW, order = 'totalVariation')
        eMax = np.max(EW)
        #Ordering
        S, order = graphTools.permIdentity(GS/eMax)
        

        #Initialize the local GNN model        
        LocalGNN = initialize_model(modelList[0], nNodes, S, order)
        
        #Add it to the dictionary
        modelsGNN[thisName] = LocalGNN
        
        
        #Train the model
        train.MultipleModels(modelsGNN, data,
                     nEpochs = nEpochs, batchSize = batchSize,
                     **trainingOptions)
        
        
    
        ###################
        ### EVALUATION ###
        ##################
        
        xTest, yTest, idsTest = data.getSamples('test')
        xTest = xTest.unsqueeze(1)
        xTest = xTest.to(device)
        yTest = yTest.to(device)
    
    
    
        for key in modelsGNN.keys():
            # Update order and adapt dimensions (this data has one input feature,
            # so we need to add that dimension; make it from B x N to B x F x N)
            #xTestOrdered = xTest[:,modelsGNN[key].order]

            with torch.no_grad():
                # Process the samples
                yHatTest = modelsGNN[key].archit.singleNodeForward(xTest, idsTest)
            
                # We compute the accuracy
                thisAcc = data.evaluate(yHatTest, yTest)
        
        #Record and print the performance
        graphs_perf.append(thisAcc)
        
        cntp += 1
        
    cnt += 1
  

Compute the average performance.

In [None]:
sum_perf = 0

for i in range(0,100):
    sum_perf += graphs_accuracies[i][j]
        
avg_perf = sum_perf/100