In [10]:

import Modules.statplot as sp
import Modules.cleaning as cl
import matplotlib.pyplot as plt
from ipywidgets import interact_manual
from scipy.stats import zscore
import pandas as pd
import numpy as np
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
from scipy.optimize import curve_fit
from scipy.stats import norm, pearsonr



feat = sp.Filter('../feature/fl_ms1_h.csv')
fl = feat.featurelist
feat2 = sp.Filter('../feature/fl_ms2_h.csv')
fl2 = feat2.featurelist
# Create a new column 'ms_level' in df1 and set all values to 'ms1'
fl['ms_level'] = 'ms1'

# Create a new column 'ms_level' in df2 and set all values to 'ms2'
fl2['ms_level'] = 'ms2'

feature = pd.concat([fl, fl2], ignore_index=True)



class TrainingToolRF:
    def __init__(self, data, rt_tolerance=0.005):
        self.data = data # data is a df with ms1 and ms2 features, adjusted for NeatMS so with minimum informations
        self.rt_tolerance = rt_tolerance # tolerance max for ms2 to be linked to a ms1 feature
        self.labels = [] # The output taking labels information
        self.spectra = {}
        self.metrics = [] # The output taking matching information (m/z diff, rt diff, similarity metric, euclidean distance)
        self.block = False
        
        # Group the features based on ms_level and sample
        self.ms1_data = self.data[self.data['ms_level'] == 'ms1']
        self.ms2_data = self.data[self.data['ms_level'] == 'ms2']

        # Create a nested list to store MS1 features and their potential MS2 matches
        self.paired_features_id = [] # we indicate here the paired feature id, ta see how many times a ms2 is matching in the ms1's feature list
        self.paired_features = []
        for _, ms1_feature in self.ms1_data.iterrows():
            matching_ms2 = self.ms2_data[(self.ms2_data['sample'] == ms1_feature['sample']) & 
                                          (np.abs(self.ms2_data['rt'] - ms1_feature['rt']) <= self.rt_tolerance) &
                                         ((self.ms2_data['m/z'] + 1.007) < ms1_feature['m/z'])]
            self.paired_features.append((ms1_feature, matching_ms2))
            if matching_ms2.empty == True:
                self.paired_features_id.append((ms1_feature['feature ID'], 0))
            else:
                self.paired_features_id.append((ms1_feature['feature ID'], matching_ms2['feature ID'].to_list()))
        self.length_model = 0 #to have the length of the training model
        for i in self.paired_features_id:
            if type(i[1]) == list:
                self.length_model += len(i[1])
        self.count = 0 # to create a count for the training model to be displayed, it begin at 0 because at the first click it is not to be taken into account
        self.current_index = [1, 0]  # first index for MS1, second index for MS2
        
        for s in data['sample'].unique(): #get samples names to take spectra from mzml files
            spec = cl.Spectra('../mzML/'+s+'.mzML')
            spec.extract_peaks()
            self.spectra[s] = spec
            
    # Function to adjust the length of the ms1 and ms2 feture intensities points for similarity metric and euclidean distance calculation
    def adjust_lengths(self, a, b):
        if len(a) > len(b):
            larger = a
            smaller = b
        else:
            larger = b
            smaller = a

        diff = len(larger) - len(smaller)
        for _ in range(diff):
            # Remove elements alternatively from the beginning (index 0) and end (index -1)
            if len(larger) % 2 == 0:
                larger = np.delete(larger, 0)
            else:
                larger = np.delete(larger, -1)
        return larger, smaller
    
    # Obtain here the different information for the decision tree
    def get_feature_vector(self, ms1_feature, ms2_feature, ms1_shape, ms2_shape):
        ms1_i = [i[0] for i in ms1_shape[0]]
        ms2_i = [i[0] for i in ms2_shape[0]]
        
        similarity_metric = round(pearsonr(zscore(ms1_i), zscore(ms2_i))[0], 3)
        euclidean_distance = round(np.linalg.norm(zscore(ms1_i) - zscore(ms2_i)), 3)
        
        mz_diff = round(ms1_feature['m/z'] - ms2_feature['m/z'], 4)
        rt_diff = round(ms1_feature['rt'] - ms2_feature['rt'], 4)
        
        return [mz_diff, rt_diff, similarity_metric, euclidean_distance]


    def plot_features(self, ms1_shape, ms2_shape):
        ms1_i = [i[0] for i in ms1_shape[0]]
        ms1_rt = [i[1] for i in ms1_shape[0]]
        ms2_i = [i[0] for i in ms2_shape[0]]
        ms2_rt = [i[1] for i in ms2_shape[0]]
             
        similarity_metric = round(pearsonr(zscore(ms1_i), zscore(ms2_i))[0], 3)
        euclidean_distance = round(np.linalg.norm(zscore(ms1_i) - zscore(ms2_i)), 3)
        
        fig, axs = plt.subplots(1, 2, figsize=(20, 5))

        # Plot absolute intensities
        axs[0].plot(ms1_rt, ms1_i, label=f'MS1 - {ms1_shape[1]} Da', color='blue')
        axs[0].plot(ms2_rt, ms2_i, label=f'MS2 - {ms2_shape[1]} Da', color='red')
        axs[0].set_xlabel('rt (min)')
        axs[0].set_ylabel('Absolute Intensity')
        axs[0].legend()

        # Plot relative intensities (Z-score normalized)
        axs[1].plot(ms1_rt, zscore(ms1_i), label=f'MS1 - {ms1_shape[1]} Da', color='blue')
        axs[1].plot(ms2_rt, zscore(ms2_i), label=f'MS2 - {ms2_shape[1]} Da', color='red')
        axs[1].set_xlabel('rt (min)')
        axs[1].set_ylabel('Relative Intensity (Z-score)')
        axs[1].set_title(f'{round(similarity_metric, 3)} - {euclidean_distance}')
        axs[1].legend()

        plt.show()
    
    # Function to fill 0 intensities values with border intensities values
    def fix_zeros(self, data):
        for i in range(len(data)):
            if data[i] == 0:
                if i == 0:  # If it's the first element
                    next_val = next((x for x in data[i+1:] if x != 0), 0)
                    data[i] = next_val
                elif i == len(data) - 1:  # If it's the last element
                    prev_val = next((x for x in reversed(data[:i]) if x != 0), 0)
                    data[i] = prev_val
                else:  # If it's any other element
                    prev_val = next((x for x in reversed(data[:i]) if x != 0), 0)
                    next_val = next((x for x in data[i+1:] if x != 0), 0)
                    data[i] = (prev_val + next_val) / 2

        return data

    # Function to obtain intensities for plotting features
    def spectra_get_ms_intensities(self, ms1_feature, ms2_feature):
        print( ms1_feature['sample'],  ms2_feature['sample'])
        rt_shift = 0.01 #this is to increase the observation window of the features
        sample = ms1_feature['sample']
        rt_min = float(ms1_feature['peak_rt_start'])
        rt_max = float(ms1_feature['peak_rt_end'])
        mz_tolerance = 0.002 # mz tolerance is to take all mz around the feature mz values with a tolerance
        mz_min_ms1 = float(ms1_feature['peak_mz_min'])
        mz_max_ms1 = float(ms1_feature['peak_mz_max'])
        spec = self.spectra[sample]       
        scans_ms1 = [] # scans are minimal and maximal scans index based of the rt range
        for rt_value  in [rt_min - rt_shift, rt_max + rt_shift]:
            match = 100 # match is 100min value to begin with a very high impossible rt value
            for idx, rt in enumerate(spec.rt1): # the idea is to have the lowest rt difference find the corresponding rt value of a scan
                difference = float(rt_value) - float(rt)  # take the difference of rt
                if abs(difference) < abs(match):
                    match = difference
                else: # if the difference increases rather than decreasing we have found the right rt, because the loop is in increase rt duration order
                    scans_ms1.append(idx-1)
                    break

        rt_min = float(ms2_feature['peak_rt_start'])
        rt_max = float(ms2_feature['peak_rt_end'])
        mz_min_ms2 = float(ms2_feature['peak_mz_min'])
        mz_max_ms2 = float(ms2_feature['peak_mz_max'])      
        scans_ms2 = [] # scans are minimal and maximal scans index based of the rt range
        for rt_value  in [rt_min - rt_shift, rt_max + rt_shift]:
            match = 100 # match is 100min value to begin with a very high impossible rt value
            for idx, rt in enumerate(spec.rt2): # the idea is to have the lowest rt difference find the corresponding rt value of a scan
                difference = float(rt_value) - float(rt)  # take the difference of rt
                if abs(difference) < abs(match):
                    match = difference
                else: # if the difference increases rather than decreasing we have found the right rt, because the loop is in increase rt duration order
                    scans_ms2.append(idx-1)
                    break  
        scans = [] #take the minimum and maximum values of scan for harmonization between ms1 and ms2 features
        scans.append(min([scans_ms1[0], scans_ms2[0]]))
        scans.append(max([scans_ms1[1], scans_ms2[1]]))    
                         
        intensities1 = []
        intensities2 = []
        for scan in range(scans[0], scans[1]):
            ms1_arr = spec.peakarray1[scan] #take all peakarraay values to make dataframes
            ms2_arr = spec.peakarray2[scan]
            df1 = pd.DataFrame(ms1_arr, columns=['mz', 'int'])
            df2 = pd.DataFrame(ms2_arr, columns=['mz', 'int'])
            matches1 = df1.loc[(df1['mz'] >= (mz_min_ms1 - mz_tolerance)) & (df1['mz'] <= (mz_max_ms1 + mz_tolerance))] # Perform a match within the m/z range
            matches2 = df2.loc[(df2['mz'] >= (mz_min_ms2 - mz_tolerance)) & (df2['mz'] <= (mz_max_ms2 + mz_tolerance))]
            if matches1.empty == False:
                intensities1.append((max(matches1['int']), spec.rt1[scan]))
            else:
                intensities1.append((0, spec.rt1[scan]))
            if matches2.empty == False:
                intensities2.append((max(matches2['int']), spec.rt2[scan]))
            else:
                intensities2.append((0, spec.rt2[scan]))
            intensities1 = self.fix_zeros(intensities1)
            intensities2 = self.fix_zeros(intensities2)
        return intensities1, intensities2

    # Function to handle the button click
    def on_button_click(self, label):
        if self.count >= (self.length_model):
            if self.block == False:
                self.labels.append(label)  # Map 'Yes' to 1, 'No' to 0, and 'Skip' to None
            print("All pairs have been labeled.")
            self.block = True # prevent for adding new labels
        else:
            if self.block == False:
                self.labels.append(label)  # Map 'Yes' to 1, 'No' to 0, and 'Skip' to None
            self.display_next_pair()
            self.count += 1
    # Before appending the label, also append the current feature values
        
        print(f'{self.count}/{self.length_model}')        
        print(self.labels)
        print(self.metrics)
        
    
    # Function to display the next pair
    def display_next_pair(self):            
        ms1_feature, matching_ms2 = self.paired_features[self.current_index[0]] # matching ms2 are all the ms2 feature matching the ms1 feature

        if self.current_index[1] >= len(matching_ms2): # if no matching ms2, len will be 0 and it will be skept
            self.current_index =  [self.current_index[0] + 1, 0]
            self.display_next_pair()
            return

        current_pair_ms2 = matching_ms2.iloc[self.current_index[1]] # take the current ms2 feature
        self.current_index[1] += 1
        
        intensities1, intensities2 = self.spectra_get_ms_intensities(ms1_feature, current_pair_ms2)
        ms1_shape = intensities1, ms1_feature['m/z']   # Fetch shape data for MS1 feature
        ms2_shape = intensities2, current_pair_ms2['m/z']   # Fetch shape data for MS2 feature
        self.metrics.append(self.get_feature_vector(ms1_feature, current_pair_ms2, ms1_shape, ms2_shape))
        self.plot_features(ms1_shape, ms2_shape)

        
tool = TrainingTool(feature)
I = interact_manual(tool.on_button_click, label=['Yes', 'No', 'Skip'])

interactive(children=(Dropdown(description='label', options=('Yes', 'No', 'Skip'), value='Yes'), Button(descri…

In [12]:
from sklearn.ensemble import RandomForestClassifier
# Collect your training data using the tool...
# Pay attention! The first label from tool.label are just a first label recorded to start the training session, reason why it isremoved below
def prepare_training_data(Tool):
    X = Tool.metrics
    y = Tool.labels[1:]
    print(f"Size of X: {len(X)}, Size of y: {len(y)}")  # Print the sizes of X and y

    return np.array(X), np.array(y)

X, y = prepare_training_data(tool)
clf = RandomForestClassifier()
clf.fit(X, y)

Size of X: 13, Size of y: 13


In [14]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

# Split your data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create a classifier and fit it to your data
clf = RandomForestClassifier()
clf.fit(X_train, y_train)

# Use the fitted model to make predictions on the test data
y_pred = clf.predict(X_test)

from sklearn.metrics import accuracy_score
print("Accuracy: ", accuracy_score(y_test, y_pred))

Accuracy:  0.6666666666666666
