# Preprocessing

> Data preprocessing utilities.

In [2]:
#| default_exp preprocessing

In [3]:
#| export
import fastcore.all as fc
from pathlib import Path
from sklearn.base import BaseEstimator, TransformerMixin
from typing import List
import numpy as np
import pywt

In [4]:
#|export
def wn_to_wl(
    wavenumber:float # wavenumber in cm^-1
) -> float: # wavelength in nm
    "Convert wavenumber to wavelength."
    return 1e7 / wavenumber

def wl_to_wn(
    wavelength:float # wavenumber in nm
) -> float: # wavenumber in cm^-1
    "Convert wavelength to wavenumber."
    return 1e7 / wavelength

fc.test_eq(int(wn_to_wl(1300)), 7692)
fc.test_eq(int(wl_to_wn(7692)), 1300)

In [5]:
#|export
class MeanCenter(BaseEstimator, TransformerMixin):
    "Mean center spectra."
    def fit(self, X, y=None): return self
    def transform(self, X, y=None):     
        return X - X.mean(axis=1, keepdims=True) 

In [6]:
#|export
class MeanReplicates(BaseEstimator, TransformerMixin):
    "Select replicated of specified spectra and average their spiked versions."
    def __init__(self, 
                 smp_name:str, # Sample of interest: 'LUI', 'SPA1' or 'TM4.1'.
                 names:List, # Names of scanned samples and replicates.
                ):        
        fc.store_attr()
        
    def _mean(self, X, substring):
        return X[[substring in name for name in self.names],:].mean(axis=0)
    
    def fit(self, X, y=None): return self
        
    def transform(self, X, y=None):     
        X =  np.array([self._mean(X, f'{self.smp_name}-{idx}') for idx in [0, 1, 2, 3]])
        return X - X.mean(axis=1, keepdims=True)

In [7]:
#|export
class DWTFiltering(BaseEstimator, TransformerMixin):
    def __init__(self, wavelet='db2', filtered_levels=[0, -1, -2], verbose=False):
        fc.store_attr()

    def fit(self, X, y=None): return self
    def transform(self, X, y=None):     
        X_filtered = []
        for spectra in X:
            coeffs = pywt.wavedec(spectra, self.wavelet, level=None, mode='smooth')
            if self.verbose: print(f'Number of decomposition levels: {len(coeffs)}')
            levels = [level if level >=0 else (len(coeffs) + level)  for level in self.filtered_levels]
            
            
            # Set the coefficients to filter to zero
            for level in levels:
                coeffs[level] = np.zeros_like(coeffs[level])
            
            X_filtered.append(pywt.waverec(coeffs, self.wavelet, mode='smooth'))
        return np.array(X_filtered)

In [8]:
#|export
class DiffFromUnspiked(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X, y=None): return X[1:] - X[0]

In [9]:
#| hide
import nbdev; nbdev.nbdev_export()