# Imputation Pipeline

* Remove Trip & Field Blanks
* Use dilution factor (for afff data)
* Rename compounds based on acronyms
* ROS in R
* 1/2 MDL/RL Imputation
    * If RL is missing - fill in with approximates

In [1]:
import pandas as pd
import numpy as np
import rpy2.robjects as robjects
import os
import imputation_utils
import warnings
warnings.filterwarnings('ignore')

### Inputs

In [20]:
run constants.py

In [9]:
extracted_df = pd.read_csv(extracted_df_path)

#### Data manipulation
Remove all blanks, mid, effluent, and treatment values

In [10]:
extracted_df = imputation_utils.remove_blanks(df = extracted_df, 
                                             matrix_col = 'Matrix',
                                              report_col = 'report',
                                              address_col = 'address', 
                                             sample_id_col = 'sample_id')

Create censored column

In [11]:
extracted_df = imputation_utils.create_censored_col(df = extracted_df,
                                                     result_col = 'Result',
                                                     output_col = 'Result_val',
                                                     limit_col = 'limit')

Convert measurement columns to numeric

In [12]:
measurement_cols = ["Result","RL","MCL","MDL","Result_val", "limit"]
for col in measurement_cols:
    extracted_df[col] = pd.to_numeric(extracted_df[col], errors = 'coerce')

Convert microgram/l -> nanogram/l
* g/kg & ng/g are within our realm of values 

In [13]:
extracted_df = imputation_utils.convert_units(df = extracted_df,
                                              units_col = 'Units',
                                              matrix_col = 'Matrix',
                                              measurement_cols = ["Result","RL","MCL","MDL","Result_val", "limit"])

Convert by dilution factor

In [14]:
extracted_df = imputation_utils.dilute_measurements(df = extracted_df,
                                                     dilution_factor_col = 'DF',
                                                     measurement_cols = ["Result","RL","MCL","MDL","Result_val", "limit"])

Combine consist compound acronyms

In [15]:
# Only care about PFAS18
extracted_df['Acronym'] = extracted_df['Acronym'].replace({'N-MeFOSAA' : 'NMeFOSAA',
                                                           'N-EtFOSAA' : 'NEtFOSAA'})

Check if RL existing is an issue. If so - impute RL

In [16]:
extracted_df = imputation_utils.check_RL(df = extracted_df,
                                          rl_col = 'RL',
                                          output_col = 'Result_val')

No RL issues. All good for imputation!


In [17]:
extracted_df.shape

(21260, 24)

#### ROS

ROS needs to be done on separate compounds with their separate matrices
* AFFF should be done separately

In [12]:
# Sort values by Result_val (for ROS)
extracted_df = extracted_df.sort_values('Result_val')

In [13]:
# Separate AFFF from other sources
# Separate water from soil sources
afff_idxs = extracted_df['folder'].str.lower().str.contains('afff')
water_idxs = (~extracted_df['folder'].str.lower().str.contains('afff')) & ((extracted_df['Matrix'].str.lower().str.contains('water')) | (extracted_df['Matrix'].str.lower().str.contains('dw')))
soil_idxs = (~extracted_df['folder'].str.lower().str.contains('afff')) & (extracted_df['Matrix'].str.lower().str.contains('so'))

In [14]:
extracted_df.shape[0] == water_idxs.sum() + afff_idxs.sum() + soil_idxs.sum()

True

Create directories if they don't exist

In [15]:
paths = [f'{ros_folder}/ros/input',
         f'{ros_folder}/ros/output',
         f'{ros_folder}/ros/analysis' ,
         f'{ros_folder}/imputed'] 

for path in paths:
    # Check whether the specified path exists or not
    isExist = os.path.exists(path)

    if not isExist:
      # Create a new directory because it does not exist 
      os.makedirs(path)

Separate files for ROS or impute using 1/2 limit

In [16]:
df_imputation_dict = {'afff' : extracted_df[afff_idxs],
                      'water' : extracted_df[water_idxs],
                      'soil' : extracted_df[soil_idxs]
                     }

