# This notebook contains the code required to show that changing Air Gap de-calibrates pipettes

## Code required to make Fig. 3a is available in this notebook

### Import necessary modules

In [1]:
import pandas as pd
from openpyxl import load_workbook
from scipy.stats import linregress, ttest_ind
import numpy as np
import sys
from plotly import tools, subplots
import plotly.graph_objs as go
import pickle
import plotly.io as pio
pio.templates.default = "none"

if 'ipykernel' in sys.modules:
    from plotly.offline import init_notebook_mode
    from plotly.offline import iplot as plot
    from IPython.display import HTML
    HTML("""
         <script>
          var waitForPlotly = setInterval( function() {
          if( typeof(window.Plotly) !== "undefined" ){
          MathJax.Hub.Config({ SVG: { font: "STIX-Web" }, displayAlign: "center" });
          MathJax.Hub.Queue(["setRenderer", MathJax.Hub, "SVG"]);
          clearInterval(waitForPlotly);}}, 250 );
        </script>
        """
    )
    init_notebook_mode(connected=True)

### Make Pipette Calibration classes/attributes

In [2]:
class Calibration(object):
    
    #Calibration class will contain all info about one calibration run for one class of volumes
    
    def __init__(self,**kwargs):
        self.max_systematic_error=None
        self.manual_slope=None
        self.manual_intercept=None
        self.number_tips=8
        self.number_replicates=None
        for key in kwargs:
            setattr(self, key, kwargs[key])
        self.tip_objects={"tip"+str(i+1): None for i in range(self.number_tips)}
        self.tip_objects["manual"]=None
        
        
    def scale_od(self):
        
        #Figures out max and min absorbance of manually pipetted samples for each replicate separately
        minmax_params={replicate.replicate_id: {'max':max([point.raw_od for point in replicate.points]),
                                                'min':min([point.raw_od for point in replicate.points])}
                           for replicate in self.tip_objects["manual"].replicates}
        
        #Scale the raw_OD of every point between the min and max values for that replicate
        for tip_id, tip in self.tip_objects.items():
            for replicate in tip.replicates:
                if replicate.replicate_id!='solution':
                    for point in replicate.points:
                        point.scaled_od = (point.raw_od - minmax_params[replicate.replicate_id]['min'])/(minmax_params[replicate.replicate_id]['max'] - minmax_params[replicate.replicate_id]['min'])
                 

    def calculate_std_curve(self):
        #Calculate std curve that relates OD to volume (and consequently concentration)
        slopes=[]
        intercepts=[]
        for replicate in self.tip_objects["manual"].replicates:
            replicate.calculate_std_curve()
            slopes.append(replicate.slope)
            intercepts.append(replicate.intercept)
        self.man_slope = np.mean(slopes)
        self.man_slope_std = np.std(slopes)
        self.man_intercept = np.mean(intercepts)
        self.man_intercept_std = np.std(intercepts)
    
    
    def predict_volumes(self):
        
        #Go to each point and predict the volume pipetted as the OD*avg_slope+avg_intercept.
        #Also calculate the pct_deviation from expected values
        for tip_id,tip in self.tip_objects.items():
            if tip_id!='manual':
                for replicate in tip.replicates:
                    if replicate.replicate_id!='solution':
                        for point in replicate.points:
                            point.predicted_value = self.man_slope * point.scaled_od + self.man_intercept
                            point.pct_deviation = abs(point.expected_value - point.predicted_value)/point.expected_value*100


    def calculate_calibration_params(self):
        
        #Create a calibration point for each volume with the average predicted value and expected value
        for tip_id, tip in self.tip_objects.items():
            if tip_id!='manual':
                volumes_list = [point.expected_value for point in tip.replicates[0].points]
                tip.replicate_dict['solution'].points.extend([CalibrationPoint(expected_value=volume,
                                                          tip_id=tip_id,
                                                          predicted_value=np.mean([replicate.points_dict[volume].predicted_value 
                                                                                   for replicate in tip.replicates
                                                                                  if replicate.replicate_id!='solution']),
                                                          pct_deviation=np.mean([replicate.points_dict[volume].pct_deviation
                                                                             for replicate in tip.replicates
                                                                             if replicate.replicate_id!='solution']))
                                                         for volume in volumes_list])
                
                #For tip, calculate an offset and a factor that can convert actual pipetted volumes to desired volumes
                #Tecan's EvoWare interface requires tip_factor,tip_offset to be the slope and intercept that transforms
                #  pipetted values(x's) into expected values(y's)
                x_vals=[point.predicted_value for point in tip.replicate_dict['solution'].points]
                y_vals=[point.expected_value for point in tip.replicate_dict['solution'].points]
                sol = linregress(x_vals, y_vals)
                tip.replicate_dict['solution'].tip_factor = sol.slope
                tip.replicate_dict['solution'].tip_offset = sol.intercept
                
                #Calculate rsq to get the feasibility of a linear regression and the average pct deviation for the tip WITHOUT calibration
                tip.replicate_dict['solution'].tip_rsq = sol.rvalue**2
                tip.replicate_dict['solution'].avg_pct_deviation = np.mean([point.pct_deviation for point in tip.replicate_dict['solution'].points])
        
    
    def load_calibration_data(self, filename='None'):
        #Load calibration file
        if filename == 'None':
            print("No filename specified")
        else:
            #Load entire workbook
            wb=load_workbook(filename)
            
            #Find which sheets begin with 'Rep' and calculate number of replicate calibrations done
            self.number_replicates=len([sheetname for sheetname in wb.sheetnames if 'Rep' in sheetname])
            if self.number_replicates==0 or self.number_replicates is None:
                print("Please check sheet names. No sheets with the name Rep to indicate replicates.")
            else:
                
                # Initiate empty Tip object for each tip in the dictionary 'tip_objects'
                for tip_id in self.tip_objects:
                    self.tip_objects[tip_id] = Tip(tip_id=tip_id,
                                                  number_replicates=self.number_replicates)
                    
                #For each replicate, find the number of OD reads (tech replicates to identify random errors)
                #Also, store the index of the row where OD data begins
                for rep in range(self.number_replicates):
                    datasheet = 'Rep'+str(rep+1)
                    data_start_index = [index+5 for index, row in enumerate(wb[datasheet].rows) 
                                        if row[0].value if "Start Time" in str(row[0].value)]
                    number_reads = len(data_start_index)
                    
                    #For each calibration point - i.e. pipetted volume, get expected value from the setup sheet and 
                    # the raw OD from the appropriate replicate sheet
                    for j,column in enumerate(wb['Setup'].iter_cols()):
                        if str(column[0].value).title() != 'Manual':
                            for i in range(8):
                                if column[i+1].value is not None:
                                    #Read error stores the random error/systematic error from the platereader
                                    point = CalibrationPoint(expected_value=column[i+1].value, tip_id=i+1)        
                                    point.raw_od = np.mean([wb[datasheet][index+i][j+1].value for index in data_start_index])
                                    point.read_error = np.std([wb[datasheet][index+i][j+1].value for index in data_start_index])
                                    self.tip_objects["tip"+str(i+1)].replicate_dict[rep].points.append(point)


                        else:
                            #Same as above for manually pipetted values
                            for i in range(8):
                                if column[i+1].value is not None:
                                    point = ManualPoint(expected_value=column[i+1].value, tip_id='manual')
                                    point.raw_od = np.mean([wb[datasheet][index+i][j+1].value for index in data_start_index])
                                    point.read_error = np.std([wb[datasheet][index+i][j+1].value for index in data_start_index])
                                    self.tip_objects["manual"].replicates[rep].points.append(point)
                                    
                #Identify max systematic error from platereader for an estimation of the number of significant figures to display
                self.max_systematic_error = np.amax([max([max([point.read_error for point in replicate.points])
                                                     for replicate in tip_object.replicates
                                                     if replicate.replicate_id!='solution']) 
                                                    for tip_id,tip_object in self.tip_objects.items()])/np.sqrt(number_reads)
                
                #Call functions to scale OD data, calculate std curve for manually pipetted samples, predict volumes, and calculate calibration params
                self.scale_od()
                self.calculate_std_curve()
                self.predict_volumes()
                self.calculate_calibration_params()

    
