In [3]:
# Data processing script
# 
# assign working directory path
wd.path <- "~/in"
setwd(wd.path)
set.seed(1234)
library(dplyr)
library(data.table)            # to get fread
# library(foreach)
# library(caret)
# library(reshape2)
library(rms)
library(WeightedROC)

#Sample mode function from Source: https://stackoverflow.com/questions/2547402/is-there-a-built-in-function-for-finding-the-mode
Mode <- function(x, na.rm = FALSE) {
  if (na.rm) {
    x = x[!is.na(x)]
  }
  
  ux <- unique(x)
  return(ux[which.max(tabulate(match(x, ux)))])
}

imputationFunction <- function(imputeToData, imputeFromData, FUN, missingCols, suffix){
  # imputeToData -  Imputation to be done on this data
  # imputeFromData - Imputations calculation from this data
  # FUN - imputation function
  # missingCols - missing value column names
  # suffix - suffix to add after column name
  
  imputeToData <- imputeToData[, names(imputeToData) %in% missingCols]
  imputeFromData <- imputeFromData[, names(imputeFromData) %in% missingCols]
  imputeVec <- apply(imputeFromData, 2, function(x) FUN(x, na.rm = T))
  
  for (i in 1:length(missingCols)) {
    imputeToxData[is.na(imputeToData[, names(imputeToData) %in% missingCols[i]])
                 , names(imputeToData) %in% missingCols[i]] <- imputeVec[names(imputeVec) %in% missingCols[i]]
  }
  
  names(imputeToData) <- paste0(names(imputeToData), suffix)
  return(imputeToData)
}


Attaching package: ‘dplyr’

The following objects are masked from ‘package:stats’:

    filter, lag

The following objects are masked from ‘package:base’:

    intersect, setdiff, setequal, union


Attaching package: ‘data.table’

The following objects are masked from ‘package:dplyr’:

    between, first, last

Loading required package: Hmisc
Loading required package: lattice
Loading required package: survival
Loading required package: Formula
Loading required package: ggplot2

Attaching package: ‘Hmisc’

The following objects are masked from ‘package:dplyr’:

    combine, src, summarize

The following objects are masked from ‘package:base’:

    format.pval, round.POSIXt, trunc.POSIXt, units

Loading required package: SparseM

Attaching package: ‘SparseM’

The following object is masked from ‘package:base’:

    backsolve



### Get a small sample dataset to test the pipeline

