===================================
 # PEAKS DETECTION #
===================================

Read the chromatogram, generate a peak table and identify the compounds using the NIST database. 

In [None]:
import os
import ipywidgets as widgets
from IPython.display import display
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
from identification import sample_identification
import netCDF4 as nc


class GCGCMSAnalysisUI:
    """
    Class that handles the GC*GC_MS Analysis user interface and processing.
    Provides a widget-based interface for configuring and running GCGCMS analysis.
    """
    
    def __init__(self):
        """Initialize the GCMS Analysis UI with default parameters and widgets."""
        # Environment setup
        self.docker_volume_path = os.getenv('DOCKER_VOLUME_PATH')
        self.host_volume_path = os.getenv('HOST_VOLUME_PATH')
        
        # Default paths
        self.default_path_input = f'{self.host_volume_path}'
        self.default_path_output = f'{self.host_volume_path}/output/'
        
        # Style configuration
        self.style = {'description_width': 'initial'}
        
        # Initialize widgets
        self._create_widgets()
        
        # Configure widget callbacks
        self._setup_callbacks()
        
        # Create main container layout
        self.vbox = widgets.VBox(layout=widgets.Layout(border='2px solid green'))
        
    def _create_widgets(self):
        """Create all UI widgets."""
        # Title
        self.txt_title = widgets.HTML('<H1>Choisissez vos parametres:</H1>')
        
        # Path inputs
        self.w_path = widgets.Text(value=self.default_path_input)
        self.path = self._bold_widget("Path", self.w_path)
        
        self.w_files = widgets.Text(placeholder="ex: file1.cdf, file2.cdf")
        self.files = self._bold_widget("Files", self.w_files)
        
        self.w_output_path = widgets.Text(value=self.default_path_output)
        self.output_path = self._bold_widget("Output path", self.w_output_path)
        
        # Method selection
        self.w_method = widgets.RadioButtons(
            options=['persistent_homology', 'peak_local_max', 'LoG', 'DoG', 'DoH'],
            value='peak_local_max',
            description='<b>Method</b>',
            disabled=False)
        # self.method = self._bold_widget("Method", self.w_method)
        
        # Mode selection
        self.w_mode = widgets.RadioButtons(
            options=['tic', 'mass_per_mass', '3D'],
            value='tic',
            description='<b>Mode</b>',
            disabled=False)
        # self.mode = self._bold_widget("Mode", self.w_mode)
        
        # Filtering factor
        self.w_filtering_factor = widgets.FloatSlider(value=0.1, min=0, max=1, step=0.1)
        self.filtering_factor = self._bold_widget("Filtering factor", self.w_filtering_factor)
        self.filtering_factor_def = widgets.HTML(value="""
                                        <div style="margin-left: 50px;">
                                        <p><i>Filtering factor is a floating-point value between 0 and 1  \
    used to filter detected peaks. A peak is retained if its intensity in the  \
    chromatogram is greater than the maximum intensity in the chromatogram  \
    multiplied by the filtering factor. 
    The noise level (sigma) is estimated, then adjusted using the filtering factor to define when a signal \
    is considered significant despite the noise. The result is normalized relative to the maximum signal in \
    the chromatogram, so that this threshold (dynamic_threshold_fact) is proportional to the overall signal \
    amplitude.</div>""")
        
        # Minimum persistence
        self.w_min_persistence = widgets.Text(value="0.02")
        self.min_persistence = self._bold_widget("Minimum persistence", self.w_min_persistence)
        self.min_persistence_def = widgets.HTML(value="""
                                        <div style="margin-left: 50px;">
                                        <p><i>enter a min_persistence if method="persistent_homology".
    min_persistence is a floating-point value used to filter detected peaks based on their topological \
                                       persistence.
    It defines the minimum persistence threshold that a peak must exceed to be considered a true signal \
                                       rather than noise.
    A small min_persistence will detect many peaks, including weak ones — often noise.
    A large min_persistence will retain only the most prominent peaks, effectively filtering out noise, \
                                       but it may also miss subtle or low-intensity signals. Use \
                                       estimation_min_persistence.ipynb in order to estimate it.</div>""")
        
        # Hit probability
        self.hit_prob_min = widgets.IntSlider(
            value=15, min=0, max=30,
            description="<b>Minimum hit probability</b> [a modifier]",
            style=self.style)
        
        # Threshold for mass_per_mass or 3D mode
        self.txt_abs_threshold_cluster = widgets.HTML("If mass_per_mass or 3D mode:")
        self.ABS_THRESHOLDS = widgets.Text(
            value=None,
            disabled=True,
            description="Seuils absolus(liste)",
            style=self.style)
        
        # Cluster checkbox
        self.cluster = widgets.Checkbox(
            value=False,
            description='<b>cluster</b>',
            disabled=True)
        
        # Peak local max parameters
        self.txt_plm = widgets.HTML("Peak local max parameter: ")
        txt1 = "The minimal allowed distance separating peaks."
        txt2 = "To find the maximum number of peaks, use min_distance=1."
        self.txt_plm_description = widgets.HTML(f"({txt1} {txt2})")
        self.min_distance = widgets.IntSlider(
            value=1, min=0, max=30, step=1, 
            description="<b>Minimal distance</b> [a modifier]",
            style=self.style)
        
        # DoG parameters
        self.txt_dog = widgets.HTML("DoG parameter: ")
        txt3 = "The ratio between the standard deviation of Gaussian Kernels used for"\
                " computing the Difference of Gaussians."
        self.txt_dog_description = widgets.HTML(txt3)
        self.sigma_ratio = widgets.FloatSlider(
            value=1.6, min=0, max=2, step=0.01,
            description="<b>sigma ratio:</b>",
            style=self.style)
        
        # LoG and DoH parameters
        self.txt_log_doh = widgets.HTML("LoG and DoG parameter")
        txt4 = "(The number of intermediate values of standard deviations to consider"\
            " between min_sigma (1) and max_sigma (30))"
        self.txt_log_doh_description = widgets.HTML(txt4)
        self.num_sigma = widgets.IntSlider(
            value=10, min=1, max=30,
            description="<b>sigma:</b>")
        
        # Formatted spectra option
        self.formated_spectra = widgets.Checkbox(
            value=True,
            description='<b>formated spectra</b>',
            disabled=False)
        
        # Match factor
        self.match_factor_min = widgets.IntSlider(
            value=700, min=0, max=1000, step=1,
            description="<b>Match factor min</b> [a modifier]")
        
        # Run button and output
        self.run_button = widgets.Button(description="Run")
        self.output = widgets.Output()
    
    def _setup_callbacks(self):
        """Set up callbacks for interactive widgets."""
        self.w_mode.observe(self._update_ABS_THRESHOLDS, names='value')
        self.ABS_THRESHOLDS.observe(self._parse_abs_thresholds, names='value')
        self.run_button.on_click(self._on_button_click)
        
    def _bold_widget(self, value, widget):
        """Create a widget with bold label."""
        bold = widgets.HTML(value=f'<b>{value}</b>')
        return widgets.HBox([bold, widget])
    
    def _parse_abs_thresholds(self, change):
        """Parse the absolute thresholds entered as a comma-separated list."""
        try:
            values = [float(x.strip()) for x in change['new'].split(",")]
            print("Valeurs converties :", values)
        except ValueError:
            print("Erreur : Veuillez entrer des nombres séparés par des virgules.")
    
    def _update_ABS_THRESHOLDS(self, change):
        """Enable or disable ABS_THRESHOLDS and cluster widgets based on selected mode."""
        if change.new == 'tic':
            self.ABS_THRESHOLDS.disabled = True
            self.cluster.disabled = True
        else:
            self.ABS_THRESHOLDS.disabled = False
            self.cluster.disabled = False
    
    def get_files_from_folder(self, path):
        """Get all CDF files from a folder."""
        if os.path.isdir(path):
            return [f for f in os.listdir(path) if f.endswith(".cdf")]
        else:
            return []
    
    def get_mod_time(self, file_path):
        """Get modulation time based on scan_number from CDF file."""
        data = nc.Dataset(file_path, 'r')
        scan_number = data.dimensions['scan_number'].size
        if scan_number == 328125:   
            mod_time = 1.25
            print("type de donnees: G0/plasma")
        elif scan_number == 540035:
            mod_time = 1.7
            print("type de donnnees: air expire")
        else:
            print("scan_number non reconnu")
        return mod_time
    
    def analyse(self, path, files_list, output_path, user_output_path, method, mode, filtering_factor,
                min_persistence, hit_prob_min, ABS_THRESHOLDS, cluster, min_distance,
                sigma_ratio, num_sigma, formated_spectra, match_factor_min):
        """Run the analysis on the specified files."""
        if not path:
            print("Erreur : Aucun chemin sélectionné.")
            return
        
        if files_list is None:
            files_list = self.get_files_from_folder(path)

        if not os.path.exists(output_path):
            os.makedirs(output_path)
            print(f"Created output directory: {user_output_path}")
        
        files_list = [file.strip() for file in files_list if file.strip()]
        print(f"Fichiers à analyser : {files_list}")
      
        for file in files_list:
            full_path = os.path.join(path, file)
            if not os.path.isfile(full_path):
                print(f"Erreur : Le fichier '{file}' est introuvable dans '{path}'")
                return
            if not os.access(full_path, os.R_OK):
                print(f"Erreur: Permission refusée pour accéder à '{file}' dans '{path}'")
                return
            
            mod_time = self.get_mod_time(full_path)
            
            print(f"Analyzing {file} with modulation time = {mod_time} secondes...\n")
            result = sample_identification(path, file, output_path, mod_time, method, mode, filtering_factor, 
                                  hit_prob_min, ABS_THRESHOLDS, cluster, min_distance, sigma_ratio, 
                                  num_sigma, formated_spectra, match_factor_min, min_persistence)
            print("Analyse terminée:", result)
        print("Tous les fichiers ont été analysés avec succès.")
    
    def _on_button_click(self, b):
        """Handle button click event to start analysis."""
        with self.output:
            self.output.clear_output()

            user_input_path = self.w_path.value
            path_for_docker = user_input_path.replace(self.host_volume_path, self.docker_volume_path, 1)

            user_output_path = self.w_output_path.value
            output_path_for_docker = user_output_path.replace(self.host_volume_path, self.docker_volume_path, 1)
           
            files_list = self.w_files.value.split(",")
           
            if files_list == ['']:
                files_list = None
                
            self.analyse(path_for_docker, files_list, output_path_for_docker, user_output_path,
                    self.w_method.value, self.w_mode.value, self.w_filtering_factor.value, 
                    self.w_min_persistence.value, self.hit_prob_min.value,
                    self.ABS_THRESHOLDS.value, self.cluster.value, self.min_distance.value,
                    self.sigma_ratio.value, self.num_sigma.value, self.formated_spectra.value,
                    self.match_factor_min.value)
    
    def display(self):
        """Display the UI."""
        display(self.txt_title, self.path, self.files, self.output_path, self.w_method, self.w_mode, 
                self.filtering_factor, self.filtering_factor_def, self.min_persistence, 
                self.hit_prob_min, self.formated_spectra, self.match_factor_min,
                self.vbox, self.txt_abs_threshold_cluster, self.ABS_THRESHOLDS, self.cluster, self.vbox,
                self.txt_plm, self.txt_plm_description, self.min_distance, self.vbox, self.txt_dog,
                self.txt_dog_description, self.sigma_ratio, self.vbox, self.txt_log_doh,
                self.txt_log_doh_description, self.num_sigma, self.vbox,
                self.run_button, self.output)




In [None]:
gcms_ui = GCGCMSAnalysisUI()
gcms_ui.display()