# EMR Data Prep + SageMaker Deep Learning

This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR clsuter with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.

In this 3 part notebook lesson, we'll see how to use EMR for data prep and serialization to S3. Next we'll prototype a deep learning architecture using SageMaker Studio notebooks, and lastly we'll scale the training using SageMaker ephemeral training jobs.

In [None]:
# %load_ext sagemaker_studio_analytics_extension.magics
# %sm_analytics emr connect --cluster_id j-xxxxxxxxxxxx --auth-type None 

## Inspect the public NYC Taxi Dataset

In [None]:
%%local
!aws s3 ls "s3://nyc-tlc/trip data/green" --human-readable | grep green_tripdata_2016

In [None]:
df = spark.read.csv("s3://nyc-tlc/trip data/green_tripdata_2016*.csv", header=True, inferSchema=True, timestampFormat='yyyy-MM-dd HH:mm:ss').cache()
df.count()

## Format the dataset

In [None]:
%%pretty
from pyspark.sql.functions import col, dayofweek, month, hour
df_dt = df.select(dayofweek(col('lpep_pickup_datetime')).alias('day_of_week'),
                   month(col('lpep_pickup_datetime')).alias('month'),
                   hour(col('lpep_pickup_datetime')).alias('hour'),
                   col("Pickup_latitude").alias("pickup_latitude"),
                   col("Pickup_longitude").alias("pickup_longitude"),
                   col("Dropoff_latitude").alias("dropoff_latitude"),
                   col("Dropoff_latitude").alias("dropoff_longitude"),
                   col("Trip_distance").alias("trip_distance"),
                   col("Fare_amount").alias("fare_amount")
                  )
df_dt.show()

## Run Data Clean Up at Scale on the Cluster

In [None]:
df_dt = df_dt[
    (df_dt.fare_amount > 0)
    & (df_dt.fare_amount < 200)    
]
df_dt.count()

In [None]:
%%pretty
df_dt = df_dt[
    (df_dt.pickup_latitude != 0)    
]
df_dt.show()
df_dt.count()

In [None]:
train_df, val_df = df_dt.randomSplit([0.8, 0.2], seed=42)
val_df, test_df = val_df.randomSplit([0.05, 0.95], seed=42)

print("Train Count:", train_df.count())
print("Validation Count:", val_df.count())
print("Test Count:", test_df.count())

In [None]:
%%local 
import sagemaker

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sess.default_bucket()

data_bucket = f"{bucket}/nyc-taxi/data/processed"
print(data_bucket)

In [None]:
%%send_to_spark -i data_bucket -t str -n data_bucket

In [None]:
train_df.write.csv(f"s3://{data_bucket}/train", mode='overwrite')
test_df.write.csv(f"s3://{data_bucket}/test", mode='overwrite')

## Store data location for next notebook

In [None]:
%store data_bucket