In [None]:
!pip install libpysal
!pip install dgl
import numpy as np
import pandas as pd

In [None]:
# import building data
df_building = pd.read_csv('Boston_building.csv')
df_building.head()
df_building = df_building.sample(frac=1)

In [None]:
# construct and save edges
import libpysal
loc=[]
for lon,lat in zip(df_building['X'].tolist(),df_building['Y'].tolist()):
  loc.append([lon,lat])
loc

kd = libpysal.cg.KDTree(np.array(loc))
wnn2 = libpysal.weights.KNN(kd, 10) # '10' is a hyperparameter, referring to a building of interest is connected with nearest 10 buildings, which can be customised.
df_edge = pd.DataFrame(wnn2.asymmetry(), columns=['Src','Dst'])
df_edge.to_csv('edge.csv')

In [None]:
# create a dgl dataset
import dgl
from dgl.data import DGLDataset
import torch
import os

class MyDataset(DGLDataset):
    def __init__(self):
        super().__init__(name='graph_data')

    def process(self):
        nodes_data = df_building
        edges_data = df_edge

        node_features = torch.from_numpy(nodes_data.iloc[:,15:24].to_numpy()).float() # customise the index

        # # classification task
        node_labels = torch.from_numpy(nodes_data['storey'].to_numpy()).float() # replace the name to conduct specific tasks
        # # classification task
        # node_labels = torch.from_numpy(nodes_data['type'].astype('category').cat.codes.to_numpy()).long()
        edges_src = torch.from_numpy(edges_data['Src'].to_numpy())
        edges_dst = torch.from_numpy(edges_data['Dst'].to_numpy())
        print(edges_src)

        self.graph = dgl.graph((edges_src, edges_dst), num_nodes=nodes_data.shape[0])
        self.graph.ndata['feat'] = node_features
        self.graph.ndata['label'] = node_labels

        n_nodes = nodes_data.shape[0]
        n_train = int(n_nodes * 0.7) # split 70% of the dataset for training
        train_mask = torch.zeros(n_nodes, dtype=torch.bool)
        test_mask = torch.zeros(n_nodes, dtype=torch.bool)
        train_mask[:n_train] = True
        test_mask[n_train:] = True
        self.graph.ndata['train_mask'] = train_mask
        self.graph.ndata['test_mask'] = test_mask

    def __getitem__(self, i):
        return self.graph

    def __len__(self):
        return 1

dataset = MyDataset()
graph = dataset[0]
graph = dgl.add_self_loop(graph)
graph

In [None]:
# using GraphSAGE algorithm to build convolution layers
from torch.nn.modules import activation
import dgl.nn as dglnn
import torch.nn as nn
import torch.nn.functional as F
m = nn.ReLU()
class SAGE(nn.Module):
    def __init__(self, in_feats, hid_feats, out_feats):
        super().__init__()
        self.conv1 = dglnn.SAGEConv(
            in_feats=in_feats, out_feats=hid_feats, aggregator_type='pool')
        self.conv2 = dglnn.SAGEConv(
            in_feats=hid_feats, out_feats=256, aggregator_type='pool')
        self.conv3 = dglnn.SAGEConv(
            in_feats=256, out_feats=out_feats, aggregator_type='pool')
        self.fc1 = nn.Linear(out_feats,512)
        self.fc2 = nn.Linear(512,256)
        self.fc3 = nn.Linear(256,128)
        # for regression task (i.e. predicting building heights, storeys, etc.)
        self.output = nn.Linear(128,1)
        # # for classification task (i.e. predicting building type, age, etc.)
        # self.output = nn.Linear(128,k) # # k corresponds to the number of clasified building characteristics


    def forward(self, graph, inputs):
        h = self.conv1(graph, inputs)
        h = self.conv2(graph, h)
        h = self.conv3(graph, h)
        h = self.fc1(h)
        h = self.fc2(h)
        h = self.fc3(h)
        h = self.output(h)
        # # for classification task, adding the following line of code
        # h = F.softmax(h, dim=1)

        return h

In [None]:
node_features = graph.ndata['feat']
node_labels = graph.ndata['label']
train_mask = graph.ndata['train_mask']
test_mask = graph.ndata['test_mask']
n_features = node_features.shape[1]
# # for regression task
n_labels = node_labels.shape[0]
# # for classification task
# n_labels = int(node_labels.max().item()+1)

In [None]:
# evaluation metrics for regression task

from sklearn.metrics import r2_score,mean_squared_error
from sklearn.metrics import mean_absolute_error


