# UJIINDOORLOC - Heterogeneous GNN

Dataset: UJIINDOORLOC

Modelo: GNN con grafo heterogéneo

## Importar Datos

In [None]:
import pandas as pd
import numpy as np
import scipy
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report
from itertools import combinations
from copy import deepcopy
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.model_selection import KFold
from sklearn.model_selection import ParameterGrid
import torch
import networkx as nx
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder

from torch_geometric.data import Data, HeteroData
from torch_geometric.nn.conv.dna_conv import Linear
from torch_geometric.utils import to_networkx, is_undirected, to_undirected

import torch.nn.functional as F
from torch_geometric.nn import GCNConv, ChebConv, SAGEConv, TAGConv, GraphConv, to_hetero, GATConv, Linear, BatchNorm, HeteroConv
from torch_geometric.loader import DataLoader
import torch_geometric.transforms as T

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from zipfile import ZipFile

from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report


In [None]:
# Descarga de datos
!kaggle datasets download -d giantuji/UjiIndoorLoc

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df = pd.read_csv(zip_file.open('TrainingData.csv'))
df.head()

## Preprocesamiento

In [None]:
def preprocess_dataset(dataset_path, filter_std, dataset_percentage=None):
    
    zip_file = ZipFile(dataset_path)
    df = pd.read_csv(zip_file.open('TrainingData.csv'))
    
    df['CLASS'] = df['BUILDINGID'].astype(str) + df['FLOOR'].astype(str)
    
    df_X = df.iloc[:,:520]
    df_y = df['CLASS']

    df_X.values[df_X.values==100] = -105


    # keep those APs where std > filter_std
    ap = (df_X.describe().iloc[2]>filter_std).index
    values = (df_X.describe().iloc[2]>filter_std).values
    filtered_aps = [ap[i] for i in range(len(values)) if values[i]==True]
    df_X = df_X[filtered_aps]
        
    # take minimum -105 to 0
    df_X.iloc[:,:] = 105 + df_X.values
    df_X['CLASS'] = df_y.values 
    
    if dataset_percentage:
        df_X = df_X.sample(frac=dataset_percentage)      
    
    # apply ordinal encoder to the classes and split X, y
    enc = OrdinalEncoder(dtype=int)
    y = enc.fit_transform(df_X['CLASS'].values.reshape(-1,1))
    X = df_X.iloc[:,:-1].values    

    dfaux = pd.DataFrame(X)

    number_aps = len(dfaux.columns)
    dfaux[str(number_aps)] = y
    subsample = dfaux.sample(frac=1, random_state=99)
    y = subsample.iloc[:, -1].values.reshape(-1,1)
    X = subsample.iloc[:, :-1].values
    
    # split 80-20
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=123)
    
    print("X_train shape: ", X_train.shape)
    return X_train, X_test, y_train, y_test, number_aps, len(df_X['CLASS'].value_counts()), filtered_aps, enc

## Grafo

En las siguientes celdas se describe un poco el dataset y se muestran las distribuciones de potencia por AP.

In [None]:
def ap_graph_creator(X_G, th=10, prune_th=0):
    """
    Dado un dataset y un threshold se arma un grafo basado en las medidas de RRSI
    """
    df_data_train = pd.DataFrame(X_G)
    df_G = pd.DataFrame(columns = ['from', 'to', 'weight']) 

    columns = df_data_train.columns.to_list()
    for ap in columns:
        # para cada AP me quedo con las instancias donde el RSSI esta en el rango
        # (max-th) intentando estimar las instancias mas cercanas al AP
        max_val = df_data_train[ap].max()
        df_aux_i = df_data_train[df_data_train[ap]  > (max_val - th)]
        df_aux_i = df_aux_i.drop(ap, axis=1) 
        # df_aux_i.head()

        for k, v in df_aux_i.mean().items():
            # armo las aristas con el promedio de RSSI que ven las instancias 
            # filtradas al resto de los APs
            # weight = v
            # if df_G.loc[(df_G['from'] == k) & (df_G['to'] == ap)].weight.any():
            #     weight = np.mean([float(df_G.loc[(df_G['from'] == k) & (df_G['to'] == ap)].weight), weight])
            #     df_G.loc[(df_G['from'] == k) & (df_G['to'] == ap)] = k, ap, weight
            if v > prune_th:
                df_G = df_G.append({'from':ap, 'to': k, 'weight': v}, ignore_index=True)
        

    edge_index_first_row = []
    edge_index_second_row = []
    edge_attr = []
    for index, row in df_G.iterrows():
        edge_index_first_row.append(columns.index(row['from']))
        edge_index_second_row.append(columns.index(row['to']))
        edge_attr.append([float(row.weight)])

    
    edge_index = torch.tensor([edge_index_first_row, edge_index_second_row], dtype=torch.long)
    edge_attr = torch.tensor(edge_attr, dtype=torch.float)                           
    edge_index, edge_attr = to_undirected(edge_index, edge_attr, reduce="mean")
    return edge_index, edge_attr

In [None]:
def zone_ap_graph_creator(X_G, y, prune_th=0, zone_to_remove=None):

    df_data_train = pd.DataFrame(X_G)
    df_data_train['cls'] = y    
    df_G = pd.DataFrame(columns = ['from', 'to', 'weight']) 

    for zone in np.sort(df_data_train['cls'].unique()):

        filtered_instances = df_data_train.loc[df_data_train['cls'] == zone]
        means = np.array(filtered_instances.mean())[:-1]

        for ap, mean in enumerate(means):
            if mean > prune_th:
                if zone_to_remove == zone:
                    mean = 0                
                df_G = df_G.append({'from':zone, 'to': ap, 'weight': mean}, ignore_index=True)

    edge_index_first_row = []
    edge_index_second_row = []
    edge_attr = []
    for index, row in df_G.iterrows():
        edge_index_first_row.append(row['from'])
        edge_index_second_row.append(row['to'])
        edge_attr.append([float(row.weight)])


    edge_index = torch.tensor([edge_index_first_row, edge_index_second_row], dtype=torch.long)
    edge_attr = torch.tensor(edge_attr, dtype=torch.float)                           

    return edge_index, edge_attr

In [None]:
def build_dataset(X, y, graph, num_classes, zone_to_remove=None):
    dataset = []
    one_hot = torch.nn.functional.one_hot(torch.arange(0,num_classes)).float()
    
    for i in range(len(y)):
        if y[i] != zone_to_remove:

            data = deepcopy(graph)
            data['aps'].x = torch.Tensor(X[i])

            # data['zones'].x = torch.ones(data['zones'].num_nodes,1)
            data['zones'].x = torch.zeros(data['zones'].num_nodes,1)
            # data['zones'].x = torch.randn(data['zones'].num_nodes,5)
            # data['zones'].x = one_hot
            # data['zones'].x -= data['zones'].x.mean()
            # data['zones'].x /= data['zones'].x.std()
            data['zones'].y = torch.Tensor(y[i])
            data['zones'].train_mask = torch.Tensor([True]*len(y))
            data['zones'].val_mask = torch.Tensor([True]*len(y))
            data['zones'].test_mask = torch.Tensor([True]*len(y))

            dataset.append(data)
    return dataset

