In [1]:
## Load the project from conf directory which has project data, In this notebook,
## we add the nuclio serverless function to preprocess test data

import mlrun
project_name = "widsdb2"
project_dir = "conf"

wids_prj, artifact_path = mlrun.set_environment('http://mlrun-api:8080', 
                                                project=project_name, user_project=False)
widsdb2_proj = mlrun.projects.load_project(project_dir,  clone=True)


In [2]:
# nuclio: start-code

In [3]:


import os

import sys
sys.path.append('/v3io/projects/widsdb2/util')


import json
import pandas as pd
import numpy as np
from collections import defaultdict
import widsutil as util
from cloudpickle import dumps, dump, load


from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem

def tstdata_prep(
    context:MLClientCtx, 
    src: DataItem,
    file_ext: str = "csv",
    test_enc: str = "test_enc"
):
    """process a raw churn data file
    
    Data has 3 states here: `raw`, `cleaned` and `encoded`
    
    * `raw` kept by default, the pipeline begins with a raw data artifact
    * `cleaned` kept for charts, presentations
    * `encoded` is input for a cross validation and training function
    
    steps (not necessarily in correct order, some parallel)
    * column name maps
    * deal with nans and other types of missings/junk
    * label encode binary and ordinal category columns
    * create category ranges from numerical columns
    And finally,
    * test
    
    Why we don't one-hot-encode here? One hot encoding isn't a necessary
    step for all algorithms. It can also generate a very large feature
    matrix that doesn't need to be serialized (even if sparse).
    So we leave one-hot-encoding for the training step.
    
    What about scaling numerical columns? Same as why we don't one hot
    encode here. Do we scale before train-test split?  IMHO, no.  Scaling
    before splitting introduces a type of data leakage.  In addition,
    many estimators are completely immune to the monotonic transformations
    implied by scaling, so why waste the cycles?
    
    TODO: 
        * parallelize where possible
        * more abstraction (more parameters, chain sklearn transformers)
        * convert to marketplace function
        
    :param context:          the function execution context
    :param src:              an artifact or file path
    :param file_ext:         file type for artifacts
    :param models_dest:       label encoders and other preprocessing steps
                             should be saved together with other pipeline
                             models
    :param cleaned_key:      key of cleaned data table in artifact store
    :param encoded_key:      key of encoded data table in artifact store
    """
    df = src.as_df()
    #context.log_dataset(raw_data, df=df, format=file_ext, index=False)
    
    datapreprocess =util.DataPreprocess(df)
    df = datapreprocess.preprocess()

    df = df.fillna(0)
    context.log_dataset(test_enc, df=df, format=file_ext, index=False)

    # would be nice to have a check here on the integrity of all done
    # raw->clean->encoded->clean->raw

In [4]:
# nuclio: end-code


In [5]:
test_data_prep_func = mlrun.code_to_function(name='tstprep',
                                         handler='tstdata_prep',
                                         kind='job',
                                         image='mlrun/ml-models')

In [6]:
test_data_prep_func.save()

'db://widsdb2/tstprep'

In [7]:
widsdb2_proj.set_function(test_data_prep_func)

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f99e3d65c90>

In [8]:
widsdb2_proj.save()

In [9]:
import mlrun
import pandas as pd
from mlrun.run import get_dataitem


test_df = pd.read_csv('UnlabeledWiDS2021.csv', index_col=[0])
#test_id = test_df.encounter_id.values
print(test_df.shape)


#print(df.head())
widsdb2_proj.log_dataset(key='raw_test_data', df=test_df, index=False, format='csv')
#widsdb2_proj.log_dataset(key='rawtest_data', df=test_df, index=False, format='csv')





(10234, 179)


<mlrun.artifacts.dataset.DatasetArtifact at 0x7f99e3ceed10>

In [10]:
from mlrun.platforms import auto_mount
import sys
sys.path.append('/v3io/projects/widsdb2/util')

#Run the nuclio function on cluster
test_data_prep_cl = test_data_prep_func.apply(auto_mount())