def evaluate(model, graph, features, labels, mask):
    model.eval()
    with torch.no_grad():
        logits = model(graph, features)
        logits = logits[mask]
        labels = labels[mask]
        prediction, indices = torch.max(logits, dim=1)
        # RMSE
        rmse = mean_squared_error(prediction,labels,squared=False)
        # MAE
        # mae = mean_absolute_error(prediction,labels)

        return rmse
        # return mae

In [None]:
# evaluation metrics for classification task

def evaluate(model, graph, features, labels, mask):
    model.eval()
    with torch.no_grad():
        logits = model(graph, features)
        logits = logits[mask]
        labels = labels[mask]
        _, indices = torch.max(logits, dim=1)
        correct = torch.sum(indices == labels)
        return correct.item() * 1.0 / len(labels)

In [None]:
dgl.use_libxsmm(False)
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Regression task: trainning and model evaluation

model = SAGE(in_feats=n_features, hid_feats=256, out_feats=1)
opt = torch.optim.Adam(model.parameters(), lr=0.001) # learning rate can be customised in future studies

best_test_rmse = float('inf')
for epoch in range(100):
    model.train()
    logits = model(graph, node_features)
    logits,indics = torch.max(logits, dim=1)
    # # RMSE
    loss = F.mse_loss(logits[train_mask], node_labels[train_mask])
    train_rmse = evaluate(model, graph, node_features, node_labels, train_mask)
    test_rmse = evaluate(model, graph, node_features, node_labels, test_mask)
    # # MAE
    # loss = F.l1_loss(logits[train_mask], node_labels[train_mask])
    # train_mae = evaluate(model, graph, node_features, node_labels, train_mask)
    # test_mae = evaluate(model, graph, node_features, node_labels, test_mask)


    # Early stopping based on test RMSE/MAE
    if test_rmse < best_test_rmse:
      best_test_rmse = test_rmse
      best_epoch = epoch
      best_model = model.state_dict()
    elif best_epoch + 20 < epoch:  # No improvement for 20 epochs
      print(f"Early stopping at epoch {best_epoch} with best test RMSE/MAE: {best_test_rmse}")
      model.load_state_dict(best_model)
      break
    opt.zero_grad()
    loss.backward()
    opt.step()

    print(f'Epoch {epoch}, Loss: {loss.item()}, Train RMSE: {train_rmse}, Test RMSE: {test_rmse}')

In [None]:
# save predicted building storeys

logits = model(graph, node_features)
prediction, indices = torch.max(logits, dim=1)
prediction_sage = prediction.detach().numpy()
df_prediction = pd.DataFrame(prediction_sage,columns=['predicted_storey'])
df_result = pd.concat([df_building, df_prediction], axis=1)
df_result.head()

In [None]:
# Classification task: trainning and model evaluation

model = SAGE(in_feats=n_features, hid_feats=256, out_feats=n_labels)
opt = torch.optim.Adam(model.parameters(), lr=0.001)

best_test_acc = 0   # to keep track of the best test accuracy
patience = 50       # number of epochs to wait for improvement before stopping
epochs_without_improvement = 0

for epoch in range(100):
    model.train()
    logits = model(graph, node_features)
    pred = logits.argmax(1)
    loss = F.cross_entropy(logits[train_mask], node_labels[train_mask])
    train_acc = (pred[train_mask] == node_labels[train_mask]).float().mean()
    test_acc = (pred[test_mask] == node_labels[test_mask]).float().mean()

    if test_acc > best_test_acc: # if there's an improvement in accuracy
        best_test_acc = test_acc
        best_model_state = model.state_dict()  # save the best model
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1

    if epochs_without_improvement == patience:
        print(f"Early stopping on epoch {epoch}. Best test accuracy was {best_test_acc}.")
        model.load_state_dict(best_model_state)  # load the best model
        break
    opt.zero_grad()
    loss.backward()
    opt.step()
    print('In epoch {}, loss: {:.5f}, train_acc: {:.5f}, test_acc: {:.5f}'.format(
                epoch, loss, train_acc, test_acc))

In [None]:
# Evaluating classification tasks with F1-score, Precision, Recall

from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score

F1_score = f1_score(node_labels[test_mask], pred[test_mask], average="weighted")
Precision = precision_score(node_labels[test_mask], pred[test_mask], average='weighted')
Recall = recall_score(node_labels[test_mask], pred[test_mask], average='weighted')

In [None]:
# save predicted classes of building characteristics

node_labels_df = node_labels.detach().numpy()
df_node_labels = pd.DataFrame(node_labels_df,columns=['truth_type'])
prediction_sage = pred.detach().numpy()
df_prediction_sage = pd.DataFrame(prediction_sage,columns=['predicted_type'])
df_type = pd.concat([df_building,df_prediction_sage], axis=1)
df_type.head()