## Prerequisites

### Get the raw data
- The data used in this example is taken from the [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page), which was downloaded using the tools provided by https://github.com/toddwschneider/nyc-taxi-data. These tools can be used to download the entire dataset from scratch, or to update an existing dataset.
- Upload the dataset to an S3 bucket, the folder structure should look something like this:

```
$ aws s3 ls s3://BUCKET/csv/ --human-readable
                           PRE unaltered/
2020-10-13 13:47:21  364.0 KiB central_park_weather.csv
2020-10-13 13:47:21   45.7 KiB fhv_bases.csv
2020-10-13 13:47:21   81.8 MiB fhv_tripdata_2015-01.csv
2020-10-13 13:47:21   93.3 MiB fhv_tripdata_2015-02.csv
...
2020-10-13 13:51:20    1.2 GiB fhvhv_tripdata_2019-02.csv
2020-10-13 13:51:20    1.4 GiB fhvhv_tripdata_2019-03.csv
...
2020-10-13 13:53:16    1.1 MiB green_tripdata_2013-08.csv
2020-10-13 13:53:17    7.3 MiB green_tripdata_2013-09.csv
...
2020-10-13 13:54:32    2.4 GiB yellow_tripdata_2009-01.csv
2020-10-13 13:54:32    2.2 GiB yellow_tripdata_2009-02.csv
...
2020-10-13 14:21:57   30.2 MiB yellow_tripdata_2020-05.csv
2020-10-13 14:21:58   47.9 MiB yellow_tripdata_2020-06.csv
```

## Submit a SageMaker Processing job

### Create the processing script

In [None]:
!mkdir -p ./src/

In [None]:
%%writefile ./src/preprocess.py
import argparse
import pyspark.sql.functions as F

from pyspark.sql import SparkSession, DataFrame


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--s3_input_prefix", type=str)
    parser.add_argument("--s3_output_prefix", type=str)
    args = parser.parse_args()
    
    s3_input_prefix = args.s3_input_prefix.rstrip("/").lstrip("s3://")
    s3_output_prefix = args.s3_output_prefix.rstrip("/").lstrip("s3://")
   
    # Build the spark session
    spark = SparkSession.builder \
                        .appName("SparkProcessor") \
                        .getOrCreate()
        
    # Read the raw input csv from S3 
    sdf_fhv = spark.read.csv(f"s3://{s3_input_prefix}/fhv_tripdata_*.csv",
                             header=True, inferSchema=True)
    sdf_fhvhv = spark.read.csv(f"s3://{s3_input_prefix}/fhvhv_tripdata_*.csv",
                               header=True, inferSchema=True)
    sdf_green = spark.read.csv(f"s3://{s3_input_prefix}/green_tripdata_*.csv",
                               header=True, inferSchema=True)
    sdf_yellow = spark.read.csv(f"s3://{s3_input_prefix}/yellow_tripdata_*.csv",
                               header=True, inferSchema=True)
            
    sdf0 = sdf_fhv.select(F.col("Pickup_date").alias("dt_pickup")) \
                  .unionAll(sdf_fhvhv.select(F.col("pickup_datetime").alias("dt_pickup"))) \
                  .unionAll(sdf_green.select(F.col("lpep_pickup_datetime").alias("dt_pickup"))) \
                  .unionAll(sdf_yellow.select(F.col("Trip_Pickup_DateTime").alias("dt_pickup")))
    
    # Generate ride-counts at 15-minute intervals
    sdf1 = \
        sdf0.groupBy(F.window("dt_pickup", "15 minutes")) \
            .agg(F.count("*").alias("num_rides")) \
            .withColumn("timestamp", F.col("window.start")) \
            .drop("window")

    # Add time-based features
    sdf2 = \
        sdf1.select("*",
                    (F.minute("timestamp") + F.hour("timestamp") * 60).alias("minutes"),
                    (F.dayofweek("timestamp")).alias("dow"),
                    (F.month("timestamp")).alias("month"),
                    (F.dayofyear("timestamp")).alias("doy"))
    
    # Write features to parquet
    sdf2.write \
        .option("header", "true") \
        .parquet(f"s3://{s3_output_prefix}/X.parquet", mode="overwrite")
    
    return


if __name__ == "__main__":
    main()

### Run the processing job

In [None]:
import sagemaker

from sagemaker.spark.processing import PySparkProcessor

BUCKET = "YOUR-BUCKET-NAME"

sm_session = sagemaker.Session()
role = sagemaker.get_execution_role()

In [None]:
processor = PySparkProcessor(
    framework_version="2.4",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
    sagemaker_session=sm_session
)

In [None]:
configuration = [{
    "Classification": "spark-defaults",
    "Properties": {"spark.executor.memory": "4g"},
}]

# This takes ~45mins to complete using 8 x ml.m5.xlarge instances
processor.run(
    submit_app="src/preprocess.py",
    arguments=["--s3_input_prefix", f"s3://{BUCKET}/csv/",
               "--s3_output_prefix", f"s3://{BUCKET}/parquet/"],
    configuration=configuration,
    logs=True
)