In [2]:
import pandas as pd
import numpy as np
import csv
import json
from scipy.stats import linregress
from statsmodels.stats.weightstats import ztest as ztest

# local code
from source.interactive_plots import interactive_linear_regression_calibration_plot
from source.get_elements      import get_elements
from source.outliers          import dixon_test

Load data from XRF analyses

In [3]:
xrf_data = pd.read_csv("../data/interim/xrf_data_clean.csv") # load xrf data
# drop uncertainty columns 
xrf_data.drop([column for column in xrf_data.columns if column.endswith("+/-")], axis=1, inplace=True)

# Calibration

Load data for standard reference materials and clean

In [4]:
srm_data = pd.read_csv("../data/interim/standard_reference_material_certified_values.csv") # load SRM data
srm_data.drop(["1SD", "95% Confidence Low", "95% Confidence High"], axis=1, inplace=True) # drop unnecessary columns

# clean SRM data
for row in srm_data.iterrows(): 
    row[1]["Sample ID"] = row[1]["Sample ID"].lower()
    row[1]["Analyte"] = row[1]["Analyte"].split(",",1)[0]

    # clean certified value
    cert_val = row[1]["Certified Value"]
    if cert_val.startswith("<"): 
        row[1]["Certified Value"] = float(cert_val.lstrip("< ")) / 2 # replace BDL with half value
    else:
        row[1]["Certified Value"] = float(row[1]["Certified Value"])

    # clean units
    row[1]["Units"] = row[1]["Units"].lstrip("(").rstrip(")")
    if row[1]["Units"] == "wt.%":  # convert units
       row[1]["Certified Value"] = row[1]["Certified Value"] * 1e4
       row[1]["Units"] = "ppm"

## Remove standard data for a given element that is unsuitable for calibration

Compare the measured concentration of each element for a given standard to the distribution of measured concentrations for that element across all non-standards. If the measured concentration 

    1.) falls outside the range of the distribution 
    2.) is more than three standard deviations from the mean 

that value is set to `NaN` (all values set to `NaN` are filtered out during calibration). 

Note: I should consider conducting a test to determine if each distribution is normally or log-normally distributed; if the latter, I should log-transform the data and then calculate the z-score. 

In [5]:
def z_score(data, value): 
    mean = np.mean(data)
    std = np.std(data)

    return (value - mean) / std

In [6]:
non_standards_data = xrf_data.loc[xrf_data["qaqc_type"]!="standard"]

outlier_stddev_cutoff = 5

for standard in xrf_data.loc[xrf_data["qaqc_type"]=="standard", "sample_id"].unique(): 
    for date in xrf_data.loc[xrf_data["sample_id"]==standard, "date"].unique():
        for element in get_elements(xrf_data.columns.to_list()): 
            standard_date_element = xrf_data.loc[(xrf_data["sample_id"]==standard) & (xrf_data["date"]==date)][element].values

            if (non_standards_data[element] > standard_date_element[0]).all() | \
               (non_standards_data[element] < standard_date_element[0]).all(): 
                
                # print(non_standards_data[element].to_numpy())
                z = z_score(non_standards_data[element], standard_date_element[0])
                if abs(z) > outlier_stddev_cutoff: 
                    # print(standard + ", " + date + ", " + element + ", " + str(standard_date_element[0]) + ", " + str(z))

                    xrf_data.loc[(xrf_data["sample_id"]==standard) & (xrf_data["date"]==date), element] = np.nan

Get the elements analyzed by the XRF and for which concentrations are reported for one or more standard reference materials

In [7]:
elements = get_elements(
                        list(
                            set(srm_data["Analyte"].unique()) & \
                            set(xrf_data.columns.to_list())
                            )
                        )

Construct a linear regression model for each element in order to predict the true concentration from the measured concentration

