# Imports

In [None]:
import os
import socket
import sys
import random
import math
import copy
from datetime import datetime, timedelta
from pathlib import Path
from time import time
from urllib.request import urlretrieve
from collections import OrderedDict

import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt


import torch
#from torch import nn

import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data

import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable

from torchinfo import summary

from tensorflow.keras.preprocessing.sequence import pad_sequences
from torch.nn.utils.rnn import pad_sequence

import dgl
from dgl.nn.pytorch import GraphConv
from dgl.nn.pytorch import GATConv


from sklearn.model_selection import train_test_split
from sklearn.manifold import TSNE

from sklearn.metrics import *


import fiona
import statistics
import shapely
from shapely.geometry import *
from shapely import affinity

# Import tensorboard logger from PyTorch
from torch.utils.tensorboard import SummaryWriter

# Load tensorboard extension for Jupyter Notebook, only need to start TB in the notebook
%load_ext tensorboard

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Functions

## Preprocessing

In [None]:
def extend_coords(coordlist, length):
    for i in range(length-len(coordlist)):
        dist_dict = {}
        for i in range(len(coordlist)-1):
            dist_dict[i]=LineString((coordlist[i], coordlist[i+1])).length
        sort = dict(sorted(dist_dict.items(), key=lambda x: x[1], reverse=True))
        line_new = [coordlist[i] for i in range(len(coordlist))]
        line_new.insert(next(iter(sort))+1, LineString((coordlist[next(iter(sort))], coordlist[next(iter(sort))+1])).interpolate(0.5, normalized = True).coords[0])
        coordlist = line_new
    return coordlist

In [None]:
# adapted from Yan et al. 2021

def normalize_shape_geometry(buildings, scaling = 'x', scale_type = 'minmax'):
    df_use = copy.deepcopy(buildings)
    
    # Verschiebung in den Mittelpunkt auf Basis Centroid
   
    df_use['mu_x'] = pd.Series([geo.x for geo in df_use['centroid_norm']])
    df_use['mu_y'] = pd.Series([geo.y for geo in df_use['centroid_norm']])
    df_use['geometry'] = pd.Series(
        [affinity.translate(geo, -mx, -my) for mx, my, geo in df_use[['mu_x', 'mu_y', 'geometry']].values])
    
    if scale_type == 'minmax':
        # Maximalster & Minimalster Wert für x
        df_use['x_max'] = pd.Series([max(geo.exterior.xy[0]) for geo in df_use['geometry']])
        df_use['x_min'] = pd.Series([min(geo.exterior.xy[0]) for geo in df_use['geometry']])
        df_use['scale_x'] = (df_use['x_max'] - df_use['x_min'])
    
        # Maximalster & Minimalster Wert für y
        df_use['y_max'] = pd.Series([max(geo.exterior.xy[1]) for geo in df_use['geometry']])
        df_use['y_min'] = pd.Series([min(geo.exterior.xy[1]) for geo in df_use['geometry']])
        df_use['scale_y'] = (df_use['y_max'] - df_use['y_min'])
        
        df_use.drop(['x_max', 'x_min', 'y_max', 'y_min'], axis=1, inplace=True)
    
    elif scale_type == 'stdev':
        df_use['bds']= pd.Series([list(geo.bounds) for geo in df_use['geometry']])
        df_use['dist']= pd.Series([centroid.distance(Point(bbox[0], bbox[1])) for centroid, bbox in df_use[['centroid_norm', 'bds']].values])
        df_use['scale_x'] = df_use['dist'].std()
        df_use['scale_y'] = df_use['dist'].std()
        
        df_use.drop(['bds', 'dist'], axis=1, inplace=True)

    # Geometrie skaliert auf Werte von -1~1 
    if scaling == 'xy': # Achsen individuell skaliert
        df_use['geometry'] = pd.Series(
            [affinity.scale(geo, 1 / del_x, 1 / del_y, origin='centroid') for del_x, del_y, geo in
             df_use[['scale_x', 'scale_y', 'geometry']].values])
    
    elif scaling == 'x': # auf Basis x-Achse skaliert 
        df_use['geometry'] = pd.Series([affinity.scale(geo, 1 / del_x, 1 / del_x, origin='centroid') for del_x, geo in
                                        df_use[['scale_x', 'geometry']].values])
        
    elif scaling == 'y': # auf Basis x-Achse skaliert 
        df_use['geometry'] = pd.Series([affinity.scale(geo, 1 / del_y, 1 / del_y, origin='centroid') for del_y, geo in
                                        df_use[['scale_y', 'geometry']].values])
    
    df_use['centroid'] = Point(0,0)
    
    df_use.drop(['mu_x', 'mu_y'], axis=1, inplace=True)

    return df_use


In [None]:
# alternative normalization method

def normalize_shape_map(buildings, scaling = 'x', scale_type = 'minmax'):
    df_use = copy.deepcopy(buildings)
    
    df_use['min_x'] = df_use.unary_union.bounds[0]
    df_use['min_y'] = df_use.unary_union.bounds[1]
    df_use['max_x'] = df_use.unary_union.bounds[2]
    df_use['max_y'] = df_use.unary_union.bounds[3]
    
    df_use['geometry'] = pd.Series(
        [affinity.translate(geo, -mx, -my) for mx, my, geo in df_use[['min_x', 'min_y', 'geometry']].values])
    
    
    if scale_type == 'minmax':
        df_use['bbox'] = df_use.unary_union.envelope

        p1 = Point(df_use.iloc[0]['bbox'].exterior.coords[0])
        p2 = Point(df_use.iloc[0]['bbox'].exterior.coords[1])
        p3 = Point(df_use.iloc[0]['bbox'].exterior.coords[2])
        l1 = LineString([p1,p2])
        l2 = LineString([p2,p3])
        max_length = l1.length if l1.length > l2.length else l2.length
        df_use['scale_max'] = max_length/2
        
        df_use.drop(['max_x', 'min_x', 'max_y', 'min_y'], axis=1, inplace=True)
        
    elif scale_type == 'stdev':
        pass

        # Geometrie skaliert auf Werte von -1~1 
    if scaling == 'xy': # Achsen individuell skaliert
        df_use['x_scale'] = l1.length/2
        df_use['y_scale'] = l2.length/2
        
        df_use['geometry'] = pd.Series([affinity.scale(geo, xfact = 1/x_scale, yfact = 1/y_scale, origin=(0,0)) 
                                            for x_scale, y_scale, geo in df_use[['x_scale', 'x_scale', 'geometry']].values])
        df_use['geometry'] = pd.Series([affinity.translate(geo, -1, -1) for geo in df_use['geometry'].values])
        
        df_use.drop(['x_scale', 'y_scale'], axis=1, inplace=True)
    
    elif scaling == 'x': # auf Basis x-Achse skaliert 
    
        df_use['geometry'] = pd.Series([affinity.scale(geo, xfact = 1/max_length, yfact = 1/max_length, origin=(0,0)) 
                                            for max_length, geo in df_use[['scale_max','geometry']].values])
        df_use['geometry'] = pd.Series([affinity.translate(geo, -1, -1) for geo in df_use['geometry'].values])
        
    elif scaling == 'y': # auf Basis y-Achse skaliert 
        df_use['geometry'] = pd.Series([affinity.scale(geo, xfact = 1/max_length, yfact = 1/max_length, origin=(0,0)) 
                                            for max_length, geo in df_use[['scale_max','geometry']].values])
        df_use['geometry'] = pd.Series([affinity.translate(geo, -1, -1) for geo in df_use['geometry'].values])
    
    df_use['centroid'] = Point(0,0)

    return df_use



In [None]:
def preprocess_geometries(buildings, shape_dict, max_len, centroid_type = 'geometry', label_col = 'name', mode = 'iterativ'):
    '''
        buildings: gdf of buildings
        shape_dict: dict for class names
        max_len: length of geometries (=number of points). Padding if 0. Longer geometries are not encoded
     '''
    
    df_buildings = copy.deepcopy(buildings)

    
    df_buildings['shape_char'] = pd.Series([shape_dict[n] for n in df_buildings[label_col]])
    df_buildings['centroid'] = gpd.GeoSeries([geo.centroid for geo in df_buildings['geometry']])
    df_buildings['centroid_orig'] = gpd.GeoSeries([geo.centroid for geo in df_buildings['geometry']])
    
    if centroid_type == 'geometry':
        df_buildings['centroid_norm'] = pd.Series([geo.centroid for geo in df_buildings['geometry']])
    elif centroid_type == 'map':
        df_buildings['centroid_norm'] = df_buildings.unary_union.centroid
    
    df_buildings['orig_len'] = pd.Series([len(geo.exterior.coords) for geo in df_buildings['geometry']])
    df_buildings['encod_len'] = pd.Series([int(len(geo.exterior.coords)) for geo in df_buildings['geometry']])
    if max_len != 0:
        
        if mode == 'iterativ':
            df_buildings['geometry'] = pd.Series([Polygon(extend_coords(list(geo.exterior.coords), max_len)) 
                                                  for geo in df_buildings['geometry']])
        elif mode == 'interpolate':
            df_buildings['geometry'] = pd.Series([Polygon([Point(LineString(geo.exterior.coords).interpolate(i/(max_len-1), normalized=True)) 
                                                           for i in range(max_len-1)]) for geo in df_buildings['geometry']])
        elif mode == 'padding':
            pass
        
        df_buildings['encod_len'] = max_len
        
        long_geoms = df_buildings[ df_buildings['orig_len'] > max_len].index
        df_buildings.drop(long_geoms, inplace = True)
        #df_buildings.reset_index(inplace=True, drop=True)
    
    return df_buildings