In [None]:
def build_heterodata(ap_edge_index, ap_edge_attr, zone_ap_edge_index, zone_ap_edge_attr, num_classes, num_aps):
    data = HeteroData()
    data['aps', 'ap_ap', 'aps'].edge_index = ap_edge_index
    ap_edge_attr = (ap_edge_attr - ap_edge_attr.mean())/ap_edge_attr.std()    
    data['aps', 'ap_ap', 'aps'].edge_attr = ap_edge_attr
    data['aps'].num_nodes = num_aps

    data['zones', 'zone_ap', 'aps'].edge_index = zone_ap_edge_index
    zone_ap_edge_attr = (zone_ap_edge_attr - zone_ap_edge_attr.mean())/zone_ap_edge_attr.std()        
    data['zones', 'zone_ap', 'aps'].edge_attr = zone_ap_edge_attr
    data['zones'].num_nodes = num_classes
    return data

## Modelo

In [None]:
class HeteroGNN(torch.nn.Module):
    def __init__(self, hidden_channels, out_channels, hidden_layers):
        super().__init__()
        
        self.hidden_convs = torch.nn.ModuleList()
        for _ in range(hidden_layers):
            conv = HeteroConv({
                ('aps', 'ap_ap', 'aps'): GraphConv((-1, -1), hidden_channels),
                ('zones', 'zone_ap', 'aps'): GraphConv((-1, -1), hidden_channels),
                ('aps', 'rev_zone_ap', 'zones'): GraphConv((-1, -1), hidden_channels),
            }, aggr='mean')
            self.hidden_convs.append(conv)
        
        self.out_convs = torch.nn.ModuleList()
        conv = HeteroConv({
            ('aps', 'ap_ap', 'aps'): GraphConv((-1, -1), out_channels),
            ('zones', 'zone_ap', 'aps'): GraphConv((-1, -1), out_channels),
            ('aps', 'rev_zone_ap', 'zones'): GraphConv((-1, -1), out_channels),
        }, aggr='mean')
        self.out_convs.append(conv)        

        # self.lin = Linear(hidden_channels, out_channels)

    def forward(self, x_dict, edge_index_dict, edge_attr_dict):
        for conv in self.hidden_convs:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict)
            x_dict = {key: x.relu() for key, x in x_dict.items()}
        for conv in self.out_convs:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict)
           # x_dict = {key: x.relu() for key, x in x_dict.items()}

        # out = self.lin(x_dict['zones'])
        
        return x_dict

In [None]:
class HeteroGNN_simplified(torch.nn.Module):
    def __init__(self, hidden_channels, out_channels, hidden_layers):
        super().__init__()
        
        self.hidden_convs = torch.nn.ModuleList()
        for _ in range(hidden_layers):
            conv = HeteroConv({
                ('aps', 'ap_ap', 'aps'): GraphConv((-1, -1), hidden_channels),
                # ('zones', 'zone_ap', 'aps'): GraphConv((-1, -1), hidden_channels),
                # ('aps', 'rev_zone_ap', 'zones'): GraphConv((-1, -1), hidden_channels),
            }, aggr='mean')
            self.hidden_convs.append(conv)
        
        self.out_convs = torch.nn.ModuleList()
        conv = HeteroConv({
            ('aps', 'ap_ap', 'aps'): GraphConv((-1, -1), out_channels),
            ('zones', 'zone_ap', 'aps'): GraphConv((-1, -1), out_channels),
            ('aps', 'rev_zone_ap', 'zones'): GraphConv((-1, -1), out_channels),
        }, aggr='mean')
        self.out_convs.append(conv)        

        # self.lin = Linear(hidden_channels, out_channels)

    def forward(self, x_dict, edge_index_dict, edge_attr_dict):
        initial_zones_signal = x_dict["zones"]
        for conv in self.hidden_convs:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict)
            x_dict = {key: x.relu() for key, x in x_dict.items()}
        x_dict["zones"] = initial_zones_signal
        for conv in self.out_convs:
            x_dict = conv(x_dict, edge_index_dict, edge_attr_dict)
           # x_dict = {key: x.relu() for key, x in x_dict.items()}

        # out = self.lin(x_dict['zones'])
        
        return x_dict

## Entrenamiento

In [None]:
# Split de los datos y armado del objeto Data con el grafo

X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3, dataset_percentage=0.3)
ap_edge_index, ap_edge_attr = ap_graph_creator(X_train[:,:-1], th=10, prune_th=20)
zone_ap_edge_index, zone_ap_edge_attr = zone_ap_graph_creator(X_train[:,:-1], y_train, prune_th=10)
heterodata = build_heterodata(ap_edge_index, ap_edge_attr, zone_ap_edge_index, zone_ap_edge_attr, num_classes, num_aps)
T.ToUndirected()(heterodata)
print(f"Undirected: {heterodata.is_undirected()}")

In [None]:
heterodata.is_undirected()

In [None]:
# Armado del dataset

x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_training_data = y_train
y_test_data = y_test

#normalize (x-mean)/std
mean = x_training_data.mean(axis=0)
std = x_training_data.std(axis=0)

x_training_data = x_training_data - mean
x_training_data /= std
x_test_data = x_test_data - mean
x_test_data /= std

In [None]:
train_dataset = build_dataset(x_training_data, y_training_data, heterodata, num_classes)
test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)

In [None]:
train_dataset[0]

In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device

In [None]:
device = torch.device('cuda:1')
device

In [None]:
batch_size = 32
learning_rate = 0.003

### HeteroGNN

In [None]:
model = HeteroGNN(10, 1, 2).to(device)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
loss = torch.nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)


In [None]:
train_loss = []
train_accuracy = []
test_loss = []
test_accuracy = []
best_test_accuracy = 0

m = torch.nn.Softmax(dim=1)

