# Import Packages

In [2]:
#pandas
import pandas as pd

#scipy
import scipy as sp, numpy as np
import scipy.io as io

#sklearn
from sklearn import preprocessing, model_selection
from sklearn.preprocessing import scale, StandardScaler, normalize, Normalizer
from sklearn.decomposition import PCA
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.linear_model import LinearRegression, ElasticNet, ElasticNetCV, MultiTaskElasticNetCV,MultiTaskElasticNet
from sklearn.model_selection import ShuffleSplit, LeaveOneGroupOut, LeaveOneOut, train_test_split, learning_curve, GridSearchCV, cross_val_score, cross_val_predict, RepeatedKFold

#matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.colors import ListedColormap, LinearSegmentedColormap

#misc
import warnings
import pickle
import mat73
from pca import pca

# Import Functions

In [3]:
#error handling
warnings.filterwarnings('ignore')

In [4]:
def create_schema_from_excel(schemaFile,schemaSheet):
    """convert excel schema to pandas dataframe"""
    mySchema = pd.read_excel(schemaFile, sheet_name=schemaSheet) 
    mySchema.set_index('Std ID',inplace=True)
    return mySchema

In [5]:
def rename_samples(myY,to_replace,replace_with):
    """wrapper to help rename y data as pandas dataframe"""
    myY.replace(to_replace,replace_with,inplace=True);
    return myY

In [6]:
def label_concentrations(mySchema,analyteList,myXlabels,fill_null=True):
    """use mySchema as a lookup table for all voltammogram labels (myXlabels) for given analyteList, 
    to build and return a corresponding list of concentrations (myY) for each voltammogram. 
    Empty values are assumed as 0 concentration"""
    myY = pd.DataFrame(columns=analyteList)
    myY['peak_labels'] = myXlabels
    myY.set_index('peak_labels',inplace=True)
    for col in myY.columns:
        myY[col]=myXlabels
    
    for col in myY.columns:
        myY[col]=myY.index.map(mySchema[col])

    if fill_null == True:
        myY.fillna(0,inplace=True) 
        #empty values will be assumed as 0; dangerous if samples are mislabeled!
    
    return myY

In [3]:
def import_data(myFile,analyteList,mySamples='Signals',myConcentrations='PeaksLabel'): 
    """Takes in file location (myFile) and list of desired analytes (analyteList)
    and returns X_train and y_train arrays. Analytes must be named in same manner as data file"""
    
    if myFile[-3:] == 'mat':
        mat1 = mat73.loadmat(myFile) 
        myX = mat1.get('Signals')
        peak_labels = mat1.get('PeaksLabel') 
        myX = pd.DataFrame(myX,index=peak_labels)

        return myX.T, peak_labels
    
    if myFile[-4:] == 'xlsx':
        myData = pd.read_excel(myFile, sheet_name='Sheet1') 
        peak_labels=myData.columns.to_list()
        myX=myData
        
        return myX, peak_labels


In [10]:
def remove_delimiter(myData,myDelimiter='.'):
    """given data (myData) as a list or pandas dataframe, 
    return data with delimiter (myDelimiter, default is a period) removed"""
    if isinstance(myData, pd.DataFrame):
        col_no_decimal = myData.columns.str.split(myDelimiter).str[0]
        for i in range(len(myData.columns)):
            col = str(myData.columns[i])
            if myDelimiter in col:
                myData.rename(columns={col:col_no_decimal[i]},inplace=True)
                
    if isinstance(myData, list):
        for i in range(len(myData)):
            if myDelimiter in str(myData[i]):
                myData[i] = str(myData[i]).split('.')[0]
            else:
                myData[i] = str(myData[i])
                

In [7]:
#TODO: remove duplicates
def plot_raw_voltammograms(myData):
    """plots raw data"""
    plt.plot(myData)
    #legend_without_duplicate_labels(plt.gca)
    #plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
    plt.xlabel("Sample");
    plt.ylabel("Current (nA)");
    plt.title("Overlaid Voltammograms");

In [3]:
def plot_preprocessers(myData):
    """Return plots of preprocessed data"""
    ##TODO:add subplots
    # eliminate offset by mean-centering each variable j in a sample i (subtracts mean of j across all i)
    # normalize each sample to maintain signal shape (for identification)
    # standardize each variable to change signal shape (for concentration)

    plt.plot(scale(myData.T).T);
    plt.title("Standardized Features");
    plt.xlabel("Sample number");
    plt.show();

    for myNorm in ['l2','l1','max']:
        plt.plot(normalize(myData.T,norm=myNorm).T);
        plt.xlabel("Sample number");
        plt.title(myNorm + " Normalized Samples");
        plt.show();
  
    plt.plot((np.diff(myData.T)).T);
    plt.xlabel("Sample number");
    plt.title("Derivative");
    plt.show();

