In [None]:
import pandas as pd
import re
import matplotlib.pyplot as plt
import os
import numpy as np
from scipy.optimize import curve_fit
from scipy.special import expit

In [None]:
df_sol_list = pd.read_csv("df_sol_list_Feb18.csv")

In [None]:
def TXTConvertor(filepath, filename, start_marker= "Reaction Starts Now!", end_marker= "", end_marker2 = "Reaction Over!!"):
    data_raw = []
    _df_endpoint = ''
    reading = False
    reading_start = False

    with open(filepath, 'r') as file:
        for line in file:
            line = line.strip()
            if start_marker in line:
                reading = True
                reading_start = True
                continue  # Skip the start marker line
            if (reading_start) and (line == end_marker):
                break  # Stop reading when the end marker is reached
            if (reading_start) and (end_marker2 in line):
                if ':' in line: # Example: 'Reaction Over!! Final Time Diff:35.1380004882'
                    _df_endpoint = pd.DataFrame({
                        'trial': [filename],
                        'EndPoint': [float(line.split(':')[-1])]
                    })
                break  # Stop reading when the other end marker is reached
            if reading:
                data_raw.append(line)

    index = 0
    ls_df = []

    data = data_raw[:-1]

    for row in data:
        if index % 2 == 0:
            row_split = re.split(r' |: ', row)
            _df_RGBC = {row_split[i]: int(row_split[i + 1]) for i in range(0, len(row_split), 2)}
            _df_RGBC = pd.DataFrame([_df_RGBC])
        else:
            row_split = re.split(r'\|\| |:', row)
            _df_diff = {'Diff': float(row_split[0]), 'Time': float(row_split[2])}
            _df_diff = pd.DataFrame([_df_diff])
        
            df_row = pd.concat([_df_RGBC, _df_diff], axis=1)
            ls_df.append(df_row)

        index += 1

    df = pd.concat(ls_df)
    df['trial'] = filename

    return df, _df_endpoint

In [None]:
# Import the data
root_dir = 'Feb 18'
ls_df = []
ls_df_endpoint = []

for root, _, files in os.walk(root_dir):
    for file in files:
        if file.endswith('.txt'):  # Check if the file is a CSV file
            _file_path = os.path.join(root, file)
            _df, _df_endpoint = TXTConvertor(_file_path, file)
            ls_df.append(_df)
            
            if isinstance(_df_endpoint, pd.DataFrame):
                ls_df_endpoint.append(_df_endpoint)

df_main_raw = pd.concat(ls_df)

# Compile endpoint list if the endpoint df exist
if len(ls_df_endpoint) > 0:
    df_endpoint = pd.concat(ls_df_endpoint)

# Extract sample information from the file name
df_main_raw['trial'] = df_main_raw['trial'].str.replace('.txt', '', regex=False)
df_main_raw['Sample_ID'] = df_main_raw['trial'].str.extract(r'^(.*?)_')
df_main_raw[['TestBench', 'Sol_ID']] = df_main_raw['Sample_ID'].str.extract(r'([A-Za-z]+)(\d+)')