class Tip(object):
    
    #Each tip object has information about the calibration replicates available for that tip.
    #After all calculations are done, it will also contain the 'solution' replicate which has the offset and factor values
    def __init__(self,**kwargs):
        self.number_replicates=None
        self.tip_id=None
        for key in kwargs:
            setattr(self, key, kwargs[key])
        self.replicates=[Replicate(tip_id=self.tip_id, replicate_id=i)
                         if self.tip_id!="manual"
                         else ManualReplicate(tip_id=self.tip_id, replicate_id=i)
                         for i in range(self.number_replicates)]
        
        if self.tip_id!="manual":
            self.replicates.append(Replicate(tip_id=self.tip_id, replicate_id='solution')) 
        
        
    @property 
    def replicate_dict(self):
        return{replicate.replicate_id:replicate for replicate in self.replicates}

    
class Replicate(object):
    
    #Each Replicate object contains calibration points
    def __init__(self, **kwargs):
        self.points=[]
        self.tip_id=None
        self.replicate_id=None
        for key in kwargs:
            setattr(self, key, kwargs[key])
            
    @property
    def points_dict(self):
        return {point.expected_value:point for point in self.points}
        
    

class ManualReplicate(Replicate):
    
    #Child class of Replicate object. Has the slope, intercept and rsq values for the manually pipetted replicate
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.slope=None
        self.intercept=None
        self.rsq=None
        
        
    def calculate_std_curve(self):
        #Calculate linear regression params that transform OD values to Volume (as a proxy for concentration)
        if self.points:
            x_vals=[point.scaled_od for point in self.points]
            y_vals=[point.expected_value for point in self.points]
            lin_fit = linregress(x_vals, y_vals)
            self.slope=lin_fit.slope
            self.intercept=lin_fit.intercept
            self.rsq=lin_fit.rvalue**2
              
            
