## Covid19 Projection using SageMaker

The purpose of this notebook is to run Covid-19 case projections at State or Country levels. The outcome is the projection of the total confirmed cases for the target geography. Please refer to covid19_simulator.ipynb for more details.

We appreciate that users might not have the required CPU and memory to run the simulation-related operations locally, hence we are providing this notebook in addition to the core simulation notebook (covid19_simulator.ipynb), so that users can off-load the compute and memory heavy operations to Amazon SageMaker, a cloud based ML platform from AWS

We'll use SageMaker Processing to push a processing script to a SageMaker managed container created from a user provided docker image. So we'll start by creating the docker image with all the required python libraries and our custom python modules and helper scripts. Once the docker image is built, we'll push them to Amazon Elastic Container Registry (ECR) service so that SageMaker can use it to locate and launch the container from this image.

Note: interventions_scorer_sagemaker.ipynb should be excuted prior to running the projections on new data

## Dockerize the core simulation modules and push the image to ECR

In [1]:
%%sh

# The name of our algorithm
algorithm_name='covid19-simulation'


account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-east-1}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build  -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}

docker push ${fullname}

Login Succeeded
Sending build context to Docker daemon  85.98MB
Step 1/21 : FROM ubuntu:16.04
 ---> dfeff22e96ae
Step 2/21 : RUN apt-get update
 ---> Using cache
 ---> 665014b1a950
Step 3/21 : RUN apt-get -y install software-properties-common python-software-properties
 ---> Using cache
 ---> fa225ddb5091
Step 4/21 : RUN add-apt-repository ppa:deadsnakes/ppa
 ---> Using cache
 ---> 2ec27d406b28
Step 5/21 : RUN apt-get update
 ---> Using cache
 ---> 3ca4a66260ff
Step 6/21 : RUN apt-get install --fix-missing -y wget curl unzip python3.6
 ---> Using cache
 ---> d108f0247c76
Step 7/21 : RUN wget https://bootstrap.pypa.io/get-pip.py
 ---> Using cache
 ---> 432a95efbbf8
Step 8/21 : RUN curl https://bootstrap.pypa.io/get-pip.py | python3.6
 ---> Using cache
 ---> 78a4b35a4909
Step 9/21 : RUN apt purge -y python2.7-minimal
 ---> Using cache
 ---> 71a215f0efb7
Step 10/21 : RUN ln -s /usr/bin/python3.6 /usr/bin/python
 ---> Using cache
 ---> d8c8d672db87
Step 11/21 : RUN apt-get install libgomp1

https://docs.docker.com/engine/reference/commandline/login/#credentials-store



## Create a ScriptProcessor object 

In [2]:
import sagemaker
import boto3
import shutil
from time import gmtime, strftime
from sagemaker.processing import ScriptProcessor, ProcessingInput

account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

sagemaker_session = sagemaker.Session()
role = 'covid19_sagemaker_exec' #sagemaker.get_execution_role()
ecr_repository = 'covid19-simulation'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
covid_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)

In [3]:
covid_processor = ScriptProcessor(base_job_name='covid19-simulation',
                                  image_uri=covid_repository_uri,
                                  command=['python'],
                                  role=role,
                                  instance_count=1,
                                  instance_type='ml.r5.xlarge',
                                  max_runtime_in_seconds=1200,
                                  env={'mode': 'python'})

## Refresh latest data in local project

In [4]:
import sys
import os
import urllib
sys.path.insert(1, 'src')
import config
import state_data_loader
import country_data_loader

# Set this flag to True if you want to download the latest COVID19 cases data from respective web sources
LOAD_LATEST_DATA = True

if LOAD_LATEST_DATA:
    
    # Function to refresh the local data file with the latest version from the web
    def download_latest_data (url, local_file):
        with urllib.request.urlopen(url) as response, open(local_file, 'wb') as out_file:
            data = response.read() # a `bytes` object
            out_file.write(data)
    
    # Mapping of online vs offline file locations to refresh
    online_offline_data = list()
    # Confirmed cases data maintained by Johns Hopkins University
    online_offline_data.append((config.confirmed_cases_global_online, 
                                os.path.join(config.base_data_dir, config.confirmed_cases_global_offline)))
    # Recovered cases data maintained by Johns Hopkins University
    online_offline_data.append((config.recovered_cases_global_online, 
                                os.path.join(config.base_data_dir, config.recovered_cases_global_offline)))
    # Deceased cases data maintained by Johns Hopkins University
    online_offline_data.append((config.deceased_cases_global_online, 
                                os.path.join(config.base_data_dir, config.deceased_cases_global_offline)))
    # Indian states specific cases maintained by COVID19INDIA (www.covid19india.org)
    online_offline_data.append((config.india_states_cases_online, 
                                os.path.join(config.base_data_dir, config.india_states_cases_offline)))
    
    # Refresh the local data files with the latest versions from respective web sources
    for path_pair in online_offline_data:
        try:
            download_latest_data (path_pair[0], path_pair[1])
            print ('Downloaded latest data from: {}'.format(path_pair[0]))
        except Exception as e:
            print ('Error while downloading {}: {}'.format(path_pair[0], e.__class__))
    
    # Transform and write the coutry specific data for further processing
    country_data_loader.load()
    
    target_states = ['KA', 'KL', 'MH', 'GJ', 'WB']
    # Transform and write the Indian states specific data for further processing
    state_data_loader.load('India', target_states)