test_data_prep_cl = test_data_prep_cl.run( 
                            inputs={"src"   :'store://raw_test_data', 'test-enc': 'test_enc' }, 
                            artifact_path=artifact_path)
 

> 2021-07-01 12:41:13,970 [info] starting run tstprep-tstdata_prep uid=d58de60272ec4ce29b75ac96c63b7297 DB=http://mlrun-api:8080
> 2021-07-01 12:41:14,452 [info] Job is running in the background, pod: tstprep-tstdata-prep-fd2nd
Percent of Nans in Train Data : 61.7
Categorical columns: ['ethnicity', 'gender', 'hospital_admit_source', 'icu_admit_source', 'icu_stay_type', 'icu_type']
Number of unique Profiles : 2460
len lab_col 128
len lab_col_names 32
lab_col_names
 ['calcium', 'mbp', 'diasbp_invasive', 'albumin', 'spo2', 'wbc', 'inr', 'arterial_ph', 'bilirubin', 'resprate', 'potassium', 'hco3', 'diasbp_noninvasive', 'bun', 'hematocrit', 'arterial_po2', 'sysbp_noninvasive', 'hemaglobin', 'sodium', 'glucose', 'heartrate', 'mbp_invasive', 'sysbp', 'temp', 'lactate', 'arterial_pco2', 'pao2fio2ratio', 'creatinine', 'platelets', 'mbp_noninvasive', 'diasbp', 'sysbp_invasive']

> 2021-07-01 12:41:26,514 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
widsdb2,...c63b7297,0,Jul 01 12:41:21,completed,tstprep-tstdata_prep,v3io_user=aruna_lankakind=jobowner=aruna_lankahost=tstprep-tstdata-prep-fd2nd,srctest-enc,,,test_enc


to track results use .show() or .logs() or in CLI: 
!mlrun get run d58de60272ec4ce29b75ac96c63b7297 --project widsdb2 , !mlrun logs d58de60272ec4ce29b75ac96c63b7297 --project widsdb2
> 2021-07-01 12:41:33,846 [info] run executed, status=completed


In [11]:
test_data_prep_run = test_data_prep_func.run( 
                            inputs={"src"   :'store://raw_test_data', 'test-enc': 'test_enc' }, 
                            artifact_path=artifact_path, local=True)
 

> 2021-07-01 12:41:33,859 [info] starting run tstprep-tstdata_prep uid=c1371fec39ff4d3dbf10bfe1bca85c7e DB=http://mlrun-api:8080
Percent of Nans in Train Data : 61.7
Categorical columns: ['ethnicity', 'gender', 'hospital_admit_source', 'icu_admit_source', 'icu_stay_type', 'icu_type']
Number of unique Profiles : 2460
len lab_col 128
len lab_col_names 32
lab_col_names
 ['arterial_ph', 'diasbp_invasive', 'wbc', 'hemaglobin', 'platelets', 'bilirubin', 'bun', 'diasbp_noninvasive', 'calcium', 'mbp', 'heartrate', 'temp', 'sodium', 'sysbp_noninvasive', 'arterial_pco2', 'spo2', 'hco3', 'potassium', 'albumin', 'sysbp_invasive', 'glucose', 'inr', 'resprate', 'mbp_invasive', 'mbp_noninvasive', 'diasbp', 'sysbp', 'lactate', 'hematocrit', 'arterial_po2', 'creatinine', 'pao2fio2ratio']



project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
widsdb2,...bca85c7e,0,Jul 01 12:41:34,completed,tstprep-tstdata_prep,v3io_user=aruna_lankakind=owner=aruna_lankahost=jupyter-svc-68f96fcc4c-wjtq2,srctest-enc,,,test_enc


to track results use .show() or .logs() or in CLI: 
!mlrun get run c1371fec39ff4d3dbf10bfe1bca85c7e --project widsdb2 , !mlrun logs c1371fec39ff4d3dbf10bfe1bca85c7e --project widsdb2
> 2021-07-01 12:41:39,549 [info] run executed, status=completed