for epoch in range(100):
    print(f"Epoch: {epoch+1}")
    
    # TRAIN
    model.train()
    train_accuracy_epoch = []
    train_loss_epoch = []
    for d in train_loader:
        
        d = d.to(device)
        out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict) 
        out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

        loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))
        loss_result.backward()
        train_loss_epoch.append(loss_result.detach().cpu())
        
        optimizer.step()
        optimizer.zero_grad()

        output = m(out_zones)
        train_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

    # if scheduler.get_last_lr()[0] > 0.0005:
    if (epoch+1)%10 == 0:
        scheduler.step()

    train_accuracy.append(np.mean(train_accuracy_epoch))
    train_loss.append(np.mean(train_loss_epoch))
    
    
    
    # VALIDATION
    model.eval()
    test_accuracy_epoch = []
    test_loss_epoch = []
    for d in test_loader:
        
        d = d.to(device)
        out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
        out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

        loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))        
        test_loss_epoch.append(loss_result.detach().cpu())
        
        output = m(out_zones)
        test_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

    test_accuracy.append(np.mean(test_accuracy_epoch))
    if test_accuracy[-1] > best_test_accuracy:
        best_test_accuracy = test_accuracy[-1]
        torch.save(model.state_dict(), "UJI_HeteroGNN_best_model.pth")
        
    test_loss.append(np.mean(test_loss_epoch))
    
    print(f"    Train Loss {np.mean(train_loss_epoch)}, Val Loss {np.mean(test_loss_epoch)}")
    

print(f"Last LR: {scheduler.get_last_lr()}")
print(f"Best Accuracy: Train {np.max(train_accuracy)}, Val {np.max(test_accuracy)}")
plt.figure()
plt.plot(train_loss, label="Train loss")
plt.plot(test_loss, label="Validation loss")
plt.legend()

plt.figure()
plt.plot(train_accuracy, label="Train accuracy")
plt.plot(test_accuracy, label="Validation accuracy")
plt.legend()

plt.show()


In [None]:
m = torch.nn.Softmax(dim=1)
output = m(out_zones)
accuracy = accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1)))

print(accuracy)
print(classification_report(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

## TEST

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df_test = pd.read_csv(zip_file.open('ValidationData.csv'))

df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

df_X_test = df_test[filtered_aps]
df_y_test = df_test['CLASS']
print(df_X_test.shape)

df_X_test.values[df_X_test.values==100] = -105
df_X_test.iloc[:,:] = 105 + df_X_test.values
df_X_test['CLASS'] = df_y_test.values 

df_X_test.describe()


y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
X_test = df_X_test.iloc[:,:-1].values


# Armado del dataset

x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_test_data = y_test

#normalize (x-mean)/std

x_test_data = x_test_data - mean
x_test_data /= std

In [None]:
test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)

In [None]:
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
test_dataset[0]

Load best model

In [None]:
model = HeteroGNN(10, 1, 2)
model.load_state_dict(torch.load("UJI_HeteroGNN_best_model.pth"))
model.to(device)

In [None]:
test_loss = []
test_accuracy = []

m = torch.nn.Softmax(dim=1)

# TEST
model.eval()
for d in test_loader:

    d = d.to(device)
    out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
    out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

    output = m(out_zones)
    test_accuracy.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

total_test_accuracy = np.mean(test_accuracy)
print(f"TEST ACCURACY: {total_test_accuracy}")

In [None]:
m = torch.nn.Softmax(dim=1)
output = m(out_zones)
accuracy = accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1)))

