<a href="https://colab.research.google.com/github/Palaeoprot/ModulAAR/blob/main/data_processor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np

class DataProcessor:
    def __init__(self):
        self.raw_data = None
        self.processed_data = None
        self.real_DL = None
        self.amino_acids = ['Asx', 'Glx', 'Ser', 'Ala', 'Val', 'Phe', 'Ile']

    def load_data(self, source, is_gsheet=False):
        try:
            if is_gsheet:
                sheet_id = source['sheet_id']
                gid = source['gid']
                export_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}"
                self.raw_data = pd.read_csv(export_url)
                print(f"Data loaded successfully from Google Sheet")
            else:
                self.raw_data = pd.read_csv(source)
                print(f"Data loaded successfully from {source}")
        except Exception as e:
            print(f"Error loading data: {str(e)}")
            self.raw_data = None
        return self.raw_data

    def clean_data(self):
        if self.raw_data is None:
            print("No data available to clean.")
            return None

        self.processed_data = self.raw_data.copy()

        # Rename columns for consistency
        column_mapping = {
            'time (h)': 'time',
            'temp': 'temp (°C)',
            'Pre-heat bleach time': 'Pre-heat bleach time (h)'
        }
        self.processed_data.rename(columns=column_mapping, inplace=True)

        # Strip whitespace and replace non-numeric characters in relevant columns
        numeric_columns = [
            '[Asx]', '[Glx]', '[Ser]', '[Ala]', '[Val]', '[Phe]', '[Ile]',
            'Asx D/L', 'Glx D/L', 'Ser D/L', 'Ala D/L', 'Val D/L', 'Phe D/L', 'Ile D/L'
        ]

        # Change the conditional statement to check for string type instead of object type
        for col in self.processed_data.columns:
            if self.processed_data[col].dtype == 'object': # Check for string type
                self.processed_data[col] = self.processed_data[col].str.strip()

        for col in numeric_columns:
            self.processed_data[col] = self.processed_data[col].astype(str).str.replace(r'[^\d.-]', '', regex=True)
            self.processed_data[col] = pd.to_numeric(self.processed_data[col], errors='coerce')

        # Drop rows with NaN values in numeric columns
        self.processed_data.dropna(subset=numeric_columns, inplace=True)

        print("Data cleaning completed.")
        return self.processed_data

    def calculate_real_DL(self):
        if self.processed_data is None:
            print("No processed data available. Please clean data first.")
            return None

        faa_df = self.processed_data[self.processed_data['sample'] == 'FAA']
        thaa_df = self.processed_data[self.processed_data['sample'] == 'THAA']

        self.real_DL = self.processed_data[['temp (°C)', 'time', 'pH']].drop_duplicates().reset_index(drop=True)

        for aa in self.amino_acids:
            aa_data = self._calculate_amino_acid_distribution(aa, faa_df, thaa_df)
            self.real_DL = pd.merge(self.real_DL, aa_data, on=['temp (°C)', 'time'], how='outer')

        self.real_DL['temp (K)'] = self.real_DL['temp (°C)'] + 273.15

        print("real_DL calculation completed.")
        return self.real_DL

    def _calculate_amino_acid_distribution(self, amino_acid, faa_df, thaa_df):
        aa_data = pd.DataFrame()

        # Calculate stats for FAA and THAA
        faa_stats = faa_df.groupby(['temp (°C)', 'time'])[[f'[{amino_acid}]', f'{amino_acid} D/L']].agg(['mean', 'std', 'count'])
        thaa_stats = thaa_df.groupby(['temp (°C)', 'time'])[[f'[{amino_acid}]', f'{amino_acid} D/L']].agg(['mean', 'std', 'count'])

        # Merge FAA and THAA data
        aa_data = pd.merge(faa_stats, thaa_stats, left_index=True, right_index=True, suffixes=('_FAA', '_THAA'))
        aa_data = aa_data.reset_index()

        # Flatten column names
        aa_data.columns = ['_'.join(col).strip() for col in aa_data.columns.values]

        # Rename columns
        column_mapping = {
            'temp (°C)_': 'temp (°C)',
            'time_': 'time',
            f'[{amino_acid}]_FAA_mean': f'{amino_acid}_Conc_FAA_Mean',
            f'[{amino_acid}]_FAA_std': f'{amino_acid}_Conc_FAA_Std',
            f'[{amino_acid}]_FAA_count': f'{amino_acid}_Conc_FAA_Count',
            f'{amino_acid} D/L_FAA_mean': f'{amino_acid}_D/L_FAA_Mean',
            f'{amino_acid} D/L_FAA_std': f'{amino_acid}_D/L_FAA_Std',
            f'{amino_acid} D/L_FAA_count': f'{amino_acid}_D/L_FAA_Count',
            f'[{amino_acid}]_THAA_mean': f'{amino_acid}_Conc_THAA_Mean',
            f'[{amino_acid}]_THAA_std': f'{amino_acid}_Conc_THAA_Std',
            f'[{amino_acid}]_THAA_count': f'{amino_acid}_Conc_THAA_Count',
            f'{amino_acid} D/L_THAA_mean': f'{amino_acid}_D/L_THAA_Mean',
            f'{amino_acid} D/L_THAA_std': f'{amino_acid}_D/L_THAA_Std',
            f'{amino_acid} D/L_THAA_count': f'{amino_acid}_D/L_THAA_Count'
        }
        aa_data = aa_data.rename(columns=column_mapping)

        # Calculate BAA concentrations
        aa_data[f'{amino_acid}_Conc_BAA_Mean'] = aa_data[f'{amino_acid}_Conc_THAA_Mean'] - aa_data[f'{amino_acid}_Conc_FAA_Mean']
        aa_data[f'{amino_acid}_Conc_BAA_Std'] = np.sqrt(aa_data[f'{amino_acid}_Conc_THAA_Std']**2 + aa_data[f'{amino_acid}_Conc_FAA_Std']**2)
        aa_data[f'{amino_acid}_Conc_BAA_Count'] = np.minimum(aa_data[f'{amino_acid}_Conc_THAA_Count'], aa_data[f'{amino_acid}_Conc_FAA_Count'])

        # Calculate FAA D and L concentrations
        aa_data[f'{amino_acid}_FAA_D'] = aa_data[f'{amino_acid}_Conc_FAA_Mean'] * aa_data[f'{amino_acid}_D/L_FAA_Mean'] / (1 + aa_data[f'{amino_acid}_D/L_FAA_Mean'])
        aa_data[f'{amino_acid}_FAA_L'] = aa_data[f'{amino_acid}_Conc_FAA_Mean'] - aa_data[f'{amino_acid}_FAA_D']

        # Calculate THAA D and L concentrations
        aa_data[f'{amino_acid}_THAA_D'] = aa_data[f'{amino_acid}_Conc_THAA_Mean'] * aa_data[f'{amino_acid}_D/L_THAA_Mean'] / (1 + aa_data[f'{amino_acid}_D/L_THAA_Mean'])
        aa_data[f'{amino_acid}_THAA_L'] = aa_data[f'{amino_acid}_Conc_THAA_Mean'] - aa_data[f'{amino_acid}_THAA_D']

        # Calculate BAA D and L concentrations
        aa_data[f'{amino_acid}_BAA_D'] = aa_data[f'{amino_acid}_THAA_D'] - aa_data[f'{amino_acid}_FAA_D']
        aa_data[f'{amino_acid}_BAA_L'] = aa_data[f'{amino_acid}_THAA_L'] - aa_data[f'{amino_acid}_FAA_L']

        # Calculate BAA D/L ratio
        aa_data[f'{amino_acid}_D/L_BAA_Mean'] = aa_data[f'{amino_acid}_BAA_D'] / aa_data[f'{amino_acid}_BAA_L']

        # Error propagation for BAA D/L ratio
        aa_data[f'{amino_acid}_D/L_BAA_Std'] = aa_data[f'{amino_acid}_D/L_BAA_Mean'] * np.sqrt(
            (aa_data[f'{amino_acid}_Conc_BAA_Std'] / aa_data[f'{amino_acid}_Conc_BAA_Mean'])**2 +
            (aa_data[f'{amino_acid}_D/L_THAA_Std'] / aa_data[f'{amino_acid}_D/L_THAA_Mean'])**2 +
            (aa_data[f'{amino_acid}_D/L_FAA_Std'] / aa_data[f'{amino_acid}_D/L_FAA_Mean'])**2
        )

        # Add BAA D/L Count column
        aa_data[f'{amino_acid}_D/L_BAA_Count'] = np.minimum(aa_data[f'{amino_acid}_D/L_THAA_Count'], aa_data[f'{amino_acid}_D/L_FAA_Count'])

        return aa_data

    def handle_single_value_stdev(self):
        """Set the standard deviation to 0 if there is only one value."""
        for col in self.real_DL.columns:
            if '_Std' in col:
                count_col = col.replace('_Std', '_Count')
                if count_col in self.real_DL.columns:
                    self.real_DL.loc[self.real_DL[count_col] == 1, col] = 0
                else:
                    print(f"Warning: Count column {count_col} not found for {col}")

    def save_to_csv(self, file_path):
        if self.real_DL is None:
            print("No real_DL data to save. Please calculate real_DL first.")
            return

        try:
            self.real_DL.to_csv(file_path, index=False)
            print(f"Data saved to {file_path}")
        except Exception as e:
            print(f"Error saving data: {str(e)}")

    def process_data(self, source, output_file, is_gsheet=False):
        self.load_data(source, is_gsheet)
        self.clean_data()
        self.calculate_real_DL()
        self.handle_single_value_stdev()
        self.save_to_csv(output_file)