In [1]:
def get_PCA(myData,myY,analyteList,xDim = 0, yDim = 1, zDim = None):
    """plots 2d PCA plots of various preprocessed voltammograms colored by concentration of analytes"""
    pca = PCA(n_components = 2) #for visualization
    label_to_color = {}
    viridis = cm.get_cmap('viridis', 8)
    for i in myY.index.unique():
        label_to_color[i]=np.unique(np.sum(myY,axis=1)[i])/np.max(np.sum(myY,axis=1))
        
    for preProcess in ['Standardized Features','Normalized Samples','Differentiated Samples']:
       
        if preProcess == 'Differentiated Samples':
            principalComponents = pca.fit_transform(np.diff(myData.T))
            
        if preProcess == 'Standardized Features':
            principalComponents = pca.fit_transform(scale(myData.T)) 
            
        if preProcess == 'Normalized Samples':
            principalComponents = pca.fit_transform(normalize(myData.T)) 
        
        #plot and label the data in PC space
        plt.figure(figsize=(10,10))
        k=[]
        for i in range(len(myData.columns)):
            x_coord=principalComponents[i,xDim]
            y_coord=principalComponents[i,yDim]
            j=myData.columns[i]
            plt.scatter(x_coord, y_coord,color=viridis(label_to_color[j][0]));
            if j not in k:
                plt.annotate(j,(x_coord,y_coord))
                k.append(j)
                
        #plot origin axes
        axes=plt.gca()  
        plt.plot([axes.get_xlim()[0],0,axes.get_xlim()[1]],[0,0,0],c='k')
        plt.plot([0,0,0],[axes.get_ylim()[0],0,axes.get_ylim()[1]],c='k')
        plt.title(str(preProcess) + " PCs in 2D")
        plt.xlabel("PC1" + ' (' + str(round(pca.explained_variance_ratio_[0]*100,2)) + '%)' )
        plt.ylabel("PC2" + ' (' + str(round(pca.explained_variance_ratio_[1]*100,2)) + '%)' )
        plt.show()

In [2]:
def legend_without_duplicate_labels(ax):
    """removes duplicate labels from plot legend"""
    handles, labels = ax.get_legend_handles_labels()
    unique = [(h, l) for i, (h, l) in enumerate(zip(handles, labels)) if l not in labels[:i]]
    ax.legend(*zip(*unique))

In [None]:
##in development##
#def plot_outliers(myX,nComps):
#    """use pca package to help plot outliers
#    see: https://pypi.org/project/pca/"""
#    myModel = pca(n_components=nComps, alpha=0.05, normalize=True, detect_outliers=['ht2', 'spe'])
#    out = myModel.fit_transform(myX)
#    model.biplot(legend=True, SPE=True, hotellingt2=True)     
#    myXoutliers = myX[results['outliers']['y_bool'],:]
#    myXnormal = myX[~results['outliers']['y_bool'],:]
#    return myXoutliers, myXnormal
#TODO: require updating y as well.

In [6]:
##in development##
#def remove_outliers(removeIndex=[],removeColumns=[],search_mode='manual'):   
#    if search_mode == 'SPE/ht2':
#        X_train = Xnormal
#    if search_mode == 'manual':
#        try:
#            y_train = y_train.drop(index=removeIndex)
#            X_train = X_train.drop(columns=removeColumns)
#        except:
#            print("Could not find samples listed.")

In [22]:
def set_preprocess(preProcess,myData,myNorm='max'):
    """set a pre-processer"""
    if preProcess == 'Scale Features':
        preProcesser = preprocessing.StandardScaler().fit(myData) 

    if preProcess == 'Normalize Samples':
        if myNorm == 'l1': 
            preProcesser = preprocessing.Normalizer(norm=myNorm).fit(myData) 
        if myNorm == 'l2': 
            preProcesser = preprocessing.Normalizer(norm=myNorm).fit(myData) 
        if myNorm == 'max': 
            preProcesser = preprocessing.Normalizer(norm=myNorm).fit(myData) 

    if preProcess == 'No Scale':
        preProcesser = preprocessing.StandardScaler(with_mean=False,with_std=False).fit(myData) 

    if preProcess == 'Derivative': 
        preProcesser = preprocessing.StandardScaler(with_mean=False,with_std=False).fit(np.diff(myData))
         
    return preProcesser

