# Chapter 3: Data preparation at scale using Amazon SageMaker Data Wrangler and Amazon SageMaker Processing

In this notebook we'll perform the following steps:

* Create a table in the Glue catalog for our data steps
* Run a SageMaker Processing job to prepare the full data set

You need to define the following variables:

* `s3_bucket`: Bucket with the data set
* `glue_db_name`: Glue database name
* `glue_tbl_name`: Glue table name
* `s3_prefix_parquet`: Location of the Parquet tables in the S3 bucket
* `s3_output_prefix`: Location to store the prepared data in the S3 bucket
* `s3_prefix`: Location of the JSON data in the S3 bucket


## Glue Catalog

In [None]:
s3_bucket = 'MyBucket'
glue_db_name = 'MyDatabase'
glue_tbl_name = 'openaq'
s3_prefix = 'openaq/realtime'
s3_prefix_parquet = 'openaq/realtime-parquet-gzipped/tables'
s3_output_prefix = 'prepared'

import boto3
s3 = boto3.client('s3')

In [None]:
glue = boto3.client('glue')
response = glue.create_database(
    DatabaseInput={
        'Name': glue_db_name,
    }
)

In [None]:
response = glue.create_table(
    DatabaseName=glue_db_name,
    TableInput={
        'Name': glue_tbl_name,
        'StorageDescriptor': {
            'Columns': [
                {
                    "Name": "date",
                    "Type": "struct<utc:string,local:string>"
                },
                {
                    "Name": "parameter",
                    "Type": "string"
                },
                {
                    "Name": "location",
                    "Type": "string"
                },
                {
                    "Name": "value",
                    "Type": "double"
                },
                {
                    "Name": "unit",
                    "Type": "string"
                },
                {
                    "Name": "city",
                    "Type": "string"
                },
                {
                    "Name": "attribution",
                    "Type": "array<struct<name:string,url:string>>"
                },
                {
                    "Name": "averagingperiod",
                    "Type": "struct<value:double,unit:string>"
                },
                {
                    "Name": "coordinates",
                    "Type": "struct<latitude:double,longitude:double>"
                },
                {
                    "Name": "country",
                    "Type": "string"
                },
                {
                    "Name": "sourcename",
                    "Type": "string"
                },
                {
                    "Name": "sourcetype",
                    "Type": "string"
                },
                {
                    "Name": "mobile",
                    "Type": "boolean"
                }
            ],
            'Location': 's3://' + s3_bucket + '/' + s3_prefix + '/',
            'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
            'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
            'Compressed': False,
            'SerdeInfo': {
                'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
                "Parameters": {
                    "paths": "attribution,averagingPeriod,city,coordinates,country,date,location,mobile,parameter,sourceName,sourceType,unit,value"
                }
            },
            'Parameters': {
                "classification": "json",
                "compressionType": "none",
            },
            'StoredAsSubDirectories': False,
        },
        'PartitionKeys': [
            {
                "Name": "aggdate",
                "Type": "string"
            },
        ],
        'TableType': 'EXTERNAL_TABLE',
        'Parameters': {
            "classification": "json",
            "compressionType": "none",
        }
        
    }
)

In [None]:
partitions_to_add = []
response = s3.list_objects_v2(
    Bucket=s3_bucket,
    Prefix=s3_prefix + '/'
)
for r in response['Contents']:
    partitions_to_add.append(r['Key'])
while response['IsTruncated']:
    token = response['NextContinuationToken']
    response = s3.list_objects_v2(
        Bucket=s3_bucket,
        Prefix=s3_prefix,
        ContinuationToken=token
    ) 
    for r in response['Contents']:
        partitions_to_add.append(r['Key'])
    if response['IsTruncated']:
        oken = response['NextContinuationToken']
    print("Getting next batch")

In [None]:
print(f"Need to add {len(partitions_to_add)} partitions")

In [None]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

In [None]:
def get_part_def(p):
    part_value = p.split('/')[-2]
    return {
                'Values': [
                    part_value
                ],
                'StorageDescriptor': {
                    'Columns': [
                        {
                            "Name": "date",
                            "Type": "struct<utc:string,local:string>"
                        },
                        {
                            "Name": "parameter",
                            "Type": "string"
                        },
                        {
                            "Name": "location",
                            "Type": "string"
                        },
                        {
                            "Name": "value",
                            "Type": "double"
                        },
                        {
                            "Name": "unit",
                            "Type": "string"
                        },
                        {
                            "Name": "city",
                            "Type": "string"
                        },
                        {
                            "Name": "attribution",
                            "Type": "array<struct<name:string,url:string>>"
                        },
                        {
                            "Name": "averagingperiod",
                            "Type": "struct<value:double,unit:string>"
                        },
                        {
                            "Name": "coordinates",
                            "Type": "struct<latitude:double,longitude:double>"
                        },
                        {
                            "Name": "country",
                            "Type": "string"
                        },
                        {
                            "Name": "sourcename",
                            "Type": "string"
                        },
                        {
                            "Name": "sourcetype",
                            "Type": "string"
                        },
                        {
                            "Name": "mobile",
                            "Type": "boolean"
                        }
                    ],
                    'Location': f"s3://{s3_bucket}/{s3_prefix}/{part_value}/",
                    'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
                    'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
                    'Compressed': False,
                    'SerdeInfo': {
                        'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
                        "Parameters": {
                            "paths": "attribution,averagingPeriod,city,coordinates,country,date,location,mobile,parameter,sourceName,sourceType,unit,value"
                        }
                    },
                    'StoredAsSubDirectories': False
                },
                'Parameters': {
                    "classification": "json",
                    "compressionType": "none",
                },


            }

In [None]:
for batch in chunks(partitions_to_add, 100):
    response = glue.batch_create_partition(
        DatabaseName=glue_db_name,
        TableName=glue_tbl_name,
        PartitionInputList=[get_part_def(p) for p in batch]
    )

## Processing Job

In [None]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

In [None]:
from sagemaker.spark.processing import PySparkProcessor

spark_processor = PySparkProcessor(
    base_job_name="spark-preprocessor",
    framework_version="3.0",
    role=role,
    instance_count=15,
    instance_type="ml.m5.4xlarge",
    max_runtime_in_seconds=7200,
)

configuration = [
    {
    "Classification": "spark-defaults",
    "Properties": {"spark.executor.memory": "18g", 
                   "spark.yarn.executor.memoryOverhead": "3g",
                   "spark.driver.memory": "18g",
                   "spark.yarn.driver.memoryOverhead": "3g",
                   "spark.executor.cores": "5", 
                   "spark.driver.cores": "5",
                   "spark.executor.instances": "44",
                   "spark.default.parallelism": "440",
                   "spark.dynamicAllocation.enabled": "false"
                  },
    },
    {
    "Classification": "yarn-site",
    "Properties": {"yarn.nodemanager.vmem-check-enabled": "false", 
                   "yarn.nodemanager.mmem-check-enabled": "false"},
    }
]

spark_processor.run(
    submit_app="scripts/preprocess.py",
    submit_jars=["s3://crawler-public/json/serde/json-serde.jar"],
    arguments=['--s3_input_bucket', s3_bucket,
               '--s3_input_key_prefix', s3_prefix_parquet,
               '--s3_output_bucket', s3_bucket,
               '--s3_output_key_prefix', s3_output_prefix],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(s3_bucket, 'sparklogs'),
    logs=True,
    configuration=configuration
)