## Comments on NeighbourhoodGraph.py

In [None]:
# -*- coding: utf-8 -*-
"""Neighbourhood Graph Visualization Tool

This module is used to create a Neighbourhood Graph based on a trained SOM.
Therefore the Class NeighbourhoodGraph should be used to initialize the Neighbourhood graph.

After initialization the instance can be used to generate a so called trace which can be displayed over any other visualization (eg. U-Matrix)

@author Lukas Lehner
@author Fabian Traxler
"""
import numpy as np
from scipy import spatial
from typing import List
import plotly.graph_objects as go
import networkx as nx

Instead of calculating distances between each input data sample every time we look for samples in a neighbourhood,
we calculate the distance matrix of the input data once, and store it in the main class.
We chose to write a separate class for the distance matrix, s.t. it can be stored and loaded into
the main class more easily. It has two functions to calculate knn- and radius-based neighbourhoods.

In [None]:
class DistanceMatrix:
    """ Class to calculate the (euclidean) distance between each input data sample
     and derive the knn- or radius-based neighbourhood of each sample in input space."""

    def __init__(self, input_data: np.ndarray = []):
        """
        Initialize the Distance Matrix with the input data.

        Args:
            input_data (np.ndarray): The input data used for training the SOM (also possible to use different data in same dimension)
        """
        self.distance_matrix = spatial.distance_matrix(input_data, input_data)

The two functions for calculating the neighbourhoods are rather straight forward:
for knn we sort the distances and for radius we use np.where() to find close samples.

In [None]:
    def get_knn(self, k: int, index: int = None) -> List:
        """
        Get the indices of the k-nearest neighbours.
        If index is none return knn for all input data samples (matrix).

        Args
            k (int): Number of nearest neighbours to look for
            index (int): Index of the sample in focus

        Return
            List[int]: Integer indices of the neighbouring samples
        """
        if index is not None:
            neighbours = self.distance_matrix[index].argsort()[
                         :k + 1]  # add 1 because distance to self is 0 and therefore always included
            return neighbours[neighbours != index]
        else:
            neighbours = self.distance_matrix.argsort()[:, :k + 1]
            return [row[row != idx] for idx, row in enumerate(neighbours)]

    def get_samples_in_radius(self, radius: float, index: int = None) -> List:
        """
        Get the indices of the samples which lie within a specified radius of the index-sample.
        If index is none return samples in radius for all input data samples (matrix).

        Args
            radius (float): Radius of the hypersphere around the index-sample
            index (int): Index of the sample in focus

        Return
            List[int]: Integer indices of samples in the hypersphere
        """
        if index is not None:
            close_samples = np.where(self.distance_matrix[index] < radius)[0]
            return close_samples[close_samples != index]
        else:
            close_samples = [np.where(row < radius)[0] for row in self.distance_matrix]
            return [row[row != idx] for idx, row in enumerate(close_samples)]

The main Neighbourhood Graph (NG) class stores the weights of a map, the maps width and height,
a set of input data (which could be omitted) and the corresponding distance matrix,
as well an array of best matching unit indices, i.e. a mapping from input_data indices to unit_weights indices.

We again decided to do preprocessing, i.e. pre-calculate the best matching units ("clac_bmu_arrray"), rather than calculate BMUs on the fly.



In [None]:
class NeighbourhoodGraph:
    """Class representing the Neighbourhood Graph, including functions for visualization."""

    def __init__(self, unit_weights: np.ndarray, m: int, n: int, input_data: np.ndarray,
                 distance_mat: DistanceMatrix = None, bmu_array: np.ndarray = None):
        """
        Initialize the NeighbourhoodGraph Instance with the trained SOM and the used input data.

        Args:
            unit_weights (np.ndarray): The Weights of the Units in Input space
            m (int): Height of the SOM
            n (int): Width of the SOM
            input_data (np.ndarray): The Input data used for training the SOM (also possible to use different data in same dimension)
            distance_mat (DistanceMatrix): Optional: A Matrix representing the distance between each input data sample. If none is provided, the Distance Matrix is calculated using input_data.
            bmu_array (np.ndarray): Optional: Best Matching Unit for each input data sample. If none is provided, the array is calculated using unit_weights and input_data.
        """

        assert m * n == len(unit_weights), \
            "NeighbourhoodGraph __init__: Weights and specified input dimensions m & n do not match"

        self.unit_weights = unit_weights
        self.m = m
        self.n = n
        self.input_data = input_data

        self.distance_mat = DistanceMatrix(input_data) if distance_mat is None else distance_mat
        self.bmu_array = self.calc_bmu_array(unit_weights, input_data) if bmu_array is None else bmu_array

    def calc_bmu_array(self, _unit_weights: np.ndarray, _input_data: np.ndarray) -> np.ndarray:
        """
        Calculate Best Matching Unit for every sample.
        Use the Euclidean Distance (L2) to find BMUs.
        If multiple units have the same distance to a sample, the unit with the lowest index is chosen.
        Args:
            _unit_weights (ndarray): Trained unit weights
            _input_data (ndarray): Input data
        Returns
            np.ndarray: Array of indices of the Best Matching Units
        """
        return np.array(
            [np.argmin(np.sqrt(np.sum(np.power(_unit_weights - _input_data[index], 2), axis=1)))
             for index in np.arange(self.input_data.shape[0])])