class ManualPoint(object):
    
    #Class that contains a calibration point for manually pipetted sample
    def __init__(self,**kwargs):
        self.number_reads=None
        self.expected_value=None
        self.raw_od=None
        self.scaled_od=None
        self.read_error=None
        self.tip_id=None
        for key in kwargs:
            setattr(self, key, kwargs[key])

        
class CalibrationPoint(ManualPoint):
        
    #Child class of Manual Point that contains a calibration point for liha pipetted sample

    def __init__(self,**kwargs):
        super().__init__(**kwargs)
        self.predicted_value=None
        self.pct_deviation=None
        for key in kwargs:
            setattr(self, key, kwargs[key])

### Calibration Performed for 2 Levels:


- Low Volume: 3$\mu$L - 10$\mu$L
- Mid Volume: 10$\mu$L - 50$\mu$L
- High Volume: 50$\mu$L - 200$\mu$L

In [3]:
#Load necessary excel files with absorbance data

pre_stag_high = Calibration()
pre_stag_high.load_calibration_data('20201021_PreSTAGChange_Calibration_Test_HighVolume.xlsx')

pre_stag_low = Calibration()
pre_stag_low.load_calibration_data('20201021_PreSTAGChange_Calibration_Test_LowVolume.xlsx')

post_stag_high = Calibration()
post_stag_high.load_calibration_data('20201030_PostSTAGChange_Calibration_Test_HighVolume.xlsx')

post_stag_low = Calibration()
post_stag_low.load_calibration_data('20201030_PostSTAGChange_Calibration_Test_LowVolume.xlsx')

calib_list = [pre_stag_low, pre_stag_high]
post_calib_list = [post_stag_low, post_stag_high]

### Determine whether calibration was successful.
#### Obtain average %error for each tip in every volume level.
#### Perform a one-tailed t-test with the alternative hypothesis : Post-calibration error is smaller than Pre-calibration error

### Considering all volume ranges together

In [4]:
df_list = []

#List to store deviation in pipetted volume before air gap change
prior_error = []
prior_error_stderr = []

#List to store deviation in pipetted volume after air gap change
post_error = []
post_error_stderr = []
pvalue = []

for tip_id in pre_stag_high.tip_objects:
    if tip_id!='manual':
        
        temp_priors = []
        temp_posts = []

        for i, (calib,post_calib) in enumerate(zip(calib_list,post_calib_list)):
        
        
                            
            temp_priors.extend(np.array([[point.pct_deviation for point in replicate.points] 
                                     for replicate in calib.tip_objects[tip_id].replicates
                                     if replicate.replicate_id!='solution']).flatten())
            temp_posts.extend(np.array([[point.pct_deviation for point in replicate.points] 
                                    for replicate in post_calib.tip_objects[tip_id].replicates
                                    if replicate.replicate_id!='solution']).flatten())
      
        prior_error.append(np.mean(temp_priors))
        prior_error_stderr.append(np.std(temp_priors)/np.sqrt(len(temp_priors)))
        post_error.append(np.mean(temp_posts))
        post_error_stderr.append(np.std(temp_posts)/np.sqrt(len(temp_posts)))
        pvalue.append(ttest_ind(temp_posts,temp_priors,equal_var=False,alternative='greater').pvalue)

    
