In [1]:
import pandas as pd
import numpy as np
import h5py
import datetime
import math
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
import seaborn as sns
from scipy.signal import savgol_filter
plt.style.use('seaborn')
%matplotlib widget
from mpl_toolkits.mplot3d import Axes3D

from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import scale
from sklearn import model_selection
from sklearn import linear_model

In [4]:
def get_calibration_temperature(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal_temp = h5_file[calibration_path].attrs['ftir_temperature']
        return cal_temp
    
def get_insertion_temperature(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_temp = h5_file[insertion_path].attrs['ftir_temp']
        return ins_temp
    
def get_visible_white_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['white_spectrum'][:]
        return cal
    
def get_ftir_white_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['white_spectrum2'][:]
        return cal
    
def get_visible_wavelength_vector(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        waves = h5_file[calibration_path].attrs['spec1_wavelengths_vector'][:]
        return waves
    
def get_ftir_wavelength_vector(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        waves = h5_file[calibration_path].attrs['spec2_wavelengths_vector'][:]
        return waves
    
def get_ftir_insertion_absorbances(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_abs = h5_file[f'{insertion_path}/spectrometer2/derived/absorbances'][:]
        return ins_abs
    
def get_ftir_insertion_raw_spectra(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_raw = h5_file[f'{insertion_path}/spectrometer2/spectra'][:]
        return ins_raw
    
def get_ftir_insertion_timestamps(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        timestamps = h5_file[f'{insertion_path}/spectrometer2/timestamps'][:]
        return timestamps
    
def get_visible_insertion_timestamps(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        timestamps = h5_file[f'{insertion_path}/spectrometer1/timestamps'][:]
        return timestamps
    
def get_visible_insertion_absorbances(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_abs = h5_file[f'{insertion_path}/spectrometer1/derived/absorbances'][:]
        return ins_abs
    
def get_visible_insertion_raw_spectra(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_raw = h5_file[f'{insertion_path}/spectrometer1/spectra'][:]
        return ins_raw
    
def create_list_of_items_in_node(item_type, file, node):
    with h5py.File(file, 'r') as h5_file:
        keys = []
        if item_type == "group":
            my_type = h5py._hl.group.Group
        if item_type == "dataset":
            my_type = h5py._hl.dataset.Dataset
        h5_file[node].visit(lambda key: keys.append(key) if type(h5_file[node][key]) is my_type else None)
        return keys

def create_list_of_calibrations_in_node(file, node):
    calibrations = []
    all_groups = create_list_of_items_in_node("group", file, node)
    for group in all_groups:
        if group[-6:-3] == 'cal':
            calibrations.append(node + '/' + group)
    return calibrations

def create_list_of_insertions_in_calibration(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        insertion_keys = list(h5_file[calibration_path].keys())
        insertions = [f'{calibration_path}/{key}' for key in insertion_keys]            
        return insertions
    
def select_by_depth_range(df, range_start, range_end):
    df_out = df.loc[(df['depth'] > range_start) & (df['depth'] < range_end)]
    return df_out

def calculate_absorbance_from_raw(raw_spectrum, white_spectrum, dark_spectrum):
    reflectance = ((raw_spectrum - dark_spectrum) / (white_spectrum - dark_spectrum))
    inverse_reflectance = 1/reflectance
    absorbance = np.log10(inverse_reflectance.astype(np.float64))
    return absorbance

def calculate_absorbance_for_2D_array(array, white_spectrum, dark_spectrum):
    absorbance_array = np.empty_like(array, dtype=np.float64)
    for i in range(array.shape[0]):
        absorbance_spectrum = calculate_absorbance_from_raw(array[i, :], white_spectrum, dark_spectrum)
        absorbance_array[i, :] = absorbance_spectrum
    return absorbance_array

def construct_full_file_path(data_path, file_name):
    file_path = data_path + file_name
    return file_path

def create_list_of_sessions_in_file(file_name):
    sessions = []
    all_groups = create_list_of_items_in_node("group", file_name, "/")
    for group in all_groups:
        if (group[0:3] == 'ses') and (len(group) == 10):
            sessions.append(group)
    return sessions

def create_list_of_insertions_in_file(file_name):
    insertions = []
    sessions = create_list_of_sessions_in_file(file_name)
    for session in sessions:
        calibrations = create_list_of_calibrations_in_node(file_name, session)
        for calibration in calibrations:
            cal_insertions = create_list_of_insertions_in_calibration(file_name, calibration)
            for insertion in cal_insertions:
                insertions.append(insertion)
    return insertions
            
def get_insertion_timestamp(file, insertion_path):
    with h5py.File(file, 'r') as h5_file:
        ins_time = h5_file[insertion_path].attrs['start_time']
        ins_timestamp = pd.Timestamp(ins_time, unit='us')
        return ins_timestamp
    
def get_calibration_timestamp(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal_time = h5_file[calibration_path].attrs['calibration_start_time']
        cal_timestamp = pd.Timestamp(cal_time, unit='us')
        return cal_timestamp  
    
def find_position_in_wavelength_vector(wavelength_vector, integer):
    position = np.where(np.isclose(wavelength_vector, integer, 1e-3))[0][0]
    return position

def normalize(value, max_value, min_value):
    normalized_value = (value - min_value)/(max_value - min_value)
    return normalized_value

def get_ftir_dark_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['dark_spectrum2'][:]
        return cal
    
def get_visible_dark_calibration_curve(file, calibration_path):
    with h5py.File(file, 'r') as h5_file:
        cal = h5_file[calibration_path].attrs['dark_spectrum'][:]
        return cal
        
def get_ftir_spectrum_timestamp(file, insertion_path, index):
    with h5py.File(file, 'r') as h5_file:
        time = h5_file[f'{insertion_path}/spectrometer2/timestamps'][index]
        timestamp = pd.Timestamp(time, unit='us')
        return timestamp
    
def get_visible_spectrum_timestamp(file, insertion_path, index):
    with h5py.File(file, 'r') as h5_file:
        time = h5_file[f'{insertion_path}/spectrometer1/timestamps'][index]
        timestamp = pd.Timestamp(time, unit='us')
        return timestamp
    
def compute_3D_distance(x1, y1, z1, x2, y2, z2):
    distance = math.sqrt((x1 - x2)**2 + (y1 - y2)**2 + (z1 - z2)**2)
    return distance

In [15]:
path_name = "/Users/linda/OneDrive/Documents/S4_mine_p/Projects/Data_collected/"
df = pd.read_csv('data/white_insertions.csv')
df_columns = list(df.columns.values)
df_columns.append('cal_time_0')
df_columns.append('temperature')
info_df = pd.DataFrame(columns=df_columns)
timestamps_df = pd.DataFrame(columns=['timestamp'])
columns = np.arange(0, 512, 1)
spectra = pd.DataFrame(columns=columns)
# each i represents an insertion
for i in range(df.shape[0]):
    row_file = df['file_name'][i]
    file = construct_full_file_path(path_name, row_file)
    calibration_path = df['session'][i] + "/" + df['calibration'][i]
    calibration_insertions = create_list_of_insertions_in_calibration(file, calibration_path)    
    calibration_first_timestamp = get_visible_spectrum_timestamp(file, calibration_insertions[0], 0)
    insertion_path = calibration_path + "/" + df['insertion'][i]
    insertion_temperature = get_insertion_temperature(file, insertion_path)
     
    # raw_spectra and timestamps have many spectra and timestamps per insertion
    raw_spectra = pd.DataFrame(get_visible_insertion_raw_spectra(file, insertion_path))    
    spectra = pd.concat([spectra, raw_spectra], axis=0, ignore_index=True)
    ts_array = get_visible_insertion_timestamps(file, insertion_path)
    timestamps = pd.DataFrame(pd.to_datetime(ts_array, unit='us'), columns=['timestamp'])
    timestamps_df = pd.concat([timestamps_df, timestamps], axis=0, ignore_index=True)
    # info and temperature will be the same for every spectrum in insertion
    info_row = df.iloc[i:i+1, :].copy()
    info_row['cal_time_0'] = calibration_first_timestamp
    info_row['temperature'] = insertion_temperature
    # each j represents a spectrum; the info is duplicated for each spectrum
    for j in range(raw_spectra.shape[0]):
        info_df = pd.concat([info_df, info_row], axis=0, ignore_index=True)

print(info_df.shape)
print(timestamps_df.shape)
print(spectra.shape)
spectra_df = pd.concat([info_df, timestamps_df, spectra], axis=1)
print(spectra_df.shape)

(17880, 8)
(17880, 1)
(17880, 512)
(17880, 521)


In [20]:
# create the input data for the PCA.  Restricting the wavelengths to 1200 to 2200 nm
# savgol filter is applied before creating X
waves = get_visible_wavelength_vector(file, calibration_path)
start_index = find_position_in_wavelength_vector(waves, 500)
end_index = find_position_in_wavelength_vector(waves, 1000)
X = savgol_filter(spectra_df.iloc[:, (start_index + 9):(end_index + 10)], 21, 3, axis=1)
X.shape

(17880, 400)

In [21]:
# do the PCA
pca = PCA(n_components=10)
X_pca = pca.fit_transform(StandardScaler().fit_transform(X))

In [22]:
# The amount of variation explained per component
np.cumsum(pca.explained_variance_ratio_)

array([0.81609008, 0.88951504, 0.93799813, 0.9712531 , 0.98580507,
       0.99236722, 0.99555232, 0.99620729, 0.99664186, 0.99688567])

In [23]:
fig, ax = plt.subplots()
ax.plot(np.cumsum(pca.explained_variance_ratio_[:10]));

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [24]:
# Turn the components into a dataframe
X_pca_df = pd.DataFrame(X_pca)

In [25]:
# Plot of PC1 by PC2
fig, ax = plt.subplots(figsize=(10,10))
ax.set_xlim(-23, 23)
ax.set_ylim(-23, 23)
sns.scatterplot(x=X_pca_df[0], y=X_pca_df[1], hue=spectra_df['conditions'], alpha=0.2, size=2.0, palette='gist_ncar')
plt.xlabel("PC1")
plt.ylabel("PC2");

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [26]:
selected = X_pca_df.loc[(spectra_df['conditions'] == 'sunshine')|(spectra_df['conditions'] == 'inside')]
selected_conditions = spectra_df.loc[(spectra_df['conditions'] == 'sunshine')|(spectra_df['conditions'] == 'inside')]


fig, ax = plt.subplots(figsize=(10,10))
ax.set_xlim(-23, 23)
ax.set_ylim(-23, 23)

sns.scatterplot(x=selected[0], y=selected[1], hue=selected_conditions['conditions'], alpha=0.2, size=selected_conditions['temperature'], palette='gist_ncar');

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [28]:
selected = X_pca_df.loc[(spectra_df['conditions'] == 'heat spectrometer')|(spectra_df['conditions'] == 'cool spectrometer')]
selected_conditions = spectra_df.loc[(spectra_df['conditions'] == 'heat spectrometer')|(spectra_df['conditions'] == 'cool spectrometer')]


fig, ax = plt.subplots(figsize=(10,10))


sns.scatterplot(x=selected[0], y=selected[1], hue=selected_conditions['conditions'], alpha=0.2, size=selected_conditions['temperature'], palette='gist_ncar');

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [29]:
selected1 = X_pca_df.loc[(spectra_df['conditions'] == 'heat spectrometer')]
selected2 = X_pca_df.loc[(spectra_df['conditions'] == 'cool spectrometer')]
selected3 = X_pca_df.loc[(spectra_df['conditions'] == 'heat light source')]
fig = plt.figure(figsize=(10, 10))
ax = fig.add_subplot(111, projection = "3d")
ax.set_xlabel("PC1")
ax.set_ylabel("PC2")
ax.set_zlabel("PC3")
ax.scatter(xs = selected1[0], ys=selected1[1], zs=selected1[2], color='tab:blue', alpha=0.2)
ax.scatter(xs = selected2[0], ys=selected2[1], zs=selected2[2], color='tab:orange', alpha=0.2)
ax.scatter(xs = selected3[0], ys=selected3[1], zs=selected3[2], color='tab:green', alpha=0.2);

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

In [30]:
fig, ax = plt.subplots()
sns.scatterplot(x=X_pca_df[0], y=spectra_df['temperature'], alpha=0.2)

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

<AxesSubplot:xlabel='0', ylabel='temperature'>