# Data Preparation at Scale Using Amazon SageMaker Data Wrangler and Processing

#### The smaller version of our dataset (covering 1 month) is about 5 GB of data. We can analyze that dataset on a modern workstation without too much difficulty. But what about the full dataset, which is closer to 500 GB? If we want to prepare the full dataset, we need to work with horizontally scalable cluster computing frameworks. Furthermore, activities such as encoding categorical variables can take quite some time if we use inefficient processing frameworks.

## Exporting the flow

#### Data Wrangler is very handy when we want to quickly explore a dataset. But we can also export the results of a flow into Amazon SageMaker Feature Store, generate a SageMaker pipeline, create a Data Wrangler job, or generate Python code. We will not use these capabilities now, but feel free to experiment with them.

## Data preparation at scale with SageMaker Processing

#### Now let's turn our attention to preparing the entire dataset. At 500 GB, it's too large to process using sklearn on a single EC2 instance. We will write a SageMaker processing job that uses Spark ML for data preparation. (Alternatively, you can use Dask, but at the time of writing, SageMaker Processing does not provide a Dask container out of the box.)
#### The Processing Job part of this chapter's notebook walks you through launching the processing job. Note that we'll use a cluster of 15 EC2 instances to run the job

In [None]:
spark_processor = PySparkProcessor(
    base_job_name=”spark-preprocessor”,
    framework_version=”3.0”,
    role=role,
    instance_count=15,
    instance_type=”ml.m5.4xlarge”,
    max_runtime_in_seconds=7200,
)


In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
    VectorAssembler,
    VectorIndexer,
    MinMaxScaler
)

spark = SparkSession \
    .builder \
    .appName("rdtest") \
    .enableHiveSupport() \
    .getOrCreate()
    

df = spark.sql("SELECT * from rdtest2.rdtest2 where aggdate like '2018-01-%'")
df.head(3)

# Drop columns
df = df.drop('date').drop('unit').drop('attribution').drop('averagingperiod').drop('coordinates')
df.head(3)


# Mobile field to int
from pyspark.sql.functions import col
df = df.withColumn("ismobile",col("mobile").cast(IntegerType())).drop('mobile')
df.head(3)


# scale value
from pyspark.ml.feature import StandardScaler

value_assembler = VectorAssembler(inputCols=["value"], 
                            outputCol="value_vec")
value_scaler = StandardScaler(inputCol="value_vec", outputCol="value_scaled")
value_pipeline = Pipeline(stages=[value_assembler, value_scaler])
value_model = value_pipeline.fit(df)
xform_df = value_model.transform(df)
xform_df.head(3)

# featurize date
from pyspark.sql.functions import unix_timestamp, to_date
xform_df = xform_df.withColumn('aggdt', 
                   to_date(unix_timestamp(col('aggdate'), 'yyyy-MM-dd').cast("timestamp")))
xform_df.head(3)

from pyspark.sql.functions import year, month, quarter, date_format
xform_df = xform_df.withColumn('year',year(xform_df.aggdt)).withColumn('month',month(xform_df.aggdt)).withColumn('quarter',quarter(xform_df.aggdt))
xform_df = xform_df.withColumn("day", date_format(col("aggdt"), "d"))
xform_df.head(3)

# Automatically assign good/bad labels

from pyspark.sql.functions import udf

def isBadAir(v, p):
    if p == 'pm10':
        if v > 50:
            return 1
        else:
            return 0
    elif p == 'pm25':
        if v > 25:
            return 1
        else:
            return 0
    elif p == 'so2':
        if v > 20:
            return 1
        else:
            return 0
    elif p == 'no2':
        if v > 200:
            return 1
        else:
            return 0
    elif p == 'o3':
        if v > 100:
            return 1
        else:
            return 0
    else:
        return 0

isBadAirUdf = udf(isBadAir, IntegerType())

xform_df = xform_df.withColumn('isBadAir', isBadAirUdf('value', 'parameter'))
xform_df.head(3)

from pyspark.ml.feature import OneHotEncoderEstimator

parameter_indexer = StringIndexer(inputCol="parameter", outputCol="indexed_parameter")
location_indexer = StringIndexer(inputCol="location", outputCol="indexed_location")
city_indexer = StringIndexer(inputCol="city", outputCol="indexed_city")
country_indexer = StringIndexer(inputCol="country", outputCol="indexed_country")
sourcename_indexer = StringIndexer(inputCol="sourcename", outputCol="indexed_sourcename")
sourcetype_indexer = StringIndexer(inputCol="sourcetype", outputCol="indexed_sourcetype")

enc_est = OneHotEncoderEstimator(inputCols=["indexed_parameter"],
                                 outputCols=["vec_parameter"])

enc_pipeline = Pipeline(stages=[parameter_indexer, location_indexer, city_indexer, country_indexer, sourcename_indexer, sourcetype_indexer, enc_est])
enc_model = enc_pipeline.fit(xform_df)
enc_df = enc_model.transform(xform_df)
enc_df.head(3)

param_cols = enc_df.schema.fields[17].metadata['ml_attr']['vals']
param_cols

final_df = enc_df.drop('parameter').drop('location').drop('city').drop('country').drop('sourcename').drop('sourcetype').drop('aggdate') \
    .drop('value_vec').drop('aggdt').drop('indexed_parameter')
#final_df.head(3)

from pyspark.sql.types import DoubleType

firstelement=udf(lambda v:str(v[0]),StringType())
final_df = final_df.withColumn('value_str', firstelement('value_scaled'))
final_df = final_df.withColumn("value",final_df.value_str.cast(DoubleType())).drop('value_str').drop('value_scaled')
#final_df.head(3)


from pyspark.ml.linalg import Vectors

def extract(row):
    return (row.value, row.ismobile, row.year, row.month, row.quarter, row.day, row.isBadAir, 
            row.indexed_location, row.indexed_city, row.indexed_sourcename, 
            row.indexed_sourcetype) + tuple(row.vec_parameter.toArray().tolist())

final_df = final_df.rdd.map(extract).toDF(["value", "ismobile", "year", "month", "quarter", "day", "isBadAir",
                               "location", "city", "sourcename", "sourcetype"] + param_cols[:-1])


(train_df, validation_df, test_df) = final_df.randomSplit([0.7, 0.2, 0.1])

train_df.write.option("header",True).csv("s3://rdtest-data/prepared/train/")
test_df.write.option("header",True).csv("s3://rdtest-data/prepared/test/")
validation_df.write.option("header",True).csv("s3://rdtest-data/prepared/validation/")

#### While there are many ways to use these tools, we recommend using Data Wrangler for interactive exploration of small to mid-sized datasets. For processing large datasets in their entirety, switch to programmatic use of processing jobs using the Spark framework to take advantage of parallel processing. (At the time of writing, Data Wrangler does not support running on multiple instances, but you can run a processing job on multiple instances.) You can always export a Data Wrangler flow as a starting point.

#### If your dataset is many terabytes, consider running a Spark job directly in EMR or Glue and invoking SageMaker using the SageMaker Spark SDK. EMR and Glue have optimized Spark runtimes and more efficient integration with S3 storage.