# Pre-process COVID-19 cases per country with data fetched from Delta Lake and SageMaker Processing

<b>This notebook was tested on SageMaker Studio with Python 3 (Data Science) Kernel.</b>

In this notebook, we provide a detailed walk-through on how to package a scikit-learn Docker image for processing job that fetch data from a table on Delta Lake, and aggregate total COVID-19 cases per country. 

## General settings

In [None]:
from sagemaker import get_execution_role, Session, image_uris
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
import sagemaker
import boto3

region = boto3.Session().region_name
role = get_execution_role()
sagemaker_session = Session()

print(region)

In [None]:
bucket = sagemaker.Session().default_bucket()
prefix = "sagemaker/delta-lake-scikit-learn-processing-demo"

print(bucket)

## Build a `scikit-learn` container for running the preprocessing job

Let's start by installing the required package.

You can read more about it in this blog post: https://aws.amazon.com/blogs/machine-learning/using-the-amazon-sagemaker-studio-image-build-cli-to-build-container-images-from-your-studio-notebooks/

In [None]:
!pip install sagemaker-studio-image-build

In [None]:
!sm-docker build ./container/

You should see the following output:

```
Image URI: 0xxxxxxxxxx.dkr.ecr.us-east-1.amazonaws.com/sagemaker-studio-d-xxxxxxxxxxxx:default-1640008606254
```

In [None]:
image_uri='<Image URI output from sm-docker build output>'

## Download profile file

We will download a profile file for the Delta Sharing Server that Databricks are hosting.

In [None]:
profile_file = "https://raw.githubusercontent.com/delta-io/delta-sharing/main/examples/open-datasets.share"

In [None]:
!wget {profile_file}

Typically this file is managed and secured on the client-side. Because our first experiment with Delta Sharing is about reading data from the Databricks server, we can stick with the provided example profile_file on GitHub and retrieve it via HTTP.

To get a better idea of the content and syntax of that file, Let's display it.

In [None]:
!cat open-datasets.share

## Upload profile file to S3

In [None]:
sample_profile_file_url = sagemaker.Session().upload_data(
    "open-datasets.share", bucket=bucket, key_prefix=prefix + "/profile"
)

print(sample_profile_file_url)

## Writing a processing script

This notebook use a file `processing_script.py`, which contains the pre-processing script.

In [None]:
!pygmentize processing_script.py

Note the relevant lines in the training script that create a `SharingClient` and load the table as a Pandas DataFrame:

```
    profile_file = profile_files[0]
    print(f'Found profile file: {profile_file}')

    # Create a SharingClient
    client = delta_sharing.SharingClient(profile_file)
    table_url = profile_file + "#delta_sharing.default.owid-covid-data"

    # Load the table as a Pandas DataFrame
    print('Loading owid-covid-data table from Delta Lake')
    data = delta_sharing.load_as_pandas(table_url)
    print(f'Data shape: {data.shape}')
```

## SageMaker Processing

We will now launch a processing job with the Python SDK.

In [None]:
processor = ScriptProcessor(command=['python3'],
                    image_uri=image_uri,
                    role=role,
                    instance_count=1,
                    instance_type='ml.m5.large')

In [None]:
processor.run(code='processing_script.py',
                    inputs=[ProcessingInput(
                        source=sample_profile_file_url,
                        destination='/opt/ml/processing/profile/')],
                    outputs=[ProcessingOutput(
                        output_name='delta_lake_processed_data',
                        source='/opt/ml/processing/processed_data/')]
             )

## Inspect the results of the processing job

In [None]:
preprocessing_job_description = processor.jobs[-1].describe()
output_config = preprocessing_job_description['ProcessingOutputConfig']
output_config

In [None]:
for output in output_config['Outputs']:
    if output['OutputName'] == 'delta_lake_processed_data':
        delta_lake_processed_data_file = output['S3Output']['S3Uri']
        bucket = delta_lake_processed_data_file.split("/")[:3][2]
        output_file_name = '/'.join(delta_lake_processed_data_file.split("/")[3:])+"/total_cases_per_location.csv"

In [None]:
output_file_name

In [None]:
s3 = boto3.client('s3')

In [None]:
data = s3.get_object(Bucket=bucket, Key=output_file_name)
content = data['Body'].read()
content.decode("utf-8")