print(accuracy)
print(classification_report(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

### KNN 

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report

neigh = KNeighborsClassifier(n_neighbors=5)
neigh.fit(x_training_data[:,:,0], y_training_data.ravel())
y_pred_knn = neigh.predict(x_test_data[:,:,0])

print(accuracy_score(y_test_data, y_pred_knn))
print(classification_report(y_test_data, y_pred_knn))

### FCNN

In [None]:
from sklearn.neural_network import MLPClassifier

clf = MLPClassifier().fit(x_training_data[:,:,0], y_training_data.ravel())
y_pred_fcnn = clf.predict(x_test_data[:,:,0])
print(accuracy_score(y_test_data, y_pred_fcnn))
print(classification_report(y_test_data, y_pred_fcnn))

## Bracco et al

Import auxiliary functions

### Functions

In [None]:
##
# Data manipulation imports
##
import csv
import time
import numpy
import copy
import json
import numpy
import math
###
# Machine Learning imports
##

from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from fastprogress import master_bar, progress_bar
from sklearn.metrics import precision_score

##
# Data viz libraries
##

import seaborn as sns
import matplotlib.pyplot as plt

def get_data(file, filter_macs=False, allowed_macs=[], to_zero=False, non_zero_macs=[]):
    naming = {'from': {}, 'to': {}}
    rows = []
    rows_aux = []
    naming_num = 0
    header = []
    row_length = len(allowed_macs) + 1
    target_names_max = []
    with open(file, 'r') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')

        for i, row in enumerate(reader):
            new_row = numpy.zeros(row_length)
            used_index = 0
            if i == 0:
                header = row
                index_acum = 1
                index_acum_2 = 1
                new_indexes = {}
                zero_index = {}
                for j, val in enumerate(row):
                    if j == 0:
                        continue
                    if header[j].split('wifi-')[1] in allowed_macs:
                        new_indexes[j] = index_acum
                        index_acum += 1
                    else:
                        new_indexes[j] = None
                    if header[j].split('wifi-')[1] in non_zero_macs:
                        zero_index[j] = index_acum
                        index_acum_2 += 1
                    else:
                        zero_index[j] = None

            else:
                for j, val in enumerate(row):
                    if j == 0:
                        # this is a name of the location
                        lab = int(val.split('_')[1])
                        if val not in naming['from']:
                            naming['from'][val] = lab
                            naming['to'][naming_num] = val
                            target_names_max.append(val)
                        new_row[0] = naming['from'][val]
                        row[0] = naming['from'][val]
                        continue
                    index_to_use = new_indexes[j]
                    idx = zero_index[j]
                    if index_to_use is None and filter_macs:
                        continue
                    if val == '':
                        new_row[index_to_use] = 0
                        row[j] = 0
                        continue
                    try:
                        if idx is None and to_zero:
                            row[j] = 0
                        else:
                            float_value = float(val)
                            new_row[index_to_use] = float_value
                            row[j] = float_value
                    except:
                        print("problem parsing value " + str(val))
                if filter_macs:
                    rows.append(new_row)
                else:
                    rows.append(row)
    y = numpy.zeros(len(rows))
    X = numpy.zeros((len(rows), len(rows[0])-1))

    record_range = list(range(len(rows)))
    for i in record_range:
        y[i] = int(rows[i][0])
        X[i, :] = numpy.array(rows[i][1:])
    return X, y-1


#####
# Funcion Auxiliar para poder hacer un print lindo de los reportes de clasificacion
#
###

def show_values(pc, fmt="%.2f", **kw):
    '''
    Heatmap with text in each cell with matplotlib's pyplot
    Source: https://stackoverflow.com/a/25074150/395857 
    By HYRY
    '''
    pc.update_scalarmappable()
    ax = pc.axes
    for p, color, value in zip(pc.get_paths(), pc.get_facecolors(), pc.get_array()):
        x, y = p.vertices[:-2, :].mean(0)
        if numpy.all(color[:3] > 0.5):
            color = (0.0, 0.0, 0.0)
        else:
            color = (1.0, 1.0, 1.0)
        ax.text(x, y, fmt % value, ha="center", va="center", color=color, **kw)


def cm2inch(*tupl):
    '''
    Specify figure size in centimeter in matplotlib
    Source: https://stackoverflow.com/a/22787457/395857
    By gns-ank
    '''
    inch = 2.54
    if type(tupl[0]) == tuple:
        return tuple(i/inch for i in tupl[0])
    else:
        return tuple(i/inch for i in tupl)


def heatmap(AUC, title, xlabel, ylabel, xticklabels, yticklabels, figure_width=40, figure_height=20, correct_orientation=False, cmap='RdBu'):
    '''
    Inspired by:
    - https://stackoverflow.com/a/16124677/395857 
    - https://stackoverflow.com/a/25074150/395857
    '''

    # Plot it out
    fig, ax = plt.subplots()    
    #c = ax.pcolor(AUC, edgecolors='k', linestyle= 'dashed', linewidths=0.2, cmap='RdBu', vmin=0.0, vmax=1.0)
    c = ax.pcolor(AUC, edgecolors='k', linestyle= 'dashed', linewidths=0.2, cmap=cmap)

    # put the major ticks at the middle of each cell
    ax.set_yticks(numpy.arange(AUC.shape[0]) + 0.5, minor=False)
    ax.set_xticks(numpy.arange(AUC.shape[1]) + 0.5, minor=False)

    # set tick labels
    #ax.set_xticklabels(np.arange(1,AUC.shape[1]+1), minor=False)
    ax.set_xticklabels(xticklabels, minor=False)
    ax.set_yticklabels(yticklabels, minor=False)

    # set title and x/y labels
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)      

    # Remove last blank column
    plt.xlim( (0, AUC.shape[1]) )

    # Turn off all the ticks
    ax = plt.gca()    
    for t in ax.xaxis.get_major_ticks():
        t.tick1On = False
        t.tick2On = False
    for t in ax.yaxis.get_major_ticks():
        t.tick1On = False
        t.tick2On = False

    # Add color bar
    plt.colorbar(c)
    plt.rcParams.update({'font.size': 14})


    # Add text in each cell 
    show_values(c)

    # Proper orientation (origin at the top left instead of bottom left)
    if correct_orientation:
        ax.invert_yaxis()
        ax.xaxis.tick_top()       

    # resize 
    fig = plt.gcf()
    fig.set_size_inches(cm2inch(figure_width, figure_height))
    fig.savefig(f'{title}.svg', format="svg")



def plot_classification_report(classification_report, title, cmap='RdBu'):
    '''
    Plot scikit-learn classification report.
    Extension based on https://stackoverflow.com/a/31689645/395857 
    '''
    lines = classification_report.split('\n')

    classes = []
    plotMat = []
    support = []
    class_names = []
    for line in lines[2 : (len(lines) - 4)]:
        t = line.strip().split()
        if len(t) < 2: continue
        classes.append(t[0])
        v = []
        for x in t[1: len(t) - 2]:
            if t[1] != 'avg':
                v.append(float(x))
        support.append(int(t[-1]))
        class_names.append(t[0])
        plotMat.append(v)

    xlabel = 'Métricas'
    ylabel = 'Zonas'
    xticklabels = ['Precisión', 'Exhaustividad']
    yticklabels = ['{0} ({1})'.format(class_names[idx], sup) for idx, sup  in enumerate(support)]
    figure_width = 25
    figure_height = len(class_names) + 7
    correct_orientation = False
    heatmap(numpy.array(plotMat), title, xlabel, ylabel, xticklabels, yticklabels, figure_width, figure_height, correct_orientation, cmap=cmap)

## Funcion Auxiliar
def youden_statistic(y_actual, y_hat):
    TP = 0
    FP = 0
    TN = 0
    FN = 0
    for i in range(len(y_hat)): 
        if y_actual[i]==y_hat[i]==1:
           TP += 1
        if y_hat[i]==1 and y_actual[i]!=y_hat[i]:
           FP += 1
        if y_actual[i]==y_hat[i]==0:
           TN += 1
        if y_hat[i]==0 and y_actual[i]!=y_hat[i]:
           FN += 1
    sensitivity = 0
    if TP + FN != 0:
        sensitivity = TP / (TP + FN)
    specificity = 0
    if TN + FP != 0:
        specificity = TN / (TN + FP)
    return specificity + sensitivity - 1



def split_train_test_val(X,y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=123)

    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2,  random_state=1)
    
    return X_train, X_test, X_val, y_train, y_test, y_val

def train_algorithms(X_train, y_train, X_test, y_test, names, classifiers):
    algorithms = {}

    for name, clf in zip(names, classifiers):
        try:
            algorithms[name] =  clf.fit(X_train,y_train)
        except Exception as e:
            print(str(e))

    results = {}

    for alg in algorithms.keys():
        results[alg] = algorithms[alg].predict(X_test)

    youden = {}

    for key in results.keys():
        youden[key] = youden_statistic(y_test,results[key])
    
    return algorithms, youden

def classify(algorithms, youden, X_val):
    probabilities = {}

    for alg in algorithms.keys():
        probabilities[alg] = algorithms[alg].predict_proba(X_val)


    shape = numpy.shape(probabilities['Nearest Neighbors'])

    probs = numpy.zeros(shape)

    for key in algorithms.keys():
        probs = probs + numpy.matrix(probabilities[key])*youden[key]

    y_final = numpy.argmax(probs, axis=1)

    y_final = numpy.asarray(y_final).reshape(-1)
    
    return y_final


def save_cm(y_val, y_final, matrix_img_name, target_names):
    cm = confusion_matrix(y_val, y_final)
    fig, ax = plt.subplots(figsize=(15,15))         # Sample figsize
    heat_map = sns.heatmap(cm, annot=True, annot_kws={"size": 12}, fmt="d", linewidths=.2,ax=ax,xticklabels=target_names, yticklabels=target_names)
    fig = heat_map.get_figure()
    plt.rcParams.update({'font.size': 18})
    fig.savefig(f'{matrix_img_name}.svg', format="svg")

def report_classify(y_val, y_final, matrix_img_name, target_names):
    prec = precision_score(y_val, y_final, average='macro')
    class_rep = classification_report(y_val, y_final, target_names=target_names)
    plot_classification_report(class_rep, matrix_img_name + '_class')
    return prec