In [8]:
# initialize dictionary to hold lin. reg. models for each element
reg = {}
for element in elements: 

    # get IDs of standard reference materials
    srm = srm_data.loc[srm_data["Analyte"]==element]["Sample ID"].unique() 

    ## TRAIN 
    # limit training data to standards for which we have standard reference material info for the element at hand
    data_train = xrf_data.loc[(xrf_data["qaqc_type"]=="standard") & (xrf_data["sample_id"].isin(srm))]
    data_train = data_train.dropna(subset=[element]) # change to true condition statement

    x_train = [srm_data.loc[
                            (srm_data["Sample ID"]==sample) & \
                            (srm_data["Analyte"]==element)
                            ]["Certified Value"].values[0] for sample in data_train["sample_id"]]
    x_train = np.array(x_train)
    y_train = data_train[element].to_numpy()

    model = linregress(x_train, y_train) # fit linear regression model

    if model.slope != 0: #only use calibration curve if meaningful (i.e., if variance in dep. var. explained by variance in indep. var.)
        # invert model so that measured concentration is independent var. and true concentration is dependent var. (i.e., y = m*x + b --> x = (1/m)*y - (b/m))
        intercept_inv = -model.intercept / model.slope
        slope_inv = model.slope ** -1

        ## PREDICT (i.e., calibrate)
        data_predict = xrf_data.loc[xrf_data["qaqc_type"]!="standard"]
        data_predict = data_predict.dropna(subset=[element])

        x_predict = data_predict[element]
        x_predict = x_predict.to_numpy()
        y_predict = slope_inv * x_predict + intercept_inv

        ## Save model results
        reg[element] = {} # initialize empty dict to save model results

        reg[element]["model"]                 = model
        reg[element]["x_train"]               = x_train
        reg[element]["y_train"]               = y_train
        reg[element]["score"]                 = model.rvalue
        reg[element]["y-intercept std error"] = model.intercept_stderr
        reg[element]["slope_inv"]             = slope_inv
        reg[element]["intercept_inv"]         = intercept_inv
        reg[element]["x_predict"]             = x_predict
        reg[element]["y_predict"]             = y_predict
        
        xrf_data.loc[xrf_data["qaqc_type"]!="standard", element] = y_predict

  slope = ssxym / ssxm
  slope_stderr = np.sqrt((1 - r**2) * ssym / ssxm / df)


Display the results of the calibration

In [9]:
dropdown_buttons = {
    "data": 
        {
            "name": "Elements", 
            "columns": list(reg.keys())
        }
    }

interactive_linear_regression_calibration_plot(dropdown_buttons, reg, x_axis_label="True concentration (ppm)", y_axis_label="Measured concentration (ppm)", title="")