def main():
    processor = DataProcessor()
    sheet_info = {'sheet_id': "1nA6jSAkAf1Ud-kHdaYTMtBPgKhe9nBg_IjM9idLlj8E", 'gid': "1259514505"}
    output_file = "/content/drive/MyDrive/Colab_Notebooks/MoDuLAAR/ProcessedData/real_DL_output.csv"
    processor.process_data(sheet_info, output_file, is_gsheet=True)

if __name__ == "__main__":
    main()

In [None]:
# # data_processor.ipynb
# # from google.colab import drive
# # drive.mount('/content/drive')
# import pandas as pd
# import numpy as np

# class DataProcessor:
#     def __init__(self):
#         self.raw_data = None
#         self.processed_data = None
#         self.real_DL = None
#         self.amino_acids = ['Asx', 'Glx', 'Ser', 'Ala', 'Val', 'Phe', 'Ile']

#     def load_data(self, source, is_gsheet=False):
#         try:
#             if is_gsheet:
#                 sheet_id = source['sheet_id']
#                 gid = source['gid']
#                 export_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv&gid={gid}"
#                 self.raw_data = pd.read_csv(export_url)
#                 print(f"Data loaded successfully from Google Sheet")
#             else:
#                 self.raw_data = pd.read_csv(source)
#                 print(f"Data loaded successfully from {source}")
#         except Exception as e:
#             print(f"Error loading data: {str(e)}")
#             self.raw_data = None
#         return self.raw_data