def subsample(X, y, percentage):
    X_subsample = numpy.zeros((0, 188))
    y_subsample = []
    unique, counts = numpy.unique(y, return_counts=True)
    count_dict = dict(zip(unique, counts))

    for i in range(0,16):
        count = math.floor(count_dict[i]* percentage)
        X_=X[y==i]
        y_=y[y==i]
        X_subsample = numpy.vstack([X_subsample,X_[:count]])
        y_subsample = numpy.concatenate((y_subsample,y_[:count]), axis=None)
    return X_subsample, y_subsample

### Train

In [None]:
names= ["Nearest Neighbors","Decision Tree","Linear SVM","Random Forest","Neural Net","AdaBoost"]

X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3)


# Armado del dataset

x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_training_data = y_train
y_test_data = y_test

#normalize (x-mean)/std
mean = x_training_data.mean(axis=0)
std = x_training_data.std(axis=0)

x_training_data = x_training_data - mean
x_training_data /= std
x_test_data = x_test_data - mean
x_test_data /= std


classifiers = [KNeighborsClassifier(3),DecisionTreeClassifier(max_depth=5),SVC(kernel="linear", C=0.025, probability=True), DecisionTreeClassifier(max_depth=5),RandomForestClassifier(max_depth=5, n_estimators=10, max_features=1),MLPClassifier(alpha=1, max_iter=1000),
    AdaBoostClassifier()]

algorithms, youden = train_algorithms(x_training_data[:,:,0], y_training_data.ravel(), x_test_data[:,:,0], y_test_data.ravel(), names, classifiers)

y_final = classify(algorithms, youden, x_test_data[:,:,0])

ACC_test = accuracy_score(y_test_data.ravel(), y_final)        
print(ACC_test)

### Test

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df_test = pd.read_csv(zip_file.open('ValidationData.csv'))
df.head()

df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

df_X_test = df_test[filtered_aps]
df_y_test = df_test['CLASS']
print(df_X_test.shape)

df_X_test.values[df_X_test.values==100] = -105
df_X_test.iloc[:,:] = 105 + df_X_test.values
df_X_test['CLASS'] = df_y_test.values 

df_X_test.describe()


y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
X_test = df_X_test.iloc[:,:-1].values


# Armado del dataset

x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_test_data = y_test

#normalize (x-mean)/std

x_test_data = x_test_data - mean
x_test_data /= std

test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### TEST BRACCO et al ####
y_final = classify(algorithms, youden, X_test)
ACC_test = accuracy_score(y_test, y_final)        
print(ACC_test)

## Análisis variando cantidad de muestras

### HeteroGNN

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 32
learning_rate = 0.003
print_every = 5

porcentajes = [0.9, 1] # [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    print('Porcentaje de datos: ', porc)
    
    for i in range(5):    

        # Split de los datos y armado del objeto Data con el grafo

        X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3, dataset_percentage=porc)
        ap_edge_index, ap_edge_attr = ap_graph_creator(X_train[:,:-1], th=10)
        zone_ap_edge_index, zone_ap_edge_attr = zone_ap_graph_creator(X_train[:,:-1], y_train)
        heterodata = build_heterodata(ap_edge_index, ap_edge_attr, zone_ap_edge_index, zone_ap_edge_attr, num_classes, num_aps)
        T.ToUndirected()(heterodata)

        # Armado del dataset

        x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
        x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
        y_training_data = y_train
        y_test_data = y_test

        #normalize (x-mean)/std
        mean = x_training_data.mean(axis=0)
        std = x_training_data.std(axis=0)

        x_training_data = x_training_data - mean
        x_training_data /= std
        x_test_data = x_test_data - mean
        x_test_data /= std

        train_dataset = build_dataset(x_training_data, y_training_data, heterodata, num_classes)
        test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)

        model = HeteroGNN(10, 1, 2).to(device)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
        loss = torch.nn.CrossEntropyLoss()
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)

        train_loss = []
        train_accuracy = []
        test_loss = []
        test_accuracy = []
        best_test_accuracy = 0

        m = torch.nn.Softmax(dim=1)

        for epoch in range(100):
            # print(f"Epoch: {epoch+1}")

            # TRAIN
            model.train()
            train_accuracy_epoch = []
            train_loss_epoch = []
            for d in train_loader:

                d = d.to(device)
                out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict) 
                out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

                loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))
                loss_result.backward()
                train_loss_epoch.append(loss_result.detach().cpu())

                optimizer.step()
                optimizer.zero_grad()

                output = m(out_zones)
                train_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

            # if scheduler.get_last_lr()[0] > 0.0005:
            if (epoch+1)%10 == 0:
                scheduler.step()

            train_accuracy.append(np.mean(train_accuracy_epoch))
            train_loss.append(np.mean(train_loss_epoch))



            # VALIDATION
            model.eval()
            test_accuracy_epoch = []
            test_loss_epoch = []
            for d in test_loader:

                d = d.to(device)
                out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
                out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

                loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))        
                test_loss_epoch.append(loss_result.detach().cpu())

                output = m(out_zones)
                test_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

            test_accuracy.append(np.mean(test_accuracy_epoch))
            if test_accuracy[-1] > best_test_accuracy:
                best_test_accuracy = test_accuracy[-1]
                torch.save(model.state_dict(), f"UJI_HeteroGNN_porc{porc}_{i}_best_model.pth")

            test_loss.append(np.mean(test_loss_epoch))

        print(f"Best Accuracy: Train {np.max(train_accuracy)}, Val {np.max(test_accuracy)}")
        accuracy[str(porc)].append(np.max(test_accuracy))

In [None]:
for porc in porcentajes:
    print(f"{porc}: {np.mean(accuracy[str(porc)])}")

In [None]:
print(accuracy)

#### TEST

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df_test = pd.read_csv(zip_file.open('ValidationData.csv'))
df.head()

df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

df_X_test = df_test[filtered_aps]
df_y_test = df_test['CLASS']
print(df_X_test.shape)

df_X_test.values[df_X_test.values==100] = -105
df_X_test.iloc[:,:] = 105 + df_X_test.values
df_X_test['CLASS'] = df_y_test.values 

df_X_test.describe()


y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
X_test = df_X_test.iloc[:,:-1].values


# Armado del dataset

x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_test_data = y_test

#normalize (x-mean)/std

x_test_data = x_test_data - mean
x_test_data /= std

test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
porcentajes = [0.9, 1] # [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
# accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    for i in range(5):
        model = HeteroGNN(10, 1, 2)
        model.load_state_dict(torch.load(f"UJI_HeteroGNN_porc{porc}_{i}_best_model.pth"))
        model.to(device)
        
        test_loss = []
        test_accuracy = []

        m = torch.nn.Softmax(dim=1)

        # TEST
        model.eval()
        for d in test_loader:

            d = d.to(device)
            out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
            out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

            output = m(out_zones)
            test_accuracy.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

        total_test_accuracy = np.mean(test_accuracy)
        accuracy[str(porc)].append(total_test_accuracy) 
        


