# PTIR Data from Harvested Biomass Samples
## Setup
### Library Imports
#### Maths and Plotting

In [None]:
import numpy as np

import scipy as sp

import sklearn.decomposition as decomp
import sklearn.cluster as cluster

In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt

%config InlineBackend.figure_format = 'retina'

#### I/O

In [None]:
import glob

import yaml
import h5py

import pprint

#### PTIR dataset handling

In [None]:
import ptirtools as ptir

### Load Config

In [None]:
CONFIG = None
with open("config.yml", 'r') as handle:
    CONFIG = yaml.safe_load(handle)
assert CONFIG is not None , "Failed to load config file."
pprint.pp(CONFIG)

In [None]:
COLORS = None
with open("gruvbox_colors.yml", 'r') as handle:
    COLORS = yaml.safe_load(handle)
assert COLORS is not None , "Failed to load config file."

## Load Data
### Search for Input Files

In [None]:
INPUT_FILE_NAMES = glob.glob(f"{CONFIG['directories']['ptirfiles']}/*.ptir")
INPUT_FILE_NAMES = [ f.split("/")[-1] for f in INPUT_FILE_NAMES ]
INPUT_FILE_NAMES

### Load Relevant Data
#### Accumulate from Files

In [None]:
DATA = {}

for ifn in INPUT_FILE_NAMES:
    h5file = h5py.File(f"{CONFIG['directories']['ptirfiles']}/{ifn}", 'r')
    DATA[ifn] = ptir.h5Group2Dict( h5file, h5file, [] )
    h5file.close()

#### Extract Image and Spectral Data

In [None]:
SAMPLES = { fn:dict(index=int(fn[1]), images=[], spectra=[]) for i,fn in enumerate(INPUT_FILE_NAMES) }

In [None]:
for filename,data in DATA.items():
    for img_key, image in data["Images"].items():
        image_data = image['data']
        metadata = image['meta']

        SAMPLES[filename]["images"].append( dict(data=image_data, extent=ptir.image_extent(metadata)) )

In [None]:
for filename,data in DATA.items():
    for meas_key, measurement in data.items():
        if meas_key.startswith("Measurement_"):
            if not bool(measurement['attribs']['IsBackground']):
                wavenumbers = measurement['Spectroscopic_Values']['data']
                channels = [ y['Raw_Data']['data'] for ch,y in measurement.items() if ch.startswith("Channel_") ]
                if len(channels) > 0:
                    SAMPLES[filename]['spectra'].append(
                        dict( 
                            position=tuple( measurement['attribs'][f"Location{X}"][0] for X in "XYZ" ),
                            wavenumbers=np.array(wavenumbers[0]), 
                            channels=np.array([channel[0] for channel in channels]),
                        )
                    )


## Inspection