Downloaded latest data from: https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv
Downloaded latest data from: https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv
Downloaded latest data from: https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv
Downloaded latest data from: https://api.covid19india.org/csv/latest/state_wise_daily.csv
Storing data for country: Afghanistan  * * * * * * * * * *
Storing data for country: Algeria  * * * * * * * * * *
Storing data for country: Argentina  * * * * * * * * * *
Storing data for country: Armenia  * * * * * * * * * *
Storing data for country: Australia  * * * * * * * * * *
Storing data for country: Austria  * * * * * * * * * *
Storing data for country: Azerbaija

## Upload data to your s3 bucket

In [5]:
s3 = boto3.resource('s3')
def copy_to_s3(local_file, s3_path, override=False):
    assert s3_path.startswith('s3://')
    split = s3_path.split('/')
    bucket = split[2]
    path = '/'.join(split[3:])
    buk = s3.Bucket(bucket)
    
    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print('File already exists.\nSet override to upload anyway.\n')
            return
        else:
            print('Overwriting existing file')
    with open(local_file, 'rb') as data:
        print('Uploading file to {}'.format(s3_path))
        buk.put_object(Key=path, Body=data)
        
        

In [6]:
# A sample simulation for the Indian state of Kerala
state = 'KL'
LOAD_STATE_DATA = False

#set your S3 bucket name here
bucket_name = 'covid19-sim-dummy'
input_prefix = 'covid19'
local_path = '../data/input/Cases_{}.csv'.format(state)
s3_data_path = 's3://{}/{}/input.tar.gz'.format(bucket_name, input_prefix, state)
print(s3_data_path)

shutil.make_archive('input', 'gztar', 'data/input')

copy_to_s3('input.tar.gz', s3_data_path, override=True)

s3://covid19-sim-dummy/covid19/input.tar.gz
Overwriting existing file
Uploading file to s3://covid19-sim-dummy/covid19/input.tar.gz


# Create the simulation initiation script
This script is the entry point to the simulation process.

In [7]:
%%writefile simulation_enabler.py

import sys
import os
import csv
import tarfile
import shutil
import inspect
sys.path.insert(1, '/opt/program')

import config
config.sagemaker_run = True
config.base_data_dir = config.base_data_dir_sagemaker
config.base_output_dir = config.base_output_dir_sagemaker 

import state_data_loader
import country_data_loader
import simulation_orchestrator

if __name__=='__main__':
    shutil.unpack_archive(os.path.join(config.base_data_dir,'input.tar.gz'), config.base_data_dir,'gztar')
    
    # Convert command line args into a map of args
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    
    country_level_projection_flag = True if args['country_level_projection']=='True' else False   
    
    simulation_orchestrator.run(args['country_code'], args['state'], int(args['state_population']), 
                                int(args['actual_testing_capacity']), int(args['future_projection_days']),  
                                country_level_projection=country_level_projection_flag)
    

Writing simulation_enabler.py


## Start the Simulation on SageMaker
You start the simulation by calling the <i>run</i> function on the <i>ScriptProcessor</i> object and passing the <i>simulation_orchestrator.py</i> script along with other parameters like input and output locations

In [8]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
import shutil

# A sample simulation for the Indian state of Kerala
country_level_projection = False
projection_days = 180
country_code, state, state_population, actual_testing_capacity = 'IND', 'KL', 33406061, 6000

out_prefix = 'covid-19-out'

s3_out_path = 's3://{}/{}'.format(bucket_name, out_prefix)

covid_processor.run(code='simulation_enabler.py',
                        inputs=[ProcessingInput(
                        source=s3_data_path,
                        input_name='input.tar.gz',
                        destination='/opt/ml/processing/input')], 
                        outputs=[ProcessingOutput(output_name='param_file',
                                                source='/opt/ml/processing/param',
                                                destination=''),
                        ProcessingOutput(output_name='simulation_output',
                                                source='/opt/ml/processing/out',
                                                destination=s3_out_path)],
                        arguments=['country_code', country_code, 'state', state, 'state_population', str(state_population), 
                                    'actual_testing_capacity', str(actual_testing_capacity), 'future_projection_days', str(projection_days), 
                                    'country_level_projection', str(country_level_projection)], #, '--country_projection', country_projection]
                        logs=True
                     )

preprocessing_job_description = covid_processor.jobs[-1].describe()

preprocessing_job_description


Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  covid19-simulation-2020-10-30-12-29-20-729
Inputs:  [{'InputName': 'input.tar.gz', 'S3Input': {'S3Uri': 's3://covid19-sim-dummy/covid19/input.tar.gz', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-161150306912/covid19-simulation-2020-10-30-12-29-20-729/input/code/simulation_enabler.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'param_file', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-161150306912/covid19-simulation-2020-10-30-12-29-20-729/output/param_file', 'LocalPath': '/opt/ml/processing/param', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'simulation_output', 'S3Output': {'S3Uri': 's3://covid19-sim-dummy/covid-19-out', 'LocalPa

{'ProcessingInputs': [{'InputName': 'input.tar.gz',
   'S3Input': {'S3Uri': 's3://covid19-sim-dummy/covid19/input.tar.gz',
    'LocalPath': '/opt/ml/processing/input',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'code',
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-161150306912/covid19-simulation-2020-10-30-12-29-20-729/input/code/simulation_enabler.py',
    'LocalPath': '/opt/ml/processing/input/code',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'param_file',
    'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-161150306912/covid19-simulation-2020-10-30-12-29-20-729/output/param_file',
     'LocalPath': '/opt/ml/processing/param',
     'S3UploadMode': 'EndOfJob'}},
   {'OutputName': 'simulation_output',
    'S3Output': {