In [1]:
from collections import defaultdict
import glob

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

from msaris.reader.reader import load_data
from msaris.reader.preprocessing import filter_intensities, reduce_data_resolution
from msaris.molecule.molecule import Molecule
from msaris.clusterisation.clusterisation import ClusterSearch, MaxClustering
from msaris.search.optimization_search import SearchClusters

In [2]:
file_name = "PdCl2_neg_maxis_2"
DATA_SOURCE = f"./data/{file_name}.mzML"

In [None]:
#original data and processed
mz, it = load_data(
    DATA_SOURCE, range_spectrum=(100, 1500), min_intensity=100, mz_binning_width=5.0,
)
mz_processed, it_processed = reduce_data_resolution(mz, it, mz.shape[0], it.shape[0])

In [None]:
# Finding clusters
# ClusterSearch is far more computationally intensive
# while 

# clust = ClusterSearch(
#             resolution = 10 ** 5,
#             charge = 1,
#             min_peaks = 1,
#             cluster_width = 8,
#             tolerance = 0.2,
#             threshold = max(it)/500,
#             cluster_min_dist = 5,  # TODO: add support for CI
#         )
# masses_ = clust.find(mz_processed, it_processed)
clust = MaxClustering(
    window=5,
    threshold = max(it)/500
)
masses_ = clust.find(mz_processed, it_processed)

In [None]:
clust = MaxClustering(
    window=5,
    threshold = max(it)/500
)
masses_ = clust.find(mz_processed, it_processed)

In [None]:
#original idea to perform calculations based on the 
#provides ability to calculate some ions which 
#can be missed otherwise
initial_params = dict(
    no_TBAB=True,
    no_K=True,
    no_MeOH=True,
    no_Cu=False,
    no_Pd1=False,
    no_Pd=False,
    no_NaTFA=True,
    no_OH=True,
    no_H2O=True,
    no_O2=True,
    no_O=True,
    no_N2=True,
    no_Na=True,
    no_CH3CN=True,
)
iteration_steps = dict(
    no_Cu=True,
    no_Pd=True,
    no_Na=False,
    no_CH3CN=False,
    no_NaTFA=False,
    no_H2O=False,
    no_MeOH=False,
    no_TBAB=False,
    no_O2=False,
    no_O=False,
    no_N2=False,
)

In [None]:
# Calculation can take far too long, hence it can be 
# sped up by loading already calculated molecules
ions_path = "./ions/"
calculated_ions = {}
for entry in tqdm(glob.glob(f"{ions_path}*.json")):
    mol = Molecule()
    mol.load(entry)
    calculated_ions[mol.formula] = mol

Here is custom script to achieve and process calculated molecules
In future it would be calculated in script

In [None]:
ranked = defaultdict(list)
srch = SearchClusters(
    mz=mz,
    it=it,
    charge = -1,
    threshold = 0.7
)
path = f"./data/{file_name}"
for target_mass in tqdm(sorted(masses_)):
    found = []
    for to_change, value in iteration_steps.items():
        params = initial_params.copy()
        params[to_change] = value
        found.extend(srch.recognise_masses(
                    target_mass,
                    params,
                    epsilon_range =(0, 5, 0.25,), # due high mistakes I decided to take such big range
                    calculated_ions=calculated_ions,
                    ions_path = ions_path
                ))
    ranked[target_mass] = sorted(found, key=lambda x: x["delta"])  
    

In [None]:
# Calculating total representation of found spectrum
count_entries = len(ranked)
fig, ax = plt.subplots(1, 1, figsize=(60, 40))
max_it = max(it_reg)
ax.plot(mz_reg, it_reg / max_it, color="black")
count: int = 0
c1: str = "blue"
c2: str = "red"
for mass, data in ranked.items():
    if data:
        colour = color_fader(c1, c2, mix=count / (count_entries + 1))
        it_n = (data[0]["spectrum"][1] / max_it) * 100
        ax.plot(
            data[0]["spectrum"][0], it_n, color=colour,
        )
        max_ind = np.argmax(it_n)
        height = it_n[max_ind]
        ax.text(
            data[0]["spectrum"][0][max_ind],
            height + 1,
            round(mass),
            color=colour,
            horizontalalignment="center",
            verticalalignment="center",
            fontsize=30,
        )
        count += 1
ax.set_title(f"Recognised masses for total spectrum", fontsize=40)
ax.set_xlabel("M/Z", fontsize=40)
ax.set_ylabel("Intensity", fontsize=40)
if not os.path.exists(path):
    os.makedirs(path)
plt.savefig(f"{path}/total.png", dpi=300)

In [None]:
# Plotting results for individual recognised spectr
for mass, data in ranked.items():
    print(f"Mass {mass} has found {len(data)}")
    for match in data:
        fig, ax = plt.subplots(1, 1, figsize=(15, 5))

        ax.plot(
            match["spectrum"][0], norm(match["spectrum"][1]) * 100, color="blue",
        )
        ax.plot(match["mz"], norm(match["it"]) * 100, color="green")
        ax.set_xlabel("M/Z", fontsize=20)
        ax.set_ylabel("Intensity", fontsize=20)

        test_label = [
            f"{formal_formula(match['formula'])}",
            f"Delta m/z: {match['delta']:.3f}",
            f"Cosine: {match['metrics']['cosine']:.3f}",
            f"Relative: {match['relative']:.3f}",
        ]

        ax.text(
            0.8,
            0.8,
            "\n".join(test_label),
            color="black",
            horizontalalignment="center",
            verticalalignment="center",
            fontsize=12,
            transform=ax.transAxes,
        )
        ax.set_title(f"Plotting relative intensity for {int(mass+0.5)}", fontsize=20)
        cluster_path = f"{path}/{int(mass+0.5)}"
        if not os.path.exists(cluster_path):
            os.makedirs(cluster_path)
        plt.savefig(f"{cluster_path}/{match['composition']}.png", dpi=300)

In [None]:
# Making CSV report for individual recognised spectr
report = {
    "mass": [],
    "brutto": [],
    "brutto_formal": [],
    "cosine": [],
    "relative": [],
}

for variable in get_coefficients(model):
    report[variable] = []
for mass, data in ranked.items():

    for match in data:
        composition = match["formula"]
        report["mass"].append(round(mass))
        report["brutto"].append(match["composition"])
        report["brutto_formal"].append(formal_formula(composition))
        report["cosine"].append(match["metrics"]["cosine"])
        report["relative"].append(match["relative"])
        for variable in variables:
            if variable in composition:
                report[variable].append(composition[variable])
            else:
                report[variable].append(0)
    pd.DataFrame(report).to_csv(f"{path}/{file_name}.csv")