#     def clean_data(self):
#         if self.raw_data is None:
#             print("No data available to clean.")
#             return None

#         self.processed_data = self.raw_data.copy()

#         # Rename columns for consistency
#         column_mapping = {
#             'time (h)': 'time',
#             'temp': 'temp (°C)',
#             'Pre-heat bleach time': 'Pre-heat bleach time (h)'
#         }
#         self.processed_data.rename(columns=column_mapping, inplace=True)

#         # Strip whitespace and replace non-numeric characters in relevant columns
#         numeric_columns = [
#             '[Asx]', '[Glx]', '[Ser]', '[Ala]', '[Val]', '[Phe]', '[Ile]',
#             'Asx D/L', 'Glx D/L', 'Ser D/L', 'Ala D/L', 'Val D/L', 'Phe D/L', 'Ile D/L'
#         ]

#         for col in self.processed_data.columns:
#             if self.processed_data[col].dtype == 'object':
#                 self.processed_data[col] = self.processed_data[col].str.strip()

#         for col in numeric_columns:
#             self.processed_data[col] = self.processed_data[col].astype(str).str.replace(r'[^\d.-]', '', regex=True)
#             self.processed_data[col] = pd.to_numeric(self.processed_data[col], errors='coerce')

#         # Drop rows with NaN values in numeric columns
#         self.processed_data.dropna(subset=numeric_columns, inplace=True)

