In [2]:
import pandas as pd
import numpy as np
import h5py
import datetime
import matplotlib.pyplot as plt
from matplotlib import colors as col
from matplotlib.dates import DateFormatter
import seaborn as sns
from scipy.signal import savgol_filter
plt.style.use('seaborn')

In [3]:
def get_calibration_temperature(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal_temp = h5_file[calibration_path].attrs['ftir_temperature']
        return cal_temp
    
def get_insertion_temperature(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_temp = h5_file[insertion_path].attrs['ftir_temp']
        return ins_temp
    
def get_visible_white_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['white_spectrum'][:]
        return cal
    
def get_ftir_white_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['white_spectrum2'][:]
        return cal
    
def get_visible_wavelength_vector(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        waves = h5_file[calibration_path].attrs['spec1_wavelengths_vector'][:]
        return waves
    
def get_ftir_wavelength_vector(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        waves = h5_file[calibration_path].attrs['spec2_wavelengths_vector'][:]
        return waves
    
def get_ftir_insertion_absorbances(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_abs = h5_file[f'{insertion_path}/spectrometer2/derived/absorbances'][:]
        return ins_abs
    
def get_ftir_insertion_raw_spectra(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_raw = h5_file[f'{insertion_path}/spectrometer2/spectra'][:]
        return ins_raw
    
def get_visible_insertion_absorbances(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_abs = h5_file[f'{insertion_path}/spectrometer1/derived/absorbances'][:]
        return ins_abs
    
def get_visible_insertion_raw_spectra(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_raw = h5_file[f'{insertion_path}/spectrometer1/spectra'][:]
        return ins_raw
    
def create_list_of_items_in_node(item_type, file, node):
    with h5py.File(file, 'r') as h5_file:
        keys = []
        if item_type == "group":
            my_type = h5py._hl.group.Group
        if item_type == "dataset":
            my_type = h5py._hl.dataset.Dataset
        h5_file[node].visit(lambda key: keys.append(key) if type(h5_file[node][key]) is my_type else None)
        return keys

def create_list_of_calibrations_in_node(file, node):
    calibrations = []
    all_groups = create_list_of_items_in_node("group", file, node)
    for group in all_groups:
        if group[-6:-3] == 'cal':
            calibrations.append(node + '/' + group)
    return calibrations

def create_list_of_insertions_in_calibration(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        insertion_keys = list(h5_file[calibration_path].keys())
        insertions = [f'{calibration_path}/{key}' for key in insertion_keys]            
        return insertions
    
def select_by_depth_range(df, range_start, range_end):
    df_out = df.loc[(df['depth'] > range_start) & (df['depth'] < range_end)]
    return df_out

def calculate_absorbance_from_raw(raw_spectrum, white_spectrum, dark_spectrum):
    reflectance = ((raw_spectrum - dark_spectrum) / (white_spectrum - dark_spectrum))
    inverse_reflectance = 1/reflectance
    absorbance = np.log10(inverse_reflectance.astype(np.float64))
    return absorbance

def calculate_absorbance_for_2D_array(array, white_spectrum, dark_spectrum):
    absorbance_array = np.empty_like(array, dtype=np.float64)
    for i in range(array.shape[0]):
        absorbance_spectrum = calculate_absorbance_from_raw(array[i, :], white_spectrum, dark_spectrum)
        absorbance_array[i, :] = absorbance_spectrum
    return absorbance_array

In [4]:
def construct_full_file_path(data_path, file_name):
    file_path = data_path + file_name
    return file_path

def create_list_of_sessions_in_file(file_name):
    sessions = []
    all_groups = create_list_of_items_in_node("group", file_name, "/")
    for group in all_groups:
        if (group[0:3] == 'ses') and (len(group) == 10):
            sessions.append(group)
    return sessions

def create_list_of_insertions_in_file(file_name):
    insertions = []
    sessions = create_list_of_sessions_in_file(file_name)
    for session in sessions:
        calibrations = create_list_of_calibrations_in_node(file_name, session)
        for calibration in calibrations:
            cal_insertions = create_list_of_insertions_in_calibration(file_name, calibration)
            for insertion in cal_insertions:
                insertions.append(insertion)
    return insertions
            
def get_insertion_timestamp(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_time = h5_file[insertion_path].attrs['start_time']
        ins_timestamp = pd.Timestamp(ins_time, unit='us')
        return ins_timestamp
    
def get_calibration_timestamp(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal_time = h5_file[calibration_path].attrs['calibration_start_time']
        cal_timestamp = pd.Timestamp(cal_time, unit='us')
        return cal_timestamp  
    
def find_position_in_wavelength_vector(wavelength_vector, integer):
    position = np.where(np.isclose(wavelength_vector, integer, 1e-3))[0][0]
    return position

def normalize(value, max_value, min_value):
    normalized_value = (value - min_value)/(max_value - min_value)
    return normalized_value

def get_ftir_dark_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['dark_spectrum2'][:]
        return cal
    
def get_visible_dark_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['dark_spectrum'][:]
        return cal
        
def get_ftir_spectrum_timestamp(file, insertion_path, index):
    with h5py.File(file, 'r') as h5_file:
        time = h5_file[f'{insertion_path}/spectrometer2/timestamps'][index]
        timestamp = pd.Timestamp(time, unit='us')
        return timestamp
    
def get_visible_spectrum_timestamp(file, insertion_path, index):
    with h5py.File(file, 'r') as h5_file:
        time = h5_file[f'{insertion_path}/spectrometer1/timestamps'][index]
        timestamp = pd.Timestamp(time, unit='us')
        return timestamp


In [None]:
path_name = "/Users/linda/OneDrive/Documents/S4_mine_p/Projects/Data_collected/"
df = pd.read_csv('data/white_insertions.csv')
inside_df = df.loc[df['conditions'] == 'inside'].copy()
calibrations_df = inside_df[['date', 'calibration']].drop_duplicates().copy()
calibrations_count = calibrations_df.shape[0]
small_df = inside_df.loc[(inside_df['calibration'] == calibration) & (inside_df['date'] == date)].copy()
records = list(calibrations_df.to_records(index=False))
nrows = (calibrations_count // 2) + (calibrations_count % 2)
fig, axs = plt.subplots(nrows=nrows, ncols=2, figsize=(12, 10))
fig.suptitle("Inside Garage, spectrum absorbance range by time elapsed in calibration", fontsize=16)
for nn, ax in enumerate(axs.flat):
    try:
        date = records[nn][0]
        calibration = records[nn][1]        
        small_df = inside_df.loc[(inside_df['calibration'] == calibration) & (inside_df['date'] == date)].copy()
        file = construct_full_file_path(path_name, small_df['file_name'][small_df.index[0]])
        calibration_path = small_df['session'][small_df.index[0]] + '/' + small_df['calibration'][small_df.index[0]]
        calibration_insertions = create_list_of_insertions_in_calibration(file, calibration_path)
        calibration_date = small_df.iat[0, 1]    
        first_timestamp = get_ftir_spectrum_timestamp(file, calibration_insertions[0], 0)
        
        for index in small_df.index:  
            insertion_path = calibration_path + '/' + small_df['insertion'][index]
            insertion = small_df['insertion'][index]        
            ax.set_title(f'{calibration_date}, {calibration}, {insertion}')
            ax.set_ylabel('Absorbance')
            ax.set_xlabel('Wavelength (nm)')
            ax.set_ylim(-0.1, 0.1)
            
            ftir_waves = get_ftir_wavelength_vector(file, calibration_path)
            start_index = find_position_in_wavelength_vector(ftir_waves, 1200)
            end_index = find_position_in_wavelength_vector(ftir_waves, 2200)
            ftir_raw_spectra = get_ftir_insertion_raw_spectra(file, insertion_path)
            absorbance_spectra = np.empty_like(ftir_raw_spectra[:, start_index:end_index], dtype=np.double)
            ftir_white_spectrum = get_ftir_white_calibration_curve(file, calibration_path)[start_index:end_index]
            ftir_dark_spectrum = get_ftir_dark_calibration_curve(file, calibration_path)[start_index:end_index]
            insertion_df = pd.DataFrame()
            for i in range(ftir_raw_spectra.shape[0]):
                absorbance_spectrum = calculate_absorbance_from_raw(ftir_raw_spectra[i, start_index:end_index], ftir_white_spectrum, ftir_dark_spectrum)
                color = plt.cm.viridis(normalize(i, ftir_raw_spectra.shape[0], 0))
                absorbance_spectra[i, :] = absorbance_spectrum
                ax.plot(ftir_waves[start_index:end_index], absorbance_spectrum, color=color, alpha=0.2)
            ax.plot(ftir_waves[start_index:end_index], absorbance_spectra.mean(axis=0), 'k', lw=2)
    except:
        pass
    
plt.subplots_adjust(hspace=0.3)
plt.show()
plt.close()