## Constants

In [1]:
# Imports 
import numpy as np
import pandas as pd
from pandas import ExcelWriter
from pandas import ExcelFile
from numpy import diff
from scipy.signal import savgol_filter
from scipy.interpolate import interp1d
from operator import truediv
pd.options.mode.chained_assignment = None

In [2]:
# Constants 

# Filepaths for the raw data and the preprocessed data that this code will output
FILEPATH_RAW = '../Data_Raw/'
FILEPATH_PREPROCESSED = '../Data_Preprocessed/'

# Dictionary of the datasets files as values, and names attributed to them as keys
NAMES_DATASETS = {
    'OD' : 'F0.csv', # Raw optical density data
    'FT' : 'F1.csv', # Raw test fluorescence data
    'FC' : 'F2.csv'  # Raw control fluorescence data
}

# Define the names for the final metrics
NAMES_FINAL = [
    'FT_OD', # Refers to the fluorescence of the test gene normalised by OD
    'FC_OD', # Refers to the fluorescence of the control gene normalised by OD
    'FT_FC'  # Refers to the fluorescence of the test gene normalised by that of the control gene
]

# Interpolation parameters
INTERPOLATION_POINTS = 750
WINDOW_SIZE = 101
POLY_ORDER = 2
KIND_OF_INTERPOLATION = 'linear'

# Stable region inference parameters
TIME_BEFORE_MAX_GROWTH = 8 # How much do we allow the window to go before Max Growth.
WINDOW_DURATION = 40 # How long do we want the window to be.

# Exporting the processed data
EXPORT_CSV = True # Whether the output of this code will be saved
EXPORT_NAME = '_final_stats.csv' # Suffix of the preprocessed datasets if they are saved

## Functions

In [3]:
# Function to load data 

def func_load_data(filepath, names):
    """
    Load raw data from csv files into a dictionary.

    Args:
    filepath (str): The directory path where the data files are stored (defined in Constants).
    names (dict): The names of the datasets and the corresponding filenames (defined in Constants).

    Returns:
    data (dict): The loaded data, where the keys are dataset names and values are DataFrames.
    """

    data_raw = {key: pd.read_csv(filepath + value, index_col=0) for key, value in names.items()}
    return data_raw

In [4]:
# Function to interpolate, filter, differentiate and normalize 

def func_IFDN(
    data_raw,
    names_raw,
    interpolation_points,
    window_size,
    poly_order,
    kind_of_interpolation,
):

    """
    Preprocess the raw data. This function interpolates, filters, and computes the relevant derivatives of the raw data.

    Args:
    data_raw (dict): The raw data dictionary (output of func_load_data).
    names_raw (dict): The names of the datasets and the corresponding filenames (defined in Constants).
    interpolation_points (int): The number of interpolation points (defined in Constants).
    window_size (int): The size of the window for the Savitzky-Golay filter (defined in Constants).
    poly_order (int): The polynomial order for the Savitzky-Golay filter (defined in Constants).
    kind_of_interpolation (str): The type of interpolation to be used (defined in Constants).

    Returns:
    data_modified (dict): The interpolated and filtered data. Here:
        • Interpolated OD
        • Filtered OD
        • Interpolated test fluorescence
        • Filtered test fluorescence
        • Interpolated control fluorescence
        • Filtered control fluorescence
    data_derivatives (dict): The derivatives of the interpolated and filtered data, with relevant normalizations. Here:
        • d(OD)/dt
        • d(test fluorescence)/dt
        • d(control fluorescence)/dt
        • (d(test fluorescence)/dt)/(d(OD)/dt)
        • (d(control fluorescence)/dt)/(d(OD)/dt)
        • (d(test fluorescence)/dt)/(d(test fluorescence)/dt)
    """
    
    
    # Initialise interpolated time array
    Time_itp = np.linspace(
        float(data_raw['OD'].columns.tolist()[0]), # Interval beginning
        float(data_raw['OD'].columns.tolist()[-1]), # Interval end
        interpolation_points # This is the number of evenly spaced values within the specified interval
    )

    # Initialise dataframes for interpolated (itp) and filtered (fil) data
    data_modified = {key + suffix: pd.DataFrame(index=data_raw['OD'].index, columns=Time_itp)
             for key in names_raw.keys()
             for suffix in ['_itp', '_fil']}

    # Initialise dataframes for time derived (der), and normalised (Fx_Fy) data
    keys = [key + '_der' for key in names_raw.keys()] + ['FT_OD', 'FC_OD', 'FT_FC']
    data_derivatives = {key: pd.DataFrame(index=data_raw['OD'].index, columns=Time_itp[1:])
                        for key in keys}

    # Loop over all the samples (a sample is constituted by the temporal signals acquired for a given microplate well).
    for i in range(len(data_raw['OD'])):

        # Counter to keep track of the computation progress
        progress=np.round(i/len(data_raw['OD']),2)*100
        print(f"{progress:.2f}%", end="\r")

        # Obtain the time stamps associated with the current sample
        time = list(map(float, data_raw['OD'].columns.tolist()))
        Nb_nan = data_raw['OD'].iloc[i].isnull().sum() # Number of NaN values because not all data acquisitions are as long as each other
        if Nb_nan:
            time = time[:-Nb_nan]
        time_stamps = np.linspace(time[0], time[-1], interpolation_points)

        # Loop over the datasets
        for j in names_raw.keys():

            # Remove nan values from the current sample
            values = data_raw[j].iloc[i]
            values_nonan = [value for value in values if not np.isnan(value)]

            # Interpolate the raw data
            data_modified[j+'_itp'].iloc[i] = interp1d(
                time,
                values_nonan,
                kind=kind_of_interpolation
            )(time_stamps) 

            # Filter the interpolated data
            data_modified[j+'_fil'].iloc[i] = savgol_filter(
                data_modified[j+'_itp'].iloc[i].tolist(),
                window_size,
                poly_order)  

            # Compute the derivatives of the interpolated data
            data_derivatives[j+'_der'].iloc[i] = diff( data_modified[j+'_fil'].iloc[i] ) / diff(time_stamps)

        # Compute the normalisations for the differentiated data
        data_derivatives['FT_OD'].iloc[i] = np.divide(
            data_derivatives['FT_der'].iloc[i].values,
            data_derivatives['OD_der'].iloc[i].values,
            out=np.zeros_like(data_derivatives['FT_der'].iloc[i]),
            where=data_derivatives['OD_der'].iloc[i]!=0
        )
        data_derivatives['FC_OD'].iloc[i] = np.divide(
            data_derivatives['FC_der'].iloc[i].values,
            data_derivatives['OD_der'].iloc[i].values,
            out=np.zeros_like(data_derivatives['FC_der'].iloc[i]),
            where=data_derivatives['OD_der'].iloc[i]!=0
        )
        data_derivatives['FT_FC'].iloc[i] = np.divide(
            data_derivatives['FT_der'].iloc[i].values,
            data_derivatives['FC_der'].iloc[i].values,
            out=np.zeros_like(data_derivatives['FT_der'].iloc[i]),
            where=data_derivatives['FC_der'].iloc[i]!=0
        )

    return data_modified, data_derivatives