In [4]:
samp_row = 1000                                                 # number of rows to take in the sample
num.impute = 2                                                  # number of imputations to perform on each node 
feat_list = c('SK_ID_CURR','TARGET','fold','Weights',
              
              
'Avg_EXT_SOURCE_application',
'ANNUITY_LENGTH',
'MAX_DAYS_CREDIT_bureau',
'CNT_INSTALMENT_FUTURE_min_sd_POS_CASH',
'MAX_DAYS_CREDIT_ENDDATE_bureau',
'DAYS_BIRTH',
'Refused_count_NAME_CONTRACT_STATUS_prev_app',
'CODE_GENDER',
'MIN_AMT_CREDIT_SUM_bureau',
'AMT_ANNUITY_AMT_GOODS_PRICE_ratio_application',
'DAYS_BIRTH_EXT_SOURCE_1_ratio_application',
'MEAN_AMT_CREDIT_SUM_DEBT_bureau',
'MEAN_AMT_CREDIT_SUM_bureau',
'EXT_SOURCE_1_DAYS_ID_PUBLISH_ratio_application',
'CREDIT_TO_GOODS_RATIO',
'AMT_ANNUITY_EXT_SOURCE_1_ratio_application',
'AMT_PAYMENT_min_sum_installments_payments',
'SD_DAYS_CREDIT_ENDDATE_bureau',
'CNT_FAM_MEMBERS_EXT_SOURCE_3_ratio_application',
'DAYS_BIRTH_EXT_SOURCE_3_ratio_application',
'SD_AMT_CREDIT_SUM_DEBT_bureau',
'CNT_INSTALMENT_FUTURE_Mode_sd_POS_CASH',
'MIN_Delta_Credit_prev_app',
'MEAN_DAYS_ENDDATE_FACT_bureau',
'AMT_PAYMENT_n_distinct_mean_installments_payments',
'AMT_ANNUITY_EXT_SOURCE_3_ratio_application',
'SD_AMT_ANNUITY_bureau',              
'SOURCES_PROD',
'INCOME_CREDIT_PERC',
'high_count_NAME_YIELD_GROUP_prev_app',
'DAYS_EMPLOYED_PERC',
'REGION_RATING_CLIENT_W_CITY',
'MEAN_DAYS_CREDIT_bureau',
'MAX_AMT_CREDIT_SUM_DEBT_bureau',
'SD_DAYS_DECISION_prev_app',
'0_count_bbalance',  #'0_count_bbalance'
'AMT_PAYMENT_min_max_installments_payments',
'AMT_ANNUITY',
'CNT_INSTALMENT_FUTURE_mean_median_POS_CASH',
'Day_Diff_installments_min_median_installments_payments',
'Day_Diff_installments_min_min_installments_payments',
'CNT_INSTALMENT_FUTURE_mean_max_POS_CASH',
'SUM_AMT_CREDIT_SUM_DEBT_bureau',
'CNT_INSTALMENT_FUTURE_median_sd_POS_CASH',
'NAME_EDUCATION_TYPE',  #'NAME_EDUCATION_TYPE_Secondary/secondaryspecial'
'MAX_AMT_CREDIT_SUM_bureau',
'CNT_INSTALMENT_FUTURE_median_mean_POS_CASH',
'AMT_DRAWINGS_ATM_CURRENT_mean_mean_credit_card_balance',
'Day_Diff_installments_min_sum_installments_payments',
'1_proportion_bbalance',  #'1_proportion_bbalance'
'DAYS_LAST_PHONE_CHANGE_AMT_ANNUITY_ratio_application',
'EXT_SOURCE_2.x',
'MEAN_AMT_ANNUITY_bureau',
'MEAN_AMT_DOWN_PAYMENT_prev_app',
'OWN_CAR_AGE_EXT_SOURCE_3_ratio_application',
'AMT_CREDIT_DAYS_EMPLOYED_ratio_application',
'DAYS_LAST_PHONE_CHANGE_DEF_30_CNT_SOCIAL_CIRCLE_ratio_application',
'OWN_CAR_AGE_AMT_REQ_CREDIT_BUREAU_QRT_ratio_application',
'AMT_ANNUITY_DAYS_EMPLOYED_ratio_application'
              

)

requested_formula = ~ TARGET +Avg_EXT_SOURCE_application + ANNUITY_LENGTH + MAX_DAYS_CREDIT_bureau + 
    CNT_INSTALMENT_FUTURE_min_sd_POS_CASH + MAX_DAYS_CREDIT_ENDDATE_bureau + 
    DAYS_BIRTH + Refused_count_NAME_CONTRACT_STATUS_prev_app + 
    CODE_GENDER + MIN_AMT_CREDIT_SUM_bureau + AMT_ANNUITY_AMT_GOODS_PRICE_ratio_application + 
    DAYS_BIRTH_EXT_SOURCE_1_ratio_application + MEAN_AMT_CREDIT_SUM_DEBT_bureau + 
    MEAN_AMT_CREDIT_SUM_bureau + EXT_SOURCE_1_DAYS_ID_PUBLISH_ratio_application + 
    CREDIT_TO_GOODS_RATIO + AMT_ANNUITY_EXT_SOURCE_1_ratio_application + 
    AMT_PAYMENT_min_sum_installments_payments + SD_DAYS_CREDIT_ENDDATE_bureau + 
    CNT_FAM_MEMBERS_EXT_SOURCE_3_ratio_application + DAYS_BIRTH_EXT_SOURCE_3_ratio_application + 
    SD_AMT_CREDIT_SUM_DEBT_bureau + CNT_INSTALMENT_FUTURE_Mode_sd_POS_CASH + 
    MIN_Delta_Credit_prev_app + MEAN_DAYS_ENDDATE_FACT_bureau + 
    AMT_PAYMENT_n_distinct_mean_installments_payments + AMT_ANNUITY_EXT_SOURCE_3_ratio_application + 
    SD_AMT_ANNUITY_bureau + SOURCES_PROD + INCOME_CREDIT_PERC + 
    high_count_NAME_YIELD_GROUP_prev_app + DAYS_EMPLOYED_PERC + 
