# Visualize VAE latent space

In [1]:
import os
import numpy as np
from specvae.vae import BaseVAE
import specvae.dataset as dt
import specvae.utils as utils

In [2]:
# Parameters
dataset = "MoNA"
model_name = "specgvae_100-60-5-60-100 (19-10-2021_18-17-41)"
model_dir = utils.get_project_path() / '.model' / dataset / 'vae_latent_size' / model_name

In [3]:
# Parameters
dataset = "MoNA"
model_name = "betavae_capacity_100-800-100-3-800-100_03 (24-12-2021_11-16-41)"
model_dir = "d:\\Workspace\\SpecVAE\\.model\\MoNA\\betavae_capacity_nextron\\factorvae_score\\betavae_capacity_100-800-100-3-800-100_03 (24-12-2021_11-16-41)"


## Load model

In [4]:
device, cpu = utils.device(use_cuda=True)

GPU device count: 1
Device in use:  cuda:0


In [5]:
print("Load model: %s..." % model_name)
model_path = os.path.join(model_dir, 'model.pth')
model = BaseVAE.load(model_path, device)
model.eval()

Load model: betavae_capacity_100-800-100-3-800-100_03 (24-12-2021_11-16-41)...


SpecVEA(
  (encoder): Sequential(
    (en_lin_1): Linear(in_features=100, out_features=800, bias=True)
    (en_lin_batchnorm_1): BatchNorm1d(800, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (en_act_1): ReLU()
    (en_lin_2): Linear(in_features=800, out_features=100, bias=True)
    (en_lin_batchnorm_2): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (en_act_2): ReLU()
  )
  (fc_mean): Linear(in_features=100, out_features=3, bias=True)
  (mean_batchnorm): BatchNorm1d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc_logvar): Linear(in_features=100, out_features=3, bias=True)
  (logvar_batchnorm): BatchNorm1d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (sample): SampleZ()
  (decoder): Sequential(
    (de_lin_1): Linear(in_features=3, out_features=800, bias=True)
    (de_lin_batchnorm_1): BatchNorm1d(800, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (de_act_1

In [6]:
model.config

{'name': 'betavae_capacity',
 'layer_config': array([list([100, 800, 100, 3]), list([3, 800, 100])], dtype=object),
 'latent_dim': 3,
 'beta': 0.2,
 'limit': 1.0,
 'dropout': 0.0,
 'input_columns': ['spectrum'],
 'types': [torch.float32],
 'dataset': 'MoNA',
 'transform': Compose(
     <specvae.dataset.SplitSpectrum object at 0x000002286ED13188>
     <specvae.dataset.TopNPeaks object at 0x000002286ED13208>
     <specvae.dataset.FilterPeaks object at 0x000002286ED13288>
     <specvae.dataset.Normalize object at 0x000002286ED13388>
     <specvae.dataset.ToMZIntConcatAlt object at 0x000002286ED134C8>
 ),
 'max_mz': 2500.0,
 'min_intensity': 0.001,
 'max_num_peaks': 50,
 'normalize_intensity': True,
 'normalize_mass': True,
 'rescale_intensity': False,
 'n_samples': -1,
 'n_epochs': 30,
 'batch_size': 128,
 'learning_rate': 0.001}

## Load and transform data

In [7]:
if dataset == 'MoNA':
    labels = ['ionization_mode_id', 'collision_energy', 'total_exact_mass', 'precursor_mz', 'instrument_type_id', 'precursor_type_id', 'superclass_id', 'class_id']
    base_path = utils.get_project_path() / '.data' / 'MoNA'
    metadata_path = base_path / 'MoNA_meta.npy'
elif dataset == 'HMDB':
    labels = ['ionization_mode_id', 'collision_energy', 'superclass_id', 'class_id']
    base_path = utils.get_project_path() / '.data' / 'HMDB'
    metadata_path = base_path / 'HMDB_meta.npy'

metadata = None
if os.path.exists(metadata_path):
    metadata = np.load(metadata_path, allow_pickle=True).item()

In [8]:
def load_vis_data(target_column):
    data_path = base_path / ('visualization_%s.csv' % target_column)
    df = dt.Spectra.open(data_path)
    return df

def preload_data_as_tensor(df):
    columns = model.config['input_columns']
    types = model.config['types']
    data = dt.Spectra.preload_tensor(
        device=device, data_frame=df[columns + ['id']], transform=model.transform, limit=-1, types=types, do_print=False)
    return data

def evaluate_model(df, data):
    print("Encode N=%d instances from %s dataset..." % (data['id'].shape[0], dataset))
    X, ids = data['spectrum'], data['id'] # TODO: handle the case for concatanated input
    Xrecon, z, latent_dist = model.forward_(X)
    print(z.shape)
    data_np = {}
    data_np['X'] = X.data.cpu().numpy()
    data_np['Xrecon'] = Xrecon.data.cpu().numpy()
    data_np['z'] = z.data.cpu().numpy()
    data_np['ids'] = ids
    data_np['ionization_mode_id'] = df['ionization_mode_id'].to_numpy()
    data_np['collision_energy'] = df['collision_energy'].to_numpy()
    # data_np['images'] = df['images'].to_numpy()
    return data_np


## Prepare data for vizualization

In [9]:
import torch
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from umap import UMAP
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
sns.set(style='white', context='notebook', rc={'figure.figsize':(24,20)})

import torchvision as tv
import specvae.vae as vae, specvae.utils as utils
import plotly.io as pio
pio.renderers.default='notebook'
import plotly.express as px
import pandas as pd

In [10]:
def get_colors(df, data_np):
    colors = {}
    colors['ionization_mode_id'] = np.array(list(map(lambda x: 'negative' if x==0 else 'positive', data_np['ionization_mode_id'].tolist())))
    colors['collision_energy'] = data_np['collision_energy']
    colors['superclass_id'] = df['superclass'].to_numpy()
    df['class'] = df['class'].fillna('Undefined')
    colors['class_id'] = df['class'].to_numpy()
    if dataset == 'MoNA':
        df['precursor_type'] = df['precursor_type'].fillna('Undefined')
        colors['precursor_type_id'] = df['precursor_type'].to_numpy()
        df['instrument_type'] = df['instrument_type'].fillna('Undefined')
        colors['instrument_type_id'] = df['instrument_type'].to_numpy()
        colors['total_exact_mass'] = df['total_exact_mass'].to_numpy()
        colors['precursor_mz'] = df['precursor_mz'].to_numpy()
    return colors

In [11]:
def compute_pca(data, n=2):
    print("Compute PCA for n_components=%d" % n)
    red = PCA(n)
    rdata = red.fit_transform(data)
    print("\t      explained_variance:", red.explained_variance_)
    print("\texplained_variance_ratio:", red.explained_variance_ratio_)
    return rdata

def compute_tsne(data, n=2):
    print("Compute tSNE for n_components=%d" % n)
    r = TSNE(n)
    rdata = r.fit_transform(data)
    print("TSNE:")
    print("\t      kl_divergence:", r.kl_divergence_)
    return rdata

def compute_umap(data, n_neighbors=15, min_dist=0.25, n=2, metric='euclidean'):
    print("Compute UMAP for n_components=%d" % n)
    fit = UMAP(
        n_neighbors=n_neighbors,
        min_dist=min_dist,
        n_components=n,
        metric=metric)
    udata = fit.fit_transform(data)
    return udata

def plot_data(data, labels=None, label_name=None, plot_components=2, 
        hover_data=None, width=1000, height=1000, title='Visualization of dataset'):
    if plot_components == 2:
        fig = px.scatter(data, x=0, y=1, color=labels, 
            template='plotly_white', hover_data=hover_data,
            title=title, width=width, height=height)
        fig.show()
    elif plot_components == 3:
        fig = px.scatter_3d(data, x=0, y=1, z=2, color=labels, 
            template='plotly_white', hover_data=hover_data, 
            title=title, width=width, height=height)
        fig.update_traces(
            marker=dict(size=3),
            selector=dict(mode='markers'))
        fig.show()

## Plot 2D UMAP for different labels

In [12]:
for label in labels:
    df = load_vis_data(label)
    data = preload_data_as_tensor(df)
    data_np = evaluate_model(df, data)
    colors = get_colors(df, data_np)
    n_dim = data_np['z'].shape[1]

    if n_dim > 2:
        umap2_data = compute_umap(data_np['z'], n=2)
        plot_data(umap2_data, colors[label], label, 
            hover_data={
                'ids': data_np['ids'], 
                'mode': data_np['ionization_mode_id'], 
                'energy': data_np['collision_energy']},
            title='UMAP analysis of VAE latent space for %s dataset (%s)' % (dataset, label) 
                    if label else 'UMAP analysis of VAE latent space for %s dataset' % dataset)
    elif n_dim == 2:
        plot_data(data_np['z'], colors[label], label, 
            hover_data={
                'ids': data_np['ids'], 
                'mode': data_np['ionization_mode_id'], 
                'energy': data_np['collision_energy']},
            title='Direct representation of VAE latent space for %s dataset (%s)' % (dataset, label) 
                    if label else 'Direct representation of VAE latent space for %s dataset' % dataset)


Encode N=5000 instances from MoNA dataset...
torch.Size([5000, 3])
Compute UMAP for n_components=2


Encode N=4986 instances from MoNA dataset...
torch.Size([4986, 3])
Compute UMAP for n_components=2


Encode N=4929 instances from MoNA dataset...
torch.Size([4929, 3])
Compute UMAP for n_components=2


Encode N=5000 instances from MoNA dataset...
torch.Size([5000, 3])
Compute UMAP for n_components=2


Encode N=4994 instances from MoNA dataset...
torch.Size([4994, 3])
Compute UMAP for n_components=2


Encode N=4994 instances from MoNA dataset...
torch.Size([4994, 3])
Compute UMAP for n_components=2


Encode N=5000 instances from MoNA dataset...
torch.Size([5000, 3])
Compute UMAP for n_components=2


Encode N=4992 instances from MoNA dataset...
torch.Size([4992, 3])
Compute UMAP for n_components=2


KeyboardInterrupt: 

## Plot 3D UMAP for different labels

In [None]:
for label in labels:
    df = load_vis_data(label)
    data = preload_data_as_tensor(df)
    data_np = evaluate_model(df, data)
    colors = get_colors(df, data_np)
    n_dim = data_np['z'].shape[1]

    if n_dim > 3:
        umap3_data = compute_umap(data_np['z'], n=3)
        plot_data(umap3_data, colors[label], label, plot_components=3, 
            hover_data={
                'ids': data_np['ids'], 
                'mode': data_np['ionization_mode_id'], 
                'energy': data_np['collision_energy']},
            title='UMAP analysis of VAE latent space for %s dataset (%s)' % (dataset, label) 
                    if label else 'UMAP analysis of VAE latent space for %s dataset' % dataset)
    elif n_dim == 3:
        plot_data(data_np['z'], colors[label], label, plot_components=3,
            hover_data={
                'ids': data_np['ids'], 
                'mode': data_np['ionization_mode_id'], 
                'energy': data_np['collision_energy']},
            title='Direct representation of VAE latent space for %s dataset (%s)' % (dataset, label) 
                    if label else 'Direct representation of VAE latent space for %s dataset' % dataset)


Encode N=5000 instances from HMDB dataset...
torch.Size([5000, 2])
Encode N=4998 instances from HMDB dataset...
torch.Size([4998, 2])
Encode N=4998 instances from HMDB dataset...
torch.Size([4998, 2])
Encode N=5000 instances from HMDB dataset...
torch.Size([5000, 2])
