In [1]:

import os
import math
import random
import functools
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
from typing import List, Dict, Tuple

import skopt

import config
from config import *
from ms_io import mgf_io
from preprocessing import preprocessing
from cluster import similarity, masking, clustering, hierarchical
from eval import eval

import warnings
warnings.filterwarnings("ignore")


In [2]:

spectra_filename = '../data/extracted_2d16b7f8-6954-4ba1-b5fa-3c467b43227d.mgf'
annotations_filename = '../data/annotations.tsv'

distance_matrix_filename = '../distance_matrix.npz'

best_config = ('complete', 0.005444, 2) # (linkage, eps, min_samples)


In [3]:

config.parse((spectra_filename, annotations_filename))

if not os.path.isfile(spectra_filename):
    raise ValueError(f'Non-existing peak file (spectra_filename)')
if not os.path.isfile(annotations_filename):
    raise ValueError(f'Non-existing annotations file (annotations_filename)')
if distance_matrix_filename is not None:
    if not os.path.isfile(distance_matrix_filename):
        raise ValueError(f'Non-existing distance matrix file (distance_matrix_filename)')

# read file and process spectra
raw_spectra = mgf_io.get_spectra(source=spectra_filename)
spectra = list(preprocessing.process_all_spectra(raw_spectra, 
                                                config.min_peaks, config.min_mz_range,
                                                config.min_mz, config.max_mz,
                                                config.remove_precursor_tol,
                                                config.min_intensity, 
                                                config.max_peaks_used, config.scaling))
spectra = [spectrum for spectrum in spectra if spectrum is not None]
spectra.sort(key=lambda x: x.precursor_mz)
n_spectra = len(spectra)

scan_idx_list = [int(spec.identifier) for spec in spectra]

if distance_matrix_filename is not None:
    # read distance matrix file and create similarity matrix
    distance_matrix = similarity.load_matrix(distance_matrix_filename)
    similarity_matrix = similarity.similarity_to_distance(distance_matrix)
else:
    # calculate pairwise mod cos similarity
    similarity_matrix = similarity.create_mod_cos_similarity_matrix(spectra, config.fragment_tol)
    distance_matrix = similarity.similarity_to_distance(similarity_matrix)
    if config.export_dist_matrix:
        similarity.save_matrix(distance_matrix, 'distance_matrix.npz')

# create masked distance matrix for clustering based on precursor mass
mask = masking.generate_mask(spectra, config.precursor_tol)
masked_distance_matrix = similarity.similarity_to_distance(np.multiply(similarity_matrix, mask))
# deal with floating point inaccuracy 
# np.clip results in "ValueError: Linkage 'Z' uses the same cluster more than once." when plotting dendrogram
masked_distance_matrix = np.where(masked_distance_matrix>0, masked_distance_matrix, 0)


In [9]:

# config.plot_dendrogram = True
best_cluster = clustering.generate_clusters(masked_distance_matrix, 'hierarchical', best_config[0], best_config[1], best_config[2])
performance = eval.evaluate_clustering(annotations_filename, best_cluster, scan_idx_list)
print(performance)

annotations = eval._read_tsv_file(annotations_filename)
annotations_subset = annotations[annotations['#Scan#'].isin(scan_idx_list)]
identified_scan_idx = eval._get_identified_spectra(annotations_subset)
true_labels = eval._get_spectrum_labels(annotations_subset)

pred_labels = best_cluster.labels
pred_labels_identified = [pred_labels[scan_idx_list.index(scan_id)] for scan_id in identified_scan_idx]


(197, 0.5974107024513716, 0.6141953619114546, 0.0)


In [12]:

for label in np.unique(pred_labels_identified):
    # skip noise
    if label == -1:
        continue
    # get idx of all spectra in cluster
    cluster_members = [idx for idx, pred_label in enumerate(pred_labels_identified) if pred_label == label]
    members_true_labels = np.array(true_labels)[cluster_members]
    true_compound_labels = annotations.loc[annotations['inchikey_p1'].isin(members_true_labels)]
    if len(np.unique(members_true_labels)) > 1:
        unique_true_labels = true_compound_labels[['inchikey_p1', 'Compound_Name']].drop_duplicates(subset='inchikey_p1')
        print(unique_true_labels)
        print()
        # print(np.unique(true_compound_labels['Compound_Name'], return_counts=True))
        # print(np.unique(members_true_labels, return_counts=True))
    # print(np.unique(true_compound_labels, return_counts=True))
    