In [None]:
def rotate_geometries(buildings, sampling = 'none', samples = 1,
                id_name = 'OBJECTID'):
    '''
        buildings: gdf of buildings
        shape_dict: dict for class names
        max_len: length of geometries (=number of points). Padding if 0. Longer geometries are not encoded
        sampling: 'none' if no sampling, 'rotate' if regular rotating, 'randomrotate' if random rotating, default = 'None'
        samples: number of objects per building, default = 1
        id_name: id attribute in buildings properties
     '''
    
    assert samples > 0, "samples must be > 0"
    
    if sampling == 'rotate':
        angles = list(range(360//samples, 360, 360//samples))
    elif sampling == 'randomrotate':
        angles = random.sample(range(360), samples-1)
    else:
        angles = []
    
    df_buildings = copy.deepcopy(buildings)
    
    df_buildings[id_name] = df_buildings[id_name]*1000
    
    for index, row in buildings.iterrows():
        #print(index)
        #print(row['centroid'])
        id_ = row[id_name]*1000
        geom = row['geometry']
        center = row['centroid']
        if row['status'] == 'train':
            for i in angles:
                row[id_name] = id_ + i
                row['geometry'] = affinity.rotate(geom, i, origin=center)
                df_buildings = df_buildings.append(row)
    
    df_buildings.sort_values([id_name], axis = 0, inplace=True)
    df_buildings.reset_index(drop=False, inplace=True)

    return df_buildings

In [None]:
def get_angle(a, b, c):
    if LineString([a, b]).length > LineString([b, c]).length:
        lon_len = LineString([a,b])
    else: 
        lon_len = LineString([b,c])
    
    p1 = np.array(lon_len.coords[1])
    p2 = np.array(lon_len.coords[0])

    # checks orientation of p vector & selects appropriate y_axis_vector
    if (p2[1] - p1[1]) < 0:
        y_axis_vector = np.array([0, -1])
    else:
        y_axis_vector = np.array([0, 1])

    if (p2[0] - p1[0]) < 0 and (p2[1] - p1[1]) :
        y_axis_vector = np.array([0, 1])

    p_unit_vector = (p2 - p1) / np.linalg.norm(p2-p1)
    return np.arccos(np.dot(p_unit_vector, y_axis_vector)) * 180 /math.pi

In [None]:
def rotate_to_y_axis(geom):
    mbr = geom.minimum_rotated_rectangle
    a, b, c = mbr.exterior.coords[0:3]
    rotation_angle = get_angle(a, b, c)
    mbr1 = affinity.rotate(mbr, rotation_angle)
    a1, b1, c1 = mbr1.exterior.coords[0:3]
    if get_angle(a1, b1, c1) > rotation_angle:
        return -rotation_angle
    else:
        return rotation_angle

In [None]:
def rotate_geom_to_y_axis(df, which_data = 'all'):
    df['rotate_angle'] = pd.Series([rotate_to_y_axis(geo) for geo in df['geometry']])
    df['orig_geometry'] = df['geometry']
    if which_data == 'train':
        df['geometry'] = gpd.GeoSeries([affinity.rotate(geo, angle) if (status == 'train') else geo for geo, angle, status in df[['orig_geometry', 'rotate_angle', 'status']].values])
    elif which_data == 'test':
        df['geometry'] = gpd.GeoSeries([affinity.rotate(geo, angle) if (status == 'test') else geo for geo, angle, status in df[['orig_geometry', 'rotate_angle', 'status']].values])
    else:
        df['geometry'] = gpd.GeoSeries([affinity.rotate(geo, angle) for geo, angle in df[['orig_geometry', 'rotate_angle']].values])
    return df

In [None]:
def collate(samples):
    graphs, labels = map(list, zip(*samples))
    return dgl.batch(graphs), torch.tensor(labels, dtype=torch.long)

## Datasets

In [None]:
class OrderedSeqs_Dataset(data.Dataset):

    def __init__(self, input_geoms, target_labels, padding = False):
        """
        Inputs:
            input_geoms - input geometries
            target_labels - target labels
            padding - if padding when preprocessing
        """
        super().__init__()
        self.input_geoms = input_geoms
        self.target_labels = target_labels
        #self.seq_len = max_sequence_length
        self.generate_dataset()

    def generate_dataset(self):
        
        zipped = zip(self.input_geoms, self.target_labels)
        
        data = []
        labels = []

        for geoms, target in zipped:

            data.append(torch.tensor(geoms))
            labels.append(torch.tensor(target))

        self.data = data
        self.label = labels
        self.size = len(data)

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        data_geometry = self.data[idx]
        data_label = self.label[idx]
        return data_geometry, data_label

In [None]:
class Graph_Dataset(data.Dataset):

    def __init__(self, input_encoding, target_labels, seq_lens, padding = False):
        """
        Inputs:
            input_encoding: list of input encodings - input geometries
            target_labels: list of target labels
            seq_lens: list of sequence lengths of encodings
            padding - if padding when preprocessing
        """
        super().__init__()
        self.input_encoding = input_encoding
        self.seq_lens = seq_lens
        self.target_labels = target_labels
        self.generate_dataset()

    def generate_dataset(self):
        
        all_data_range=0
        graph = []
        for i in range(len(self.input_encoding)):
            point = self.seq_lens[i]
            uu = []
            vv = []
            for j in range(point):
                uu.append(j)
                if j + 1 in range(point):
                    vv.append(j+1)
                else:
                    vv.append(0)
            u = np.concatenate([uu, vv])
            v = np.concatenate([vv, uu])
            g = dgl.graph((torch.tensor(uu), torch.tensor(vv)))
            g = dgl.to_bidirected(g)
            g.edges()

            g.ndata['x'] = torch.tensor(self.input_encoding[i], dtype=torch.float64)
            graph.append(g)
        
        data = []
        for i in range(len(graph)):
            temp = (graph[i], self.target_labels[i])
            data.append(temp)

        self.data = data
        self.size = len(data)

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        data = self.data[idx]
        return data

## Encoding

In [None]:
def encoding_2(building, centroid, scale):
    coord_list = list(building.exterior.coords)
    centroid = centroid.coords[0]
    tensor_rep_orig = []
    for coordinate in coord_list[:-1]:
        tensor_rep_orig.append([(centroid[0]-coordinate[0])/scale, (centroid[1]-coordinate[1])/scale])
    tensor_rep_orig.append([(centroid[0]-coord_list[0][0])/scale, (centroid[1]-coord_list[0][1])/scale])
    return tensor_rep_orig

In [None]:
def encoding_3(building, centroid, scale, intermediate = True):
    coord_list = list(building.exterior.coords)
    centroid = centroid.coords[0]
    tensor_rep_orig = []
    for coordinate in coord_list[:-1]:
        tensor_rep_orig.append([(centroid[0]-coordinate[0])/scale, (centroid[1]-coordinate[1])/scale, int(intermediate)])
    tensor_rep_orig.append([(centroid[0]-coord_list[0][0])/scale, (centroid[1]-coord_list[0][1])/scale, int(not intermediate)])
    return tensor_rep_orig

In [None]:
def encoding_4(building, centroid, scale, intermediate = True):
    coord_list = list(building.exterior.coords)
    centroid = centroid.coords[0]
    tensor_rep_orig = []
    for coordinate in coord_list[:-1]:
        tensor_rep_orig.append([(centroid[0]-coordinate[0])/scale, (centroid[1]-coordinate[1])/scale, int(intermediate), int(not intermediate)])
    tensor_rep_orig.append([(centroid[0]-coord_list[0][0])/scale, (centroid[1]-coord_list[0][1])/scale, int(not intermediate), int(intermediate)])
    return tensor_rep_orig

In [None]:
def encoding_5(building, centroid, scale, intermediate = True):
    coord_list = list(building.exterior.coords)
    centroid = centroid.coords[0]
    tensor_rep_orig = []
    for coordinate in coord_list[:-1]:
        tensor_rep_orig.append([(centroid[0]-coordinate[0])/scale, (centroid[1]-coordinate[1])/scale, int(intermediate), int(not intermediate), int(not intermediate)])
    tensor_rep_orig.append([(centroid[0]-coord_list[0][0])/scale, (centroid[1]-coord_list[0][1])/scale, int(not intermediate), int(not intermediate), int(intermediate)])
    return tensor_rep_orig

In [None]:
# adapted from Yan et al. 2021

def encoding_graph_features(uid, geo, local_features, regional_features, global_features_polygon, 
                            klst = [2, 4], normalize = 'minmax'):    
    
    
    cols = []
    if 'triangle_area' in local_features:
        cols.append('s_abc')
    if 'len_c' in local_features:
        cols.append('l_ab')
    if 'len_b' in local_features:
        cols.append('l_ac')
    if 'angle_a' in local_features:
        cols.append('angle_bac')
    if 'len_a' in local_features:
        cols.append('l_bc')
    if 'height' in local_features:
        cols.append('height_a')    
 
    if 'reg_triangle_area' in regional_features:
        cols.append('s_obc')
    if 'reg_len_c' in regional_features:
        cols.append('l_ob')
    if 'reg_len_b' in regional_features:
        cols.append('l_oc')
    if 'reg_angle_o' in regional_features:
        cols.append('angle_boc')
    if 'reg_len_o' in regional_features:
        cols.append('l_bc')
    if 'reg_height' in regional_features:
        cols.append('height_o')
    if 'semiperimeter' in regional_features:
        cols.append('c_obc')    
    if 'radius' in regional_features:
        cols.append('r_obc') 
        
        
        
    pDic = {}
    
    geom_points = list(geo.exterior.coords)#[:-1]
    geom_points_gdf = gpd.GeoDataFrame(geom_points)
    geom_points_gdf.columns = ['a_x', 'a_y']
    geom_points_gdf['o_x'] = 0
    geom_points_gdf['o_y'] = 0
    geom_points_gdf['UID'] = uid
    geom_points_gdf['PID'] = pd.Series([i for i in range(len(geom_points_gdf))])
    geom_points_gdf['OID_UID'] = pd.Series([uid*1000+ pid for uid, pid in geom_points_gdf[['UID', 'PID']].values])
    
    #geom_points_gdf = gpd.GeoDataFrame()
    for k in klst:
        dft = get_single_features_final_new(uid, geo, k)
        dft.columns = ['k{0}_{1}'.format(k, x) if x in cols else x for x in dft.columns]
        pDic[k] = dft
    columns_all = []
    for k in klst:
        newcols = ['k{0}_{1}'.format(k, x) for x in cols if 'k{0}_{1}'.format(k, x) in pDic[k].columns]
        if 'mean_dist_o' in global_features_polygon:
            if k == klst[-1]:
                newcols.append('l_oa')
        geom_points_gdf = pd.merge(geom_points_gdf, pDic[k][['OID_UID'] + newcols], how='left', on='OID_UID')
        for n in newcols:
            columns_all.append(n)
    #df_detail=df_detail.dropna(axis=1)
    
    
    # MBR
    df_mbr = gpd.GeoDataFrame()
    df_mbr['geometry'] = pd.Series(geo.minimum_rotated_rectangle) # min-rotated rectangle
    df_mbr['xy'] = pd.Series([list(geo.exterior.coords) for geo in df_mbr['geometry']]) # coords of mrr
    
    # Koordinaten für Winkel & Seitenlängen
    df_mbr['x0'] = pd.Series([xy[0][0] for xy in df_mbr['xy']])
    df_mbr['x1'] = pd.Series([xy[1][0] for xy in df_mbr['xy']])
    df_mbr['x2'] = pd.Series([xy[2][0] for xy in df_mbr['xy']])

    df_mbr['y0'] = pd.Series([xy[0][1] for xy in df_mbr['xy']])
    df_mbr['y1'] = pd.Series([xy[1][1] for xy in df_mbr['xy']])
    df_mbr['y2'] = pd.Series([xy[2][1] for xy in df_mbr['xy']])
    
    # Berechnungen Winkel & Seitenlängen
    df_mbr['l1'] = pd.Series(
        [cal_euclidean([x0, y0], [x1, y1]) for x0, y0, x1, y1 in df_mbr[['x0', 'y0', 'x1', 'y1']].values])
    df_mbr['l2'] = pd.Series(
        [cal_euclidean([x0, y0], [x1, y1]) for x0, y0, x1, y1 in df_mbr[['x1', 'y1', 'x2', 'y2']].values])

    df_mbr['a1'] = pd.Series(
        [cal_arc([x0, y0], [x1, y1], True) for x0, y0, x1, y1 in df_mbr[['x0', 'y0', 'x1', 'y1']].values])
    df_mbr['a2'] = pd.Series(
        [cal_arc([x0, y0], [x1, y1], True) for x0, y0, x1, y1 in df_mbr[['x1', 'y1', 'x2', 'y2']].values])
    #
    df_mbr['longer'] = df_mbr['l1'] >= df_mbr['l2']
    #
    
    
    geom_points_gdf['lon_len'] = pd.Series([l1 if longer else l2 for l1, l2, longer in df_mbr[['l1', 'l2', 'longer']].values]).iloc[0]
    geom_points_gdf['short_len'] = pd.Series([l2 if longer else l1 for l1, l2, longer in df_mbr[['l1', 'l2', 'longer']].values]).iloc[0]
    
    # weitere Attribute
    
    geom_points_gdf['Area'] = geo.area
    geom_points_gdf['Perimeter'] = geo.exterior.length
    geom_points_gdf['Area_convex'] = geo.convex_hull.area
    
    if 'area' in global_features_polygon:
        geom_points_gdf['A'] = geom_points_gdf['Area']
    if 'mabr' in global_features_polygon:
        geom_points_gdf['MABR'] = geo.minimum_rotated_rectangle.area
    if 'elongation' in global_features_polygon:
        geom_points_gdf['Elongation'] = pd.Series([short_len / lon_len for short_len, lon_len in geom_points_gdf[['short_len','lon_len']].values])
    if 'circularity' in global_features_polygon:
        geom_points_gdf['Circularity'] = 4 * np.pi * geom_points_gdf['Area'] / geom_points_gdf['Perimeter'] / geom_points_gdf['Perimeter']
    if 'rectangularity' in global_features_polygon:
        geom_points_gdf['Rectangularity'] = pd.Series([area / geo.minimum_rotated_rectangle.area for area in geom_points_gdf['Area'].values])
    if 'squareness' in global_features_polygon:
        geom_points_gdf['Squareness'] = pd.Series([(4*np.sqrt(area) / perimeter)**2 for area, perimeter in geom_points_gdf[['Area', 'Perimeter']].values])    
    if 'convexity' in global_features_polygon:
        geom_points_gdf['Convexity'] = geom_points_gdf['Area'] / geom_points_gdf['Area_convex']
    if 'fractality' in global_features_polygon:
        #geom_points_gdf['Fractality'] = 0 # not implemented yet
        pass
    if 'orientation' in global_features_polygon:
        geom_points_gdf['Orientation'] = rotate_to_y_axis(geo)
          
    #
    if 'mean_dist_o' in global_features_polygon:
        dfg = geom_points_gdf[['UID', 'l_oa']].groupby(['UID'], as_index=False)['l_oa'].agg({'MeanRadius': 'mean'})
        #
        #df_features = pd.merge(df_features, dft, how='left', on='OBJECTID')
        geom_points_gdf = pd.merge(geom_points_gdf, dfg, how='left', on='UID')
        geom_points_gdf.drop(['l_oa'], axis=1, inplace=True)    
    
    geom_points_gdf.drop(['a_x', 'a_y', 'o_x', 'o_y', 'UID', 'PID', 'OID_UID',
                         'lon_len', 'short_len', 'Area', 'Perimeter', 'Area_convex'], axis=1, inplace=True)

    if normalize != 'none':
        geom_points_gdf = get_normalize_features_final(geom_points_gdf, normalize)
    
    return np.array(geom_points_gdf.values.tolist())

In [None]:
# Encoding using point coordinates and angle at each point

def encoding_graph2_features(uid, geo, normalize = 'minmax'):
    
    k = 1
    geom_points = list(geo.exterior.coords)#[:-1]
    geom_points_gdf = gpd.GeoDataFrame(geom_points)
    geom_points_gdf.columns = ['a_x', 'a_y']
    geom_points_gdf['UID'] = uid
    geom_points_gdf['PID'] = pd.Series([i for i in range(len(geom_points_gdf))])
    geom_points_gdf['OID_UID'] = pd.Series([uid*1000+ pid for uid, pid in geom_points_gdf[['UID', 'PID']].values])
    geom_points_gdf['b_key'] = pd.Series(
        [(id_ - k) if id_ > (k-1) else id_+(len(geom_points_gdf)-k) for id_ in geom_points_gdf['PID']])
    geom_points_gdf['b_x'] = pd.Series([geom_points_gdf['a_x'][i] for i in geom_points_gdf['b_key']])
    geom_points_gdf['b_y'] = pd.Series([geom_points_gdf['a_y'][i] for i in geom_points_gdf['b_key']])
    geom_points_gdf['c_key'] = pd.Series(
        [(id_ + k) if id_ < len(geom_points_gdf)-1-(k-1) else id_-k-1 for id_ in geom_points_gdf['PID']])
    geom_points_gdf['c_x'] = pd.Series([geom_points_gdf['a_x'][i] for i in geom_points_gdf['c_key']])
    geom_points_gdf['c_y'] = pd.Series([geom_points_gdf['a_y'][i] for i in geom_points_gdf['c_key']])

    geom_points_gdf['arc_ba'] = round(
        1 - np.arctan2(geom_points_gdf['a_y'] - geom_points_gdf['b_y'], geom_points_gdf['a_x'] - geom_points_gdf['b_x']) / np.pi, 6)
    geom_points_gdf['arc_ac'] = round(
        1 - np.arctan2(geom_points_gdf['c_y'] - geom_points_gdf['a_y'], geom_points_gdf['c_x'] - geom_points_gdf['a_x']) / np.pi, 6)

    geom_points_gdf['angle_bac'] = pd.Series([(ac - ba - 1) % 2 for ba, ac in geom_points_gdf[['arc_ba', 'arc_ac']].values])

    cols = ['angle_bac']
    geom_points_gdf = geom_points_gdf[[x for x in cols if x in geom_points_gdf.columns]]

    
    if normalize != 'none':

        df_features = copy.deepcopy(geom_points_gdf)

        df_stat = df_features[cols].describe().transpose()
        for col in cols:
            col_min, col_max, col_std, col_mean = df_stat.loc[col][['min', 'max', 'std', 'mean']].values
            if normalize == 'zscore':
                df_features[col] = (df_features[col] - col_mean) / col_std
            elif normalize == 'minmax':
                df_features[col] = (df_features[col] - col_min) / (col_max - col_min)

        geom_points_gdf = df_features

    return np.array(geom_points_gdf.values.tolist())

In [None]:
# adapted from Yan et al. 2021

def get_single_features_final_new(uid, geo, k):
    geom_points = list(geo.exterior.coords)
    geom_points_gdf = gpd.GeoDataFrame(geom_points)
    geom_points_gdf.columns = ['a_x', 'a_y']
    geom_points_gdf['o_x'] = 0
    geom_points_gdf['o_y'] = 0
    geom_points_gdf['UID'] = uid
    geom_points_gdf['PID'] = pd.Series([i for i in range(len(geom_points_gdf))])
    geom_points_gdf['OID_UID'] = pd.Series([uid*1000+ pid for uid, pid in geom_points_gdf[['UID', 'PID']].values])
    geom_points_gdf['b_key'] = pd.Series(
        [(id_ - k) if id_ > (k-1) else id_+(len(geom_points_gdf)-k) for id_ in geom_points_gdf['PID']])
    geom_points_gdf['b_x'] = pd.Series([geom_points_gdf['a_x'][i] for i in geom_points_gdf['b_key']])
    geom_points_gdf['b_y'] = pd.Series([geom_points_gdf['a_y'][i] for i in geom_points_gdf['b_key']])
    geom_points_gdf['c_key'] = pd.Series(
        [(id_ + k) if id_ < len(geom_points_gdf)-1-(k-1) else id_-k-1 for id_ in geom_points_gdf['PID']])
    geom_points_gdf['c_x'] = pd.Series([geom_points_gdf['a_x'][i] for i in geom_points_gdf['c_key']])
    geom_points_gdf['c_y'] = pd.Series([geom_points_gdf['a_y'][i] for i in geom_points_gdf['c_key']])
    geom_points_gdf['Area'] = geo.area

# AB,AC,BC,OA,OB,OC
    geom_points_gdf['l_ab'] = pd.Series(
        [cal_euclidean([ax, ay], [bx, by]) for ax, ay, bx, by in geom_points_gdf[['a_x', 'a_y', 'b_x', 'b_y']].values])
    geom_points_gdf['l_ac'] = pd.Series(
        [cal_euclidean([ax, ay], [cx, cy]) for ax, ay, cx, cy in geom_points_gdf[['a_x', 'a_y', 'c_x', 'c_y']].values])
    geom_points_gdf['l_bc'] = pd.Series(
        [cal_euclidean([cx, cy], [bx, by]) for cx, cy, bx, by in geom_points_gdf[['c_x', 'c_y', 'b_x', 'b_y']].values])
    geom_points_gdf['l_oa'] = pd.Series(
        [cal_euclidean([ax, ay], [ox, oy]) for ax, ay, ox, oy in geom_points_gdf[['a_x', 'a_y', 'o_x', 'o_y']].values])
    geom_points_gdf['l_ob'] = pd.Series(
        [cal_euclidean([bx, by], [ox, oy]) for bx, by, ox, oy in geom_points_gdf[['b_x', 'b_y', 'o_x', 'o_y']].values])
    geom_points_gdf['l_oc'] = pd.Series(
        [cal_euclidean([cx, cy], [ox, oy]) for cx, cy, ox, oy in geom_points_gdf[['c_x', 'c_y', 'o_x', 'o_y']].values])
    #

    geom_points_gdf['arc_ba'] = round(
        1 - np.arctan2(geom_points_gdf['a_y'] - geom_points_gdf['b_y'], geom_points_gdf['a_x'] - geom_points_gdf['b_x']) / np.pi, 6)
    geom_points_gdf['arc_ac'] = round(
        1 - np.arctan2(geom_points_gdf['c_y'] - geom_points_gdf['a_y'], geom_points_gdf['c_x'] - geom_points_gdf['a_x']) / np.pi, 6)
    geom_points_gdf['arc_bc'] = round(
        1 - np.arctan2(geom_points_gdf['b_y'] - geom_points_gdf['c_y'], geom_points_gdf['b_x'] - geom_points_gdf['c_x']) / np.pi, 6)
    geom_points_gdf['arc_ob'] = round(
        1 - np.arctan2(geom_points_gdf['b_y'] - geom_points_gdf['o_y'], geom_points_gdf['b_x'] - geom_points_gdf['o_x']) / np.pi, 6)
    geom_points_gdf['arc_oc'] = round(
        1 - np.arctan2(geom_points_gdf['c_y'] - geom_points_gdf['o_y'], geom_points_gdf['c_x'] - geom_points_gdf['o_x']) / np.pi, 6)
    # ;
    geom_points_gdf['angle_bac'] = pd.Series([(ac - ba - 1) % 2 for ba, ac in geom_points_gdf[['arc_ba', 'arc_ac']].values])
    geom_points_gdf['angle_cba'] = pd.Series([(ba - bc - 1) % 2 for ba, bc in geom_points_gdf[['arc_ba', 'arc_bc']].values])
    geom_points_gdf['angle_boc'] = pd.Series([(oc - ob) % 2 for ob, oc in geom_points_gdf[['arc_ob', 'arc_oc']].values])
    geom_points_gdf['angle_cbo'] = pd.Series([(bo - bc - 1) % 2 for bo, bc in geom_points_gdf[['arc_ob', 'arc_bc']].values])
    #
    geom_points_gdf['angle_bac_change'] = pd.Series(
        [(ac - ba) % 2 for ba, ac in geom_points_gdf[['arc_ba', 'arc_ac']].values])
    geom_points_gdf['angle_bac_change'] = pd.Series(
        [change if change <= 1 else change - 2 for change in geom_points_gdf['angle_bac_change']])

    #
    geom_points_gdf['rotate_bac'] = pd.Series([angle if angle < 1 else 2 - angle for angle in geom_points_gdf['angle_bac']])
    geom_points_gdf['rotate_boc'] = pd.Series([angle if angle < 1 else 2 - angle for angle in geom_points_gdf['angle_boc']])
    #
    
    geom_points_gdf['height_a'] = pd.Series([a*np.sin(angle_b) for a, angle_b in geom_points_gdf[['l_bc', 'angle_cba']].values])
    geom_points_gdf['height_o'] = pd.Series([o*np.sin(angle_b) for o, angle_b in geom_points_gdf[['l_bc', 'angle_cbo']].values])
    
    # Area of Tri_ABC
    geom_points_gdf['s_abc'] = pd.Series([(-1 if angle < 1 else 1) * cal_area(l1, l2, l3) for l1, l2, l3, angle in
                                    geom_points_gdf[['l_ab', 'l_bc', 'l_ac', 'angle_bac']].values])
    # Area of Tri_OBC
    geom_points_gdf['s_obc'] = pd.Series([cal_area(l1, l2, l3) for l1, l2, l3 in geom_points_gdf[['l_ob', 'l_bc', 'l_oc']].values])
    # of Tri_OBC
    geom_points_gdf['c_obc'] = (geom_points_gdf['l_ob'] + geom_points_gdf['l_oc'] + geom_points_gdf['l_bc']) / 3
    geom_points_gdf['r_obc'] = geom_points_gdf['s_obc'] / geom_points_gdf['c_obc']

    #     #
    geom_points_gdf['s_abc'] = pd.Series([(-1 if angle < 1 else 1) * feat / area for feat, area, angle in
                                    geom_points_gdf[['s_abc', 'Area', 'angle_bac']].values])
    geom_points_gdf['l_bc'] = pd.Series([(-1 if angle < 1 else 1) * feat / np.sqrt(area) for feat, area, angle in
                                   geom_points_gdf[['l_bc', 'Area', 'angle_bac']].values])
    geom_points_gdf['s_obc'] = pd.Series([feat / area for feat, area in geom_points_gdf[['s_obc', 'Area']].values])
    geom_points_gdf['c_obc'] = pd.Series([feat / np.sqrt(area) for feat, area in geom_points_gdf[['c_obc', 'Area']].values])
    geom_points_gdf['r_obc'] = pd.Series([feat / np.sqrt(area) for feat, area in geom_points_gdf[['r_obc', 'Area']].values])
    geom_points_gdf['l_oa'] = pd.Series([feat / np.sqrt(area) for feat, area in geom_points_gdf[['l_oa', 'Area']].values])

    cols = ['OBJECTID', 'PID', 'UID', 'OID_UID', 'isBegin', 'isStart', 'Area'
        , 'l_bc', 'l_oa', 'l_ab', 'l_ac', 'l_ob', 'l_oc'
        , 'rotate_bac', 'rotate_boc'
        , 'angle_bac_change', 'angle_bac', 'angle_boc'
        , 's_abc', 's_obc', 'c_obc', 'r_obc'
        , 'height_a', 'height_o'
            ]
    geom_points_gdf = geom_points_gdf[[x for x in cols if x in geom_points_gdf.columns]]
    return geom_points_gdf

In [None]:
# adapted from Yan et al. 2021

def get_normalize_features_final(df_features, norm_type):
    df_features = copy.deepcopy(df_features)
    cols = [x for x in df_features.columns if 'k' in x and 'rotate' not in x]
    #
    df_stat = df_features[cols].describe().transpose()
    for col in cols:
        col_min, col_max, col_std, col_mean = df_stat.loc[col][['min', 'max', 'std', 'mean']].values
        if norm_type == 'zscore':
            df_features[col] = (df_features[col] - col_mean) / col_std
        elif norm_type == 'minmax':
            df_features[col] = (df_features[col] - col_min) / (col_max - col_min)
    
    return df_features

In [None]:
# adapted from Yan et al. 2021

def cal_euclidean(p1, p2):
    return np.linalg.norm([p1[0] - p2[0], p1[1] - p2[1]])

def cal_area(l1, l2, l3):
    p = (l1 + l2 + l3) / 2
    area = p * (p - l1) * (p - l2) * (p - l3)
    area = 0 if area <= 0 else np.sqrt(area)
    # area=np.sqrt(p*(p-l1)*(p-l2)*(p-l3))
    return area

def cal_arc(p1, p2, degree=False):
    dx, dy = p2[0] - p1[0], p2[1] - p1[1]
    arc = np.pi - np.arctan2(dy, dx)
    return arc / np.pi * 180 if degree else arc

In [None]:
def write_tensors(buildings, encoding_schemes, k_list,
                  local_features, regional_features, global_features_polygon,
                  intermediate = True, id_name = 'OBJECTID'):
    '''
        buildings: list of building geometries
        shape_dict: dict for class names
        encoding_schemes: encoding scheme    '2': (x, y)  = default
                                            '3': (x,y, 1) (single one-hot-vector for intermediate/end point) 
                                            '4': (x,y, 1, 0) (double one-hot-vector for intermediate/end point)
                                            '5': (x,y, 1, 0, 0) (triple one-hot-vector for intermediate/end point)
                                            'f': feature encoding as of Yan et al. 2021
                                            2f: combining 2 + f
                                            5f: combining 5+f
        local_features, regional_features, global_features_polygon: selected features
        intermediate: encoding value of intermediate points (1, 0, 0) or (0, 1, 1)
        id_name: id attribute in buildings properties
     '''
    
    print('Start encoding features at ' + str(datetime.now()))
    
    print(encoding_schemes)
    df_buildings = copy.deepcopy(buildings)
    if id_name != 'OBJECTID':
        df_buildings['OBJECTID'] = pd.Series([int(oid) for oid in df_buildings[id_name]], dtype=int)
    df_buildings.reset_index(inplace=True, drop=True)

    scale = 1
    
    cols = ['enc_{}'.format(x) for x in encoding_schemes]
    #print(cols)
    
    for c in cols:
             
        if c == 'enc_2':
            df_buildings[c] = pd.Series([encoding_2(geo, centroid, scale) 
                                      for geo, centroid in df_buildings[['geometry', 'centroid']].values])
            print('       Done encoding 2D-features at ' + str(datetime.now()))
                
        if c == 'enc_3':
            df_buildings[c] = pd.Series([encoding_3(geo, centroid, scale) 
                                         for geo, centroid in df_buildings[['geometry', 'centroid']].values])
            print('       Done encoding 3D-features at ' + str(datetime.now()))
            
        if c == 'enc_4':
            df_buildings[c] = pd.Series([encoding_4(geo, centroid, scale) 
                                         for geo, centroid in df_buildings[['geometry', 'centroid']].values])
            print('       Done encoding 4D-features at ' + str(datetime.now()))

        if c == 'enc_5':
            df_buildings[c] = pd.Series([encoding_5(geo, centroid, scale) 
                                         for geo, centroid in df_buildings[['geometry', 'centroid']].values])
            print('       Done encoding 5D-features at ' + str(datetime.now()))
        
        if c == 'enc_f':
            df_buildings[c] = pd.Series([encoding_graph_features(uid, geo, local_features, regional_features, global_features_polygon, klst = k_list) 
                                         for uid, geo in df_buildings[['OBJECTID', 'geometry']].values])
            print('       Done encoding graph features at ' + str(datetime.now()))
        
        if c == 'enc_graph_2':
            df_buildings[c] = pd.Series([encoding_graph2_features(uid, geo) 
                                         for uid, geo in df_buildings[['OBJECTID', 'geometry']].values])
            print('       Done encoding 2D-graph features at ' + str(datetime.now()))
        
        if c == 'enc_2f':
            df_buildings[c] = pd.Series([np.concatenate((a, b), axis=1) 
                                         for a, b in df_buildings[['enc_2','enc_f']].values])
            print('       Done encoding 2f-graph features at ' + str(datetime.now()))
            
        if c == 'enc_5f':
            df_buildings[c] = pd.Series([np.concatenate((a, b), axis=1) 
                                         for a, b in df_buildings[['enc_5','enc_f']].values])
            print('       Done encoding 5f-graph features at ' + str(datetime.now()))
        
    print('Done encoding features at ' + str(datetime.now()))
    
    return df_buildings

## Reports

In [None]:
def model_test_predict(model, test_data_loader, target_dict, transpose = False, graph = False):
    """
    input:
    model: model
    test_data_loader: test data
    target_dict: dict to translate target values to class name
    
    output:
    targets_list: list of target values
    targets_shape: list of translated target values
    predictions: list of predictions
    shape_predictions: list of translated predictions
    embeds: embedding state
    """ 
    
    with torch.no_grad():
        correct = 0
        total = 0

        targets_list = []
        targets_shape = []
        predictions = []
        shape_predictions = []

        embeds = []

        for batch in test_data_loader:  
            input_, targets = batch
            
            if transpose == True:
                try:
                    outputs, embedding = model(torch.transpose(input_, 1, 2).float())
                except Exception:
                    outputs, _, embedding = model(torch.transpose(input_, 1, 2).float())
            elif graph == True:
                outputs, embedding = model(input_)
            else:
                outputs, embedding = model(input_.float())
            embeds.append([targets.float(), embedding])

            _, predicted = torch.max(outputs.data, 1)

            total += targets.size(0)
            correct += (predicted == targets).sum().item()

            for t in targets:
                targets_list.append(int(t))
                targets_shape.append(shape_dict_reverse[str(int(t))])
            for p in predicted:
                predictions.append(int(p))
                shape_predictions.append(shape_dict_reverse[str(int(p))])

        print('Test Accuracy of the model on the test polygons: {} %'.format(100 * correct / total))
    
    return targets_list, targets_shape, predictions, shape_predictions, embeds
    

In [None]:
# Embedding Visualization
def embedding_viz(embeddings, folder_filename, n_components=2, perplexity=30.0, learning_rate=200, init='random'):
    """
    input:
    embeddings: embeddings states
    folder_filename: folder + filename
    TSNE parameters:
        n_components
        perplexity
        learning_rate
        init
    
    saves figure under folder_filename and shows plot
    """ 
    first = True
    x = []
    y = []
    cols = []

    for batch in embeddings:
        targets = batch[0].detach().numpy()
        for t in targets:
            cols.append(t)
        ems = batch[1].detach().numpy()
        if first == True:
            embedding = ems
            first = False
        else:
            embedding = np.append(embedding, ems, axis = 0)

    embedding_ = copy.deepcopy(embedding)
    #print(embedding.shape)
    
    X_embedded = TSNE(n_components=n_components, perplexity=perplexity, learning_rate=learning_rate, init=init).fit_transform(embedding_)

    for e in X_embedded:
        x.append(e[0])
        y.append(e[1])

    plt.scatter(x, y, s=3, c = cols, cmap = 'tab10')
    plt.savefig(folder_filename)

In [None]:
def report_statistics(targets_list, predictions_list, target_selection, target_dict):
    
    keys = [target_dict[str(i)] for i in target_selection]
    
    report = classification_report(targets_list, predictions_list, output_dict=True)

    precision = []
    recall = []
    f1_score = []
    support = []

    for i in target_selection:
        precision.append(report[str(i)]['precision'])
        recall.append(report[str(i)]['recall'])
        f1_score.append(report[str(i)]['f1-score'])
        support.append(report[str(i)]['support'])

    fig, ax = plt.subplots(1, 2, figsize = (15,5))

    ax[0].plot(keys, precision, label = 'precision')
    ax[0].plot(keys, recall, label = 'recall')
    ax[0].plot(keys, f1_score, label = 'f1-score')

    ax[0].set_xlabel('shape')
    ax[0].legend()
    ax[0].set_title('Summary')

    ax[1].plot(keys, support, label = 'support')
    ax[1].set_xlabel('shape')
    ax[1].set_ylabel('occurences')
    ax[1].legend()
    ax[1].set_title('Support')

    return precision, recall, f1_score, support

In [None]:
def matrix(predictions_df, target_selection):
    matrix = []
    for shapes in target_selection:
        subset = predictions_df[predictions_df['target'] == shapes]
        shape_list = []
        for s in target_selection:
            subsubset = subset[predictions_df['clas_prediction'] == s]
            shape_list.append(len(subsubset)/len(subset))
        matrix.append(shape_list)
    
    f, ax = plt.subplots(figsize = (8,8))

    ax.matshow(np.array(matrix),cmap=plt.cm.Blues)
    ax.set_xticks([i for i in range(9)])
    ax.set_xticklabels([i for i in ('E', 'F', 'H', 'I', 'L', 'O', 'T', 'U', 'Z')])
    ax.set_yticks([i for i in range(9)])
    ax.set_yticklabels([i for i in ('E', 'F', 'H', 'I', 'L', 'O', 'T', 'U', 'Z')])

# Model

In [None]:
folder = 'path to folder for results'

In [None]:
if not os.path.exists(folder):
    os.makedirs(folder)
imagefolder = folder+ 'img/'
if not os.path.exists(imagefolder):
    os.makedirs(imagefolder)

# Hyperparameter

In [None]:
# Hyper-parameters
hp = {
    'train_test_split': 0.8,
    'shuffled_train': True,
    'batch_size': 16,
    'num_epochs': 100,
    'learning_rate': 0.001,
    'dropout': 0.3,
    'rec_dropout': 0.3,    
    'input_size': 64,
    'num_classes': 10,  
}

# Data processing

In [None]:
exp = {
    'normalize': True,
    'mode': 'iterativ', # 'iterativ', 'interpolate' or 'padding'
    'k_list': [1,2],
    'scaling': 'x', # 'none', 'x', 'y' or 'xy'
    'scale_method': 'minmax', # 'minmax' or 'stdev' 
    'centroid': 'geometry', # 'geometry' or 'map'
    'sampling': 'none', # 'none', 'rotate' or 'random'
    'samples': 1, # assert >0
    'rotate_to_y_axis': 'none', # 'none', 'all', 'train' or 'test'
    }


In [None]:
encodings_schemes = ['2', '5', 'f', '2f', '5f']
test_selection = [0,1,2,3,4,5,6,7,8,9]

local_features = [#'triangle_area', # Area of the triangle ABC
                   'len_c', # Length of adjacent line BA
                   'len_b', # Length of adjacent line AC
                   'angle_a', # Turning angle at point A
                   'len_a', # Length of opposite line from point A (BC)
                   #'height', # Height of the triangle ABC
                  ]

regional_features = [#'reg_triangle_area', # Area of the triangle OBC
                   'reg_len_c', # Length of adjacent line BO
                   'reg_len_b', # Length of adjacent line OC
                   'reg_angle_o', # Turning angle at point O
                   #'reg_height', # Height of the triangle OBC
                   'semiperimeter', # semi_perimeter (BO+OC+BC)/3
                   'radius', # 'reg_triangle_area' / 'semiperimeter'
                  ]


global_features_polygon = [#'area', # Area of the polygon
                           #'mabr', # Minimum area bounding rectangle
                           'elongation', # Length to width ratio of the MABR
                           'circularity', # Deviation between polygon and its equal-perimeter circle.
                           'rectangularity', # Deviation between polygon and its MABR
                           'squareness', # Deviation between polygon and its equal-area square
                           'convexity', # Deviation between polygon and its convex hull
                           #'fractality', # Edge roughness or smoothness
                           #'orientation', # Angle between major axis of the MABR and the horizontal direction
                           'mean_dist_o', # Mean distance from vertices to centroid
                          ]

global_features_line = [#'l_mabr', # The minimum area bounding rectangle of the line
                        #'l_mabr_area', # The area of the MABR of the line
                        #'len_anchor_line', # The length of the MABR
                        #'bandwidth', # The width of the MABR
                        #'segmentation', # Distance from the beginning to the location on the anchor line where the maximum
                        #'width_pos', # Maximum deviation on one side of the anchor line
                        #'width_neg', # Maximum deviation on other side of the anchor line
                        #'concurrence', # Number of times line crosses anchor line
                        #'error_variance', # Discrete approximation of total discrepancy between line and anchor line
                        #'bendings', # Number of bends
                        #'sinuosity', # Number of inflection points
                        #'directionality', # Deviation between line length and anchor line length
                       ]

In [None]:
text_file = open(folder+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
for key, value in hp.items():
    text_file.write(key + ': ' + str(value) + '\n')

text_file.write('\n')
text_file.write('experiment parameters:' + '\n')
for key, value in exp.items():
    text_file.write(key + ': ' + str(value) + '\n')

text_file.write('\n')
text_file.write('Encoding_schemes: ' + str(encodings_schemes))
    
text_file.close()

## Loading data

In [None]:
buildings_shapes = gpd.read_file('path to file') # File from Yan et al. 2021

## Classes

In [None]:
shape_dict = {'E' : 0, 'F' : 1, 'H' : 2, 'I' : 3, 'L' : 4, 'O' : 5, 'T' : 6, 'U' : 7, 'Y' : 8, 'Z' : 9}
shape_dict_reverse = {'0': 'E', '1': 'F', '2': 'H', '3': 'I', '4': 'L', '5': 'O', '6': 'T', '7': 'U', '8': 'Y', '9': 'Z'}

## Preprocessing

In [None]:
print('Start preprocessing at ' + str(datetime.now()))

buildings_shapes = preprocess_geometries(buildings_shapes, shape_dict, hp['input_size'], centroid_type = exp['centroid'], mode = exp['mode'])

if exp['normalize']:
    if exp['centroid'] == 'geometry':
        buildings_shapes = normalize_shape_geometry(buildings_shapes, scaling = exp['scaling'], scale_type = exp['scale_method'])
    elif exp['centroid'] == 'map':
        buildings_shapes = normalize_shape_map(buildings_shapes, scaling = exp['scaling'], scale_type = exp['scale_method'])

# Train-Test-Split
train_buildings, test_buildings = train_test_split(buildings_shapes, train_size = hp['train_test_split'])
train_buildings['status'] = 'train'
test_buildings['status'] = 'test'
building_data = pd.concat([train_buildings, test_buildings])
building_data.sort_index(inplace=True)

print('End preprocessing at ' + str(datetime.now()))

In [None]:
building_data

### Sampling

In [None]:
if exp['sampling'] != 'none':
    building_data = rotate_geometries(building_data, sampling = exp['sampling'], samples = exp['samples'])

if exp['rotate_to_y_axis'] != 'none':
    building_data = rotate_geom_to_y_axis(building_data, which_data = exp['rotate_to_y_axis'])

### Encoding

In [None]:
building_data = write_tensors(building_data, encodings_schemes, exp['k_list'],
                              local_features, regional_features, global_features_polygon)

In [None]:
if exp['mode'] == 'padding':
    building_data['enc_2'] = pd.Series([np.concatenate((enc_2, np.zeros([encod_len-orig_len, 2])), axis=0) 
                                        for enc_2, encod_len, orig_len in building_data[['enc_2', 'encod_len', 'orig_len']].values])
    building_data['enc_5'] = pd.Series([np.concatenate((enc_5, np.zeros([encod_len-orig_len, 5])), axis=0) 
                                        for enc_5, encod_len, orig_len in building_data[['enc_5', 'encod_len', 'orig_len']].values])
    building_data['enc_f'] = pd.Series([np.concatenate((enc_f, np.zeros([encod_len-orig_len, enc_f.shape[1]])), axis=0) 
                                        for enc_f, encod_len, orig_len in building_data[['enc_f', 'encod_len', 'orig_len']].values])
    building_data['enc_2f'] = pd.Series([np.concatenate((enc_2f, np.zeros([encod_len-orig_len, enc_2f.shape[1]])), axis=0) 
                                        for enc_2f, encod_len, orig_len in building_data[['enc_2f', 'encod_len', 'orig_len']].values])
    building_data['enc_5f'] = pd.Series([np.concatenate((enc_5f, np.zeros([encod_len-orig_len, enc_5f.shape[1]])), axis=0) 
                                        for enc_5f, encod_len, orig_len in building_data[['enc_5f', 'encod_len', 'orig_len']].values])

In [None]:
train_buildings = building_data[building_data['status'] == 'train']
test_buildings = building_data[building_data['status'] == 'test']

In [None]:
train_buildings.iloc[0]['enc_5f'].shape

### Histogram

In [None]:
train_buildings.hist(column=["orig_len"], bins= 80, figsize=(10, 8))

In [None]:
#---

## Data Loader

In [None]:
print('Start Data Loader processing at ' + str(datetime.now()))

In [None]:
data_geoms_train = train_buildings['enc_2'].tolist()
data_targets_train = train_buildings['shape_char'].tolist()
data_len_train = train_buildings['encod_len'].tolist()

data_geoms_test = test_buildings['enc_2'].tolist()
data_targets_test = test_buildings['shape_char'].tolist()
data_len_test = test_buildings['encod_len'].tolist()

data_geoms_train_5 = train_buildings['enc_5'].tolist()
data_geoms_test_5 = test_buildings['enc_5'].tolist()

data_geoms_train_G = train_buildings['enc_f'].tolist()
data_geoms_test_G = test_buildings['enc_f'].tolist()

data_geoms_train_G2 = train_buildings['enc_2f'].tolist()
data_geoms_test_G2 = test_buildings['enc_2f'].tolist()

data_geoms_train_G5 = train_buildings['enc_5f'].tolist()
data_geoms_test_G5 = test_buildings['enc_5f'].tolist()

### Ordered Sequence

In [None]:
train_dataset_c = OrderedSeqs_Dataset(data_geoms_train, data_targets_train)
test_dataset_c = OrderedSeqs_Dataset(data_geoms_test, data_targets_test)

train_dataset_s = OrderedSeqs_Dataset(data_geoms_train_5, data_targets_train)
test_dataset_s = OrderedSeqs_Dataset(data_geoms_test_5, data_targets_test)

train_dataset_f = OrderedSeqs_Dataset(data_geoms_train_G, data_targets_train)
test_dataset_f = OrderedSeqs_Dataset(data_geoms_test_G, data_targets_test)

train_dataset_2f = OrderedSeqs_Dataset(data_geoms_train_G2, data_targets_train)
test_dataset_2f = OrderedSeqs_Dataset(data_geoms_test_G2, data_targets_test)

train_dataset_5f = OrderedSeqs_Dataset(data_geoms_train_G5, data_targets_train)
test_dataset_5f = OrderedSeqs_Dataset(data_geoms_test_G5, data_targets_test)

In [None]:
print("Size of Train Dataset:", len(train_dataset_c))
print("Size of Test Dataset:", len(test_dataset_c))

In [None]:
train_data_loader_c = data.DataLoader(train_dataset_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
data_inputs_c, data_labels_c = next(iter(train_data_loader_c))

train_data_loader_s = data.DataLoader(train_dataset_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
data_inputs_s, data_labels_s = next(iter(train_data_loader_s))

train_data_loader_f = data.DataLoader(train_dataset_f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
data_inputs_f, data_labels_f = next(iter(train_data_loader_f))

train_data_loader_2f = data.DataLoader(train_dataset_2f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
data_inputs_2f, data_labels_2f = next(iter(train_data_loader_2f))

train_data_loader_5f = data.DataLoader(train_dataset_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
data_inputs_5f, data_labels_5f = next(iter(train_data_loader_5f))

In [None]:
test_data_loader_c = data.DataLoader(test_dataset_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
test_data_loader_s = data.DataLoader(test_dataset_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
test_data_loader_f = data.DataLoader(test_dataset_f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
test_data_loader_2f = data.DataLoader(test_dataset_2f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
test_data_loader_5f = data.DataLoader(test_dataset_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)

In [None]:
data_inputs_5f.shape

In [None]:
data_inputs_c.shape[2]

### Graph

In [None]:
train_dataset_graph_c = Graph_Dataset(data_geoms_train, data_targets_train, data_len_train)
test_dataset_graph_c = Graph_Dataset(data_geoms_test, data_targets_test, data_len_test)

train_dataset_graph_s = Graph_Dataset(data_geoms_train_5, data_targets_train, data_len_train)
test_dataset_graph_s = Graph_Dataset(data_geoms_test_5, data_targets_test, data_len_test)

train_dataset_graph_f = Graph_Dataset(data_geoms_train_G, data_targets_train, data_len_train)
test_dataset_graph_f = Graph_Dataset(data_geoms_test_G, data_targets_test, data_len_test)

train_dataset_graph_2f = Graph_Dataset(data_geoms_train_G2, data_targets_train, data_len_train)
test_dataset_graph_2f = Graph_Dataset(data_geoms_test_G2, data_targets_test, data_len_test)

train_dataset_graph_5f = Graph_Dataset(data_geoms_train_G5, data_targets_train, data_len_train)
test_dataset_graph_5f = Graph_Dataset(data_geoms_test_G5, data_targets_test, data_len_test)

In [None]:
print("Size of Train Dataset:", len(train_dataset_graph_c))
print("Size of Test Dataset:", len(test_dataset_graph_c))

In [None]:
train_data_loader_graph_c = data.DataLoader(train_dataset_graph_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
data_inputs_graph_c, data_labels_graph_c = next(iter(train_data_loader_graph_c))

train_data_loader_graph_s = data.DataLoader(train_dataset_graph_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
data_inputs_graph_s, data_labels_graph_s = next(iter(train_data_loader_graph_s))

train_data_loader_graph_f = data.DataLoader(train_dataset_graph_f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
data_inputs_graph_f, data_labels_graph_f = next(iter(train_data_loader_graph_f))

train_data_loader_graph_2f = data.DataLoader(train_dataset_graph_2f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
data_inputs_graph_2f, data_labels_graph_2f = next(iter(train_data_loader_graph_2f))

train_data_loader_graph_5f = data.DataLoader(train_dataset_graph_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
data_inputs_graph_5f, data_labels_graph_5f = next(iter(train_data_loader_graph_5f))

In [None]:
test_data_loader_graph_c = data.DataLoader(test_dataset_graph_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
test_data_loader_graph_s = data.DataLoader(test_dataset_graph_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
test_data_loader_graph_f = data.DataLoader(test_dataset_graph_f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
test_data_loader_graph_2f = data.DataLoader(test_dataset_graph_2f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
test_data_loader_graph_5f = data.DataLoader(test_dataset_graph_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)

In [None]:
data_inputs_graph_c

In [None]:
data_inputs_graph_c.ndata['x'].shape[1]

In [None]:
print('End Data Loader processing at ' + str(datetime.now()))

# Models

## tVeerCNN+c

In [None]:
folder_tVeerCNNc = folder+'tVeerCNNc/'
if not os.path.exists(folder_tVeerCNNc):
    os.makedirs(folder_tVeerCNNc)

In [None]:
sequence_length = 2

In [None]:
tVeerCNNc_dims = {2: {'sequence_length': 2,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 1},
                3: {'sequence_length': 3,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 1},
                4: {'sequence_length': 4,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                5: {'sequence_length': 5,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2}}

In [None]:
text_file = open(folder_tVeerCNNc+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
#text_file.write('dense_size: ' + str(dense_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
text_file.write('tVeerCNNc_dims:' + '\n')
for key, value in tVeerCNNc_dims[sequence_length].items():
    text_file.write(key + ': ' + str(value) + '\n')
    
text_file.close()

In [None]:

class tVeerCNNc(nn.Module):
    def __init__(self, input_size, dropout, num_classes):
        super(tVeerCNNc, self).__init__()
        self.num_classes = num_classes
        
        self.conv1d_1 = nn.Sequential(
            nn.Conv1d(in_channels = input_size, 
                      out_channels=32, 
                      kernel_size = tVeerCNNc_dims[sequence_length]['sequence_length'], 
                      padding=tVeerCNNc_dims[sequence_length]['conv1d_1_padding']),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=3)
        )
        
        self.conv1d_2 = nn.Sequential(
            nn.Conv1d(in_channels = 32, out_channels=64, kernel_size = 1, stride=2),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size = tVeerCNNc_dims[sequence_length]['sequence_length'], 
                         padding=tVeerCNNc_dims[sequence_length]['conv1d_2_padding'])
        )
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.dense_1 = nn.Linear(64, 32)
        self.dense_2 = nn.Linear(32, num_classes)
    
    def forward(self, x):
        
        out = self.conv1d_1(x)
        out = self.conv1d_2(out)
        embeddings = out
        
        B, N, C = out.shape
        out = out.reshape([B, N*C])
        
        out = self.dense_1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense_2(out)
        return out, embeddings
        
        
model = tVeerCNNc(hp['input_size'], hp['dropout'], hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the CNN_paper model

print('Start training of tVeerCNNc at ' + str(datetime.now()))

training_loss_tVeerCNNc = []
test_loss_tVeerCNNc = []
test_accuracy_tVeerCNNc = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_c = data.DataLoader(train_dataset_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_c:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_tVeerCNNc.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_c:  
        input_, targets = batch
        input_ = input_.float()

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_tVeerCNNc.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_tVeerCNNc.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1,
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_tVeerCNNc+"model_tVeerCNNc_best.mod")
    
print('End training of tVeerCNNc at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_tVeerCNNc+"model_tVeerCNNc_best.mod")

print('Best epoch of tVeerCNNc: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()},
           folder_tVeerCNNc+"model_tVeerCNNc.mod")

In [None]:
checkpoint = torch.load(folder_tVeerCNNc+"model_tVeerCNNc_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_tVeerCNNc, shape_predictions_tVeerCNNc, embeds = model_test_predict(model, test_data_loader_c, shape_dict_reverse)

In [None]:
# Embedding encoding
emb = []
for e in embeds:
    #print(e[1][:, :, -1])
    emb.append((e[0], e[1][:, :, -1]))
embeddings = emb

embedding_viz(embeddings, imagefolder+'tVeerCNNc.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:
# Test data
targets_list, targets_shape, predictions_tVeerCNNc, shape_predictions_tVeerCNNc, embeds = model_test_predict(model, test_data_loader_c, shape_dict_reverse)

pred_data_tVeerCNNc = {'index': [i for i in range(len(predictions_tVeerCNNc))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_tVeerCNNc,
        'shape_prediction': shape_predictions_tVeerCNNc}
predictionsDF_tVeerCNNc = pd.DataFrame(pred_data_tVeerCNNc)
predictionsDF_tVeerCNNc.to_csv(folder_tVeerCNNc+'predictions_tVeerCNNc.csv')

precision_tVeerCNNc, recall_tVeerCNNc, f1_score_tVeerCNNc, support_tVeerCNNc = report_statistics(targets_list, predictions_tVeerCNNc, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_tVeerCNNc = []
for shapes in test_selection:
    subset = predictionsDF_tVeerCNNc[predictionsDF_tVeerCNNc['target'] == shapes]
    clas_accuracy_tVeerCNNc.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_tVeerCNNc, test_selection)

## tVeerCNN+s

In [None]:
folder_tVeerCNNs = folder+'tVeerCNNs/'
if not os.path.exists(folder_tVeerCNNs):
    os.makedirs(folder_tVeerCNNs)

In [None]:
sequence_length = 5
dense_size = 32

In [None]:
tVeerCNNs_dims = {2: {'sequence_length': 2,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 1},
                3: {'sequence_length': 3,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 1},
                4: {'sequence_length': 4,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                5: {'sequence_length': 5,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2}}

In [None]:
text_file = open(folder_tVeerCNNs+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('dense_size: ' + str(dense_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
text_file.write('tVeerCNNs_dims:' + '\n')
for key, value in tVeerCNNs_dims[sequence_length].items():
    text_file.write(key + ': ' + str(value) + '\n')
    
text_file.close()

In [None]:
class tVeerCNNs(nn.Module):
    def __init__(self, input_size, dropout, dense_size, num_classes):
        super(tVeerCNNs, self).__init__()
        self.num_classes = num_classes
        
        self.conv1d_1 = nn.Sequential(
            nn.Conv1d(in_channels = input_size, 
                      out_channels=32, 
                      kernel_size = tVeerCNNs_dims[sequence_length]['sequence_length'], 
                      padding=tVeerCNNs_dims[sequence_length]['conv1d_1_padding']),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=3)
        )
        
        self.conv1d_2 = nn.Sequential(
            nn.Conv1d(in_channels = 32, out_channels=64, kernel_size = 1, stride=2),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size = tVeerCNNs_dims[sequence_length]['sequence_length'], 
                         padding=tVeerCNNs_dims[sequence_length]['conv1d_2_padding'])
        )
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.dense_1 = nn.Linear(64, dense_size)
        self.dense_2 = nn.Linear(dense_size, num_classes)
    
    def forward(self, x):
        
        out = self.conv1d_1(x)
        out = self.conv1d_2(out)
        embeddings = out
        
        B, N, C = out.shape
        out = out.reshape([B, N*C])
        
        out = self.dense_1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense_2(out)
        return out, embeddings
        
        
model = tVeerCNNs(hp['input_size'], hp['dropout'], dense_size, hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the tVeerCNNs model

print('Start training of tVeerCNNs at ' + str(datetime.now()))

training_loss_tVeerCNNs = []
test_loss_tVeerCNNs = []
test_accuracy_tVeerCNNs = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_s = data.DataLoader(train_dataset_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_s:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_tVeerCNNs.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_s:  
        input_, targets = batch
        input_ = input_.float()
        
        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_tVeerCNNs.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_tVeerCNNs.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1,
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_tVeerCNNs+"model_tVeerCNNs_best.mod")

print('End training of tVeerCNNs at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_tVeerCNNs+"model_tVeerCNNs_best.mod")

print('Best epoch of tVeerCNNs: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, 
           folder_tVeerCNNs+"model_tVeerCNNs.mod")

In [None]:
checkpoint = torch.load(folder_tVeerCNNs+"model_tVeerCNNs_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_tVeerCNNs, shape_predictions_tVeerCNNs, embeds = model_test_predict(model, test_data_loader_s, shape_dict_reverse)

In [None]:
# Embedding encoding
emb = []
for e in embeds:
    #print(e[1][:, :, -1])
    emb.append((e[0], e[1][:, :, -1]))
embeddings = emb

embedding_viz(embeddings, imagefolder+'tVeerCNNs.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:
# Test data
targets_list, targets_shape, predictions_tVeerCNNs, shape_predictions_tVeerCNNs, embeds = model_test_predict(model, test_data_loader_s, shape_dict_reverse)

pred_data_tVeerCNNs = {'index': [i for i in range(len(predictions_tVeerCNNs))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_tVeerCNNs,
        'shape_prediction': shape_predictions_tVeerCNNs}
predictionsDF_tVeerCNNs = pd.DataFrame(pred_data_tVeerCNNs)
predictionsDF_tVeerCNNs.to_csv(folder_tVeerCNNs+'predictions_tVeerCNNs.csv')
#predictionsDF_hh_cnn_paper.hist(column=["clas_prediction"], figsize=(10, 8))

precision_tVeerCNNs, recall_tVeerCNNs, f1_score_tVeerCNNs, support_tVeerCNNs = report_statistics(targets_list, predictions_tVeerCNNs, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_tVeerCNNs = []
for shapes in test_selection:
    subset = predictionsDF_tVeerCNNs[predictionsDF_tVeerCNNs['target'] == shapes]
    clas_accuracy_tVeerCNNs.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_tVeerCNNs, test_selection)

## tVeerCNN+f

In [None]:
folder_tVeerCNNf = folder+'tVeerCNNf/'
if not os.path.exists(folder_tVeerCNNf):
    os.makedirs(folder_tVeerCNNf)

In [None]:
sequence_length = data_inputs_5f.shape[2]
print(sequence_length)
dense_size = 32

In [None]:
tVeerCNNf_dims = {2: {'sequence_length': 2,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 1},
                3: {'sequence_length': 3,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 1},
                4: {'sequence_length': 4,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                5: {'sequence_length': 5,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                 20: {'sequence_length': 20,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 0},
                 22: {'sequence_length': 22,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 0},
                 24: {'sequence_length': 24,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 0},
                 25: {'sequence_length': 25,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 0},
                 26: {'sequence_length': 26,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 0},
                 29: {'sequence_length': 29,
                    'conv1d_1_padding': 1,
                    'conv1d_2_padding': 0}}

In [None]:
text_file = open(folder_tVeerCNNf+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('dense_size: ' + str(dense_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
text_file.write('tVeerCNNf_dims:' + '\n')
for key, value in tVeerCNNf_dims[sequence_length].items():
    text_file.write(key + ': ' + str(value) + '\n')
    
text_file.close()

In [None]:
class tVeerCNNf(nn.Module):
    def __init__(self, input_size, dropout, dense_size, num_classes):
        super(tVeerCNNf, self).__init__()
        self.num_classes = num_classes
        
        self.conv1d_1 = nn.Sequential(
            nn.Conv1d(in_channels = input_size, 
                      out_channels=32, 
                      kernel_size = tVeerCNNf_dims[sequence_length]['sequence_length'], 
                      padding=tVeerCNNf_dims[sequence_length]['conv1d_1_padding']),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=3)#, padding=1)
        )
        
        self.conv1d_2 = nn.Sequential(
            nn.Conv1d(in_channels = 32, out_channels=64, kernel_size = 1, stride=2),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size = 1,
                        )
        )
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.dense_1 = nn.Linear(64, 32)
        self.dense_2 = nn.Linear(32, num_classes)
    
    def forward(self, x):
        
        out = self.conv1d_1(x)
        out = self.conv1d_2(out)
        embeddings = out
        
        B, N, C = out.shape
        out = out.reshape([B, N*C])
        
        out = self.dense_1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense_2(out)
        return out, embeddings
        
        
model = tVeerCNNf(hp['input_size'], hp['dropout'], dense_size, hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the tVeerCNNf model

print('Start training of tVeerCNNf at ' + str(datetime.now()))

training_loss_tVeerCNNf = []
test_loss_tVeerCNNf = []
test_accuracy_tVeerCNNf = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_5f = data.DataLoader(train_dataset_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_5f:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_tVeerCNNf.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_5f:  
        input_, targets = batch
        input_ = input_.float()

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_tVeerCNNf.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_tVeerCNNf.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1,
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_tVeerCNNf+"model_tVeerCNNf_best.mod")

print('End training of tVeerCNNf at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_tVeerCNNf+"model_tVeerCNNf_best.mod")

print('Best epoch of tVeerCNNf: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, 
           folder_tVeerCNNf+"model_tVeerCNNf.mod")

In [None]:
checkpoint = torch.load(folder_tVeerCNNf+"model_tVeerCNNf_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_tVeerCNNf, shape_predictions_tVeerCNNf, embeds = model_test_predict(model, test_data_loader_5f, shape_dict_reverse)

In [None]:
# Embedding encoding
emb = []
for e in embeds:
    #print(e[1][:, :, -1])
    emb.append((e[0], e[1][:, :, -1]))
embeddings = emb

embedding_viz(embeddings, imagefolder+'tVeerCNNf.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_tVeerCNNf = {'index': [i for i in range(len(predictions_tVeerCNNf))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_tVeerCNNf,
        'shape_prediction': shape_predictions_tVeerCNNf}
predictionsDF_tVeerCNNf = pd.DataFrame(pred_data_tVeerCNNf)
predictionsDF_tVeerCNNf.to_csv(folder_tVeerCNNf+'predictions_tVeerCNNf.csv')

precision_tVeerCNNf, recall_tVeerCNNf, f1_score_tVeerCNNf, support_tVeerCNNf = report_statistics(targets_list, predictions_tVeerCNNf, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_tVeerCNNf = []
for shapes in test_selection:
    subset = predictionsDF_tVeerCNNf[predictionsDF_tVeerCNNf['target'] == shapes]
    clas_accuracy_tVeerCNNf.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_tVeerCNNf, test_selection)

## dCNN+c

Architektur von t'Veer et al. 2019, Tiefe übernommen aus Liu et al. 2021

In [None]:
folder_dCNNc = folder+'dCNNc/'
if not os.path.exists(folder_dCNNc):
    os.makedirs(folder_dCNNc)

In [None]:
sequence_length = 2
hidden_size_1 = 64
hidden_size_2 = 1024
dense_size = 256

In [None]:
dCNNc_dims = {2: {'sequence_length': 2,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 1},
                3: {'sequence_length': 3,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 1},
                4: {'sequence_length': 4,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                5: {'sequence_length': 5,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2}}

In [None]:
text_file = open(folder_dCNNc+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('hidden_size_1: ' + str(hidden_size_1) + '\n')
text_file.write('hidden_size_2: ' + str(hidden_size_2) + '\n')
text_file.write('dense_size: ' + str(dense_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
text_file.write('dCNNc_dims:' + '\n')
for key, value in dCNNc_dims[sequence_length].items():
    text_file.write(key + ': ' + str(value) + '\n')
    
text_file.close()

In [None]:
# Paper Version
class dCNNc(nn.Module):
    def __init__(self, input_size, dropout, hidden_size_1, hidden_size_2, dense_size, num_classes):
        super(dCNNc, self).__init__()
        self.num_classes = num_classes
        
        self.conv1d_1 = nn.Sequential(
            nn.Conv1d(in_channels = input_size, 
                      out_channels=hidden_size_1, 
                      kernel_size = dCNNc_dims[sequence_length]['sequence_length'], 
                      padding=dCNNc_dims[sequence_length]['conv1d_1_padding']),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=3)
        )
        
        self.conv1d_2 = nn.Sequential(
            nn.Conv1d(in_channels = hidden_size_1, out_channels=hidden_size_2, kernel_size = 1, stride=2),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size = dCNNc_dims[sequence_length]['sequence_length'], 
                         padding = dCNNc_dims[sequence_length]['conv1d_2_padding'])
        )
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.dense_1 = nn.Linear(hidden_size_2, dense_size)
        self.dense_2 = nn.Linear(dense_size, num_classes)
    
    def forward(self, x):
        
        out = self.conv1d_1(x)
        out = self.conv1d_2(out)
        
        B, N, C = out.shape
        out = out.reshape([B, N*C])
        
        embedding = out
        
        out = self.dense_1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense_2(out)
        return out, embedding
        
        
model = dCNNc(hp['input_size'], hp['rec_dropout'], hidden_size_1, hidden_size_2, dense_size, hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the dCNNc model

print('Start training of dCNNc at ' + str(datetime.now()))

training_loss_dCNNc = []
test_loss_dCNNc = []
test_accuracy_dCNNc = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_c = data.DataLoader(train_dataset_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_c:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_dCNNc.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_c:  
        input_, targets = batch
        input_ = input_.float()

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_dCNNc.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_dCNNc.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1,
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_dCNNc+"model_dCNNc_best.mod")
    
print('End training of dCNNc at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_dCNNc+"model_dCNNc_best.mod")

print('Best epoch of dCNNc: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_dCNNc+"model_dCNNc.mod")

In [None]:
checkpoint = torch.load(folder_dCNNc+"model_dCNNc_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_dCNNc, shape_predictions_dCNNc, embeddings = model_test_predict(model, test_data_loader_c, shape_dict_reverse)

In [None]:
embedding_viz(embeddings, imagefolder+'dCNNc.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:
# Test data
targets_list, targets_shape, predictions_dCNNc, shape_predictions_dCNNc, embeds = model_test_predict(model, test_data_loader_c, shape_dict_reverse)

pred_data_dCNNc = {'index': [i for i in range(len(predictions_dCNNc))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_dCNNc,
        'shape_prediction': shape_predictions_dCNNc}
predictionsDF_dCNNc = pd.DataFrame(pred_data_dCNNc)
predictionsDF_dCNNc.to_csv(folder_dCNNc+'predictions_dCNNc.csv')

precision_dCNNc, recall_dCNNc, f1_score_dCNNc, support_dCNNc = report_statistics(targets_list, predictions_dCNNc, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_dCNNc = []
for shapes in test_selection:
    subset = predictionsDF_dCNNc[predictionsDF_dCNNc['target'] == shapes]
    clas_accuracy_dCNNc.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_dCNNc, test_selection)

## dCNN+s

Architektur von t'Veer et al. 2019, Tiefe übernommen aus Liu et al. 2021

In [None]:
folder_dCNNs = folder+'dCNNs/'
if not os.path.exists(folder_dCNNs):
    os.makedirs(folder_dCNNs)

In [None]:
sequence_length = 5
hidden_size_1 = 64
hidden_size_2 = 1024
dense_size = 256

In [None]:
dCNNs_dims = {2: {'sequence_length': 2,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 1},
                3: {'sequence_length': 3,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 1},
                4: {'sequence_length': 4,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                5: {'sequence_length': 5,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2}}

In [None]:
text_file = open(folder_dCNNs+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('hidden_size_1: ' + str(hidden_size_1) + '\n')
text_file.write('hidden_size_2: ' + str(hidden_size_2) + '\n')
text_file.write('dense_size: ' + str(dense_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
text_file.write('dCNNs_dims:' + '\n')
for key, value in dCNNs_dims[sequence_length].items():
    text_file.write(key + ': ' + str(value) + '\n')
    
text_file.close()

In [None]:
# Paper Version
class dCNNs(nn.Module):
    def __init__(self, input_size, dropout, hidden_size_1, hidden_size_2, dense_size, num_classes):
        super(dCNNs, self).__init__()
        self.num_classes = num_classes
        
        self.conv1d_1 = nn.Sequential(
            nn.Conv1d(in_channels = input_size, 
                      out_channels=hidden_size_1, 
                      kernel_size = dCNNs_dims[sequence_length]['sequence_length'], 
                      padding=dCNNs_dims[sequence_length]['conv1d_1_padding']),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=3)
        )
        
        self.conv1d_2 = nn.Sequential(
            nn.Conv1d(in_channels = hidden_size_1, out_channels=hidden_size_2, kernel_size = 1, stride=2),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size = dCNNs_dims[sequence_length]['sequence_length'], 
                         padding = dCNNs_dims[sequence_length]['conv1d_2_padding'])
        )
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.dense_1 = nn.Linear(hidden_size_2, dense_size)
        self.dense_2 = nn.Linear(dense_size, num_classes)
    
    def forward(self, x):
        
        out = self.conv1d_1(x)
        out = self.conv1d_2(out)
        
        B, N, C = out.shape
        out = out.reshape([B, N*C])
        
        embedding = out
        
        out = self.dense_1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense_2(out)
        return out, embedding
        
        
model = dCNNs(hp['input_size'], hp['rec_dropout'], hidden_size_1, hidden_size_2, dense_size, hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the dCNNs model

print('Start training of dCNNs at ' + str(datetime.now()))

training_loss_dCNNs = []
test_loss_dCNNs = []
test_accuracy_dCNNs = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_s = data.DataLoader(train_dataset_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_s:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_dCNNs.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_s:  
        input_, targets = batch
        input_ = input_.float()

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_dCNNs.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_dCNNs.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_dCNNs+"model_dCNNs_best.mod")
    
print('End training of dCNNs at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_dCNNs+"model_dCNNs_best.mod")

print('Best epoch of dCNNs: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_dCNNs+"model_dCNNs.mod")

In [None]:
checkpoint = torch.load(folder_dCNNs+"model_dCNNs_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_dCNNs, shape_predictions_dCNNs, embeddings = model_test_predict(model, test_data_loader_s, shape_dict_reverse)

In [None]:
embedding_viz(embeddings, imagefolder+'dCNNs.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:
# Test data
targets_list, targets_shape, predictions_dCNNs, shape_predictions_dCNNs, embeds = model_test_predict(model, test_data_loader_s, shape_dict_reverse)

pred_data_dCNNs = {'index': [i for i in range(len(predictions_dCNNs))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_dCNNs,
        'shape_prediction': shape_predictions_dCNNs}
predictionsDF_dCNNs = pd.DataFrame(pred_data_dCNNs)
predictionsDF_dCNNs.to_csv(folder_dCNNs+'predictions_dCNNs.csv')

precision_dCNNs, recall_dCNNs, f1_score_dCNNs, support_dCNNs = report_statistics(targets_list, predictions_dCNNs, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_dCNNs = []
for shapes in test_selection:
    subset = predictionsDF_dCNNs[predictionsDF_dCNNs['target'] == shapes]
    clas_accuracy_dCNNs.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_dCNNs, test_selection)

## dCNN+f

In [None]:
folder_dCNNf = folder+'dCNNf/'
if not os.path.exists(folder_dCNNf):
    os.makedirs(folder_dCNNf)

In [None]:
sequence_length_20 = data_inputs_5f.shape[2]
hidden_size_1 = 64
hidden_size_2 = 1024
dense_size = 256

In [None]:
dCNNf_dims = {2: {'sequence_length': 2,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 1},
                3: {'sequence_length': 3,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 1},
                4: {'sequence_length': 4,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                5: {'sequence_length': 5,
                    'conv1d_1_padding': 2,
                    'conv1d_2_padding': 2},
                 20: {'sequence_length': 20,
                    'conv1d_1_padding': 2,
                    'sequence_length_2': 4,
                    'conv1d_2_padding': 2},
             22: {'sequence_length': 22,
                    'conv1d_1_padding': 2,
                    'sequence_length_2': 4,
                    'conv1d_2_padding': 2},
              24: {'sequence_length': 24,
                    'conv1d_1_padding': 2,
                    'sequence_length_2': 4,
                    'conv1d_2_padding': 2},
             25: {'sequence_length': 25,
                    'conv1d_1_padding': 2,
                    'sequence_length_2': 4,
                    'conv1d_2_padding': 2},
             26: {'sequence_length': 26,
                    'conv1d_1_padding': 2,
                    'sequence_length_2': 4,
                    'conv1d_2_padding': 2},
             29: {'sequence_length': 29,
                    'conv1d_1_padding': 2,
                    'sequence_length_2': 4,
                    'conv1d_2_padding': 2}}

In [None]:
text_file = open(folder_dCNNf+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length_20) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('hidden_size_1: ' + str(hidden_size_1) + '\n')
text_file.write('hidden_size_2: ' + str(hidden_size_2) + '\n')
text_file.write('dense_size: ' + str(dense_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
text_file.write('dCNNf_dims:' + '\n')
for key, value in dCNNf_dims[sequence_length].items():
    text_file.write(key + ': ' + str(value) + '\n')
    
text_file.close()

In [None]:
# Paper Version
class dCNNf(nn.Module):
    def __init__(self, input_size, dropout, hidden_size_1, hidden_size_2, dense_size, num_classes):
        super(dCNNf, self).__init__()
        self.num_classes = num_classes
        
        self.conv1d_1 = nn.Sequential(
            nn.Conv1d(in_channels = input_size, 
                      out_channels=hidden_size_1, 
                      kernel_size = dCNNf_dims[sequence_length_20]['sequence_length'], 
                      padding=dCNNf_dims[sequence_length_20]['conv1d_1_padding']),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=3, stride=3)#, padding=1)
        )
        
        self.conv1d_2 = nn.Sequential(
            nn.Conv1d(in_channels = hidden_size_1, out_channels=hidden_size_2, kernel_size = 1, stride=2),
            nn.ReLU(),
            nn.AvgPool1d(kernel_size = dCNNf_dims[sequence_length_20]['sequence_length_2'], 
                         padding = dCNNf_dims[sequence_length_20]['conv1d_2_padding'])
        )
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.dense_1 = nn.Linear(hidden_size_2, dense_size)
        self.dense_2 = nn.Linear(dense_size, num_classes)
    
    def forward(self, x):
        
        out = self.conv1d_1(x)
        out = self.conv1d_2(out)
        
        B, N, C = out.shape
        out = out.reshape([B,N*C])
        
        embedding = out
        
        out = self.dense_1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense_2(out)
        return out, embedding
        
        
model = dCNNf(hp['input_size'], hp['rec_dropout'], hidden_size_1, hidden_size_2, dense_size, hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length_20), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the dCNNf model

print('Start training of dCNNf at ' + str(datetime.now()))

training_loss_dCNNf = []
test_loss_dCNNf = []
test_accuracy_dCNNf = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_5f = data.DataLoader(train_dataset_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_5f:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_dCNNf.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_5f:  
        input_, targets = batch
        input_ = input_.float()

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_dCNNf.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_dCNNf.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_dCNNf+"model_dCNNf_best.mod")
    
print('End training of dCNNf at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_dCNNf+"model_dCNNf_best.mod")

print('Best epoch of dCNNf: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_dCNNf+"model_dCNNf.mod")

In [None]:
checkpoint = torch.load(folder_dCNNf+"model_dCNNf_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_dCNNf, shape_predictions_dCNNf, embeddings = model_test_predict(model, test_data_loader_5f, shape_dict_reverse)

In [None]:
embedding_viz(embeddings, imagefolder+'dCNNf.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_dCNNf = {'index': [i for i in range(len(predictions_dCNNf))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_dCNNf,
        'shape_prediction': shape_predictions_dCNNf}
predictionsDF_dCNNf = pd.DataFrame(pred_data_dCNNf)
predictionsDF_dCNNf.to_csv(folder_dCNNf+'predictions_dCNNf.csv')


precision_dCNNf, recall_dCNNf, f1_score_dCNNf, support_dCNNf = report_statistics(targets_list, predictions_dCNNf, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_dCNNf = []
for shapes in test_selection:
    subset = predictionsDF_dCNNf[predictionsDF_dCNNf['target'] == shapes]
    clas_accuracy_dCNNf.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_dCNNf, test_selection)

## TriangleCNN

Source: Liu et al. 2021: TriangleConv: A Deep Point Convolutional Network for Recognizing Building Shapes in Map Space

In [None]:
folder_triangleCNN = folder+'triangleCNN/'
if not os.path.exists(folder_triangleCNN):
    os.makedirs(folder_triangleCNN)

In [None]:
sequence_length = 2

In [None]:
text_file = open(folder_triangleCNN+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
def conv_bn_block(input, output, kernel_size):
    return nn.Sequential(
        nn.Conv1d(input, output, kernel_size),
        nn.BatchNorm1d(output),
        nn.ReLU(inplace=True)
    )


def fc_bn_block(input, output):
    return nn.Sequential(
        nn.Linear(input, output),
        nn.BatchNorm1d(output),
        nn.ReLU(inplace=True)
    )

class TriangleConv(nn.Module):
    def __init__(self, layers):
        super(TriangleConv, self).__init__()
        self.layers = layers
        mlp_layers = OrderedDict()
        for i in range(len(self.layers) - 1):
            if i == 0:
                mlp_layers['conv_bn_block_{}'.format(i + 1)] = conv_bn_block(4 * self.layers[i], self.layers[i + 1], 1)
            else:
                mlp_layers['conv_bn_block_{}'.format(i + 1)] = conv_bn_block(self.layers[i], self.layers[i + 1], 1)
        self.mlp = nn.Sequential(mlp_layers)



    def forward(self, X):
        B, N, F = X.shape
        k_indexes = []
        for i in range(N):
            if i == 0:
                k_indexes.append([N - 1, i + 1])
            elif i == N-1:
                k_indexes.append([i - 1, 0])
            else:
                k_indexes.append([i - 1, i+1])
        k_indexes_tensor = torch.Tensor(k_indexes)
        k_indexes_tensor = k_indexes_tensor.long()
        x1 = torch.zeros(B, N, 2, F).to(device)
        for idx, x in enumerate(X):
            x1[idx] = x[k_indexes_tensor]
        x2 = X.reshape([B, N, 1, F]).float()
        x2 = x2.expand(B, N, 2, F)
        x2 = x2-x1
        x3 = x2[:, :, 0:1, :]
        x4 = x2[:, :, 1:2, :]
        x4 = x3-x4
        x5 = X.reshape([B, N, 1, F]).float()
        x2 = x2.reshape([B, N, 1, 2*F])
        x_triangle = torch.cat([x5, x2, x4], dim=3)
        x_triangle=torch.squeeze(x_triangle)
        x_triangle = x_triangle.permute(0, 2, 1)
        x_triangle = torch.tensor(x_triangle,dtype=torch.float32).to(device)
        out = self.mlp(x_triangle)
        out = out.permute(0, 2, 1)
        return out


class triangleCNN(nn.Module):
    def __init__(self, dropout, num_classes):
        super(triangleCNN, self).__init__()

        self.num_classes = num_classes
        self.triangleconv_1 = TriangleConv(layers=[2, 64, 64, 64])
        self.triangleconv_2 = TriangleConv(layers=[64, 512, 1024])
        self.fc_block_4 = fc_bn_block(1024, 512)
        self.drop_4 = nn.Dropout(dropout)
        self.fc_block_5 = fc_bn_block(512, 256)
        self.drop_5 = nn.Dropout(dropout)
        self.fc_6 = nn.Linear(256, self.num_classes)
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm1d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        B, N, C = x.shape
        assert C == 2, 'dimension of x does not match'
        x = self.triangleconv_1(x)
        x = self.triangleconv_2(x)
        x = x.permute(0, 2, 1)
        x = nn.MaxPool1d(N)(x)
        x = x.reshape([B, 1024])
        embedding = x
        x = self.fc_block_4(x)
        x = self.drop_4(x)
        x = self.fc_block_5(x)
        x = self.drop_5(x)
        x = self.fc_6(x)
        x = F.log_softmax(x, dim=-1)

        return x, embedding

model = triangleCNN(hp['dropout'], hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], hp['input_size'], sequence_length), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the triangleCNN model

print('Start training of TriangleCNN at ' + str(datetime.now()))

training_loss_triangleCNN = []
test_loss_triangleCNN = []
test_accuracy_triangleCNN = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_c = data.DataLoader(train_dataset_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_c:
        data_inputs, data_labels = batch
        data_inputs = data_inputs.float()

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_triangleCNN.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_c:  
        input_, targets = batch
        input_ = input_.float()

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_triangleCNN.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_triangleCNN.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_triangleCNN+"model_triangleCNN_best.mod")
    
    
print('End training of TriangleCNN at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_triangleCNN+"model_triangleCNN_best.mod")

print('Best epoch of triangleCNN: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_triangleCNN+"model_triangleCNN.mod")

In [None]:
checkpoint = torch.load(folder_triangleCNN+"model_triangleCNN_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_triangleCNN, shape_predictions_triangleCNN, embeddings = model_test_predict(model, test_data_loader_c, shape_dict_reverse)

In [None]:
embedding_viz(embeddings, imagefolder+'triangleCNN.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:
# Test data
targets_list, targets_shape, predictions_triangleCNN, shape_predictions_triangleCNN, embeds = model_test_predict(model, test_data_loader_c, shape_dict_reverse)

pred_data_triangleCNN = {'index': [i for i in range(len(predictions_triangleCNN))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_triangleCNN,
        'shape_prediction': shape_predictions_triangleCNN}
predictionsDF_triangleCNN = pd.DataFrame(pred_data_triangleCNN)
predictionsDF_triangleCNN.to_csv(folder_triangleCNN+'predictions_triangleCNN.csv')

precision_triangleCNN, recall_triangleCNN, f1_score_triangleCNN, support_triangleCNN = report_statistics(targets_list, predictions_triangleCNN, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_triangleCNN = []
for shapes in test_selection:
    subset = predictionsDF_triangleCNN[predictionsDF_triangleCNN['target'] == shapes]
    clas_accuracy_triangleCNN.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_triangleCNN, test_selection)

## tVeerRNN+c

In [None]:
folder_tVeerRNNc = folder+'tVeerRNNc/'
if not os.path.exists(folder_tVeerRNNc):
    os.makedirs(folder_tVeerRNNc)

In [None]:
sequence_length = 2
hidden_size = 256
num_layers = 2

In [None]:
tVeerRNNc_dims = {2: {'maxpool_kernel': 2},
                  3: {'maxpool_kernel': 3},
                  4: {'maxpool_kernel': 4},
                  5: {'maxpool_kernel': 2},
                 }

In [None]:
text_file = open(folder_tVeerRNNc+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('rec_dropout: ' + str(hp['rec_dropout']) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_layers: ' + str(num_layers) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
# Bidirectional recurrent neural network (many-to-one)
class tVeerRNNc(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, rec_dropout, dropout, num_classes):
        super(tVeerRNNc, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=rec_dropout, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.dense = nn.Linear(hidden_size*2, num_classes)  # 2 for bidirection
    
    def forward(self, x):
        # Set initial states
        h0 = torch.zeros(self.num_layers*2, x.shape[0], self.hidden_size).to(device) # 2 for bidirection 
        c0 = torch.zeros(self.num_layers*2, x.shape[0], self.hidden_size).to(device)
        #print(x.shape[1])
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size*2)
        
        out = self.dropout(out)
        

        B, N, C = out.shape
            #print(out.shape)
            # Decode the hidden state of the last time step
        out = nn.MaxPool1d(N)(out)
            #print(out.shape)

        out = out.reshape([B, C])
        
        embedding = out

        out = self.dense(out)
        
            
        return out, embedding

model = tVeerRNNc(hp['input_size'], hidden_size, num_layers, hp['rec_dropout'], hp['dropout'], hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], sequence_length, hp['input_size']), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the tVeerRNNc model

print('Start training of tVeerRNNc at ' + str(datetime.now()))

training_loss_tVeerRNNc = []
test_loss_tVeerRNNc = []
test_accuracy_tVeerRNNc = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_c = data.DataLoader(train_dataset_c, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_c:
        data_inputs, data_labels = batch

        outputs, _ = model(torch.transpose(data_inputs, 1, 2).float())

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_tVeerRNNc.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_c:  
        input_, targets = batch

        outputs, _ = model(torch.transpose(input_, 1, 2).float())
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_tVeerRNNc.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_tVeerRNNc.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_tVeerRNNc+"model_tVeerRNNc_best.mod")    
    
print('End training of tVeerRNNc at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_tVeerRNNc+"model_tVeerRNNc_best.mod")

print('Best epoch of tVeerRNNc: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_tVeerRNNc+"model_tVeerRNNc.mod")

In [None]:
checkpoint = torch.load(folder_tVeerRNNc+"model_tVeerRNNc_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_tVeerRNNc, shape_predictions_tVeerRNNc, embeddings = model_test_predict(model, test_data_loader_c, shape_dict_reverse, transpose = True)

In [None]:
embedding_viz(embeddings, imagefolder+'tVeerRNNc.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_tVeerRNNc = {'index': [i for i in range(len(predictions_tVeerRNNc))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_tVeerRNNc,
        'shape_prediction': shape_predictions_tVeerRNNc}
predictionsDF_tVeerRNNc = pd.DataFrame(pred_data_tVeerRNNc)
predictionsDF_tVeerRNNc.to_csv(folder_tVeerRNNc+'predictions_tVeerRNNc.csv')


precision_tVeerRNNc, recall_tVeerRNNc, f1_score_tVeerRNNc, support_tVeerRNNc = report_statistics(targets_list, predictions_tVeerRNNc, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_tVeerRNNc = []
for shapes in test_selection:
    subset = predictionsDF_tVeerRNNc[predictionsDF_tVeerRNNc['target'] == shapes]
    clas_accuracy_tVeerRNNc.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_tVeerRNNc, test_selection)

## tVeerRNN+s

In [None]:
folder_tVeerRNNs = folder+'tVeerRNNs/'
if not os.path.exists(folder_tVeerRNNs):
    os.makedirs(folder_tVeerRNNs)

In [None]:
sequence_length = 5
hidden_size = 256
num_layers = 2

In [None]:
tVeerRNNs_dims = {2: {'maxpool_kernel': 2},
                  3: {'maxpool_kernel': 3},
                  4: {'maxpool_kernel': 4},
                  5: {'maxpool_kernel': 2},
                 }

In [None]:
text_file = open(folder_tVeerRNNs+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('rec_dropout: ' + str(hp['rec_dropout']) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_layers: ' + str(num_layers) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
# Bidirectional recurrent neural network (many-to-one)
class tVeerRNNs(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, rec_dropout, dropout, num_classes):
        super(tVeerRNNs, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=rec_dropout, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.dense = nn.Linear(hidden_size*2, num_classes)  # 2 for bidirection
    
    def forward(self, x):
        # Set initial states
        h0 = torch.zeros(self.num_layers*2, x.shape[0], self.hidden_size).to(device) # 2 for bidirection 
        c0 = torch.zeros(self.num_layers*2, x.shape[0], self.hidden_size).to(device)
        #print(x.shape[1])
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size*2)
        
        out = self.dropout(out)
            
        embedding = out[:, -1, :]
        out = self.dense(out[:, -1, :])
            
        return out, embedding

model = tVeerRNNs(hp['input_size'], hidden_size, num_layers, hp['rec_dropout'], hp['dropout'], hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], sequence_length, hp['input_size']), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the tVeerRNNs model

print('Start training of tVeerRNNs at ' + str(datetime.now()))

training_loss_tVeerRNNs = []
test_loss_tVeerRNNs = []
test_accuracy_tVeerRNNs = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_s = data.DataLoader(train_dataset_s, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_s:
        data_inputs, data_labels = batch

        outputs, _ = model(torch.transpose(data_inputs, 1, 2).float())

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_tVeerRNNs.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_s:  
        input_, targets = batch

        outputs, _ = model(torch.transpose(input_, 1, 2).float())
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_tVeerRNNs.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_tVeerRNNs.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1,
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_tVeerRNNs+"model_tVeerRNNs_best.mod")    
    
print('End training of tVeerRNNs at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_tVeerRNNs+"model_tVeerRNNs_best.mod")

print('Best epoch of tVeerRNNs: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_tVeerRNNs+"model_tVeerRNNs.mod")

In [None]:
checkpoint = torch.load(folder_tVeerRNNs+"model_tVeerRNNs_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_tVeerRNNs, shape_predictions_tVeerRNNs, embeddings = model_test_predict(model, test_data_loader_s, shape_dict_reverse, transpose = True)

In [None]:
embedding_viz(embeddings, imagefolder+'tVeerRNNs.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_tVeerRNNs = {'index': [i for i in range(len(predictions_tVeerRNNs))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_tVeerRNNs,
        'shape_prediction': shape_predictions_tVeerRNNs}
predictionsDF_tVeerRNNs = pd.DataFrame(pred_data_tVeerRNNs)
predictionsDF_tVeerRNNs.to_csv(folder_tVeerRNNs+'predictions_tVeerRNNs.csv')

precision_tVeerRNNs, recall_tVeerRNNs, f1_score_tVeerRNNs, support_tVeerRNNs = report_statistics(targets_list, predictions_tVeerRNNs, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_tVeerRNNs = []
for shapes in test_selection:
    subset = predictionsDF_tVeerRNNs[predictionsDF_tVeerRNNs['target'] == shapes]
    clas_accuracy_tVeerRNNs.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_tVeerRNNs, test_selection)

## tVeerRNN+f

In [None]:
folder_tVeerRNNf = folder+'tVeerRNNf/'
if not os.path.exists(folder_tVeerRNNf):
    os.makedirs(folder_tVeerRNNf)

In [None]:
sequence_length = data_inputs_5f.shape[2]
hidden_size = 256
num_layers = 2
print(sequence_length)

In [None]:
text_file = open(folder_tVeerRNNf+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('sequence_length: ' + str(sequence_length) + '\n')
text_file.write('input_size: ' + str(hp['input_size']) + '\n')
text_file.write('dropout: ' + str(hp['dropout']) + '\n')
text_file.write('rec_dropout: ' + str(hp['rec_dropout']) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_layers: ' + str(num_layers) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
# Bidirectional recurrent neural network (many-to-one)
class tVeerRNNf(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, rec_dropout, dropout, num_classes):
        super(tVeerRNNf, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=rec_dropout, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.dense = nn.Linear(hidden_size*2*sequence_length, num_classes)  # 2 for bidirection
        
        self.dense_1 = nn.Linear(hidden_size*2*sequence_length, hidden_size*2)  # 2 for bidirection
        self.dense_2 = nn.Linear(hidden_size*2, num_classes)  # 2 for bidirection
    
    def forward(self, x):
        # Set initial states
        h0 = torch.zeros(self.num_layers*2, x.shape[0], self.hidden_size).to(device) # 2 for bidirection 
        c0 = torch.zeros(self.num_layers*2, x.shape[0], self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size*2)
        
        embedding = out[:, -1, :]
        
        out = self.dropout(out)
        
        
        B, N, C = out.shape
        # Decode the hidden state of the last time step
        #out = nn.MaxPool1d(N)(out)
        out = out.reshape([B,N*C])
        
        embedding = out
        
        #out = self.dense(out)
        
        out = self.dense_1(out)
        out = self.dense_2(out)
                                                              
        # Decode the hidden state of the last time step
        #out = self.dense(out[:, -1, :])
        return out, embedding

model = tVeerRNNf(hp['input_size'], hidden_size, num_layers, hp['rec_dropout'], hp['dropout'], hp['num_classes']).to(device)
summary(model, input_size=(hp['batch_size'], sequence_length, hp['input_size']), verbose = 0, col_names = ("input_size", "output_size", "num_params"))

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the tVeerRNNf model

print('Start training of tVeerRNNf at ' + str(datetime.now()))

training_loss_tVeerRNNf = []
test_loss_tVeerRNNf = []
test_accuracy_tVeerRNNf = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_5f = data.DataLoader(train_dataset_5f, batch_size=hp['batch_size'], shuffle=hp['shuffled_train'], drop_last=True)
    
    for batch in train_data_loader_5f:
        data_inputs, data_labels = batch

        outputs, _ = model(torch.transpose(data_inputs, 1, 2).float())

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_tVeerRNNf.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_5f:  
        input_, targets = batch

        outputs, _ = model(torch.transpose(input_, 1, 2).float())
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_tVeerRNNf.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_tVeerRNNf.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1,
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_tVeerRNNf+"model_tVeerRNNf_best.mod")    
    
print('End training of tVeerRNNf at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_tVeerRNNf+"model_tVeerRNNf_best.mod")

print('Best epoch of tVeerRNNf: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()}, folder_tVeerRNNf+"model_tVeerRNNf.mod")

In [None]:
checkpoint = torch.load(folder_tVeerRNNf+"model_tVeerRNNf_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_tVeerRNNf, shape_predictions_tVeerRNNf, embeddings = model_test_predict(model, test_data_loader_5f, shape_dict_reverse, transpose = True)

In [None]:
embedding_viz(embeddings, imagefolder+'tVeerRNNf.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

<img style="float: left;" src="colors.png">

In [None]:

pred_data_tVeerRNNf = {'index': [i for i in range(len(predictions_tVeerRNNf))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_tVeerRNNf,
        'shape_prediction': shape_predictions_tVeerRNNf}
predictionsDF_tVeerRNNf = pd.DataFrame(pred_data_tVeerRNNf)
predictionsDF_tVeerRNNf.to_csv(folder_tVeerRNNf+'predictions_tVeerRNNf.csv')

precision_tVeerRNNf, recall_tVeerRNNf, f1_score_tVeerRNNf, support_tVeerRNNf = report_statistics(targets_list, predictions_tVeerRNNf, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_tVeerRNNf = []
for shapes in test_selection:
    subset = predictionsDF_tVeerRNNf[predictionsDF_tVeerRNNf['target'] == shapes]
    clas_accuracy_tVeerRNNf.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_tVeerRNNf, test_selection)

## GCNN+c

In [None]:
folder_GCNNc = folder+'GCNNc/'
if not os.path.exists(folder_GCNNc):
    os.makedirs(folder_GCNNc)

In [None]:
input_size_G = data_inputs_graph_c.ndata['x'].shape[1]
hidden_size = 512

In [None]:
text_file = open(folder_GCNNc+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('input_size_G: ' + str(input_size_G) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
# Simple Version
class GCNNc(nn.Module):
    def __init__(self, in_dim, hidden_dim, n_classes):
        super(GCNNc, self).__init__()
        self.conv1 = GraphConv(in_dim, hidden_dim)
        self.conv2 = GraphConv(hidden_dim, 256)
        self.dense = nn.Linear(256, n_classes)

    def forward(self, g):

        h = g.ndata['x'].float()
        h = F.leaky_relu(self.conv1(g, h))
        h = F.leaky_relu(self.conv2(g, h))

        g.ndata['x'] = h
        hg = dgl.mean_nodes(g, 'x')
        
        embeddings = hg
        
        hg = self.dense(hg)
        return hg, embeddings
        
model = GCNNc(input_size_G, hidden_size, hp['num_classes']).to(device)


In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the GCNNc model

print('Start training of GCNNc at ' + str(datetime.now()))

training_loss_GCNNc = []
test_loss_GCNNc = []
test_accuracy_GCNNc = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_graph_c = data.DataLoader(train_dataset_graph_c, batch_size=hp['batch_size'], 
                                                  shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
    
    for batch in train_data_loader_graph_c:
        data_inputs, data_labels = batch
        data_inputs = data_inputs

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_GCNNc.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_graph_c:  
        input_, targets = batch
        input_ = input_

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_GCNNc.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_GCNNc.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_GCNNc+"model_GCNNc_best.mod")
    
print('End training of GCNNc at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_GCNNc+"model_GCNNc_best.mod")

print('Best epoch of GCNNc: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()},
           folder_GCNNc+"model_GCNNc.mod")

In [None]:
checkpoint = torch.load(folder_GCNNc+"model_GCNNc_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_GCNNc, shape_predictions_GCNNc, embeddings = model_test_predict(model, test_data_loader_graph_c, shape_dict_reverse, graph = True)

In [None]:
embedding_viz(embeddings, imagefolder+'GCNNc.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_GCNNc = {'index': [i for i in range(len(predictions_GCNNc))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_GCNNc,
        'shape_prediction': shape_predictions_GCNNc}
predictionsDF_GCNNc = pd.DataFrame(pred_data_GCNNc)
predictionsDF_GCNNc.to_csv(folder_GCNNc+'predictions_GCNNc.csv')

precision_GCNNc, recall_GCNNc, f1_score_GCNNc, support_GCNNc = report_statistics(targets_list, predictions_GCNNc, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_GCNNc = []
for shapes in test_selection:
    subset = predictionsDF_GCNNc[predictionsDF_GCNNc['target'] == shapes]
    clas_accuracy_GCNNc.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_GCNNc, test_selection)

## GCNN+s

In [None]:
folder_GCNNs = folder+'GCNNs/'
if not os.path.exists(folder_GCNNs):
    os.makedirs(folder_GCNNs)

In [None]:
input_size_s = data_inputs_graph_s.ndata['x'].shape[1]
hidden_size = 512

In [None]:
text_file = open(folder_GCNNs+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('input_size_G: ' + str(input_size_G) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:

class GCNNs(nn.Module):
    def __init__(self, in_dim, hidden_dim, n_classes):
        super(GCNNs, self).__init__()
        self.conv1 = GraphConv(in_dim, hidden_dim)
        self.conv2 = GraphConv(hidden_dim, 256)
        self.dense = nn.Linear(256, n_classes)

    def forward(self, g):

        h = g.ndata['x'].float()
        h = F.leaky_relu(self.conv1(g, h))
        h = F.leaky_relu(self.conv2(g, h))

        g.ndata['x'] = h
        hg = dgl.mean_nodes(g, 'x')
        
        embeddings = hg
        
        hg = self.dense(hg)
        return hg, embeddings
        
model = GCNNs(input_size_s, hidden_size, hp['num_classes']).to(device)


In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the GCNNs model

print('Start training of GCNNs at ' + str(datetime.now()))

training_loss_GCNNs = []
test_loss_GCNNs = []
test_accuracy_GCNNs = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_graph_s = data.DataLoader(train_dataset_graph_s, batch_size=hp['batch_size'], 
                                                  shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
    
    for batch in train_data_loader_graph_s:
        data_inputs, data_labels = batch
        data_inputs = data_inputs

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_GCNNs.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_graph_s:  
        input_, targets = batch
        input_ = input_

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_GCNNs.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_GCNNs.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_GCNNs+"model_GCNNs_best.mod")
    
print('End training of GCNNs at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_GCNNs+"model_GCNNs_best.mod")

print('Best epoch of GCNNs: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()},
           folder_GCNNs+"model_GCNNs.mod")

In [None]:
checkpoint = torch.load(folder_GCNNs+"model_GCNNs_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_GCNNs, shape_predictions_GCNNs, embeddings = model_test_predict(model, test_data_loader_graph_s, shape_dict_reverse, graph = True)

In [None]:
embedding_viz(embeddings, imagefolder+'GCNNs.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_GCNNs = {'index': [i for i in range(len(predictions_GCNNs))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_GCNNs,
        'shape_prediction': shape_predictions_GCNNs}
predictionsDF_GCNNs = pd.DataFrame(pred_data_GCNNs)
predictionsDF_GCNNs.to_csv(folder_GCNNs+'predictions_GCNNs.csv')

precision_GCNNs, recall_GCNNs, f1_score_GCNNs, support_GCNNs = report_statistics(targets_list, predictions_GCNNs, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_GCNNs = []
for shapes in test_selection:
    subset = predictionsDF_GCNNs[predictionsDF_GCNNs['target'] == shapes]
    clas_accuracy_GCNNs.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_GCNNs, test_selection)

## YanGCNN+f

In [None]:
folder_YanGCNNf = folder+'YanGCNNf/'
if not os.path.exists(folder_YanGCNNf):
    os.makedirs(folder_YanGCNNf)

In [None]:
input_size_Gf = data_inputs_graph_f.ndata['x'].shape[1]
print(input_size_Gf)
hidden_size = 512

In [None]:
text_file = open(folder_YanGCNNf+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('input_size_G: ' + str(input_size_Gf) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
# Simple Version
class YanGCNNf(nn.Module):
    def __init__(self, in_dim, hidden_dim, n_classes):
        super(YanGCNNf, self).__init__()
        self.conv1 = GraphConv(in_dim, hidden_dim)
        self.conv2 = GraphConv(hidden_dim, 256)
        self.dense = nn.Linear(256, n_classes)

    def forward(self, g):

        h = g.ndata['x'].float()
        h = F.leaky_relu(self.conv1(g, h))
        h = F.leaky_relu(self.conv2(g, h))

        g.ndata['x'] = h
        hg = dgl.mean_nodes(g, 'x')
        
        embeddings = hg
        
        hg = self.dense(hg)
        return hg, embeddings
        
model = YanGCNNf(input_size_Gf, hidden_size, hp['num_classes']).to(device)


In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the YanGCNNf model

print('Start training of YanGCNNf at ' + str(datetime.now()))

training_loss_YanGCNNf = []
test_loss_YanGCNNf = []
test_accuracy_YanGCNNf = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_graph_f = data.DataLoader(train_dataset_graph_f, batch_size=hp['batch_size'], 
                                                    shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
    
    for batch in train_data_loader_graph_f:
        data_inputs, data_labels = batch
        data_inputs = data_inputs

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_YanGCNNf.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_graph_f:  
        input_, targets = batch
        input_ = input_

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_YanGCNNf.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_YanGCNNf.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_YanGCNNf+"model_YanGCNNf_best.mod")
    
print('End training of YanGCNNf at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_YanGCNNf+"model_YanGCNNf_best.mod")

print('Best epoch of YanGCNNf: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()},
           folder_YanGCNNf+"model_YanGCNNf.mod")

In [None]:
checkpoint = torch.load(folder_YanGCNNf+"model_YanGCNNf_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_YanGCNNf, shape_predictions_YanGCNNf, embeddings = model_test_predict(model, test_data_loader_graph_f, shape_dict_reverse, graph = True)

In [None]:
embedding_viz(embeddings, imagefolder+'YanGCNNf.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

<img style="float: left;" src="colors.png">

In [None]:

pred_data_YanGCNNf = {'index': [i for i in range(len(predictions_YanGCNNf))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_YanGCNNf,
        'shape_prediction': shape_predictions_YanGCNNf}
predictionsDF_YanGCNNf = pd.DataFrame(pred_data_YanGCNNf)
predictionsDF_YanGCNNf.to_csv(folder_YanGCNNf+'predictions_YanGCNNf.csv')

precision_YanGCNNf, recall_YanGCNNf, f1_score_YanGCNNf, support_YanGCNNf = report_statistics(targets_list, predictions_YanGCNNf, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_YanGCNNf = []
for shapes in test_selection:
    subset = predictionsDF_YanGCNNf[predictionsDF_YanGCNNf['target'] == shapes]
    clas_accuracy_YanGCNNf.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_YanGCNNf, test_selection)

## GCNN+f

In [None]:
folder_GCNNf = folder+'GCNNf/'
if not os.path.exists(folder_GCNNf):
    os.makedirs(folder_GCNNf)

In [None]:
input_size_5f = data_inputs_graph_5f.ndata['x'].shape[1]
print(input_size_5f)
hidden_size = 512

In [None]:
text_file = open(folder_GCNNf+ 'params.txt', 'w')
text_file.write('hyperparameters:' + '\n')
text_file.write('input_size_G: ' + str(input_size_5f) + '\n')
text_file.write('hidden_size: ' + str(hidden_size) + '\n')
text_file.write('num_classes: ' + str(hp['num_classes']) + '\n')
text_file.write('\n')
    
text_file.close()

In [None]:
# Simple Version
class GCNNf(nn.Module):
    def __init__(self, in_dim, hidden_dim, n_classes):
        super(GCNNf, self).__init__()
        self.conv1 = GraphConv(in_dim, hidden_dim)
        self.conv2 = GraphConv(hidden_dim, 256)
        self.dense = nn.Linear(256, n_classes)

    def forward(self, g):

        h = g.ndata['x'].float()
        h = F.leaky_relu(self.conv1(g, h))
        h = F.leaky_relu(self.conv2(g, h))

        g.ndata['x'] = h
        hg = dgl.mean_nodes(g, 'x')
        
        embeddings = hg
        
        hg = self.dense(hg)
        return hg, embeddings
        
model = GCNNf(input_size_5f, hidden_size, hp['num_classes']).to(device)


In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hp['learning_rate'])

In [None]:
## Train the GCNNf model

print('Start training of GCNNf at ' + str(datetime.now()))

training_loss_GCNNf = []
test_loss_GCNNf = []
test_accuracy_GCNNf = []

best_epoch = 0

for epoch in range(hp['num_epochs']):
    
    # training
    model.train()
    if hp['shuffled_train']:
        train_data_loader_graph_5f = data.DataLoader(train_dataset_graph_5f, batch_size=hp['batch_size'], 
                                                    shuffle=hp['shuffled_train'], drop_last=True, collate_fn=collate)
    
    for batch in train_data_loader_graph_5f:
        data_inputs, data_labels = batch
        data_inputs = data_inputs

        outputs, _ = model(data_inputs)

        loss = criterion(outputs, data_labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    training_loss_GCNNf.append(loss.item())
    
    # testing
    model.eval()
    
    correct = 0
    total = 0
    
    for batch in test_data_loader_graph_5f:  
        input_, targets = batch
        input_ = input_

        outputs, _ = model(input_)
        
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
    
    test_loss_GCNNf.append(criterion(outputs, targets))
    
    test_accuracy = 100 * correct / total
    test_accuracy_GCNNf.append(test_accuracy)
    
    print('Epoch [{}/{}], Training Loss: {:.5f}, Test Loss: {:.5f}, Test Accuracy: {:.5f}'.format(
                epoch+1, hp['num_epochs'], loss.item(), criterion(outputs, targets), test_accuracy))
    
    if epoch > 10 and test_accuracy >= best_epoch:
        best_epoch = test_accuracy
        torch.save({'epoch': epoch+1, 
                    'accuracy': test_accuracy,
                    'model_state_dict': model.state_dict(), 
                    'optimizer_state_dict': optimizer.state_dict()},
               folder_GCNNf+"model_GCNNf_best.mod")
    
print('End training of GCNNf at ' + str(datetime.now()))

best_epoch_printer = torch.load(folder_GCNNf+"model_GCNNf_best.mod")

print('Best epoch of GCNNf: Epoch {} with {}'.format(best_epoch_printer['epoch'], best_epoch_printer['accuracy']))

In [None]:
torch.save({'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict()},
           folder_GCNNf+"model_GCNNf.mod")

In [None]:
checkpoint = torch.load(folder_GCNNf+"model_GCNNf_best.mod")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

In [None]:
# Analysis
targets_list, targets_shape, predictions_GCNNf, shape_predictions_GCNNf, embeddings = model_test_predict(model, test_data_loader_graph_5f, shape_dict_reverse, graph = True)

In [None]:
embedding_viz(embeddings, imagefolder+'GCNNf.png', n_components=2, perplexity=30.0, learning_rate=200, init='random')

In [None]:

pred_data_GCNNf = {'index': [i for i in range(len(predictions_GCNNf))],
        'target': targets_list,
        'target_shape': targets_shape,
        'clas_prediction': predictions_GCNNf,
        'shape_prediction': shape_predictions_GCNNf}
predictionsDF_GCNNf = pd.DataFrame(pred_data_GCNNf)
predictionsDF_GCNNf.to_csv(folder_GCNNf+'predictions_GCNNf.csv')

precision_GCNNf, recall_GCNNf, f1_score_GCNNf, support_GCNNf = report_statistics(targets_list, predictions_GCNNf, test_selection, shape_dict_reverse)

In [None]:
clas_accuracy_GCNNf = []
for shapes in test_selection:
    subset = predictionsDF_GCNNf[predictionsDF_GCNNf['target'] == shapes]
    clas_accuracy_GCNNf.append(subset['clas_prediction'].value_counts()[shapes]/len(subset))
    
matrix(predictionsDF_GCNNf, test_selection)

# Results

## Original test data

In [None]:
x = [x for x in range(1, len(training_loss_tVeerCNNc)+1)]

# creating subplot and figure
fig, ax = plt.subplots()
ax.plot(x, training_loss_tVeerCNNc, label = 'tVeerCNNc')
ax.plot(x, training_loss_tVeerCNNs, label = 'tVeerCNNs')
ax.plot(x, training_loss_tVeerCNNf, label = 'tVeerCNNf')
ax.plot(x, training_loss_dCNNc, label = 'dCNNc')
ax.plot(x, training_loss_dCNNs, label = 'dCNNs')
ax.plot(x, training_loss_dCNNf, label = 'dCNNf')
ax.plot(x, training_loss_triangleCNN, label = 'TriangleCNN')
ax.plot(x, training_loss_tVeerRNNc, label = 'tVeerRNNc')
ax.plot(x, training_loss_tVeerRNNs, label = 'tVeerRNNs')
ax.plot(x, training_loss_tVeerRNNf, label = 'tVeerRNNf')
ax.plot(x, training_loss_GCNNc, label = 'GCNNc')
ax.plot(x, training_loss_GCNNs, label = 'GCNNs')
ax.plot(x, training_loss_GCNNf, label = 'GCNNf')
ax.plot(x, training_loss_YanGCNNf, label = 'YanGCNNf')

# setting labels
plt.legend(bbox_to_anchor=(1,0), loc="lower left")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training_Loss")

In [None]:
x = [x for x in range(1, len(test_loss_tVeerCNNc)+1)]

# creating subplot and figure
fig, ax = plt.subplots()
ax.plot(x, test_loss_tVeerCNNc, label = 'tVeerCNNc')
ax.plot(x, test_loss_tVeerCNNs, label = 'tVeerCNNs')
ax.plot(x, test_loss_tVeerCNNf, label = 'tVeerCNNf')
ax.plot(x, test_loss_dCNNc, label = 'dCNNc')
ax.plot(x, test_loss_dCNNs, label = 'dCNNs')
ax.plot(x, test_loss_dCNNf, label = 'dCNNf')
ax.plot(x, test_loss_triangleCNN, label = 'TriangleCNN')
ax.plot(x, test_loss_tVeerRNNc, label = 'tVeerRNNc')
ax.plot(x, test_loss_tVeerRNNs, label = 'tVeerRNNs')
ax.plot(x, test_loss_tVeerRNNf, label = 'tVeerRNNf')
ax.plot(x, test_loss_GCNNc, label = 'GCNNc')
ax.plot(x, test_loss_GCNNs, label = 'GCNNs')
ax.plot(x, test_loss_GCNNf, label = 'GCNNf')
ax.plot(x, test_loss_YanGCNNf, label = 'YanGCNNf')

# setting labels
plt.legend(bbox_to_anchor=(1,0), loc="lower left")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Test_Loss")

In [None]:
x = [x for x in range(1, len(test_accuracy_tVeerRNNf)+1)]

# creating subplot and figure
fig, ax = plt.subplots()
ax.plot(x, test_accuracy_tVeerCNNc, label = 'tVeerCNN+c', c = '#ff7f0e',  ls = ':')
ax.plot(x, test_accuracy_tVeerCNNs, label = 'tVeerCNN+s', c = '#ff7f0e',  ls = '--')
ax.plot(x, test_accuracy_tVeerCNNf, label = 'tVeerCNN+f', c = '#ff7f0e',  ls = '-')
ax.plot(x, test_accuracy_dCNNc, label = 'dCNN+c', c = 'green',  ls = ':')
ax.plot(x, test_accuracy_dCNNs, label = 'dCNN+s', c = 'green',  ls = '--')
ax.plot(x, test_accuracy_dCNNf, label = 'dCNN+f', c = 'green',  ls = '-')
ax.plot(x, test_accuracy_tVeerRNNc, label = 'tVeerRNN+c', c = '#1f77b4',  ls = ':')
ax.plot(x, test_accuracy_tVeerRNNs, label = 'tVeerRNN+s', c = '#1f77b4',  ls = '--')
ax.plot(x, test_accuracy_tVeerRNNf, label = 'tVeerRNN+f', c = '#1f77b4',  ls = '-')
ax.plot(x, test_accuracy_GCNNc, label = 'GCNN+c', c = 'red',  ls = ':')
ax.plot(x, test_accuracy_GCNNs, label = 'GCNN+s', c = 'red',  ls = '--')
ax.plot(x, test_accuracy_GCNNf, label = 'GCNN+f', c = 'red',  ls = '-')
ax.plot(x, test_accuracy_triangleCNN, label = 'LiuDPCN', c = '#7f7f7f', ls = ':')
ax.plot(x, test_accuracy_YanGCNNf, label = 'YanGCNN', c = '#7f7f7f')


# setting labels
plt.legend(bbox_to_anchor=(1,0), loc="lower left")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.savefig(imagefolder+'model_performances_new.png', bbox_inches='tight', dpi=300)
plt.savefig(imagefolder+'model_performances_new.pdf', bbox_inches='tight', dpi=300)

In [None]:
text_file = open(folder+ 'results.txt', 'w')
text_file.write('net;precision;recall;f1_score;support;' + '\n')
text_file.write('tVeerCNNc;'+str(precision_tVeerCNNc)+';'+str(recall_tVeerCNNc)+';'+str(f1_score_tVeerCNNc)+';'+str(support_tVeerCNNc)+ '\n')
text_file.write('tVeerCNNs;'+str(precision_tVeerCNNs)+';'+str(recall_tVeerCNNs)+';'+str(f1_score_tVeerCNNs)+';'+str(support_tVeerCNNs)+ '\n')
text_file.write('tVeerCNNf;'+str(precision_tVeerCNNf)+';'+str(recall_tVeerCNNf)+';'+str(f1_score_tVeerCNNf)+';'+str(support_tVeerCNNf)+ '\n')
text_file.write('dCNNc;'+str(precision_dCNNc)+';'+str(recall_dCNNc)+';'+str(f1_score_dCNNc)+';'+str(support_dCNNc)+ '\n')
text_file.write('dCNNs;'+str(precision_dCNNs)+';'+str(recall_dCNNs)+';'+str(f1_score_dCNNs)+';'+str(support_dCNNs)+ '\n')
text_file.write('dCNNc;'+str(precision_dCNNf)+';'+str(recall_dCNNf)+';'+str(f1_score_dCNNf)+';'+str(support_dCNNf)+ '\n')
text_file.write('triangleCNN;'+str(precision_triangleCNN)+';'+str(recall_triangleCNN)+';'+str(f1_score_triangleCNN)+';'+str(support_triangleCNN)+ '\n')
text_file.write('tVeerRNNc;'+str(precision_tVeerRNNc)+';'+str(recall_tVeerRNNc)+';'+str(f1_score_tVeerRNNc)+';'+str(support_tVeerRNNc)+ '\n')
text_file.write('tVeerRNNs;'+str(precision_tVeerRNNs)+';'+str(recall_tVeerRNNs)+';'+str(f1_score_tVeerRNNs)+';'+str(support_tVeerRNNs)+ '\n')
text_file.write('tVeerRNNf;'+str(precision_tVeerRNNf)+';'+str(recall_tVeerRNNf)+';'+str(f1_score_tVeerRNNf)+';'+str(support_tVeerRNNf)+ '\n')
text_file.write('GCNNc;'+str(precision_GCNNc)+';'+str(recall_GCNNc)+';'+str(f1_score_GCNNc)+';'+str(support_GCNNc)+ '\n')
text_file.write('GCNNs;'+str(precision_GCNNs)+';'+str(recall_GCNNs)+';'+str(f1_score_GCNNs)+';'+str(support_GCNNs)+ '\n')
text_file.write('YanGCNNf;'+str(precision_YanGCNNf)+';'+str(recall_YanGCNNf)+';'+str(f1_score_YanGCNNf)+';'+str(support_YanGCNNf)+ '\n')
text_file.write('GCNNf;'+str(precision_GCNNf)+';'+str(recall_GCNNf)+';'+str(f1_score_GCNNf)+';'+str(support_GCNNf)+ '\n')

text_file.close()