In [None]:
for fn,data in SAMPLES.items():
    fig = plt.figure( figsize=(9,3) , dpi=200 )
    gs = fig.add_gridspec( 1, 2, width_ratios=[1,2.5] )
    
    ax = fig.add_subplot( gs[1] )

    positions = np.zeros( (2, len(data['spectra'])) )
    accumulator = np.zeros( (len(data['spectra']), *data['spectra'][0]['channels'][0].shape), dtype=np.complex128 )
    wavenumbers = data['spectra'][0]['wavenumbers']
    ### TODO: check that the domains are the same
    for ispec,spectrum in enumerate(data['spectra']):
        _optir, _phase, _x, _y = spectrum['channels']
        _amplitude = _x + 1j*_y
        accumulator[ispec] += _amplitude
        #ax.plot( wavenumbers, np.abs(_amplitude), color=COLORS['normal']['yellow'], alpha=0.1 )
        positions[0,ispec] = spectrum['position'][0]
        positions[1,ispec] = spectrum['position'][1]
    ax.plot( wavenumbers, np.abs(np.mean(accumulator, axis=0)), color=COLORS['normal']['red'], label='avg. amplitude' )
    ax.plot( [], [], color=COLORS['normal']['blue'], label='avg. phase' )
    ax2 = ax.twinx()
    ax2.plot( wavenumbers, np.unwrap(np.angle(np.mean(accumulator, axis=0)[::-1]))[::-1], color=COLORS['normal']['blue'] )
    ax.set_xlabel("Wavenumber [cm$^{-1}$]")
    ax.set_ylabel("Amplitude [a.u.]", color=COLORS['normal']['red'])
    ax.set_yticks([0])
    ax2.set_ylabel("Phase [rad]", color=COLORS['normal']['blue'])
    ax.tick_params(axis='y', labelcolor=COLORS['normal']['red'])
    ax2.tick_params(axis='y', labelcolor=COLORS['normal']['blue'])
    ax.set_ylim(ymax=1.5*np.max(np.abs(np.mean(accumulator, axis=0))), ymin=-0.15*np.max(np.abs(np.mean(accumulator, axis=0))))

    ax.set_xlim(xmin=np.min(wavenumbers), xmax=np.max(wavenumbers))
    ax.axhline( 0, c=COLORS['normal']['red'], lw=0.5, ls='--' )
    ax.set_title( f"{fn[:-len('.ptir')]} average (n={len(accumulator)})" )
    ax.invert_xaxis()
    #ax.get_yaxis().set_visible(False)

    xmin,ymin = np.min(positions,axis=1)
    xmax,ymax = np.max(positions,axis=1)

    ax = fig.add_subplot( gs[0] )
    for image in data["images"]:
        ax.imshow( image["data"], extent=image['extent'] )
    
    ax.fill( 
        [xmin, xmax, xmax, xmin],
        [ymin, ymin, ymax, ymax],
        fill=False,
        color=COLORS['normal']['yellow'],
        lw=2
    )
    ax.text(
        0.5*(xmin+xmax), ymin,
        f"{xmax-xmin:.0f}µm",
        size=8, color='k', va='center', ha='center', backgroundcolor=COLORS['normal']['yellow']
    )
    ax.text(
        xmin, 0.5*(ymin+ymax), 
        f"{ymax-ymin:.0f}µm",
        size=8, color='k', va='center', ha='center', backgroundcolor=COLORS['normal']['yellow'], rotation=90
    )
    ax.set_axis_off()

    fig.patch.set_alpha(0.0)

    fig.tight_layout()
    plt.show()
    fig.savefig(f"{CONFIG['directories']['output']}/{fn[:-len('.ptir')]} average.png", dpi=300, bbox_inches='tight')
    plt.close(fig)