In [18]:
too_many_non_detects = []
for matrix_type in df_imputation_dict:
    
    df = df_imputation_dict[matrix_type].copy()
    
    for acronym in extracted_df['Acronym'].unique():

        # filter
        df_filtered = df[df['Acronym'] == acronym]

        #If % of values ND < 80% output df as csv into new folder for ROS
        perc_non_detects = df_filtered['Result_val_cen'].sum() / df_filtered.shape[0]
        if (perc_non_detects <= 0.8) & (matrix_type != 'afff'):
            df_filtered.to_csv(f'{ros_folder}/ros/input/{matrix_type}_{acronym}.csv')

        # else use 1/2 limit
        else:
            # If non-detect < non-detect threshold OR detected samples do not exceed 20 ng/l 
            if ((perc_non_detects) < non_detect_threshold) | (df_filtered['Result'].describe()['max'] > 20):
                df_filtered['Result_val'] = np.where(df_filtered['Result_val_cen'] == 1, df_filtered['Result_val'] / 2, df_filtered['Result_val'])
                df_filtered.to_csv(f'{ros_folder}/imputed/{matrix_type}_{acronym}.csv')
            else:
                print(matrix_type, acronym, ':')
                print(round(perc_non_detects * 100, 2), '% ND - Not included in analysis')
                print(df_filtered['Result'].describe())
    

afff ADONA :
100.0 % ND - Not included in analysis
count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: Result, dtype: float64
afff NEtFOSAA :
100.0 % ND - Not included in analysis
count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: Result, dtype: float64
afff 11Cl-PF 3OUdS :
100.0 % ND - Not included in analysis
count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: Result, dtype: float64
afff NMeFOSAA :
100.0 % ND - Not included in analysis
count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: Result, dtype: float64
afff PFTrDA :
100.0 % ND - Not included in analysis
count    0.0
mean     NaN
std      NaN
min      NaN
25%      NaN
50%      NaN
75%      NaN
max      NaN
Name: Result, dtype: float64
afff 9Cl-PF 3ONS :
100.0 % ND - Not included in analysis
count   

In [19]:
path, dirs, files = next(os.walk(f"{ros_folder}/ros/input"))
print('# files for ros imputation :', len(files))

path, dirs, files = next(os.walk(f"{ros_folder}/imputed"))
print('# files that were 1/2 imputed :', len(files))

# files for ros imputation : 14
# files that were 1/2 imputed : 20


In [21]:
robjects.globalenv['ros_analysis_location'] = f"{ros_folder}/ros/analysis"
robjects.globalenv['ros_inputs_location'] = f"{ros_folder}/ros/input"
robjects.globalenv['ros_outputs_location'] = f"{ros_folder}/output"

r_source = robjects.r['source']
r_source("./ros.R")

0,1
value,[RTYPES.NILSXP]
visible,[RTYPES.LGLSXP]


In [22]:
# Read data back in after ROS. Overwrite with ROS modeled data
path, dirs, files = next(os.walk(f"{ros_folder}/ros/output"))

for file in files:
    matrix_type = file.split('_')[0]
    acronym = file.split('_')[1].split('.')[0]
    
    ros_df = pd.read_csv(path + '/' + file)
    
    data = df_imputation_dict[matrix_type]
    data_filtered = data[data['Acronym'] == acronym]
    
    data_filtered = data_filtered.reset_index()

    # Overwrite with ROS modeled sample
    data_filtered['Result_val'] = ros_df['modeled']
    
    # Error in ROS - Dropped censored values that exceed max of uncensored values. Use 1/2 imputation for these places.
    data_filtered['Result_val'] = np.where(data_filtered['Result_val'].isna(), data_filtered['limit'] / 2, data_filtered['Result_val'])
    
    # write out to imputed folder
    data_filtered.to_csv(f'{ros_folder}/imputed/{matrix_type}_{acronym}.csv')