This function takes a neighbourhood parameter and returns the adjacency matrix of the corresponding neighbourhood graph.
As mentioned before, the calculations are based on the preprocessed distance_mat and bmu_array.


In [None]:
    def calc_adjacency_matrix(self, radius: float = None, knn: int = None) -> np.ndarray:
        """
        Calculate the adjacency matrix of the Neighbourhood Graph corresponding to the specified neighbourhood.
        Use KNN or Radius Approach to define neighbourhood (depends on which argument is specified - if both are specified KNN is used)

        Args:
            radius (float): Radius size (if specified, use radius method)
            knn (int): Number of neighbours (if specified, use KNN)

        Returns:
            np.ndarray: The Adjacency Matrix of the Neighbourhood Graph
        """
        assert (radius is not None or knn is not None), \
            "NeighbourhoodGraph create_graph: radius or knn must be specified"
        assert (radius is None or radius > 0), "NeighbourhoodGraph create_graph: radius must be > 0"
        assert (knn is None or knn > 0), "NeighbourhoodGraph create_graph: knn must be > 0"

        adjacency_mat = np.zeros((self.n * self.m, self.n * self.m))

        if knn is not None:
            neighbours = self.distance_mat.get_knn(knn)
        else:
            neighbours = self.distance_mat.get_samples_in_radius(radius)

        for index, neighbour_idxs in enumerate(neighbours):
            bmu = self.bmu_array[index]
            for neighbour_idx in neighbour_idxs:
                neighbour_bmu = self.bmu_array[neighbour_idx]
                if bmu != neighbour_bmu:
                    adjacency_mat[bmu, neighbour_bmu] = 1

        return adjacency_mat

With the help of the *networkx* package we can easily turn an adjacency matrix into a graph (i.e. nodes & edges).
The position of each node can be passed as a parameter to the function,
but by default the nodes are arranged on an m x n equidistant grid with cell size 1.

The nx graph and the node positions are then converted into x and y data readable by a plotly graph object.
A Scatter plot of that data is returned and can be used as a trace in any existing FigureWidget using *add_trace*.


In [None]:
    def get_trace(self, radius: float = None, knn: int = None, adjacency_matrix: np.ndarray = None,
                  line_width: float = 3, line_color:str='#fff', pos: dict = None) -> go.Scatter:
        """
        Build a plotly Scatter Plot of the Neighbourhood Graph (can be used as trace in a FigureWidget).

        Args:
            radius (float): Radius size (if specified use radius method)
            knn (int): Number of neighbours (if specified use KNN)
            adjacency_matrix (np.ndarray): Optional: An adjacency matrix of a Neighbourhood Graph. If none is provided, an adjacency matrix is calculated based on the specified neighbourhood parameter (radius or knn).
            line_width (float): The width of the edges in the plot
            line_color (str): The color of the edges (in HEX Hash)
            pos (dict): Optional: The position of the Units in the 2D plot. If none is provided, the units are laid out on an m x n grid with cell size=1.
        Returns:
            plotly.graph_objects.Scatter: A Scatter Plot Object
        """
        if adjacency_matrix is None:
            adjacency_matrix = self.calc_adjacency_matrix(radius, knn)

        # Make networkx graph from input adjacency matrix
        G = nx.from_numpy_matrix(adjacency_matrix, parallel_edges=True)

        # # If no unit positions are provided, units are arranged on an m x n grid with grid cell size = 1
        if pos is None:
            pos = {}
            counter = 0
            for i in range(self.m):
                for j in range(self.n):
                    pos.update({counter: (j, i)})
                    counter += 1
        nx.set_node_attributes(G, pos, 'pos')

        # Convert nx graph to plotly trace data
        edge_x = []
        edge_y = []
        for edge in G.edges():
            x0, y0 = G.nodes[edge[0]]['pos']
            x1, y1 = G.nodes[edge[1]]['pos']
            edge_x.append(x0)
            edge_x.append(x1)
            edge_x.append(None)
            edge_y.append(y0)
            edge_y.append(y1)
            edge_y.append(None)

        return go.Scatter(
            x=edge_x, y=edge_y,
            line=dict(width=line_width, color=line_color),
            hoverinfo='skip',
            mode='lines')


