# Microwave acid digestion run analysis

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
# options: inline or widget
%matplotlib inline

In [None]:
class MADProfile(object):
    def __init__(self, filename, sample_info=None):
        self.read_multiwave_csv(filename)
        self.process_multiwave_metadata()
        self.sample_info = sample_info

    def read_multiwave_csv(self, filename):
        from datetime import datetime, timedelta
        with open(filename, encoding = "utf-16le") as multiwave_file:
            self.metadata = []
            for line in multiwave_file:
                self.metadata.append(line)
                if line.startswith('Result Data'):
                    break
            self.data = pd.read_csv(multiwave_file, sep='\t')
            self.data["Time"] = self.data["Time"].apply(
                lambda x: datetime.strptime(x, '%H:%M:%S')#.time()
            ).apply(lambda t: timedelta(hours=t.hour, minutes=t.minute, seconds=t.second).total_seconds())
        return self.data

    def process_multiwave_metadata(self):
        self.metadata = [tuple(l.strip().split('\t')) for l in self.metadata]
        try:
            self.warnings = self.metadata[self.metadata.index(('Warnings',))+1:-2]
            self.metadata = self.metadata[:self.metadata.index(('Warnings',))-1]
        except ValueError:
            self.warnings = None
            self.metadata = self.metadata[:-3]
        self.metadata = pd.Series({l[0]:l[1] if len(l)==2 else l[1:] for l in self.metadata if l[0]})
        self.vessels = int(self.metadata['Number of Vessels'])
        
    def plot(self):
        return self.data[[
            "Time","T Pos. 1","T Pos. 4","T Pos. 7","T Pos. 10","Power"
        ]].ffill().set_index("Time").plot.line()

    def identify_profile_features(self, sample, plateau_threshold=0.1, inflection_threshold = 0.02, peak_height=10, ax=None, legend=True):
        from scipy.signal import savgol_filter, find_peaks
        import numpy as np
        sample_data = pd.DataFrame({'raw':self.data.set_index('Time')[sample].dropna()})
        sample_data['smooth'] = savgol_filter(sample_data['raw'], window_length=11, polyorder=2)
        with np.errstate(divide='ignore'):
            sample_data['diff'] = np.gradient(sample_data["smooth"], sample_data.index)
            sample_data['diff2nd'] = np.gradient(sample_data["diff"], sample_data.index)
        ax = sample_data.plot.line(ax=ax)
        # Identify key points (e.g., peaks or plateaus)
        key_points = np.where(np.abs(sample_data['diff']) < plateau_threshold)[0]  # Example threshold, start with 0.1
        sample_data['key_point'] = False
        sample_data.loc[sample_data.index[key_points], 'key_point'] = True
        for kp in sample_data.index[key_points]: ax.axvline(kp, linestyle='--', alpha=0.5)
        # Peaks
        peaks, peak_heights = find_peaks(sample_data.smooth, height=peak_height)
        sample_data['peak'] = False
        sample_data.loc[sample_data.index[peaks], 'peak'] = True
        ax.scatter(sample_data.index[peaks], sample_data.iloc[peaks]['smooth'], color='green', label='Peaks', zorder=5)
        # Detect inflection points (where second derivative changes sign)
        inflection_points = np.where(
            (np.diff(np.sign(sample_data['diff2nd'])) != 0) & 
            (np.abs(sample_data['diff2nd']) > inflection_threshold).iloc[:-1]
        )[0]
        #inflection_points = np.where(np.diff(np.sign(sample_data['diff2nd'])))[0]
        sample_data['inflection_point'] = False
        sample_data.loc[sample_data.index[inflection_points], 'inflection_point'] = True
        ax.scatter(sample_data.index[inflection_points], sample_data.iloc[inflection_points]['smooth'], color='purple', label='Inflection points', zorder=5)
        if not legend: ax.get_legend().remove()
            
        return sample_data, ax

    def profile_feature_analysis(self, plateau_threshold=0.1, inflection_threshold = 0.02, peak_height=10):
        if self.vessels == 4:
            samples = [f"T Pos. {i}" for i in (1,4,7,10)]
            self.features = {}
            fig, axes = plt.subplots(nrows=2, ncols=2)
            for s, ax in zip(samples,axes.flatten()):
                sd, _ = self.identify_profile_features(
                    s, plateau_threshold=plateau_threshold, 
                    inflection_threshold = inflection_threshold,
                    peak_height=peak_height, ax=ax, legend=False
                )
                self.features[s] = {
                    'plateaus':sd.key_point.sum(), 'peaks':sd.peak.sum(),
                    'inflections':sd.inflection_point.sum()
                }
            self.features = pd.DataFrame(self.features)
        else: raise NotImplementedError

In [None]:
#! file -bi user/85000797 DSK0761_2.csv
mad_results = MADProfile('data/85000797_DSK0761_2.csv')
mad_results.plot()

In [None]:
# Experiment with interpolation
mad_results.data[["Time","T Pos. 1","T Pos. 4","T Pos. 7","T Pos. 10","Power"]].set_index("Time").infer_objects(copy=False).interpolate().plot.line()

In [None]:
mad_results, metadata = read_multiwave_csv('data/300mg_random_coffee.csv')
mad_results[["Time","T Pos. 1","T Pos. 4","T Pos. 7","T Pos. 10","Power"]].ffill().set_index("Time").plot.line()

In [None]:
mad_results = MADProfile('data/500mg organic b ramp to 100.csv')
mad_results.plot()

## Analyze profile
### Profile for 1 sample

In [None]:
sample_data, ax = mad_results.identify_profile_features('T Pos. 1',0.05)

### Profiles for all samples

In [None]:
mad_results.profile_feature_analysis(plateau_threshold=0.05, inflection_threshold = 0.05, peak_height=20)
print(mad_results.features)