In [None]:
accuracy

In [None]:
aux = []
aux.append(accuracy['0.3'])
aux.append(accuracy['0.4'])
aux.append(accuracy['0.5'])
aux.append(accuracy['0.6'])
aux.append(accuracy['0.7'])
aux.append(accuracy['0.8'])
aux.append(accuracy['0.9'])
aux.append(accuracy['1'])

In [None]:
plt.figure(figsize=[9,7])
plt.boxplot(aux, showfliers=False) #, meanline=True, showmeans=True)
plt.xticks([1, 2, 3, 4, 5, 6, 7, 8], [30, 40, 50, 60, 70, 80, 90, 100])
plt.ylabel('Accuracy')
plt.xlabel('Fingerprints sample size (%)')
plt.grid()
plt.savefig('UJIINDOORLOC_hetero_cant_muestras.pdf')

### HeteroGNNSimplified

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 32
learning_rate = 0.003
print_every = 5

porcentajes = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    print('Porcentaje de datos: ', porc)
        
    for i in range(5):    

        # Split de los datos y armado del objeto Data con el grafo

        X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3, dataset_percentage=porc)
        ap_edge_index, ap_edge_attr = ap_graph_creator(X_train[:,:-1], th=10)
        zone_ap_edge_index, zone_ap_edge_attr = zone_ap_graph_creator(X_train[:,:-1], y_train)
        heterodata = build_heterodata(ap_edge_index, ap_edge_attr, zone_ap_edge_index, zone_ap_edge_attr, num_classes, num_aps)
        T.ToUndirected()(heterodata)

        # Armado del dataset

        x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
        x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
        y_training_data = y_train
        y_test_data = y_test

        #normalize (x-mean)/std
        mean = x_training_data.mean(axis=0)
        std = x_training_data.std(axis=0)

        x_training_data = x_training_data - mean
        x_training_data /= std
        x_test_data = x_test_data - mean
        x_test_data /= std

        train_dataset = build_dataset(x_training_data, y_training_data, heterodata, num_classes)
        test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)        
        
        model = HeteroGNN_simplified(10, 1, 2).to(device)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
        loss = torch.nn.CrossEntropyLoss()
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)

        train_loss = []
        train_accuracy = []
        test_loss = []
        test_accuracy = []
        best_test_accuracy = 0

        m = torch.nn.Softmax(dim=1)

        for epoch in range(100):
            # print(f"Epoch: {epoch+1}")

            # TRAIN
            model.train()
            train_accuracy_epoch = []
            train_loss_epoch = []
            for d in train_loader:

                d = d.to(device)
                out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict) 
                out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

                loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))
                loss_result.backward()
                train_loss_epoch.append(loss_result.detach().cpu())

                optimizer.step()
                optimizer.zero_grad()

                output = m(out_zones)
                train_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

            # if scheduler.get_last_lr()[0] > 0.0005:
            if (epoch+1)%10 == 0:
                scheduler.step()

            train_accuracy.append(np.mean(train_accuracy_epoch))
            train_loss.append(np.mean(train_loss_epoch))



            # VALIDATION
            model.eval()
            test_accuracy_epoch = []
            test_loss_epoch = []
            for d in test_loader:

                d = d.to(device)
                out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
                out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

                loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))        
                test_loss_epoch.append(loss_result.detach().cpu())

                output = m(out_zones)
                test_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

            test_accuracy.append(np.mean(test_accuracy_epoch))
            if test_accuracy[-1] > best_test_accuracy:
                best_test_accuracy = test_accuracy[-1]
                torch.save(model.state_dict(), f"UJI_HeteroGNNSimplified_porc{porc}_{i}_best_model.pth")

            test_loss.append(np.mean(test_loss_epoch))

        print(f"Best Accuracy: Train {np.max(train_accuracy)}, Val {np.max(test_accuracy)}")
        accuracy[str(porc)].append(np.max(test_accuracy))

#### TEST

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df_test = pd.read_csv(zip_file.open('ValidationData.csv'))
df.head()

df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

df_X_test = df_test[filtered_aps]
df_y_test = df_test['CLASS']
print(df_X_test.shape)

df_X_test.values[df_X_test.values==100] = -105
df_X_test.iloc[:,:] = 105 + df_X_test.values
df_X_test['CLASS'] = df_y_test.values 

df_X_test.describe()


y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
X_test = df_X_test.iloc[:,:-1].values


# Armado del dataset

x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_test_data = y_test

#normalize (x-mean)/std

x_test_data = x_test_data - mean
x_test_data /= std

test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
porcentajes = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
# accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    for i in range(5):
        model = HeteroGNN_simplified(10, 1, 2)
        model.load_state_dict(torch.load(f"UJI_HeteroGNNSimplified_porc{porc}_{i}_best_model.pth"))
        model.to(device)
        
        test_loss = []
        test_accuracy = []

        m = torch.nn.Softmax(dim=1)

        # TEST
        model.eval()
        for d in test_loader:

            d = d.to(device)
            out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
            out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

            output = m(out_zones)
            test_accuracy.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

        total_test_accuracy = np.mean(test_accuracy)
        accuracy[str(porc)].append(total_test_accuracy)
        
print(accuracy)

In [None]:
aux = []
aux.append(accuracy['0.3'])
aux.append(accuracy['0.4'])
aux.append(accuracy['0.5'])
aux.append(accuracy['0.6'])
aux.append(accuracy['0.7'])
aux.append(accuracy['0.8'])
aux.append(accuracy['0.9'])
aux.append(accuracy['1'])

In [None]:
plt.figure(figsize=[9,7])
plt.boxplot(aux, showfliers=False) #, meanline=True, showmeans=True)
plt.xticks([1, 2, 3, 4, 5, 6, 7, 8], [30, 40, 50, 60, 70, 80, 90, 100])
plt.yticks([0.95, 0.90, 0.85, 0.80, 0.75])
plt.ylabel('Accuracy')
plt.xlabel('Fingerprints sample size (%)')
plt.grid()
# plt.savefig('UJIINDOORLOC_hetero_cant_muestras.pdf')

### KNN

In [None]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cuda:1')

batch_size = 32
learning_rate = 0.003
print_every = 5