## Comments on util.py

In [None]:
"""
This file define a utility function that is used to create multiple visualizations at once
"""
import numpy as np
from IPython.core.display import display
from ipywidgets import HBox

from NeighbourhoodGraph import NeighbourhoodGraph
from som_vis import SomViz

We wanted a single function that could produce meaningful visualizations given weights and input data.
Thus we wrote this. By default it plots a Neighbourhood Graph as overlay over a U-Matrix
for every parameter in k_list (knn approach) and r_list (radius approach).
It can also be told to show the hit histogram as underlying visualization.

In [None]:
def create_visualizations(sweights: np.ndarray, smap_y: int, smap_x: int, idata,
                          color='viridis', interp=False, data_title='Chainlink',
                          k_list=[1, 2, 3, 4],
                          r_list=[0.03, 0.09, 0.15, 0.21],
                          width=700, height=None,
                          scale_to_mean=False,
                          show_hithist=False,
                          save_dir=None, save_file_type="svg",
                          display_mode=False):
    """
    Create multiple Visualizations with different Parameters at once
    Displays a plot for every parameter in k_list, and r_list
    Also Scales the plots to width

    Args:
        sweights (np.ndarray): The unit weights of the SOM in input space
        smap_y (int): The y-size of the SOM
        smap_x (int): The x-size of the SOM
        color (str): The color palette to be used,
        interp (bool): Use interpolation or not
        data_title (str): The Title of the dataset for the plots
        k_list (list): KNNs to be visualized
        r_list (list): Radii to be visualized
        width (int): Width of the plots
        height (int): Height of the plots
        scale_to_mean (bool): Specify if data should be scaled (only relevant for Radius method)
        show_hithist (bool): Specify if also a HitHistogram should be used
        save_dir (str): If specified, all plots are stored in this directory
        save_file_type (str): String of file type plots should be saves as. E.g.: "png", "svg"
        display_mode (bool): If true, IPython.core.display.display is used to display the figures. fig.show() is used otherwise
    """
    # Visualization
    viz = SomViz(sweights, smap_y, smap_x)

    if scale_to_mean:  # scale radii to input data
        r_list = np.around(np.mean(idata) * np.array(r_list), decimals=8)

    if height is None: height = width * (smap_y / smap_x)

    save_file_title = "{}_umatrix_".format(data_title)
    if show_hithist: save_file_title += "hithist_"
    save_file_title += "neighbourhoodgraph-"

    # Prepare Neighbourhood Graph
    ng = NeighbourhoodGraph(viz.weights, viz.m, viz.n, input_data=idata)
    traces = []
    for val in k_list:
        traces.append(ng.get_trace(knn=val))
    for val in r_list:
        traces.append(ng.get_trace(radius=val))

    # Prepare underlying visualizations
    umatrix_title = '{}: U-Matrix'.format(data_title)
    hithist_title = '{}: Hit hist'.format(data_title)

    # Show underlying visualizations with Neighbourhood Graph overlay
    for i, trace in enumerate(traces):
        if i < len(k_list):
            title_suffix = " + Neighbourhood Graph ({}-NN)".format(k_list[i])
            save_file_suffix = "-knn-{}".format(k_list[i])
        else:
            title_suffix = " + Neighbourhood Graph (radius: {})".format(r_list[i - len(k_list)])
            save_file_suffix = "-rad-{}".format(r_list[i - len(k_list)])

        umatrix = viz.umatrix(color=color, interp=interp, title=umatrix_title + title_suffix)
        umatrix.layout.width = width
        umatrix.layout.height = height
        umatrix.add_trace(trace)

        fig = umatrix

        if show_hithist:
            hithist = viz.hithist(idata=idata, color=color, interp=interp, title=hithist_title + title_suffix)
            hithist.layout.width = width
            hithist.layout.height = height
            hithist.add_trace(trace)

            fig = HBox([umatrix, hithist])

        if save_dir is not None:
            fig.write_image(save_dir + save_file_title + save_file_suffix + "." + save_file_type)

        if display_mode:
            display(fig)
        else:
            fig.show()