In [8]:
def save_model(myModel,myFile):
    pickle.dump(myModel, open(myFile, 'wb'))
    print('Saved model successfully!')
    
def load_model(myFile):
    myModel = pickle.load(myFile)
    return myModel

In [9]:
def get_R2Y(myComp,myX,myY,analyteList,modelChoice,myPreProcess,myCmap):
    """fits a model (PCR or PLSR) to data using the set pre-processer and plots explained variance by analyte 
    for starting from 1 component model up to the number of components (myComp) desired"""
    if modelChoice == 'PLSR' or modelChoice == "PCR":
        
        variance_explained = pd.DataFrame(columns=['N']+analyteList)
        
        i_values = np.arange(1,myComp+1)
        
        for i in i_values:
            
            variance_explained.loc[i-1,'N'] = i
            
            if modelChoice == 'PLSR':
                myModel = make_pipeline(myPreProcess, PLSRegression(n_components=i,scale=False))
                myModel.fit_transform(myX.T, myY)

            if modelChoice == 'PCR':
                myModel = make_pipeline(myPreProcess, PCA(n_components=i), LinearRegression()) 
                myModel.fit(myX.T, myY)

            y_pred = myModel.predict(myX.T)

            for j in range(len(analyteList)):
                idx=analyteList[j]
                variance_explained.loc[i-1,idx] = r2_score(myY,y_pred,multioutput='raw_values')[j]
                
    print(variance_explained)

    for analyte in analyteList:
        plt.plot(i_values,variance_explained[analyte],linewidth=3,color=myCmap[analyte],label=str(analyte+' R2Y'),linestyle='dotted');

    plt.legend();
    plt.title("Variance Explained by N Components");
    plt.xlabel("Number Components");
    plt.ylabel("Variance Explained");
    #TODO:plot average in black?
    
    return y_pred, variance_explained

In [23]:
def get_Q2Y(myComp,myX,myY,analyteList,modelChoice,myPreProcess,myCmap,myCV=5):
    #TODO: update CV strategy?
    """cross validated version of get_R2Y. Set cross validation folders using myCV. Default is 5-fold."""
    if modelChoice == 'PLSR' or modelChoice == "PCR":
        
        variance_explained_CV = pd.DataFrame(columns=['N']+analyteList)
        
        i_values = np.arange(1,myComp+1)

        for i in i_values:
            
            variance_explained_CV.loc[i-1,'N'] = i
            
            if modelChoice == 'PLSR':
                myModel = make_pipeline(myPreProcess, PLSRegression(n_components=i,scale=False))
                myModel.fit_transform(myX.T, myY)

            if modelChoice == 'PCR':
                myModel = make_pipeline(myPreProcess, PCA(n_components=i), LinearRegression()) 
                myModel.fit(myX.T, myY) 

            y_pred_CV = cross_val_predict(myModel, myX.T, myY, cv=myCV)

            for j in range(len(analyteList)):
                idx=analyteList[j]
                variance_explained_CV.loc[i-1,idx] = r2_score(myY,y_pred_CV,multioutput='raw_values')[j]

    print(variance_explained_CV)

    for analyte in analyteList:
        plt.plot(i_values,variance_explained_CV[analyte],linewidth=3,color=myCmap[analyte],label=str(analyte+' Q2Y'));

    plt.legend();
    plt.title("Variance Explained by N Components during "+str(myCV)+" Fold CV");
    plt.xlabel("Number Components");
    plt.ylabel("Variance Explained");
    plt.show();
    
    return y_pred_CV, variance_explained_CV

In [25]:
def set_model(modelChoice,myX, myY,myComp,preProcesser):
    """sets and re-trains model on all data"""
    if modelChoice == 'PLSR':
        myModel = make_pipeline(preProcesser, PLSRegression(n_components=myComp,scale=False))
        myModel.fit_transform(myX, myY)

    if modelChoice == 'PCR':
        myModel = make_pipeline(preProcesser, PCA(n_components=myComp), LinearRegression()) 
        myModel.fit(myX, myY)
    
    return myModel
    #TODO:add elastic net, etc.