# Create a figure and axis
num_trial = len(df_main_raw['trial'].unique())
ncol = 4 # Define the number of columns
nrow = -(-num_trial // ncol) # Calculate the number of rows needed

In [None]:
# Create an summary of endpoint
df_endpoint_summary = pd.DataFrame({
    'trial': df_main_raw['trial'].unique(),
    'Endpoint_Slope': None, 
    'Endpoint_Min': None, 
    'Endpoint_Std': None, 
    'Endpoint_SquareSlope': None, 
})
df_endpoint_summary['Sample_ID'] = df_endpoint_summary['trial'].str.extract(r'^(.*?)_')
df_endpoint_summary[['TestBench', 'Sol_ID']] = df_endpoint_summary['Sample_ID'].str.extract(r'([A-Za-z]+)(\d+)')

In [None]:
# Plot raw data
# Create a figure and axis
fig, axs = plt.subplots(nrow, ncol, figsize=(15, nrow*3.5))
axs = axs.flatten() # Flatten axs for easy iteration

# Plotting RGB vs Time
i = 0
for _trial, _df_select in df_main_raw.groupby(['trial']):
    axs[i].plot(_df_select['Time'], _df_select['R'], c = 'orangered', alpha = 0.7, label = 'R', linewidth = 0.7)
    # axs[i].plot(_df_select['Time'], _df_select['G'], c = 'forestgreen', alpha = 0.7, label = 'G')
    # axs[i].plot(_df_select['Time'], _df_select['B'], c = 'dodgerblue', alpha = 0.7, label = 'B')
    axs[i].set_title(_trial[0])
    axs[i].legend()

    if (len(ls_df_endpoint) > 0) and (_trial[0] in df_endpoint['trial'].values):
        axs[i].axvline(df_endpoint[df_endpoint['trial'] == _trial[0]].EndPoint.values[0])

    i += 1

In [None]:
# Moving slope
def rolling_avg_diff(window):
    if len(window) < 6:  # Ensure we have at least 12 values
        return np.nan  
    first_avg = np.mean(window[:5])  # Average of first 6 values
    last_avg = np.mean(window[-5:])  # Average of last 6 values
    return first_avg - last_avg  # Difference

def smooth_R(window):
    if len(window) < 50:
        return np.nan
    else:
        avg = np.mean(window[:])
        std_dev = np.std(window[:], ddof=0)
        # return std_dev
    
        if window[-1] < avg - std_dev:
            return np.nan
        else:
            return window[-1]


In [None]:
# Process  the data
ls_df_main = []
for _trial, _df_select in df_main_raw.groupby(['trial']):
    # Recalculate the difference
    _R_max = _df_select['R'].max()
    _df_select['Diff_corrected'] = _R_max - _df_select['R']

    # Applying rolling average to smooth out the curve
    _df_select['RollingAvg'] = _df_select['R'].rolling(window=50).mean()
    
    # Shift the starting point to capture actual starting time (Peak in R reading)
    _Start_time = _df_select[_df_select['R'] == _df_select['R'].max()].Time.values[0]
    _df_select['Time_corrected'] = _df_select['Time'] - _Start_time

    # Calculate the square of the rolling average
    _df_select['R_Square'] = _df_select['R'] ** 3
    _df_select['RollingAvg_Square'] = _df_select['R_Square'].rolling(window=50).mean()

    # Remove the data before 0 s (after correction)
    _df_select = _df_select.loc[_df_select['Time_corrected'] >= 0].copy()

    # Testing smoothing algorithm
    _df_select['R_smoothed'] = _df_select['R'].rolling(window=50).apply(smooth_R, raw=True)# Determine the endpoint

    # Calculate the endpoint using the min R reading
    Endpoint_min = _df_select[_df_select['RollingAvg'] == _df_select['RollingAvg'].min()].Time_corrected.values[0]
    df_endpoint_summary.loc[df_endpoint_summary['trial'] == _trial[0], 'Endpoint_Min'] = Endpoint_min

    # Calculate the endpoint using moving slope
    _df_select['rolling_slope'] = _df_select['RollingAvg'].rolling(window=10).apply(rolling_avg_diff, raw=True)# Determine the endpoint
    Endpoint_slope = _df_select[(_df_select['rolling_slope'] < 0) & (_df_select['Time_corrected'] > 50)].Time_corrected.values[0]
    df_endpoint_summary.loc[df_endpoint_summary['trial'] == _trial[0], 'Endpoint_Slope'] = Endpoint_slope

    # Calculate the endpoint using moving slope of R_square
    _df_select['rolling_square_slope'] = _df_select['RollingAvg_Square'].rolling(window=10).apply(rolling_avg_diff, raw=True)# Determine the endpoint
    Endpoint_SquareSlope = _df_select[(_df_select['rolling_square_slope'] < 0) & (_df_select['Time_corrected'] > 50)].Time_corrected.values[0]
    df_endpoint_summary.loc[df_endpoint_summary['trial'] == _trial[0], 'Endpoint_SquareSlope'] = Endpoint_SquareSlope
    
    ls_df_main.append(_df_select)

df_main = pd.concat(ls_df_main)

In [None]:
# Plotting Rolling average vs Time
fig, axs = plt.subplots(nrow, ncol, figsize=(20, nrow*5))
axs = axs.flatten() # Flatten axs for easy iteration

i = 0
for _trial, _df_select in df_main.groupby(['trial']):

    axs[i].plot(_df_select['Time_corrected'], _df_select['RollingAvg'], c = 'violet', alpha = 0.7, label = 'RollingAvg')
    axs[i].set_title(_trial[0])
    axs[i].legend()
    axs[i].set_xlim(left=0)

    _min_value = _df_select[_df_select['RollingAvg'] == _df_select['RollingAvg'].min()].Time_corrected.values[0]
    axs[i].axvline(_min_value) # Label the timestamp where the minimum occured

    _Endpoint_slope = _df_select[(_df_select['rolling_slope'] < 0) & (_df_select['Time_corrected'] > 50)].Time_corrected.values[0]
    axs[i].axvline(_Endpoint_slope, c = 'yellow') # Label the timestamp where the Slope Endpoint occured

    _Endpoint_SquareSlope = _df_select[(_df_select['rolling_square_slope'] < 0) & (_df_select['Time_corrected'] > 50)].Time_corrected.values[0]
    axs[i].axvline(_Endpoint_SquareSlope, c = 'lightgreen') # Label the timestamp where the SquareSlope Endpoint occured

    i += 1

In [None]:
# Plotting Rolling average vs Time
plt.figure(figsize=(8, 6))

for _trial, _df_select in df_main.groupby(['trial']):
    _df_select['RollingAvg_Diff'] = _df_select['Diff_corrected'].rolling(window=20).mean()
    plt.plot(_df_select['Time_corrected'], _df_select['RollingAvg_Diff'], alpha = 0.6)
    plt.xlim(0, 160)
    plt.ylim(0, 160)
    plt.xlabel('time (s)')
    plt.ylabel('Diff (Corrected)')

In [None]:
# Std in the Rolling average

# ls_df_main_Diffrollingavg = []

# Plotting Rolling average vs Time
fig, axs = plt.subplots(nrow, ncol, figsize=(20, nrow*5))
axs = axs.flatten() # Flatten axs for easy iteration

i = 0
for _trial, _df_select in df_main.groupby(['trial']):
    _df_select['rolling_std'] = _df_select['RollingAvg'].rolling(window=10).std()
    # ls_df_main_Diffrollingavg.append(_df_select)

    # Determine the endpoint
    # _std_value = _df_select[(_df_select['rolling_std'] < 0.25) & (_df_select['Time'] > 50)].Time.values[0]
    # df_endpoint_summary.loc[df_endpoint_summary['trial'] == _trial[0], 'Endpoint_Std'] = _std_value

    axs[i].plot(_df_select['Time'], _df_select['rolling_std'], c = 'royalblue', alpha = 0.7, label = 'DiffRollingAvg')
    axs[i].set_title(_trial[0])
    axs[i].legend()
    axs[i].set_ylim(0, 1)

    i += 1

# df_main_Diffrollingavg = pd.concat(ls_df_main_Diffrollingavg)

In [None]:
fig, axs = plt.subplots(nrow, ncol, figsize=(20, nrow*5))
axs = axs.flatten() # Flatten axs for easy iteration

i = 0
for _trial, _df_select in df_main.groupby(['trial']):
    axs[i].plot(_df_select['Time_corrected'], _df_select['rolling_square_slope'], c = 'royalblue', alpha = 0.7, label = 'Moving slope')
    axs[i].axhline(0, color='r')
    axs[i].set_title(_trial[0])
    axs[i].legend()
    axs[i].set_ylim(-1, 10)

    i += 1

In [None]:
# Add concentration info to the endpoint df
df_endpoint_summary['Sol_ID'] = df_endpoint_summary['Sol_ID'].astype(str)
df_sol_list['Sol_ID'] = df_sol_list['Sol_ID'].astype(str)
df_endpoint_summary_complete = pd.merge(df_endpoint_summary, df_sol_list, on="Sol_ID", how = 'left')

In [None]:
# Plot the KOH conc vs Reaction Time
plt.figure(figsize=(8, 5))
plt.scatter(df_endpoint_summary_complete['KOH_conc'], df_endpoint_summary_complete['Endpoint_SquareSlope'], marker='o', linestyle='-', color='crimson')

plt.xlabel('KOH Concentration (g/25 ml)')
plt.ylabel('Reaction Time (s)')

In [None]:
# Generate regression line
# Define the regression function
def exponential_func(x, a, b, c):
    return a * np.exp(-b * x) + c

KOH_data = df_endpoint_summary_complete['KOH_conc'].values
Endpoint_data = df_endpoint_summary_complete['Endpoint_SquareSlope'].values

# Fit the curve
popt, pcov = curve_fit(exponential_func, KOH_data, Endpoint_data)
a_opt, b_opt, c_opt = popt

x_smooth = np.linspace(min(KOH_data), max(KOH_data), 50)
y_fit = exponential_func(x_smooth, *popt)

# Compute R^2
Endpoint_fit = exponential_func(KOH_data, *popt)
ss_res = np.sum((Endpoint_data - Endpoint_fit) ** 2)
ss_tot = np.sum((Endpoint_data - np.mean(Endpoint_data)) ** 2)
r_squared = 1 - (ss_res / ss_tot)
print(f"R^2 is {r_squared}")

# Format equation string
equation = f"y = {a_opt:.2f} * exp({b_opt:.2f} * x) + {c_opt:.2f}\n$R^2$ = {r_squared:.4f}"

# Plot
plt.scatter(KOH_data, Endpoint_data, color="crimson")
plt.plot(x_smooth, y_fit, label="Fitted Curve", color="dodgerblue", linewidth=2)
plt.xlabel('KOH Concentration (g/25 ml)')
plt.ylabel('Reaction Time (s)')
plt.legend()

# Add equation and R² to plot
plt.text(0.6, 0.8, equation, transform=plt.gca().transAxes, fontsize=8, bbox=dict(facecolor='white', alpha=0.5))

plt.show()