## Dimension Reduction
We are dealing with quite a large amount of data. Even if the spectra were perfectly precise (which they definitely aren't), to make any statements about the samples, we must somehow reduce the number of variables we're analyzing. 

A basic way to do that is **Singular Value Decomposition (SVD)**. To implement that, we would write all the spectra into a huge matrix. As additional components, we would include any kind of classification we can tag the spectra with. That is, for instance, the sample it appears in (but not the index of the sample as a value, but a bitmap whose size is the number of samples and all bits are zero except the one at the position that corresponds to the sample) and the values of the RGB fluorescence channels.

### Arrange all spectra into one data structure

We'll allocate an array with dimensions
- number of spectra
- (number of samples) + 3 (number of wide field image channels) + (number of manually determined classes) + (number of wavenumbers)

and then start slicing...

In [None]:
NUM = {
    'samples' : 0,
    'widefieldchannels' : 3,
    'classes' : 0,
    'spectra' : 0,
}
for fn,data in SAMPLES.items():
    NUM['samples'] = max( NUM['samples'] , int(fn[1])+1 )
    NUM['spectra'] += len(data['spectra'])
WAVENUMBERS = np.copy(data['spectra'][0]['wavenumbers'])
NUM['wavenumbers'] = len(WAVENUMBERS)

OFFSET = { key:sum( [ NUM[key2] for key2 in list(NUM.keys())[:list(NUM.keys()).index(key)] ] ) for key in NUM }
del OFFSET['wavenumbers']

In [None]:
DATASET = np.zeros( ( NUM['spectra'] , sum([NUM[key] for key in NUM.keys() if key != 'spectra']) ) , dtype=np.complex128 )
SORTED_FILES = {}

In [None]:
idx_dataset = 0

for fn,data in SAMPLES.items():
    idx_file = int(fn[1])
    SORTED_FILES[idx_file] = fn[:-len('.ptir')]

    for ispec,spectrum in enumerate(data['spectra']):
        DATASET[idx_dataset,idx_file] = 1.0
        _optir, _phase, _x, _y = spectrum['channels']
        #DATASET[idx_dataset,OFFSET['spectra']:] += _x + 1j*_y
        ### analyse only amplitude, not complex phase
        DATASET[idx_dataset,OFFSET['spectra']:] += np.sqrt( _x**2 + _y**2 )

        idx_dataset += 1


To later accurately label any plots, let's keep track of which column corresponds to which feature.

In [None]:
FEATURE_NAMES = ["None"] + [ SORTED_FILES[i] for i in sorted(list(SORTED_FILES.keys())) ]+[ "R", "G", "B" ]
for i in range(len(FEATURE_NAMES)):
    if FEATURE_NAMES[i][0] == '[':
        FEATURE_NAMES[i] = "Sample " + FEATURE_NAMES[i].split(']')[0] + "]"
for i,f in enumerate(FEATURE_NAMES): print(f"{i:>02d} : {f}")

Classes and wide field colours remain to be done...

### Unguided Singular Value Decomposition
Since `sklearn.decomposition.PCA` does not support complex-valued input...

In [None]:
UMAT, SIGMA, VADJ = sp.linalg.svd( DATASET, full_matrices=False )

In [None]:
MANUAL_PCA = { 'explained_variance' : (SIGMA**2) / DATASET.shape[0] }
MANUAL_PCA['explained_variance_ratio'] = MANUAL_PCA['explained_variance']/np.sum(MANUAL_PCA['explained_variance'])

In [None]:
fig = plt.figure( figsize=(4,3), dpi=100 )
ax = fig.add_subplot()
ax.plot(
    np.arange(len(MANUAL_PCA['explained_variance_ratio']))+1,
    MANUAL_PCA['explained_variance_ratio'], 
    color=COLORS['normal']['blue'], marker='o'
)
ax.set_yscale("log")
ax.set_ylim(ymin=1e-7, ymax=1)
ax.set_xscale("log")
ax.set_xlabel("SVD Component Index")
ax.set_ylabel("Explained Variance")
fig.patch.set_alpha(0.0)
fig.tight_layout()
plt.show()

Apparently, the first 8 components are significant and, from then on, we have mostly noise over-fitting. So let's have a look at these 8 components:

In [None]:
CUTOFF = 8
SVDCOLORS = [ COLORS['normal'][c] for c in ["yellow","orange","red","purple","blue","aqua","green"] ]
SVDCOLORS.insert(0,COLORS['light_to_dark'][4])
SVDCOLORS.append(COLORS['normal']['gray'])

In [None]:
from collections.abc import Iterable

def ring_access(a:Iterable, i:int):
    return a[i%len(a)]

In [None]:
fig = plt.figure( figsize=(12,6), dpi=100 )
gs = fig.add_gridspec( 2, 2, width_ratios=[1,4], height_ratios=[3,2] )

axes = [ 
    fig.add_subplot( gs[:,1] ), 
    fig.add_subplot( gs[0,0] ), 
    fig.add_subplot( gs[1,0] ), 
]

y_offset_abs = 0

for idx in range(CUTOFF)[::-1]:
    component = VADJ[idx,OFFSET['spectra']:]
    axes[0].plot( WAVENUMBERS, 1.25*np.abs(component)/np.max(np.abs(component)) + y_offset_abs, color=ring_access(SVDCOLORS, idx) )
    axes[0].axhline( y_offset_abs, ls='--', lw=0.5, color=ring_access(SVDCOLORS, idx) )
    axes[0].text(np.max(WAVENUMBERS), y_offset_abs-0.05, f" Component {idx}", va='top', ha='left', color=ring_access(SVDCOLORS, idx))
    y_offset_abs += 1 # np.max(np.abs(component))

axes[0].set_ylabel("Amplitude [a.u.]")
axes[0].set_yticks([])

for ax in axes[:1]:
    ax.set_xlim(xmin=np.min(WAVENUMBERS), xmax=np.max(WAVENUMBERS))
    ax.invert_xaxis()
    ax.set_xlabel("Wavenumber [cm$^{-1}$]")

axes[1].imshow( np.abs(  VADJ[:CUTOFF,:OFFSET['spectra']]), cmap='magma' )

for ax in axes[1:2]:
    ax.set_ylabel("SVD Component")
    #ax.set_xlabel("Feature")
    ax.set_xticks(
        ticks = list(range(len(FEATURE_NAMES))),
        labels = FEATURE_NAMES,
        rotation=90
    )
    ax.set_yticks( ticks = list(range(CUTOFF)) )

#cutoff = 8
pie = dict()
pie['x'] = [ r for r in MANUAL_PCA['explained_variance_ratio'][:CUTOFF] ]
pie['x'].append( np.sum( [ r for r in MANUAL_PCA['explained_variance_ratio'][CUTOFF:] ] ) )
pie['colors'] = [ COLORS['bright'][c] for c in ["yellow","orange","red","purple","blue","aqua","green"] ]
pie['colors'].insert(0,COLORS['light_to_dark'][4])
pie['colors'].append(COLORS['normal']['gray'])
pie['autopct'] = lambda pct: f"{pct:.1f}%" if pct > 10 else ""
pie['textprops'] = dict( size=7 )
pie['explode'] = np.cos( np.array(pie['x']) * 2*np.pi )*0.2

wedges,texts,autotexts = axes[2].pie(**pie)

# axes[2].legend(
#     wedges, 
#     [f"Component {i}" for i in range(CUTOFF)]+["other components"],
#     loc="center left",
#     bbox_to_anchor=(1,0,0.5,1)
# )
axes[2].set_ylabel("Explained Variance")

fig.patch.set_alpha(0.0)
fig.tight_layout()

plt.show()

In [None]:
fig.savefig(f"{CONFIG['directories']['output']}/../SVD_amplitude_components.png", dpi=300, bbox_inches='tight')

It appears, that the 8 principal components we see are just the average spectra of the 8 samples. 

Another way would be to **fit a fixed number of Gaussians** to each individual spectrum and then try to correlate these to the classifications. Each Gaussian would have as parameters just magnitude and width. The central value would have to be pre-set to ensure comparability, though a small deviation parameter may be included, allowing the bell curve to shift a few wavenumbers. Central value, offset and width shall be real-valued, the amplitude may be complex-valued.

### Decomposition by Peaks

Likely locations where we would expect to find peaks and should thus start the gaussians for fitting at are...
- $1650 \, \mathrm{cm}^{-1}:$ Amide-I band and water absorption peak
- $1550 \, \mathrm{cm}^{-1}:$ Amide-II band
- $1730 \, \mathrm{cm}^{-1}:$ C=O vibration band
- $1470 \, \mathrm{cm}^{-1}:$ CH<sub>2</sub> scissoring
- $1150 \, \mathrm{cm}^{-1}$
- $1080 \, \mathrm{cm}^{-1}:$ PO<sub>4</sub> stretching
- $1020 \, \mathrm{cm}^{-1}:$ 

In [None]:
LIKELY_PEAKS = [ 1730, 1650, 1550, 1470, 1150, 1080, 1020 ]

We'll attempt to find peaks in all recorded spectra...

In [None]:
PEAKS = []
PROMS = []
WIDTHS = []

for line in range(CUTOFF):
    spectrum = np.abs( DATASET[line,OFFSET['spectra']:] )
    #spectrum = np.abs( VADJ[line,OFFSET['spectra']:] )

    peaks,props = sp.signal.find_peaks( 
        spectrum / np.max(spectrum),
        prominence = 1e-3,
        width=2.0
    )
    PEAKS.append(peaks)
    PROMS.append(props['prominences']*np.max(spectrum))
    WIDTHS.append(props['widths'])

In [None]:
fig = plt.figure( figsize=(6,4), dpi=100 )
gs = fig.add_gridspec( 1, 1 )

ax = fig.add_subplot( gs[:,:] )

idx_to_wavenumbers = np.mean(np.diff(WAVENUMBERS))

i_comp = 0
for peaks,proms,widths in zip(PEAKS,PROMS,WIDTHS):
    ax.scatter(
        WAVENUMBERS[peaks],
        proms*widths,
        color = COLORS['normal']['red'],
        marker = 'o',
        s=proms*widths*100
    )
    i_comp += 1

ax.invert_xaxis()
ax.set_xlabel("Wavenumber [cm$^{-1}$]")
ax.set_ylabel("Peak Magnitude [a.u.]")

fig.patch.set_alpha(0.0)
fig.tight_layout()

plt.show()

In [None]:
fig.savefig(f"{CONFIG['directories']['output']}/peak_magnitudes.png", dpi=300, bbox_inches='tight')

In [None]:
fig = plt.figure( figsize=(6,6), dpi=100 )
gs = fig.add_gridspec( 1, 1 )

ax = fig.add_subplot( gs[:,:] )

idx_to_wavenumbers = np.mean(np.diff(WAVENUMBERS))

flatten = lambda xss : [x for xs in xss for x in xs]
FLAT_PEAKS = np.array( flatten(PEAKS) )
FLAT_PROMS = np.array( flatten(PROMS) )
FLAT_WIDTHS = np.array( flatten(WIDTHS) )

ax.scatter(
    WAVENUMBERS[FLAT_PEAKS],
    FLAT_WIDTHS,
    c = FLAT_PROMS,
    cmap = mpl.cm.Spectral_r,
    marker = 'o', 
    s=FLAT_PROMS*FLAT_WIDTHS*400,
    alpha=1.0, 
)

ax.invert_xaxis()
ax.set_xlabel("Wavenumber [cm$^{-1}$]")
ax.set_ylabel("Peak Width [cm$^{-1}$]")

fig.patch.set_alpha(0.0)
fig.tight_layout()

plt.show()

In [None]:
fig.savefig(f"{CONFIG['directories']['output']}/peak_widths_and_proms_log.png", dpi=300, bbox_inches='tight')

In [None]:
clustering_weights = FLAT_WIDTHS*FLAT_PROMS
clustering_weights /= np.max(clustering_weights)
clustering_weights *= 1000

clustering_data = []
for weight,peak,width,prom in zip(clustering_weights, FLAT_PEAKS, FLAT_WIDTHS, FLAT_PROMS):
    for _ in range(int(max(1,weight))):
        clustering_data.append( [ WAVENUMBERS[peak], width ] )
clustering_data = np.array(clustering_data)

clustering_norm = np.ones( ( clustering_data.shape[1] , ) )
clustering_norm[1] *= 0.1

In [None]:

kmeans = cluster.KMeans(n_clusters=17, random_state=0, n_init="auto").fit( clustering_data / clustering_norm[None,:] )

In [None]:
fig = plt.figure( figsize=(8,5), dpi=100 )
gs = fig.add_gridspec( 1, 1 )

ax = fig.add_subplot( gs[:,:] )

idx_to_wavenumbers = np.mean(np.diff(WAVENUMBERS))

ax.scatter(
    clustering_data[:,0],
    clustering_data[:,1],
    c = [ ring_access( SVDCOLORS[1:], l ) for l in kmeans.labels_ ],
    marker = 'o', 
    #s=FLAT_PROMS*FLAT_WIDTHS*400,
    alpha=1.0,
    s=3 
)

for i,center in enumerate(kmeans.cluster_centers_):
    ax.text(
        *(center * clustering_norm), 
        f"{center[0] * clustering_norm[0]:.0f}",
        ha='center', va='center_baseline', color=ring_access( SVDCOLORS[1:], i ), 
        bbox=dict(facecolor='w', alpha=0.666, edgecolor=ring_access( SVDCOLORS[1:], i ), boxstyle='circle')
    )

ax.invert_xaxis()
ax.set_xlabel("Wavenumber [cm$^{-1}$]")
ax.set_ylabel("Peak Width [cm$^{-1}$]")

ax.set_xticks( np.arange(1000,1900,100), minor=False )
ax.set_xticks( np.arange(950,1900,50), minor=True )

fig.patch.set_alpha(0.0)
fig.tight_layout()

plt.show()

In [None]:
fig.savefig(f"{CONFIG['directories']['output']}/peak_clusters.png", dpi=300, bbox_inches='tight')

In [None]:
for wavenumber,widthx10 in kmeans.cluster_centers_[np.argsort(kmeans.cluster_centers_[:,1])][::-1]:
    print(f"- {wavenumber:.0f} cm⁻¹ ± {widthx10/10:.1f}")