In [14]:
#TODO: incorporate pipeline
def reconstruct_voltammograms(myX,modelChoice,nComponents):
    if modelChoice == 'PCR':
        pca = PCA(n_components=nComponents);
        myX_reduced = pca.fit_transform(myX);
        myX_recovered = pca.inverse_transform(myX_reduced);

    if modelChoice == 'PLSR':
        plsr = PLSRegression(n_components=nComponents,scale=False);
        myX_reduced = plsr.fit_transform(myX,y);
        myX_recovered = plsr.inverse_transform(plsr.x_scores_);

    plt.plot(myX.T);
    plt.xlabel('Sample')
    plt.ylabel('Preprocessed Current (nA)')
    plt.title('Preprocessed Voltammogram')
    plt.show();
    plt.plot((myX_recovered).T);
    plt.xlabel('Sample')
    plt.ylabel('Preprocessed Current (nA)')
    plt.title('Reconstructed Preprocessed Voltammogram')
    plt.show()

    if preProcess != 'Normalize Samples':
        plt.plot((preProcesser.inverse_transform(myX)).T);
        plt.xlabel('Sample')
        plt.ylabel('Current (nA)')
        plt.title('Voltammogram')
        plt.show();
        plt.plot((preProcesser.inverse_transform(myX_recovered)).T);
        plt.xlabel('Sample')
        plt.ylabel('Current (nA)')
        plt.title('Reconstructed Voltammogram')
        plt.show()

    print("Reconstruction Error (%):", 100*round(1-r2_score(myX,myX_recovered),4))

In [15]:
def calibration_curves(myModel,myX,myY,analyteList,myCmap):
    """plot predicted versus actual concentrations and r2-values"""
    myY_pred = myModel.predict(myX)
    
    myY_pred=pd.DataFrame(myY_pred,columns=[i for i in analyteList])

    r2_scores = pd.DataFrame(columns=analyteList)
    
    for i in range(len(analyteList)):
        r2_scores.loc[i,analyteList[i]] = r2_score(myY[analyteList[i]],myY_pred[analyteList[i]],multioutput='raw_values')
        
    for i in range(len(analyteList)):
        plt.scatter(myY[analyteList[i]],myY_pred[analyteList[i]], label=analyteList[i],color=myCmap[analyteList[i]]);
        
    plt.xlabel('Observed Concentration')
    plt.ylabel('Predicted Concentration')
    plt.title("Predicted versus Observed Concentration")
    abline(1,0)
    plt.plot();
    plt.legend();
    print(r2_scores)

In [None]:
def vip(myModel):
    """calculate and plot vip scores from set PLSR model. 
    Adapted from: https://github.com/scikit-learn/scikit-learn/issues/7050"""
    myModel = myModel.named_steps['plsregression']
    t = myModel.x_scores_
    w = myModel.x_weights_
    q = myModel.y_loadings_
    p, h = w.shape
    vips = np.zeros((p,))
    s = np.diag(t.T @ t @ q.T @ q).reshape(h, -1)
    total_s = np.sum(s)
    for i in range(p):
        weight = np.array([ (w[i,j] / np.linalg.norm(w[:,j]))**2 for j in range(h) ])
        vips[i] = np.sqrt(p*(s.T @ weight)/total_s)
    
    plt.plot(vips);
    plt.xlabel('Sampled Point')
    plt.ylabel('VIP Score')
    return vips;

In [None]:
def moving_average(a, n) :
    """moving average of data a for given interval n"""
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

In [None]:
def abline(slope, intercept):
    """Plot a line from slope and intercept"""
    axes = plt.gca()
    x_vals = np.array(axes.get_xlim())
    y_vals = intercept + slope * x_vals
    plt.plot(x_vals, y_vals, '--')

In [None]:
def downsample(myX,n):
    """takes every nth sample"""
    myXds=myX.iloc[::n, :]
    myXds.reset_index(drop=True,inplace=True)
    plt.plot(myX);
    plt.title('Original Data')
    plt.xlabel('Sampled Point')
    plt.ylabel('Current (nA)')
    plt.show()
    plt.plot(myXds);
    plt.xlabel('Sampled Point')
    plt.ylabel('Current (nA)')
    plt.title('Downsampled Data')
    plt.show()
    return myXds

In [None]:
def getIndexes(dfObj, value):
    """look for given value in dataframe dfObj and return its index"""
    # Empty list
    listOfPos = []
     
    # isin() method will return a dataframe with
    # boolean values, True at the positions   
    # where element exists
    result = dfObj.isin([value])
     
    # any() method will return
    # a boolean series
    seriesObj = result.any()
 
    # Get list of column names where
    # element exists
    columnNames = list(seriesObj[seriesObj == True].index)
    
    # Iterate over the list of columns and
    # extract the row index where element exists
    for col in columnNames:
        rows = list(result[col][result[col] == True].index)
 
        for row in rows:
            listOfPos.append((row, col))
             
    # This list contains a list tuples with
    # the index of element in the dataframe
    return listOfPos
 
