### Data Preparation Notebook for DengAI on AzureML
This notebook contains the initial AzureML setup process for the DengAI as well as the programmtic components for DS operations on Azure. This includes Workspace, experiment, pipeline, and endpoint management processes 

#### Workspace Setup

In [61]:
#Create workspace configuration file - one time code for this project - assumes workspace already exists
from azureml.core import Workspace
ws=Workspace.get(name='ENTER WS NAME HERE',subscription_id='ENTER SUBSCRIPTION HERE',resource_group='ENTER RG NAME HERE')

#write out a config file
ws.write_config(file_name="ws_config.json")

print(ws.name,'Config written as ws_config.json')


Azure-ML-WS Config written as ws_config.json


In [62]:
#Load the workspace from the config file
from azureml.core import Workspace

ws = Workspace.from_config(path='.azureml/ws_config.json')
print(ws.name, "loaded")

Azure-ML-WS loaded


#### Data Upload, Initial Prep., and Dataset Creation
Create datasets for the initial data files for the competition. These files will then be used for further processing in prep to run experiments and data analysis,
##### Upload data to datastore

In [16]:
#Get a list of current datastores in the workspace
default_ds=ws.get_default_datastore()

for ds_name in ws.datastores:
    print(ds_name, "- Default =", ds_name == default_ds.name)

azureml_globaldatasets - Default = False
workspacefilestore - Default = False
workspaceblobstore - Default = True


In [17]:
#Upload the source competition files to the datastore
default_ds.upload_files(files=['inputdata/dengue_features_train.csv', 'inputdata/dengue_features_test.csv'], # Upload the diabetes csv files
                       target_path='dengueAI/inputdata', # Put it in a folder path in the datastore
                       overwrite=True, # Replace existing files of the same name
                       show_progress=True)
#Create a data_ref object for the file location
data_ref = default_ds.path('dengueAI/inputdata').as_download(path_on_compute='inputdata')
print(data_ref)

Uploading an estimated of 2 files
Uploading inputdata/dengue_features_test.csv
Uploaded inputdata/dengue_features_test.csv, 1 files out of an estimated total of 2
Uploading inputdata/dengue_features_train.csv
Uploaded inputdata/dengue_features_train.csv, 2 files out of an estimated total of 2
Uploaded 2 files
$AZUREML_DATAREFERENCE_3db657dad66c43a9acdc149ee6edbc24


##### Create and Register Datasets from Raw Data Files

In [63]:
#set up variable to contain input data folder
inputdata_folder='inputdata'

In [64]:
%%writefile $inputdata_folder/create_raw_datasets.py

#Import needed libraries
from azureml.core import Dataset

#Create datasets for the raw training and holdout/test file
ws = Workspace.from_config(path='.azureml/ws_config.json')
default_ds = ws.get_default_datastore()