porcentajes = [0.8, 0.9, 1] #[0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
# accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    print('Porcentaje de datos: ', porc)
    
    for i in range(10):    

        # Split de los datos y armado del objeto Data con el grafo

        X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3, dataset_percentage=porc)

        # Armado del dataset

        x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
        x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
        y_training_data = y_train
        y_test_data = y_test

        #normalize (x-mean)/std
        mean = x_training_data.mean(axis=0)
        std = x_training_data.std(axis=0)

        x_training_data = x_training_data - mean
        x_training_data /= std
        x_test_data = x_test_data - mean
        x_test_data /= std

        train_dataset = build_dataset(x_training_data, y_training_data, heterodata, num_classes)
        test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)        

        
        #### FIT KNN ####
        neigh = KNeighborsClassifier(n_neighbors=5)
        neigh.fit(x_training_data[:,:,0], y_training_data.ravel())
        
        #################
        
        dataset = 'UjiIndoorLoc.zip'
        zip_file = ZipFile(dataset)
        df_test = pd.read_csv(zip_file.open('ValidationData.csv'))
        df.head()

        df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

        df_X_test = df_test[filtered_aps]
        df_y_test = df_test['CLASS']
        print(df_X_test.shape)

        df_X_test.values[df_X_test.values==100] = -105
        df_X_test.iloc[:,:] = 105 + df_X_test.values
        df_X_test['CLASS'] = df_y_test.values 

        df_X_test.describe()


        y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
        X_test = df_X_test.iloc[:,:-1].values


        # Armado del dataset

        x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
        y_test_data = y_test

        #normalize (x-mean)/std

        x_test_data = x_test_data - mean
        x_test_data /= std

        test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        #### KNN PREDICT ####
        y_pred_knn = neigh.predict(x_test_data[:,:,0])
        acc = accuracy_score(y_test_data, y_pred_knn)
        accuracy[str(porc)].append(acc) 
        print(acc)
        


In [None]:
accuracy

## Prunning

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 32
learning_rate = 0.003
print_every = 5

porcentajes = [0.3] # [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    print('Porcentaje de datos: ', porc)
        
    for i in range(5):    
        # Split de los datos y armado del objeto Data con el grafo

        X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3, dataset_percentage=porc)
        ap_edge_index, ap_edge_attr = ap_graph_creator(X_train[:,:-1], th=10, prune_th=10)
        zone_ap_edge_index, zone_ap_edge_attr = zone_ap_graph_creator(X_train[:,:-1], y_train, prune_th=10)
        heterodata = build_heterodata(ap_edge_index, ap_edge_attr, zone_ap_edge_index, zone_ap_edge_attr, num_classes, num_aps)
        T.ToUndirected()(heterodata)

        # Armado del dataset

        x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
        x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
        y_training_data = y_train
        y_test_data = y_test

        #normalize (x-mean)/std
        mean = x_training_data.mean(axis=0)
        std = x_training_data.std(axis=0)

        x_training_data = x_training_data - mean
        x_training_data /= std
        x_test_data = x_test_data - mean
        x_test_data /= std

        train_dataset = build_dataset(x_training_data, y_training_data, heterodata, num_classes)
        test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
    
        model = HeteroGNN(10, 1, 2).to(device)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
        loss = torch.nn.CrossEntropyLoss()
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)

        train_loss = []
        train_accuracy = []
        test_loss = []
        test_accuracy = []
        best_test_accuracy = 0

        m = torch.nn.Softmax(dim=1)

        for epoch in range(100):
            # print(f"Epoch: {epoch+1}")

            # TRAIN
            model.train()
            train_accuracy_epoch = []
            train_loss_epoch = []
            for d in train_loader:

                d = d.to(device)
                out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict) 
                out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

                loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))
                loss_result.backward()
                train_loss_epoch.append(loss_result.detach().cpu())

                optimizer.step()
                optimizer.zero_grad()

                output = m(out_zones)
                train_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

            # if scheduler.get_last_lr()[0] > 0.0005:
            if (epoch+1)%10 == 0:
                scheduler.step()

            train_accuracy.append(np.mean(train_accuracy_epoch))
            train_loss.append(np.mean(train_loss_epoch))



            # VALIDATION
            model.eval()
            test_accuracy_epoch = []
            test_loss_epoch = []
            for d in test_loader:

                d = d.to(device)
                out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
                out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

                loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))        
                test_loss_epoch.append(loss_result.detach().cpu())

                output = m(out_zones)
                test_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

            test_accuracy.append(np.mean(test_accuracy_epoch))
            if test_accuracy[-1] > best_test_accuracy:
                best_test_accuracy = test_accuracy[-1]
                torch.save(model.state_dict(), f"UJI_HeteroGNN_porc{porc}_{i}_best_model_prunning.pth")

            test_loss.append(np.mean(test_loss_epoch))

        print(f"Best Accuracy: Train {np.max(train_accuracy)}, Val {np.max(test_accuracy)}")
        accuracy[str(porc)].append(np.max(test_accuracy))

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df_test = pd.read_csv(zip_file.open('ValidationData.csv'))
df.head()

df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

df_X_test = df_test[filtered_aps]
df_y_test = df_test['CLASS']
print(df_X_test.shape)

df_X_test.values[df_X_test.values==100] = -105
df_X_test.iloc[:,:] = 105 + df_X_test.values
df_X_test['CLASS'] = df_y_test.values 

df_X_test.describe()


y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
X_test = df_X_test.iloc[:,:-1].values


# Armado del dataset

x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_test_data = y_test

#normalize (x-mean)/std

x_test_data = x_test_data - mean
x_test_data /= std

test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
porcentajes = [0.3] # [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1]
# accuracy = {"0.3":[], "0.4":[], "0.5":[], "0.6":[], "0.7":[], "0.8":[], "0.9":[], "1":[]}

for porc in porcentajes:
    for i in range(5):
        model = HeteroGNN(10, 1, 2)
        model.load_state_dict(torch.load(f"UJI_HeteroGNN_porc{porc}_{i}_best_model_prunning.pth"))
        model.to(device)
        
        test_loss = []
        test_accuracy = []

        m = torch.nn.Softmax(dim=1)

        # TEST
        model.eval()
        for d in test_loader:

            d = d.to(device)
            out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
            out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

            output = m(out_zones)
            test_accuracy.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

        total_test_accuracy = np.mean(test_accuracy)
        accuracy[str(porc)].append(total_test_accuracy) 
print(accuracy)


In [None]:
accuracy = {'0.3': [0.820786943319838, 0.8063638663967612, 0.8150303643724697, 0.817877024291498, 0.8158527327935223, 0.7353649068322982, 0.6814440993788821, 0.7112577639751553, 0.6951863354037267, 0.7273291925465838], '0.4': [], '0.5': [], '0.6': [], '0.7': [], '0.8': [], '0.9': [], '1': []}