#    REGION_RATING_CLIENT_W_CITY + 
    MEAN_DAYS_CREDIT_bureau + MAX_AMT_CREDIT_SUM_DEBT_bureau + 
    SD_DAYS_DECISION_prev_app + #0_count_bbalance + 
    AMT_PAYMENT_min_max_installments_payments

In [5]:
print(Sys.time())
train <- fread('train_modified.csv') # applications test data
test <- fread('test_modified.csv') # applications train data
print(Sys.time())

[1] "2018-08-28 20:48:45 UTC"
Read 307511 rows and 3205 (of 3205) columns from 4.639 GB file in 00:01:58
Read 48744 rows and 3202 (of 3202) columns from 0.751 GB file in 00:00:15
[1] "2018-08-28 20:50:58 UTC"


In [None]:
train_imp <- train[,feat_list,with=FALSE]
# train_imp <- train_imp[,c(1,3:ncol(train_imp),2)]                 # needs to remove this for fread
print(Sys.time())

test_imp <- test
test_imp$fold <- rep(NA,nrow(test_imp))
test_imp$Weights <- rep(NA,nrow(test_imp))
test_imp$TARGET <- rep(NA,nrow(test_imp))
test_imp <- test_imp[, feat_list, with=FALSE]
print(Sys.time())

M <- rbind(train_imp, test_imp)
M[M == 'XNA' | M == 'Unknown'] <- NA
M = droplevels(M)
print(Sys.time())

# replace Inf with NA as transcan, aregimpute cannot handle Inf
system.time(invisible(lapply(names(M),function(.name) set(M, which(is.infinite(M[[.name]])), j = .name,value =NA))))
print(Sys.time())

In [7]:
print(Sys.time())

library(parallel)
# Using all cores can slow down the computer
# significantly, I therefore try to leave one
# core alone in order to be able to do something 
# else during the time the code runs

cores_2_use <- detectCores() - 1

cl <- makeCluster(cores_2_use)
clusterSetRNGStream(cl, 9956)

clusterExport(cl, c("M", "num.impute", "requested_formula"))   # specify the list of objects to pass to each thread
clusterEvalQ(cl, library(rms))
imp_pars <- 
  parLapply(cl = cl, X = 1:cores_2_use, fun = function(no){
    mi <- aregImpute(requested_formula
                 , data=M, n.impute=num.impute, nk=4, pr=FALSE)
  })
stopCluster(cl)
print("Finished Imputation on all clusters.")
print(Sys.time())

[1] "2018-08-28 20:51:58 UTC"


In [None]:
setDF(M)             # converting the Data Table back to Data Frame, otherwise the join would not work 
completed.data = list()
for (i in 1:length(imp_pars)){
    for (j in 1:num.impute){
        imputed.data <- impute.transcan(imp_pars[[i]],imputation=j,data=M,list.out=TRUE,pr=FALSE,check=FALSE)
        k = (i-1)*num.impute + j
        completed.data[[k]] <- M
        completed.data[[k]][names(imputed.data)] <- imputed.data
        }
}
print("Finished Imputing the datasets")
print(Sys.time())

## Experiments

1. Are the imputation from the different cores giving rise to the same datasets?

In [None]:
library(parallel)
# Using all cores can slow down the computer
# significantly, I therefore try to leave one
# core alone in order to be able to do something 
# else during the time the code runs

#cores_2_use <- detectCores() - 1
cores_2_use <- 2
num.impute <- 2

cl <- makeCluster(cores_2_use)
clusterSetRNGStream(cl, 9956)