default_ds.upload_files(files=['inputdata/dengue_features_train.csv'],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_train_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/dengue_features_train.csv'))

# Register the tabular dataset
try:
    tab_train_ds = tab_train_ds.register(workspace=ws, 
                            name='dengue-features-train-ds',
                            description='Raw dengue feature training data',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)

    
default_ds.upload_files(files=['inputdata/dengue_features_test.csv'],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_test_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/dengue_features_test.csv'))

# Register the tabular dataset
try:
    tab_test_ds = tab_test_ds.register(workspace=ws, 
                            name='dengue-features-test-ds',
                            description='Raw dengue feature test/holdout data',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)


Overwriting inputdata/create_raw_datasets.py


##### Initial Data Prep and Dataset Creation
Do initial data prep to create a single, clean copy of the training and holdout data, and then create a dataset for the result. 


In [65]:
%%writefile $inputdata_folder/create_clean_datasets.py

from azureml.core import Workspace,Datastore,Dataset,Run
import pandas as pd
import numpy as np
import argparse
import os

#Set run context
run=Run.get_context()

# Get PipelineData argument
parser = argparse.ArgumentParser()
parser.add_argument('--folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

#Get the datasets from the input params
df_i=run.input_datasets['dengue_train'].to_pandas_dataframe()
df_h=run.input_datasets['dengue_test'].to_pandas_dataframe()

#interpolate missing values
df_i.interpolate(inplace=True,method='linear',limit_direction='forward')
df_h.interpolate(inplace=True,method='linear',limit_direction='forward')

#Add a total cases column to the holdout set
df_h['total_cases']=0

#split by city
df_sj=df_i[df_i['city']=='sj']
df_iq=df_i[df_i['city']=='iq']

df_sj_h=df_h[df_h['city']=='sj']
df_iq_h=df_h[df_h['city']=='iq']

len_sj=len(df_sj)
len_sj_h=len(df_sj_h)

#concat training and holdout data
df_sj=df_sj.append(df_sj_h)

#Get cumulative totals at various intervals - past 4 to past 25 weeks
for i in range(2,25):
    df_sj['cum_rain_prior_'+str(i)+'_wks']=df_sj['precipitation_amt_mm'].rolling(i).sum()

for i in range(2,25):
    df_sj['avg_min_temp_prior_'+str(i)+'_wks']=df_sj['station_min_temp_c'].rolling(i).mean()
    
for i in range(2,25):
    df_sj['avg_max_temp_prior_'+str(i)+'_wks']=df_sj['station_max_temp_c'].rolling(i).mean()
    
for i in range(2,25):
    df_sj['avg_specific_humidity_prior_'+str(i)+'_wks']=df_sj['reanalysis_specific_humidity_g_per_kg'].rolling(i).mean()
    
for i in range(2,25):
    df_sj['avg_relative_humidity_prior_'+str(i)+'_wks']=df_sj['reanalysis_relative_humidity_percent'].rolling(i).mean()

#split the files back apart
df_sj_h=df_sj.iloc[len_sj:]
df_sj=df_sj.iloc[:len_sj]

#remove the first 25 rows of the input file because of the rolling numbers and the nan's they create
df_sj=df_sj.iloc[25:,:]

#Begin IQ
#Break out source file and holdout file by city
len_iq=len(df_iq)
len_iq_h=len(df_iq_h)

#concat training and holdout data
df_iq=df_iq.append(df_iq_h)

#Get cumulative rainfall totals at various accumulations - past 4 to past 25 weeks
for i in range(2,25):
    df_iq['cum_rain_prior_'+str(i)+'_wks']=df_iq['precipitation_amt_mm'].rolling(i).sum()

for i in range(2,25):
    df_iq['avg_min_temp_prior_'+str(i)+'_wks']=df_iq['station_min_temp_c'].rolling(i).mean()
    
for i in range(2,25):
    df_iq['avg_max_temp_prior_'+str(i)+'_wks']=df_iq['station_max_temp_c'].rolling(i).mean()
    
for i in range(2,25):
    df_iq['avg_specific_humidity_prior_'+str(i)+'_wks']=df_iq['reanalysis_specific_humidity_g_per_kg'].rolling(i).mean()
    
for i in range(2,25):
    df_iq['avg_relative_humidity_prior_'+str(i)+'_wks']=df_iq['reanalysis_relative_humidity_percent'].rolling(i).mean()

#split the files back apart
df_iq_h=df_iq.iloc[len_iq:]
df_iq=df_iq.iloc[:len_iq]

#remove the first 25 rows of the input file because of the rolling numbers
df_iq=df_iq.iloc[25:,:]

#reconstructe df's
df_all=df_sj.append(df_iq)
df_holdout=df_sj_h.append(df_iq_h)
df_holdout.drop(columns=['total_cases'],inplace=True)

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
train_output_path = os.path.join(output_folder, 'dengue_train_all.csv')
df_all.to_csv(train_output_path,index=False)
test_output_path = os.path.join(output_folder, 'dengue_holdout_all.csv')
df_holdout.to_csv(test_output_path,index=False)

##Create datasets for the raw training and holdout/test file
#Get the default data store
ws=run.experiment.workspace
default_ds = ws.get_default_datastore()

default_ds.upload_files(files=[train_output_path],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_train_all_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/dengue_train_all.csv'))


# Register the tabular dataset
try:
    tab_train_all_ds = tab_train_all_ds.register(workspace=ws, 
                            name='dengue-features-train-all-ds',
                            description='Cleaned dengue feature training data',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)

    

default_ds.upload_files(files=[test_output_path],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_holdout_all_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/dengue_holdout_all.csv'))

# Register the tabular dataset
try:
    tab_holdout_all_ds = tab_holdout_all_ds.register(workspace=ws, 
                            name='dengue-features-holdout-all-ds',
                            description='Cleaned dengue feature test/holdout data',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)

run.complete()

Overwriting inputdata/create_clean_datasets.py


##### Data Pre-processing
Create a script to pre-process the data into datasets for each city ready for city-specific lagging, scaling, and model training

In [66]:
%%writefile $inputdata_folder/create_city_datasets.py

from azureml.core import Workspace,Datastore,Dataset,Run
import pandas as pd
import numpy as np

#Set run context
run=Run.get_context()

# Get PipelineData argument
parser = argparse.ArgumentParser()
parser.add_argument('--folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

#create a dataframe for each dataset, train and holdout
df_a=pd.read_csv(output_folder+'/dengue_train_all.csv')
df_h=pd.read_csv(output_folder+'/dengue_holdout_all.csv')

#create a diferrent df for each city
df_sj=df_a[df_a['city']=='sj']
df_sj_h=df_h[df_h['city']=='sj']
df_iq=df_a[df_a['city']=='iq']
df_iq_h=df_h[df_h['city']=='iq']

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
train_sj_output_path = os.path.join(output_folder, 'train_all_sj.csv')
df_sj.to_csv(train_sj_output_path,index=False)
test_sj_output_path = os.path.join(output_folder, 'holdout_all_sj.csv')
df_sj_h.to_csv(test_sj_output_path,index=False)

train_iq_output_path = os.path.join(output_folder, 'train_all_iq.csv')
df_iq.to_csv(train_iq_output_path,index=False)
test_iq_output_path = os.path.join(output_folder, 'holdout_all_iq.csv')
df_iq_h.to_csv(test_iq_output_path,index=False)

#upload and create datasets
#Get the default data store
ws=run.experiment.workspace
default_ds = ws.get_default_datastore()

default_ds.upload_files(files=[train_sj_output_path],
                        target_path='dengueAI/inputdata',
                        overwrite=True, 
                        show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_train_all_sj_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/train_all_sj.csv'))

#Register the tabular dataset for sj train_all
try:
    tab_train_all_sj_ds = tab_train_all_sj_ds.register(workspace=ws, 
                            name='dengue-train-all-sj-ds',
                            description='Lagged feature training data for sj',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)

    
default_ds.upload_files(files=[test_sj_output_path],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_holdout_all_sj_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/holdout_all_sj.csv'))

#Register the tabular dataset for sj holdout all
try:
    tab_holdout_all_sj_ds = tab_holdout_all_sj_ds.register(workspace=ws, 
                            name='dengue-holdout-all-sj-ds',
                            description='Lagged dengue feature test/holdout data for sj',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)
    

#Create upload for train iq
default_ds.upload_files(files=[train_iq_output_path],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_train_all_iq_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/train_all_iq.csv'))

#Register the tabular dataset for train iq
try:
    tab_train_all_iq_ds = tab_train_all_iq_ds.register(workspace=ws, 
                            name='dengue-train-all-iq-ds',
                            description='Lagged feature training data for iq',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)

    
#Create upload for test iq
default_ds.upload_files(files=[test_iq_output_path],
                    target_path='dengueAI/inputdata',
                    overwrite=True, 
                    show_progress=True)

#Create a tabular dataset from the path on the datastore for the file
tab_holdout_all_iq_ds = Dataset.Tabular.from_delimited_files(path=(default_ds, 'dengueAI/inputdata/holdout_all_iq.csv'))

# Register the tabular dataset
try:
    tab_holdout_all_iq_ds = tab_holdout_all_iq_ds.register(workspace=ws, 
                            name='dengue-holdout-all-iq-ds',
                            description='Lagged dengue feature test/holdout data for iq',
                            tags = {'format':'CSV'},
                            create_new_version=True)
    print('Dataset registered.')
except Exception as ex:
    print(ex)

run.complete()

Overwriting inputdata/create_city_datasets.py


#### Create Compute Environment for Data Prep Pipeline
##### Compute Cluster

In [67]:
#Create a compute cluster if it does not exist
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core import Workspace

ws = Workspace.from_config(path='.azureml/ws_config.json')

cluster_name = "DS-Comp-Cluster"

try:
    #Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    #If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


##### Python Environment on the Cluster

In [68]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
dengue_env = Environment("dengue-pipeline-env")
dengue_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
dengue_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
dengue_packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]','keras','tensorflow'])

# Add the dependencies to the environment
dengue_env.python.conda_dependencies = dengue_packages

# Register the environment (just in case you want to use it again)
dengue_env.register(workspace=ws)
registered_env = Environment.get(ws, 'dengue-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


#### Create and Run the Data Prep Pipeline

In [69]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

#Get the dataset for the initial data files
dengue_train_ds = ws.datasets.get('dengue-features-train-ds')
dengue_test_ds = ws.datasets.get('dengue-features-test-ds')

#Create a PipelineData
ws = Workspace.from_config(path='.azureml/ws_config.json')
data_store=ws.get_default_datastore()
dengueAI=PipelineData('dengueAI',datastore=data_store)

#Step 1, clean the raw datasets
create_clean_datasets = PythonScriptStep(name = 'Create Clean Datasets',
                                source_directory = inputdata_folder,
                                script_name = 'create_clean_datasets.py',
                                arguments = ['--folder', dengueAI],
                                inputs=[dengue_train_ds.as_named_input('dengue_train'),dengue_test_ds.as_named_input('dengue_test')],
                                outputs=[dengueAI],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

#Step 2, clean the raw datasets
create_city_datasets = PythonScriptStep(name = 'Create City Datasets',
                                source_directory = inputdata_folder,
                                script_name = 'create_city_datasets.py',
                                arguments = ['--folder', dengueAI],
                                inputs=[dengueAI],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)


print("Pipeline steps defined")

Pipeline steps defined


##### Create Experiment for the Data Prep Pipeline to Run In

In [70]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

#Construct the pipeline
pipeline_steps=[create_clean_datasets,create_city_datasets]
pipeline = Pipeline(workspace=ws,steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment=Experiment(workspace=ws, name='dengue-dataprep-pipeline')
pipeline_run=experiment.submit(pipeline,regenerate_outputs=True)
print("Pipeline submitted for execution.")
pipeline_run.wait_for_completion(show_output=False)

Pipeline is built.
Created step Create Clean Datasets [fabaa2f1][f673f654-ffa9-4e00-b841-d1a593975994], (This step will run and generate new outputs)Created step Create City Datasets [f383a5da][1c2e6f86-9a65-4164-b114-b529c6695fc8], (This step will run and generate new outputs)

Submitted PipelineRun 1bffeb40-611f-49d9-a9c3-782bd47ed288
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/dengue-dataprep-pipeline/runs/1bffeb40-611f-49d9-a9c3-782bd47ed288?wsid=/subscriptions/fd2d8de8-17e1-4976-9906-fdde487edd5f/resourcegroups/AzureML-Learning/workspaces/Azure-ML-WS
Pipeline submitted for execution.
PipelineRunId: 1bffeb40-611f-49d9-a9c3-782bd47ed288
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/dengue-dataprep-pipeline/runs/1bffeb40-611f-49d9-a9c3-782bd47ed288?wsid=/subscriptions/fd2d8de8-17e1-4976-9906-fdde487edd5f/resourcegroups/AzureML-Learning/workspaces/Azure-ML-WS
{'runId': '1bffeb40-611f-49d9-a9c3-782bd47ed288', 'status': 'Completed', 

'Finished'