In [5]:
# Function to compute the final metrics 

def func_final_metrics(
    data_modified,
    data_derivatives,
    window_duration,
    time_before_max_growth,
    names_final
):
    
    """
    This function selects the window of time with the smallest variance in the normalised differentiated data, 
    and computes the mean value over this window, which serves as the final metric for each condition of each sample.

    Args:
    data_modified (dict): The interpolated and filtered data (output of func_IFDN).
    data_derivatives (dict): The derivatives of the interpolated and filtered data, with relevant normalizations (output of func_IFDN).
    window_duration (int): How long do we want the window to be (defined in Constants).
    time_before_max_growth (int): How much do we allow the window to go before maximum growth rate (defined in Constants).
    names_final (list): The names of the final datasets to be computed (defined in Constants).

    Returns:
    data_final_metrics (dict): A dictionary of pandas DataFrames. Each DataFrame contains the mean values over 
    the window with the smallest variance for each sample and each condition in the 'names_final' list.
    """

    # Initialise final computations dataframes which will store the final computed metrics for each sample. 
    data_final_metrics = {names_final: pd.DataFrame(index=data_derivatives['OD_der'].index,columns=['Mean'])
                         for names_final in names_final}

    # Loop over all samples
    for m in range(len(data_derivatives['OD_der'])):   

        # Counter to keep track of the computation progress
        progress=np.round(m/len(data_derivatives['OD_der']),2)*100
        print(f"{progress:.2f}%", end="\r")

        # Define current sample
        sample = data_derivatives['OD_der'].index[m]

        # Compute growth rate
        Growth_Rate = np.divide(data_derivatives['OD_der'].loc[sample].tolist(),
                                data_modified['OD_fil'].loc[sample].tolist()[:-1])

        # Compute the number of possible time windows
        Nb_possible_timewindows = max(1,abs(np.argmax(Growth_Rate) - window_duration))

        # Initialize all possible signal windows to store the possible signal windows and their variances.
        SignalWindows = {names_final: np.zeros((Nb_possible_timewindows,window_duration)) for names_final in names_final}
        Vars          = {names_final: np.zeros(Nb_possible_timewindows) for names_final in names_final}

        # Loop over all time windows. For each time window, it loops over conditions to store possible signal windows and 
        # compute their variances.
        for k in range(Nb_possible_timewindows):
            # Loop over conditions
            for names_final_i in names_final:
                # Store possible signal windows
                SignalWindows[names_final_i][k] = data_derivatives[names_final_i].loc[sample].tolist()[k:k+window_duration]
                # Compute the signal variance over the possible signal windows
                Vars[names_final_i][k] = np.var(SignalWindows[names_final_i][k])

        # Loop over conditions. For each condition, it finds the best signal window, i.e., the one with minimum variance and 
        # computes the temporal mean over the relevant window. This mean value is the final metric for the sample under that condition.
        for names_final_i in names_final:
            # Find the best signal window
            Var_MinIdx = list(Vars[names_final_i]).index(min(Vars[names_final_i][-time_before_max_growth:],key=abs))
            Best_SignalWindow = data_derivatives[names_final_i].loc[sample].tolist()[Var_MinIdx:Var_MinIdx+window_duration]
            # Compute temporal mean over the relevant window
            data_final_metrics[names_final_i].loc[sample] = np.mean(Best_SignalWindow)
            
    return data_final_metrics