calib_result_df = pd.DataFrame(data={'pre_error':prior_error,
                                     'pre_error_stderr':prior_error_stderr,
                                     'post_error': post_error,
                                     'post_error_stderr':post_error_stderr,
                                     'pvalue':pvalue},
                               index=[tip_id.title() for tip_id in calib.tip_objects if tip_id!='manual'])
    
display(calib_result_df)

#Shows deviation in pipetted volumes (as % error) before and after airgap change. A one tailed t-test is used to calculate p-values for each tip to 
#test the hypothesis that changing air gap does not affect the calibration.

Unnamed: 0,pre_error,pre_error_stderr,post_error,post_error_stderr,pvalue
Tip1,1.414316,0.403271,22.838742,6.691175,0.01641
Tip2,0.834132,0.28031,17.604271,5.867291,0.023839
Tip3,1.319355,0.430196,20.841409,5.740071,0.013289
Tip4,1.610212,0.575624,31.502854,9.066473,0.014835
Tip5,0.889256,0.302969,18.649287,3.939714,0.004549
Tip6,1.375025,0.509421,21.799325,5.688547,0.010911
Tip7,1.089109,0.412663,28.487972,9.188657,0.020818
Tip8,1.948443,0.682385,30.064403,9.767063,0.023286


### Plot average errors pre- and post-calibration

In [5]:
colors = ['#A0D297', '#AF7E5A']

yaxis_ranges = (0, 50)

#These are max acceptable error levels laid out by ISO for each volume level
iso_ranges = (2.4, 8)

#Typically found max errors for manual multichannel pipettes
manual_ranges = (1, 5.5)

trace_list = []
trace_list.append(go.Scatter(x=np.linspace(0, 10, 8), y=[iso_ranges[1]]*8,
                             name='Max Allowed Error (ISO)',
                             mode='lines', line=dict(dash='dot',color='#9b5151',width=1),
                             xaxis='x2', showlegend=True))

trace_list.append(go.Bar(x=list(calib_result_df.index), y=calib_result_df['pre_error'].to_list(),
                         error_y=dict(type='data', array=calib_result_df['pre_error_stderr'].to_list(),
                                      width=2, thickness=1, color='black'),
                         marker=dict(color=colors[0], line=dict(width=0, color='black')),
                         textfont=dict(family='Arial', size=20, color='black'),
                         name='Before STAG Change', showlegend=True))
trace_list.append(go.Bar(x=list(calib_result_df.index), y=calib_result_df['post_error'].to_list(),
                         error_y=dict(type='data', array=calib_result_df['post_error_stderr'].to_list(),
                                      width=2, thickness=1, color='black'),
                         marker=dict(color=colors[1], line=dict(width=0, color='black')),
                         textfont=dict(family='Arial', size=40, color='black'),
                         name='After STAG Change', showlegend=True))

layout = go.Layout(height=465, width=425, showlegend=True, barmode='group', bargap=0.2, bargroupgap=0.05,
                   legend_orientation='h',

                    xaxis=dict(#title_text=xaxis_titles[i], 
#                               title_font=dict(family='Arial',size=13,color='black'), title_standoff=0,
                              showline=True, linewidth=1, linecolor='black', mirror=True, showgrid=False,
                              ticks='outside', ticklen=4, tickfont=dict(size=13, family='Arial', color='black'),
                              tickangle=0, tickcolor='black', side='bottom', 
                              type='category', anchor='y', overlaying='x2'),

                   xaxis2=dict(showline=False,linewidth=0,ticklen=0,side='top',type='linear',range=(0,8),
                               anchor='y',visible=False),

                   yaxis=dict(title='Pipetting Accuracy<br>(% deviation)',
                              titlefont=dict(family='Arial', size=16, color='black'), title_standoff=0,
                              showline=True, linewidth=1, linecolor='black', mirror=True, showgrid=False,
                              ticks='outside', ticklen=4, tickfont=dict(family='Arial',size=13, color='black'),
                              tickangle=0, tickcolor='black', range=yaxis_ranges))

fig = go.Figure(data=trace_list,layout=layout)
plot(fig)
#pio.write_image(fig,"Figures/fig_2_airgapeffect.svg",format='svg')
display(pd.DataFrame(calib_result_df['pvalue']))

Unnamed: 0,pvalue
Tip1,0.01641
Tip2,0.023839
Tip3,0.013289
Tip4,0.014835
Tip5,0.004549
Tip6,0.010911
Tip7,0.020818
Tip8,0.023286
