<h1>Data exploration, preprocessing and feature engineering</h1>

In this and the following notebooks we will demonstrate how you can build your ML Pipeline leveraging Spark Feature Transformers and SageMaker XGBoost algorithm & after the model is trained, deploy the Pipeline (Feature Transformer and XGBoost) as a SageMaker Inference Pipeline behind a single Endpoint for real-time inference.

In particular, in this notebook we will tackle the first steps related to data exploration and preparation. We will use [Amazon Athena](https://aws.amazon.com/athena/) to query our dataset and have a first insight about data quality and available features, and [AWS Glue](https://aws.amazon.com/glue/) to create a Data Catalog and run serverless Spark jobs.

<span style="color: red"><strong>To get started, in the cell below please replace your initials in the bucket_name variable, in order to match the bucket name you've created in the previous steps.</strong></span>

In [None]:
import boto3
import sagemaker
import time

role = sagemaker.get_execution_role()
region = boto3.Session().region_name

print(region)
print(role)
 
# replace [your-initials] according to the bucket name you have defined.
bucket_name = 'endtoendml-workshop-[your-initials]'

print(bucket_name)

We can now copy to our bucket the dataset used for this use case. We will use the `windturbine_raw_data.csv` made available for this workshop in the `gianpo-public` public S3 bucket. In this Notebook, we will download from that bucket and upload to your bucket so that AWS Glue can access the data.

In [None]:
import boto3

s3 = boto3.resource('s3')

file_key = 'data/raw/windturbine_raw_data.csv'
copy_source = {
    'Bucket': 'gianpo-public',
    'Key': 'endtoendml/{0}'.format(file_key)
}

s3.Bucket(bucket_name).object_versions.delete()
s3.Bucket(bucket_name).copy(copy_source, file_key)

The first thing we need now is to infer a schema for our dataset. Thanks to its [integration with AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html), we will later use Amazon Athena to run SQL queries against our data stored in S3 without the need to import them into a relational database. To do so, Amazon Athena uses the AWS Glue Data Catalog as a central location to store and retrieve table metadata throughout an AWS account. The Athena execution engine, indeed, requires table metadata that instructs it where to read data, how to read it, and other information necessary to process the data.

To organize our Glue Data Catalog we create a new database named `endtoendml-db`. To do so, we create a Glue client via Boto and invoke the `create_database` method.

However, first we want to make sure these AWS resources to not exist yet to avoid any error.

In [None]:
glue_client = boto3.client('glue')

# Trying to remove any existing database, crawler and job with the same name.

crawler_found = True
try:
    glue_client.get_crawler(Name = 'endtoendml-crawler')
except glue_client.exceptions.EntityNotFoundException:
    crawler_found = False

db_found = True
try:
    glue_client.get_database(Name = 'endtoendml-db')
except glue_client.exceptions.EntityNotFoundException:
    db_found = False
    
job_found = True
try:
    glue_client.get_job(JobName = 'endtoendml-job')
except glue_client.exceptions.EntityNotFoundException:
    job_found = False

if crawler_found:
    glue_client.delete_crawler(Name = 'endtoendml-crawler')
if db_found:
     glue_client.delete_database(Name = 'endtoendml-db')
if job_found:
     glue_client.delete_job(JobName = 'endtoendml-job')

print("Cleanup completed.")

In [None]:
response = glue_client.create_database(DatabaseInput={'Name': 'endtoendml-db'})
response = glue_client.get_database(Name='endtoendml-db')
response
assert response['Database']['Name'] == 'endtoendml-db'

Now we define a Glue Crawler that we point to the S3 path where the dataset resides, and the crawler creates table definitions in the Data Catalog.
To grant the correct set of access permission to the crawler, we use one of the roles created before (`GlueServiceRole-endtoendml`) whose policy grants AWS Glue access to data stored in your S3 buckets.

In [None]:
response = glue_client.create_crawler(
    Name='endtoendml-crawler',
    Role='service-role/GlueServiceRole-endtoendml', 
    DatabaseName='endtoendml-db',
    Targets={'S3Targets': [{'Path': '{0}/data/raw/'.format(bucket_name)}]}
)

We are ready to run the crawler with the `start_crawler` API and to monitor its status upon completion through the `get_crawler_metrics` API.

In [None]:
glue_client.start_crawler(Name='endtoendml-crawler')

while glue_client.get_crawler_metrics(CrawlerNameList=['endtoendml-crawler'])['CrawlerMetricsList'][0]['TablesCreated'] == 0:
    print('RUNNING')
    time.sleep(15)
    
assert glue_client.get_crawler_metrics(CrawlerNameList=['endtoendml-crawler'])['CrawlerMetricsList'][0]['TablesCreated'] == 1


When the crawler has finished its job, we can retrieve the Table definition for the newly created table.
As you can see, the crawler has been able to correctly identify 12 fields, infer a type for each column and assign a name.

In [None]:
table = glue_client.get_table(DatabaseName='endtoendml-db', Name='raw')
table

Based on our knowledge of the dataset, we can assign more specific names to columns.

In [None]:
table['Table']['StorageDescriptor']['Columns'] = [{'Name': 'turbine_id', 'Type': 'string'},
                                                  {'Name': 'turbine_type', 'Type': 'string'},
                                                  {'Name': 'wind_speed', 'Type': 'double'},
                                                  {'Name': 'rpm_blade', 'Type': 'double'},
                                                  {'Name': 'oil_temperature', 'Type': 'double'},
                                                  {'Name': 'oil_level', 'Type': 'double'},
                                                  {'Name': 'temperature', 'Type': 'double'},
                                                  {'Name': 'humidity', 'Type': 'double'},
                                                  {'Name': 'vibrations_frequency', 'Type': 'double'},
                                                  {'Name': 'pressure', 'Type': 'double'},
                                                  {'Name': 'wind_direction', 'Type': 'string'},
                                                  {'Name': 'breakdown', 'Type': 'string'}]
updated_table = table['Table']
updated_table.pop('DatabaseName', None)
updated_table.pop('CreateTime', None)
updated_table.pop('UpdateTime', None)
updated_table.pop('CreatedBy', None)
updated_table.pop('IsRegisteredWithLakeFormation', None)

glue_client.update_table(
    DatabaseName='endtoendml-db',
    TableInput=updated_table
)

<h2>Data exploration with Amazon Athena</h2>

For data exploration, let's install PyAthena, a Python client for Amazon Athena. Note: PyAthena is not maintained by AWS, please visit: https://pypi.org/project/PyAthena/ for additional information.

In [None]:
!pip install pyathena

In [None]:
import pyathena
from pyathena import connect
import pandas as pd

athena_cursor = connect(s3_staging_dir='s3://{0}/staging/'.format(bucket_name), 
                        region_name=region).cursor()

athena_cursor.execute('SELECT * FROM "endtoendml-db".raw limit 8;')
pd.read_csv(athena_cursor.output_location)

Another SQL query to count how many records we have

In [None]:
athena_cursor.execute('SELECT COUNT(*) FROM "endtoendml-db".raw;')
pd.read_csv(athena_cursor.output_location)

Let's try to see what are possible values for the field "breakdown" and how frequently they occur over the entire dataset

In [None]:
athena_cursor.execute('SELECT breakdown, (COUNT(breakdown) * 100.0 / (SELECT COUNT(*) FROM "endtoendml-db".raw)) \
            AS percent FROM "endtoendml-db".raw GROUP BY breakdown;')
pd.read_csv(athena_cursor.output_location)

In [None]:
athena_cursor.execute('SELECT breakdown, COUNT(breakdown) AS bd_count FROM "endtoendml-db".raw GROUP BY breakdown;')
df = pd.read_csv(athena_cursor.output_location)

%matplotlib inline
import matplotlib.pyplot as plt

plt.bar(df.breakdown, df.bd_count)

We have discovered that the dataset is quite unbalanced, although we are not going to try balancing it.

In [None]:
athena_cursor.execute('SELECT DISTINCT(turbine_type) FROM "endtoendml-db".raw')
pd.read_csv(athena_cursor.output_location)

In [None]:
athena_cursor.execute('SELECT COUNT(*) FROM "endtoendml-db".raw WHERE oil_temperature IS NULL GROUP BY oil_temperature')
pd.read_csv(athena_cursor.output_location)

We also realized there are a few null values that need to be managed during the data preparation steps.

For the purpose of keeping the data exploration step short during the workshop, we are not going to execute additional queries. However, feel free to explore the dataset more if you have time.

**Note**: you can go to Amazon Athena console and check for query duration under History tab: usually queries are executed in a few seconds, then it some time for Pandas to load results into a dataframe

<h2>Preprocessing and Feature Engineering with AWS Glue</h2>

The preprocessing and feature engineering code is implemented in the endtoendml_etl.py file. You can go through the code and see that several categorical columns required indexing and one-hot encoding.
Once the Spark ML Pipeline fit() and transform() is done, we are splitting our dataset into 80-20 train & validation as part of the script and uploading to S3 so that it can be used with XGBoost for training.

In [None]:
!pygmentize endtoendml_etl.py

For our job, we will also have to pass MLeap dependencies to Glue. MLeap is an additional library we are using which does not come bundled with default Spark. (https://github.com/combust/mleap). MLeap is used here to serialize the SparkML model and create a bundle that will be used later for inference.

Similar to most of the packages in the Spark ecosystem, MLeap is also implemented as a Scala package with a front-end wrapper written in Python so that it can be used from PySpark. We need to make sure that the MLeap Python library as well as the JAR is available within the Glue job environment. In the following cell, we will download the MLeap Python dependency & JAR from a SageMaker hosted bucket and upload to the S3 bucket we created above in your account.

In [None]:
!wget -nc https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
!wget -nc https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

In [None]:
s3.Bucket(bucket_name).upload_file('python.zip', 'dependencies/python/python.zip')
s3.Bucket(bucket_name).upload_file('mleap_spark_assembly.jar', 'dependencies/jar/mleap_spark_assembly.jar')

We will be uploading the endtoendml_etl.py script to S3 now so that Glue can use it to run the PySpark job. Then, we create the AWS Glue job definition, which includes the S3 paths for the extra dependencies defined above.

In [None]:
s3.Bucket(bucket_name).upload_file('endtoendml_etl.py', 'code/endtoendml_etl.py')

ETLJob = glue_client.create_job(Name='endtoendml-job', 
                                Role='GlueServiceRole-endtoendml',
                                Command={
                                    'Name': 'glueetl',
                                    'ScriptLocation': 's3://{0}/code/endtoendml_etl.py'.format(bucket_name)
                                },
                               DefaultArguments={
                                   '--job-language': 'python',
                                   '--extra-jars' : 's3://{0}/dependencies/jar/mleap_spark_assembly.jar'.format(bucket_name),
                                   '--extra-py-files': 's3://{0}/dependencies/python/python.zip'.format(bucket_name),
                                   '--enable-continuous-cloudwatch-log': 'true',
                                   '--enable-continuous-log-filter': 'true'
                               })
glue_job_name = ETLJob['Name']
print(glue_job_name)

Finally, we can **start** our preprocessing and feature engineering job.

<div style="color: red"><strong>Note: running the job has been commented out since it requires around 15 minutes to be executed. For the purpose of saving time executing this workshop, the outputs of the job (preprocessed files and Feature Transformer SparkML model) are provided in a S3 bucket and can be copied to the expected target locations, just by executing the code below.</strong></div>
<div style="color: red">Alternatively, you can skip running next cell and uncomment the code in the subsequent cells, in order to run the job from scratch.</div>

In [None]:
import boto3

s3 = boto3.resource('s3')

file_key = 'data/preprocessed/train/part-00000'
copy_source = {
    'Bucket': 'gianpo-public',
    'Key': 'endtoendml/{0}'.format(file_key)
}
s3.Bucket(bucket_name).copy(copy_source, file_key)

file_key = 'data/preprocessed/val/part-00000'
copy_source = {
    'Bucket': 'gianpo-public',
    'Key': 'endtoendml/{0}'.format(file_key)
}
s3.Bucket(bucket_name).copy(copy_source, file_key)

file_key = 'output/sparkml/model.tar.gz'
copy_source = {
    'Bucket': 'gianpo-public',
    'Key': 'endtoendml/{0}'.format(file_key)
}
s3.Bucket(bucket_name).copy(copy_source, file_key)

In [None]:
#JobRun = glue_client.start_job_run(JobName=glue_job_name, 
#                                  Arguments = {'--S3_BUCKET': bucket_name})
#print(JobRun)

# Running the job will take around 15 minutes

In [None]:
#status = glue_client.get_job_run(JobName=ETLJob['Name'], RunId=JobRun['JobRunId'])
#while status['JobRun']['JobRunState'] not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
#    print('Job status: ' + status['JobRun']['JobRunState'])
#    time.sleep(30)
#    status = glue_client.get_job_run(JobName=ETLJob['Name'], RunId=JobRun['JobRunId'])

#print(status['JobRun']['JobRunState'])

After the preprocessing and feature engineering are completed, you can move to the next notebook in the **03_train_model** folder to start model training.