In [None]:
aux = []
# aux.append(accuracy_cant_muestras_mnav_hetero['0.1'])
aux.append(accuracy['0.3'])
aux.append(accuracy['0.4'])
aux.append(accuracy['0.5'])
aux.append(accuracy['0.6'])
aux.append(accuracy['0.7'])
aux.append(accuracy['0.8'])
aux.append(accuracy['0.9'])
aux.append(accuracy['1'])

In [None]:
plt.figure(figsize=[9,7])
plt.boxplot(aux, showfliers=False) #, meanline=True, showmeans=True)
plt.xticks([1], [30])
plt.yticks([0.95, 0.90, 0.85, 0.80, 0.75])
plt.ylabel('Accuracy')
plt.xlabel('Fingerprints sample size (%)')
plt.title('HeteroGNN prune_th=10')
plt.grid()

## Agregado de zona

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 32
learning_rate = 0.003
print_every = 50

zones_to_remove = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] #, 13, 14, 15]
accuracy = {"0":[],"1":[],"2":[],"3":[],"4":[],"5":[],"6":[],"7":[],"8":[],"9":[],"10":[],"11":[],"12":[]} #,"13":[],"14":[],"15":[],}

for zone_to_remove in zones_to_remove:
    print('Zone removed: ', zone_to_remove)
    
    # Split de los datos y armado del objeto Data con el grafo
    X_train, X_test, y_train, y_test, num_aps, num_classes, filtered_aps, enc_train = preprocess_dataset(dataset, filter_std=3)
    ap_edge_index, ap_edge_attr = ap_graph_creator(X_train[:,:-1], th=10)
    zone_ap_edge_index, zone_ap_edge_attr = zone_ap_graph_creator(X_train[:,:-1], y_train, zone_to_remove=zone_to_remove)
    heterodata = build_heterodata(ap_edge_index, ap_edge_attr, zone_ap_edge_index, zone_ap_edge_attr, num_classes, num_aps)
    T.ToUndirected()(heterodata)
    
    # Armado del dataset

    x_training_data = np.reshape(X_train,(X_train.shape[0],num_aps,1))
    x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
    y_training_data = y_train
    y_test_data = y_test

    #normalize (x-mean)/std
    mean = x_training_data.mean(axis=0)
    std = x_training_data.std(axis=0)

    x_training_data = x_training_data - mean
    x_training_data /= std
    x_test_data = x_test_data - mean
    x_test_data /= std    

    train_dataset = build_dataset(x_training_data, y_training_data, heterodata, num_classes, zone_to_remove=zone_to_remove)
    test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes, zone_to_remove=zone_to_remove)
    
    
    
    
    model = HeteroGNN_simplified(10, 1, 2).to(device)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
    loss = torch.nn.CrossEntropyLoss()
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)

    train_loss = []
    train_accuracy = []
    test_loss = []
    test_accuracy = []
    best_test_accuracy = 0

    m = torch.nn.Softmax(dim=1)

    for epoch in range(100):
        # print(f"Epoch: {epoch+1}")

        # TRAIN
        model.train()
        train_accuracy_epoch = []
        train_loss_epoch = []
        for d in train_loader:

            d = d.to(device)
            out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict) 
            out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

            loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))
            loss_result.backward()
            train_loss_epoch.append(loss_result.detach().cpu())

            optimizer.step()
            optimizer.zero_grad()

            output = m(out_zones)
            train_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

        # if scheduler.get_last_lr()[0] > 0.0005:
        if (epoch+1)%10 == 0:
            scheduler.step()

        train_accuracy.append(np.mean(train_accuracy_epoch))
        train_loss.append(np.mean(train_loss_epoch))



        # VALIDATION
        model.eval()
        test_accuracy_epoch = []
        test_loss_epoch = []
        for d in test_loader:

            d = d.to(device)
            out = model(d.x_dict, d.edge_index_dict, d.edge_attr_dict)
            out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

            loss_result = loss(out_zones.cpu(), d["zones"].y.cpu().type(torch.long))        
            test_loss_epoch.append(loss_result.detach().cpu())

            output = m(out_zones)
            test_accuracy_epoch.append(accuracy_score(d["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))

        test_accuracy.append(np.mean(test_accuracy_epoch))
        if test_accuracy[-1] > best_test_accuracy:
            best_test_accuracy = test_accuracy[-1]
            torch.save(model.state_dict(), f"UJI_HeteroGNNSimplified_without_{zone_to_remove}_best_model.pth")

        test_loss.append(np.mean(test_loss_epoch))

    print(f"Best Accuracy: Train {np.max(train_accuracy)}, Val {np.max(test_accuracy)}")
    accuracy[str(zone_to_remove)].append(np.max(test_accuracy))    
print(accuracy)
    

In [None]:
dataset = 'UjiIndoorLoc.zip'
zip_file = ZipFile(dataset)
df_test = pd.read_csv(zip_file.open('ValidationData.csv'))

df_test['CLASS'] = df_test['BUILDINGID'].astype(str) + df_test['FLOOR'].astype(str)

df_X_test = df_test[filtered_aps]
df_y_test = df_test['CLASS']
print(df_X_test.shape)

df_X_test.values[df_X_test.values==100] = -105
df_X_test.iloc[:,:] = 105 + df_X_test.values
df_X_test['CLASS'] = df_y_test.values 

df_X_test.describe()


y_test = enc_train.transform(df_X_test['CLASS'].values.reshape(-1,1))
X_test = df_X_test.iloc[:,:-1].values


# Armado del dataset

x_test_data = np.reshape(X_test,(X_test.shape[0],num_aps,1))
y_test_data = y_test

#normalize (x-mean)/std

x_test_data = x_test_data - mean
x_test_data /= std

In [None]:
test_dataset = build_dataset(x_test_data, y_test_data, heterodata, num_classes)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
for zone_to_remove in zones_to_remove:
    print('Zone removed: ', zone_to_remove)
    
    model = HeteroGNN_simplified(10, 1, 2)
    model.load_state_dict(torch.load(f"UJI_HeteroGNNSimplified_without_{zone_to_remove}_best_model.pth"))
    model.to(device)

    test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
    
    # VALIDATION
    model.eval()

    for data in test_loader:
        data = data.to(device)
        out = model(data.x_dict, data.edge_index_dict, data.edge_attr_dict) 
        out_zones = out["zones"].cpu().reshape(out["zones"].cpu().shape[0]//num_classes,num_classes)

        output = m(out_zones)
        print(classification_report(data["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1))))
        plt.figure(figsize=[9,7])
        cf_matrix = confusion_matrix(data["zones"].y.cpu().reshape(-1).type(torch.long), np.array(torch.argmax(output.cpu(), axis=1)), normalize="true")
        sns.heatmap(cf_matrix, annot=True, fmt=".0%", cmap="YlGnBu", vmin=0, vmax=0.2, cbar=False)