#         print("Data cleaning completed.")
#         return self.processed_data

#     def calculate_stats(self):
#         if self.processed_data is None:
#             print("No processed data available. Please clean data first.")
#             return None

#         faa_df = self.processed_data[self.processed_data['sample'] == 'FAA']
#         thaa_df = self.processed_data[self.processed_data['sample'] == 'THAA']

#         agg_dict = {
#             **{f'[{aa}]': ['mean', 'std', 'count'] for aa in self.amino_acids},
#             **{f'{aa} D/L': ['mean', 'std', 'count'] for aa in self.amino_acids}
#         }

#         grouped_faa = faa_df.groupby(['temp (°C)', 'time']).agg(agg_dict).reset_index()
#         grouped_thaa = thaa_df.groupby(['temp (°C)', 'time']).agg(agg_dict).reset_index()

#         # Rename columns
#         new_columns_faa = ['temp (°C)', 'time']
#         new_columns_thaa = ['temp (°C)', 'time']

#         for aa in self.amino_acids:
#             for stat in ['Mean', 'Std', 'Count']:
#                 new_columns_faa.extend([f'{aa}_Conc_FAA_{stat}', f'{aa}_D/L_FAA_{stat}'])
#                 new_columns_thaa.extend([f'{aa}_Conc_THAA_{stat}', f'{aa}_D/L_THAA_{stat}'])

#         grouped_faa.columns = new_columns_faa
#         grouped_thaa.columns = new_columns_thaa

#         # Handle single values in standard deviations
#         for df in [grouped_faa, grouped_thaa]:
#             for col in df.columns:
#                 if '_Std' in col:
#                     count_col = col.replace('_Std', '_Count')
#                     df.loc[df[count_col] == 1, col] = 0

#         self.real_DL = pd.merge(grouped_faa, grouped_thaa, on=['temp (°C)', 'time'], how='outer')
#         self.real_DL['temp (K)'] = self.real_DL['temp (°C)'] + 273.15

#         print("Statistics calculation completed.")
#         return self.real_DL

#     def save_to_csv(self, file_path):
#         if self.real_DL is None:
#             print("No real_DL data to save. Please calculate statistics first.")
#             return

#         try:
#             self.real_DL.to_csv(file_path, index=False)
#             print(f"Data saved to {file_path}")
#         except Exception as e:
#             print(f"Error saving data: {str(e)}")

#     def process_data(self, source, output_file, is_gsheet=False):
#         self.load_data(source, is_gsheet)
#         self.clean_data()
#         self.calculate_stats()
#         self.save_to_csv(output_file)

# def main():
#     processor = DataProcessor()
#     sheet_info = {'sheet_id': "1nA6jSAkAf1Ud-kHdaYTMtBPgKhe9nBg_IjM9idLlj8E", 'gid': "1259514505"}
#     output_file = "/content/drive/MyDrive/Colab_Notebooks/MoDuLAAR/ProcessedData/real_DL_output.csv"
#     processor.process_data(sheet_info, output_file, is_gsheet=True)

# if __name__ == "__main__":
#     main()