VBox(children=(HBox(children=(Dropdown(description='Elements', options=('Ag', 'As', 'Ba', 'Bi', 'Ca', 'Cd', 'C…

Calculate the detection limit for each element. 

In [10]:
for element in elements: 
    detection_limit = reg[element]["slope_inv"] * (reg[element]["model"].intercept + 3*reg[element]["y-intercept std error"]) + reg[element]["intercept_inv"]
    reg[element]["detection_limit"] = detection_limit

Apply detection limit to dataset.

In [11]:
for element in reg.keys(): 
    limit = reg[element]["detection_limit"]
    xrf_data[element].where(xrf_data[element] >= limit, other=f"<{limit}", inplace=True)

# Evaluating Precision

In [12]:
lab_duplicates = xrf_data[xrf_data["qaqc_type"]=="lab duplicate"]

lab_parent_indexes = []

for row in lab_duplicates.iterrows(): 
    lab_dup_id = row[1]["sample_id"]
    lab_parent_id = lab_dup_id.rstrip('L')

    condition = xrf_data["sample_id"] == lab_parent_id
    index = xrf_data.index[condition][0]
    lab_parent_indexes.append(index)

lab_parents = xrf_data.iloc[lab_parent_indexes]

lab_pairs = pd.concat([lab_duplicates, lab_parents])

In [56]:
def half_detection_limit(x): 
    if isinstance(x, str): 
        if x.startswith("<"): 
            x = float(x.lstrip("< ")) / 2 # replace BDL with half value
    return x

In [109]:
lab_pair_diffs = lab_parents.copy()

elements_dup = get_elements(xrf_data.columns.to_list())

lab_pair_diffs.loc[:, elements_dup] = lab_pair_diffs.loc[:, elements_dup].apply(pd.to_numeric, errors="coerce")
lab_duplicates = lab_duplicates.copy()
lab_duplicates.loc[:, elements_dup] = lab_duplicates.loc[:, elements_dup].apply(pd.to_numeric, errors="coerce")

for row in lab_pair_diffs.iterrows():
    parent_id = row[1]["sample_id"]
    duplicate_id = parent_id + "L"

    parent_entry = row[1].to_frame().transpose()
    parent_entry.reset_index(inplace=True)
    parent_values = parent_entry.loc[:, elements_dup]

    duplicate_entry = lab_duplicates.loc[lab_duplicates["sample_id"]==duplicate_id]
    duplicate_entry.reset_index(inplace=True)
    duplicate_values = duplicate_entry.loc[:, elements_dup]
    
    condition = lab_pair_diffs["sample_id"] == parent_id
    lab_pair_diffs.loc[condition, elements_dup] = parent_values.subtract(duplicate_values).values

    parent_values = parent_values.applymap(half_detection_limit)
    duplicate_values = duplicate_values.applymap(half_detection_limit)
    pair_values = pd.concat((parent_values, duplicate_values))

    condition = xrf_data["sample_id"] == parent_id
    xrf_data.loc[condition, elements_dup] = pair_values.mean().values

    condition = xrf_data["sample_id"] == duplicate_id
    xrf_data.drop(xrf_data[condition].index, inplace=True)

lab_pair_diffs = lab_pair_diffs[["sample_id"] + elements_dup]

In [15]:
def std_dev_from_pairs(diffs): 
    diffs = np.asarray(diffs)
    n = diffs.size
    
    std_dev = np.sqrt(np.sum(np.square(diffs)) / (2 * n))

    return std_dev, n

In [16]:
analytical_precision = {}
for element in elements_dup: 
    analytical_precision[element] = dict.fromkeys(["standard deviation", "number of pairs"])

    diffs = lab_pair_diffs[element]
    if (len(diffs) >=3) & (False in lab_pair_diffs["Cd"].isnull()): # min. for Dixon's Q Test & remove elements w/ all null values
        diffs = dixon_test(list(diffs), left=True, right=True, confidence_level=95)[0]
    std_dev, n = std_dev_from_pairs(diffs)

    if np.isnan(std_dev): 
        std_dev = None

    analytical_precision[element]["standard deviation"] = std_dev
    analytical_precision[element]["number of pairs"] = n

In [17]:
with open("../data/interim/analytical_precision.json", "w") as outfile: 
    json.dump(analytical_precision, outfile, indent=4)

In [None]:
field_duplicates = xrf_data[xrf_data["qaqc_type"]=="field duplicate"]
field_parent_indexes = []

for row in field_duplicates.iterrows(): 
        field_dup_id = row[1]["sample_id"]
        field_parent_id = field_dup_id.rstrip('F')

        condition = xrf_data["sample_id"] == field_parent_id
        index = xrf_data.index[condition][0]
        field_parent_indexes.append(index)

field_parents = xrf_data.iloc[field_parent_indexes]

field_pairs = pd.concat([field_duplicates, field_parents])
field_parents.shape

In [None]:
# alpha= 0.05
# element = "Ba"
# normality_test_data = xrf_data.loc[xrf_data["qaqc_type"]!="standard", element]
# k, p = stats.normaltest(normality_test_data)
# if p < alpha:
#     print("reject null hypothesis that sample is normall distributed")

In [None]:
# export data to csv file
xrf_data.to_csv('../data/interim/xrf_data_calib.csv')

In [21]:

    # xrf_data.to_excel('../data/interim/xrf_data_calib.xlsx', na_rep="NaN")