clusterExport(cl, c("M", "num.impute"))   # specify the list of objects to pass to each thread
clusterEvalQ(cl, library(rms))
imp_pars <- 
  parLapply(cl = cl, X = 1:cores_2_use, fun = function(no){
    mi <- aregImpute(~ TARGET +Avg_EXT_SOURCE_application + ANNUITY_LENGTH + MAX_DAYS_CREDIT_bureau + 
    CNT_INSTALMENT_FUTURE_min_sd_POS_CASH
                 , data=M, n.impute=num.impute, nk=4, pr=FALSE)
  })
stopCluster(cl)

completed.data = list()
for (i in 1:length(imp_pars)){
    for (j in 1:num.impute){
        imputed.data <- impute.transcan(imp_pars[[i]],imputation=j,data=M,list.out=TRUE,pr=FALSE,check=FALSE)
        k = (i-1)*num.impute + j
        completed.data[[k]] <- M
        completed.data[[k]][names(imputed.data)] <- imputed.data
        }
}

print("Variables that have any missing in completed.data.1")
names(completed.data[[1]])[apply(completed.data[[1]], 2, function(x) sum(is.na(x))) > 0]

print("Variables that have any missing in completed.data.2")
names(completed.data[[2]])[apply(completed.data[[2]], 2, function(x) sum(is.na(x))) > 0]

print("Are the two completions identical?")
identical(x=completed.data[[1]], y=completed.data[[2]])

print("Three sample imputed values for MAX_DAYS_CREDIT_bureau, imputation 1 on Core 1")
print(imp_pars[[1]]$imputed$MAX_DAYS_CREDIT_bureau[1:3,1])
print("Three sample imputed values for MAX_DAYS_CREDIT_bureau, imputation 1 on Core 2")
print(imp_pars[[2]]$imputed$MAX_DAYS_CREDIT_bureau[1:3,1])

In [None]:
set.seed(17) # so can reproduce random aspects
system.time(mi <- aregImpute(~ TARGET +Avg_EXT_SOURCE_application + ANNUITY_LENGTH + MAX_DAYS_CREDIT_bureau + 
    CNT_INSTALMENT_FUTURE_min_sd_POS_CASH
                 , data=train_test_top1000, n.impute=3, nk=4, pr=FALSE))

# Useful Tools

In [None]:
# Find out the memory usage of each item
sort(sapply(ls(), function(x) format(object.size(get(x)), unit = 'auto')))

In [None]:
# Mechanically create a formula for imputation
M = train_test_top1000

col_set = M %>% names()
col_set = col_set[!col_set %in% c("TARGET", 'SK_ID_CURR', 'fold', 'Weights')]

string_formula = "TARGET~"
for(i in col_set){
  if(class(M[[i]]) == "numeric"){
    tmp_component = paste0("+",i)
  }
  if(class(M[[i]]) != "numeric"){
    tmp_component = paste0("+",i)
  }
  string_formula = paste0(string_formula, tmp_component)  
}
requested_formula = as.formula(string_formula)
print(requested_formula)

In [None]:
# replace Inf with NA as transcan, aregimpute cannot handle Inf
system.time(invisible(lapply(names(M),function(.name) set(M, which(is.infinite(M[[.name]])), j = .name,value =NA))))

## Conclusions / Takeaways

1. transcan and aregImpute cannot handle infinite values, need to first replace with NA. Use the following code.
> system.time(invisible(lapply(names(M),function(.name) set(M, which(is.infinite(M[[.name]])), j = .name,value =NA))))
    
2. aregImpute more fault-tolerant than transcan, latter would quit if it does not converge in 50 iterations.
> Error in transcan(~Avg_EXT_SOURCE_application + ANNUITY_LENGTH + MAX_DAYS_CREDIT_bureau + : no convergence in 50 iterations

3. data.table::fread much better than read.csv for larger datasets, 2.5min vs 40min!

4. Peek at the source code for R packages to understand how to use the dataset! (https://github.com/harrelfe/Hmisc/blob/master/R/fit.mult.impute.s)

5. identical and all.equal can be used to check if two data frames are the same (https://stackoverflow.com/questions/10978895/how-to-compare-two-dataframes)