In [6]:
# Function to compute the statistics over final metrics 

def func_stats(
    data_raw,
    final_metrics,
    names_final
):

    """
    This function computes the mean and the standard error of the mean (SEM) over replicas 
    for each unique condition in the dataset. The unique conditions are determined by splitting 
    the 'OD' index in the raw data. The function assumes that replicas are distinguished by a 
    '_r' in the index name. The final metrics are then averaged over these replicas.

    Args:
    final_metrics (dict): A dictionary of pandas DataFrames (output of func_final_metrics).
    data_raw (dict): The raw data dictionary (output of func_load_data).
    names_final (List[str]): The names of the final datasets to be computed (defined in Constants).

    Returns:
    data_final_stats (dict): A dictionary of pandas DataFrames. Each DataFrame contains the mean 
    and SEM of the final metrics for each unique condition and each dataset in the 'names_final' list.
    """
    
    # Compute Final Metrics over replica 

    # Extract unique conditions from sample names by splitting at '_r'
    unique_samples = set([var[0] for var in data_raw['OD'].index.str.split('_r')])
    
    # Initialize dataframes a new dictionary of dataframes to store the statistical results
    data_final_stats = {names_final: pd.DataFrame(index=sorted(unique_samples),columns=['Mean','SEM'])
                        for names_final in names_final}
    
    # Loop over all unique conditions
    for var in sorted(unique_samples):
        
        # Loop over each condition to compute and store statistics
        for names_final_i in names_final:
            
            # Find all replicates for a given condition in final_metrics
            replicates = final_metrics[names_final_i][final_metrics[names_final_i].index.str.contains(var)]
            
            # Compute mean and standard error of the mean (SEM) across replicates
            mean = replicates.mean(axis=0)[0]
            sem = replicates.sem(axis=0)[0]
            
            # Store mean and SEM in the appropriate dataframe
            data_final_stats[names_final_i].loc[var] = mean, sem
            
    return data_final_stats

In [7]:
# Function to export preprocessed data 

def func_export(export_csv, export_name, filepath_preprocessed, names_final, stats):
    
    """
    This function takes statistics of the preprocessed data "stats" and exports them to CSV format, if the 'export_csv' flag is set to True
    The name of each CSV file is determined by the corresponding element in 'names_final' and the 'export_name' string.

    Args:
    export_csv (bool): A flag indicating whether the data should be exported to CSV (defined in Constants).
    export_name (str): A string to be appended to the name of each exported file (defined in Constants).
    filepath_preprocessed (str): The file path where the exported files will be saved (defined in Constants).
    names_final (List[str]): The names of the datasets to be exported (defined in Constants).
    stats (Dict[str, pd.DataFrame]): The preprocessed data to be exported (output of func_stats).
    """
    
    # Check if the export_csv flag is set to True
    if export_csv:
        # Loop over each dataset name in names_final
        for names_final_i in names_final:
            # Save the dataset to a CSV file, with the filename determined by the dataset name and export_name
            stats[names_final_i].to_csv(filepath_preprocessed+names_final_i+export_name)

## Data Preprocessing

In [8]:
# Load data 

Data_Raw = func_load_data(
    FILEPATH_RAW,
    NAMES_DATASETS
)

In [9]:
# Interpolate, Filter, Differentiate and Normalise data  

Data_Modified, Data_Derivatives = func_IFDN(
    Data_Raw,
    NAMES_DATASETS,
    INTERPOLATION_POINTS,
    WINDOW_SIZE,
    POLY_ORDER,
    KIND_OF_INTERPOLATION
)

100.00%

In [10]:
# Compute final metrics 
Final_Metrics = func_final_metrics(
    Data_Modified,
    Data_Derivatives,
    WINDOW_DURATION,
    TIME_BEFORE_MAX_GROWTH,
    NAMES_FINAL
)

100.00%

In [11]:
# Compute final statistics 
Stats = func_stats(
    Data_Raw,
    Final_Metrics,
    NAMES_FINAL
)

In [12]:
# Export final statistics 

func_export(
    EXPORT_CSV,
    EXPORT_NAME,
    FILEPATH_PREPROCESSED,
    NAMES_FINAL,
    Stats
)