In [12]:

test_data_prep_run.outputs['test_enc']


'store://artifacts/widsdb2/tstprep-tstdata_prep_test_enc:c1371fec39ff4d3dbf10bfe1bca85c7e'

In [13]:
dft = mlrun.get_dataitem(test_data_prep_run.outputs['test_enc']).as_df()

In [14]:
dft

Unnamed: 0,age,bmi,elective_surgery,ethnicity,gender,height,hospital_admit_source,icu_admit_source,icu_id,icu_stay_type,...,d1_resprate_div_sysbp_min,d1_lactate_min_div_diasbp_min,d1_heartrate_min_div_d1_sysbp_min,d1_hco3_div,d1_resprate_times_resprate,left_average_spo2,total_chronic,total_cancer_immuno,has_complicator,apache_3j
0,72.0,0.000000,0,2,0,152.4,3,0,82,0,...,0.197368,0.0,0.500000,1.130435,525.0,98.666667,0,0,0,0
1,86.0,0.000000,0,2,0,175.3,2,0,82,0,...,0.215385,0.0,0.861538,1.000000,714.0,98.333333,0,0,0,0
2,72.0,0.000000,0,2,0,162.6,3,1,82,0,...,0.077670,0.0,0.660194,1.032258,392.0,94.333333,0,0,0,0
3,66.0,0.000000,0,2,1,177.8,3,1,82,0,...,0.200000,0.0,0.905263,1.000000,1064.0,92.666667,0,0,0,7
4,89.0,0.000000,0,2,1,170.2,1,0,82,0,...,0.078947,0.0,0.298246,1.100000,441.0,97.666667,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10229,36.0,37.533684,0,2,0,170.1,3,1,1108,0,...,0.064000,0.0,0.400000,1.000000,584.0,99.000000,0,0,0,5
10230,61.0,32.148438,0,2,0,160.0,2,0,1108,0,...,0.072000,0.0,0.528000,1.000000,441.0,98.333333,0,0,0,6
10231,74.0,22.745608,0,2,0,165.1,9,1,1108,0,...,0.103093,0.0,0.484536,1.000000,370.0,97.666667,0,0,0,7
10232,90.0,19.882812,0,2,0,160.0,2,0,1108,0,...,0.193548,0.0,0.655914,1.025641,864.0,98.666667,0,0,0,6


In [15]:
dfa = mlrun.get_dataitem('store://widsdb2/tstprep-tstdata_prep_test_enc').as_df()
dfa.head()




Unnamed: 0,age,bmi,elective_surgery,ethnicity,gender,height,hospital_admit_source,icu_admit_source,icu_id,icu_stay_type,...,d1_resprate_div_sysbp_min,d1_lactate_min_div_diasbp_min,d1_heartrate_min_div_d1_sysbp_min,d1_hco3_div,d1_resprate_times_resprate,left_average_spo2,total_chronic,total_cancer_immuno,has_complicator,apache_3j
0,72.0,0.0,0,2,0,152.4,3,0,82,0,...,0.197368,0.0,0.5,1.130435,525.0,98.666667,0,0,0,0
1,86.0,0.0,0,2,0,175.3,2,0,82,0,...,0.215385,0.0,0.861538,1.0,714.0,98.333333,0,0,0,0
2,72.0,0.0,0,2,0,162.6,3,1,82,0,...,0.07767,0.0,0.660194,1.032258,392.0,94.333333,0,0,0,0
3,66.0,0.0,0,2,1,177.8,3,1,82,0,...,0.2,0.0,0.905263,1.0,1064.0,92.666667,0,0,0,7
4,89.0,0.0,0,2,1,170.2,1,0,82,0,...,0.078947,0.0,0.298246,1.1,441.0,97.666667,0,0,0,0


In [16]:
widsdb2_proj.save()

In [17]:
dfa = mlrun.get_dataitem('store://artifacts/widsdb2/tstprep-tstdata_prep_test_enc').as_df()

