# **ML-Ops | DC Taxi Fares**

## 00 - Data Preprocessing



### Downloading the DC taxi dataset

`gdown` is a Python utility for downloading files stored in Google Drive. The bash script in the following cell iterates through a collection of Google Drive identifiers that match files `taxi_2015.zip` through `taxi_2019.zip` stored in a shared Google Drive. This script uses these files instead of the original files from https://opendata.dc.gov/search?categories=transportation&q=taxi&type=document%20link since the originals cannot be easily downloaded using a bash script.

In [None]:
%%bash
pip install gdown
for ID in '1yF2hYrVjAZ3VPFo1dDkN80wUV2eEq65O'\
          '1Z7ZVi79wKEbnc0FH3o0XKHGUS8MQOU6R'\
          '1I_uraLKNbGPe3IeE7FUa9gPfwBHjthu4'\
          '1MoY3THytktrxam6_hFSC8kW1DeHZ8_g1'\
          '1balxhT6Qh_WDp4wq4OsG40iwqFa86QgW'
do

  gdown --id $ID

done

### Unziping the dataset

The script in the following cell unzips the downloaded dataset files to the `dctaxi` subdirectory in the current directory of the notebook.

The `-o` flag used by the `unzip` command overwrites existing files in case we execute the next cell more than once.


In [None]:
%%bash

mkdir -p dctaxi

for YEAR in '2015' \
            '2016' \
            '2017' \
            '2018' \
            '2019'
do

  unzip -o taxi_$YEAR.zip -d dctaxi

done

### Reporting on the disk space used by the dataset files

The next cell reports on the disk usage (`du`) by the files from the DC taxi dataset. All of the files in the dataset have the `taxi_` prefix.

Since the entire output of the `du` command lists the disk usage of all of the files, the `tail` command is used to limit the output to just the last line. We can remove the tail command we wish to report on the disk usage by the individual files in the dataset.



In [None]:
!du -cha --block-size=1MB dctaxi/taxi_*.txt | tail -n 1

11987	total


### Scanning the dataset documentation

The dataset includes a `README_DC_Taxicab_trip.txt` file with a brief documentation about the dataset contents. We will run the next cell and take a moment to review the documentation, focusing on the schema used by the dataset.



In [None]:
%%bash
cat dctaxi/README_DC_Taxicab_trip.txt

TITLE: Taxicab Trip Information

ABSTRACT:  DC Taxi trips from January 2015 to June 2019.  The zip file contains zip files of pipe (|) delimeted text files for trips by month.  The record counts are:

Month			Count
5/2015		1,397,102 
6/2015		1,470,466 
7/2015		1,401,792 
8/2015		1,129,707 
9/2015		1,308,445 
10/2015		1,487,133 
11/2015	 	  993,502 
12/2015		1,081,726 
1/2016	 	  922,338 
2/2016		1,194,698 
3/2016		1,404,639 
4/2016		1,369,882 
5/2016		1,323,155 
6/2016		1,282,651 
7/2016		1,109,118 
8/2016		  949,650 
9/2016		1,203,388 
10/2016		1,027,036 
11/2016		1,011,673 
12/2016	 	  898,993 
1/2017	  	  901,807 
2/2017	 	  949,578 
3/2017		1,237,621
4/2017		1,185,859
5/2017		1,188,944
6/2017		1,182,526
7/2017		  993,834
8/2017     	  813,513
9/2017		  902,795
10/2017    	1,056,864
11/2017    	  867,709
12/2017    	  730,679
1/2018     	  686,993
2/2018     	  750,363
3/2018     	  999,048
4/2018     	1,040,147
5/2018     	  984,593
6/2018 

### Previewing the dataset

We run the next cell to confirm that the dataset consists of pipe (|) separated values organized according to the schema described by the documentation. The `taxi_2015_09.txt` file used in the next cell was picked arbitrarily, just to illustrate the dataset.

In [None]:
!head dctaxi/taxi_2015_09.txt

OBJECTID|TRIPTYPE|PROVIDERNAME|FAREAMOUNT|GRATUITYAMOUNT|SURCHARGEAMOUNT|EXTRAFAREAMOUNT|TOLLAMOUNT|TOTALAMOUNT|PAYMENTTYPE|ORIGINCITY|ORIGINSTATE|ORIGINZIP|DESTINATIONCITY|DESTINATIONSTATE|DESTINATIONZIP|MILEAGE|DURATION|ORIGIN_BLOCK_LATITUDE|ORIGIN_BLOCK_LONGITUDE|ORIGIN_BLOCKNAME|DESTINATION_BLOCK_LATITUDE|DESTINATION_BLOCK_LONGITUDE|DESTINATION_BLOCKNAME|AIRPORT|ORIGINDATETIME_TR|DESTINATIONDATETIME_TR
6482604|1|Transco|3.25|0.00|0.25|0.00|0.00|3.50|1|Washington|DC|20002|Washington|DC|20002|0|1|38.897204|-77.008388|1 - 99 BLOCK OF MASSACHUSETTS AVENUE NE|38.896039|-77.007142|40 - 49 BLOCK OF COLUMBUS CIRCLE NE|N|09/01/2015 00:00|09/01/2015 00:00
6482605|1|Transco|7.30|1.89|0.25|0.00|0.00|9.44|1|Washington|DC|20005|Washington|DC|20009|2|7|38.904692|-77.034573|1100 - 1199 BLOCK OF 15TH STREET NW|38.921623|-77.042365|2400 - 2499 BLOCK OF 18TH STREET NW|N|09/01/2015 00:00|09/01/2015 00:00
6482606|1|Transco|5.68|0.00|0.25|0.00|0.00|5.93|2||NULL|00000||NULL|00000|1|3|||||||N|09/01/201

### Downloading and install AWS CLI



In [None]:
%%bash
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip -o awscliv2.zip
sudo ./aws/install

### Configuring AWS credentials and region
The contents of the next cell should be modified to specify AWS credentials as strings.

**This will soon be replaced with a more secure and automated way of handling secrets*

If you see the following exception:

`TypeError: str expected, not NoneType`

It means that you did not specify the credentials correctly.

In [None]:
import os
# *** REPLACE None in the next 2 lines with your AWS key values ***
os.environ['AWS_ACCESS_KEY_ID'] = None
os.environ['AWS_SECRET_ACCESS_KEY'] = None

Run the next cell to validate your credentials.

In [None]:
%%bash
aws sts get-caller-identity

In [None]:
# *** REPLACE None in the next line with your AWS region ***
os.environ['AWS_DEFAULT_REGION'] = None

If you have specified the region correctly, the following cell should return back the region that you have specifies.

In [None]:
%%bash
echo $AWS_DEFAULT_REGION

### Creating unique bucket ID

We will use the bash `$RANDOM` pseudo-random number generator and the first 32 characters of the `md5sum` output to produce a unique bucket ID.

In [None]:
BUCKET_ID = !echo (echo $RANDOM | md5sum | cut -c -32)
os.environ['BUCKET_ID'] = next(iter(BUCKET_ID))
os.environ['BUCKET_ID']

The next cell saves the contents of the `BUCKET_ID` environment variable to a `BUCKET_ID` file as a backup.



In [None]:
val = os.environ['BUCKET_ID']
%store val > BUCKET_ID
!cat BUCKET_ID

### Downloading the BUCKET_ID file

Ensure that you have a backup copy of the `BUCKET_ID` file created by the previous cell before proceeding. The contents of the `BUCKET_ID` file are going to be reused later in this notebook and in the other notebooks.

### Creatubg an AWS bucket


     

In [None]:
%%bash
aws s3api create-bucket --bucket dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION \
--create-bucket-configuration LocationConstraint=$AWS_DEFAULT_REGION

You can return back the name of the bucket by running the following cell:

In [None]:
!echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION

You can also use the AWS CLI list-buckets command to print out all the buckets that exist in your AWS account, however the printed names will not show the `s3://` prefix:

In [None]:
!aws s3api list-buckets

### Upload the DC taxi dataset to AWS S3

Synchronize the contents of the `dctaxi` directory (where you unzipped the dataset) to the `csv` folder in the S3 bucket you just created.

In [None]:
%%bash
aws s3 sync \
  --exclude 'README*' \
  dctaxi/ s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/

We can check whether the aws sync command completed successfully, by listing the contents of the newly created bucket.

In [None]:
!aws s3 ls --recursive --summarize --human-readable s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/ | tail -n 2


### Creating AWS role and policy to allow Glue to access the S3 bucket

In [None]:
%%bash
aws iam detach-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole && \
aws iam delete-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-name GlueBucketPolicy && \
aws iam delete-role --role-name AWSGlueServiceRole-dc-taxi

aws iam create-role --path "/service-role/" --role-name AWSGlueServiceRole-dc-taxi --assume-role-policy-document '{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "glue.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}'

aws iam attach-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole

aws iam put-role-policy --role-name AWSGlueServiceRole-dc-taxi --policy-name GlueBucketPolicy --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3:::dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/*"
            ]
        }
    ]
}'

### Creating an AWS Glue database

We create the **Glue metadata database** named `dc_taxi_db`, which is going to be used to store a schema for the
DC taxi data set along with a table based on the schema.

In [None]:
%%bash
aws glue delete-database --name dc_taxi_db 2> /dev/null

aws glue create-database --database-input '{
  "Name": "dc_taxi_db"
}'

aws glue get-database --name 'dc_taxi_db'

### Creating an AWS Glue crawler

Glue is as an umbrella name for a toolkit of different AWS services that you can use to
prepare your data set for analysis.

We will save the results of crawling the S3 bucket with the DC taxi dataset to the AWS Glue database created in the previous cell.

In [None]:
%%bash
aws glue delete-crawler --name dc-taxi-csv-crawler 2> /dev/null

aws glue create-crawler \
  --name dc-taxi-csv-crawler \
  --database-name dc_taxi_db \
  --table-prefix dc_taxi_ \
  --role $( aws iam get-role \
              --role-name AWSGlueServiceRole-dc-taxi \
              --query 'Role.Arn' \
              --output text ) \
   --targets '{
  "S3Targets": [
    {
      "Path": "s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/csv"
    }]
}'

aws glue start-crawler --name dc-taxi-csv-crawler

### Checking the status of the AWS Glue crawler



In [None]:
%%bash
aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.State' --output text

Poll the crawler state every minute to wait for it to finish.

In [None]:
%%bash
printf "Waiting for crawler to finish..."
until echo "$(aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.State' --output text)" | grep -q "READY"; do
   sleep 60
   printf "..."
done
printf "done\n"

### Finding out the last known status of the AWS Glue crawler



In [None]:
!aws glue get-crawler --name dc-taxi-csv-crawler --query 'Crawler.LastCrawl'

### Describe the table created by the AWS Glue crawler



In [None]:
!aws glue get-table --database-name dc_taxi_db --name dc_taxi_csv

### Creating a PySpark job to convert CSV to Parquet



In [None]:
%%writefile dctaxi_csv_to_parquet.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME',
                                     'BUCKET_SRC_PATH',
                                     'BUCKET_DST_PATH',
									                    'DST_VIEW_NAME'])

BUCKET_SRC_PATH = args['BUCKET_SRC_PATH']
BUCKET_DST_PATH = args['BUCKET_DST_PATH']
DST_VIEW_NAME = args['DST_VIEW_NAME']

sc = SparkContext()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = ( spark.read.format("csv")
		.option("header", True)
		.option("inferSchema", True)
    .option("multiLine", True)
		.option("delimiter", "|")
		.load(f"{BUCKET_SRC_PATH}") )

df.createOrReplaceTempView(f"{DST_VIEW_NAME}")

query_df = spark.sql(f"""
SELECT

  CAST(fareamount AS DOUBLE) AS fareamount_double,
  CAST(fareamount AS STRING) AS fareamount_string,

  CAST(origindatetime_tr AS STRING) AS origindatetime_tr,

  CAST(origin_block_latitude AS DOUBLE) AS origin_block_latitude_double,
  CAST(origin_block_latitude AS STRING) AS origin_block_latitude_string,

  CAST(origin_block_longitude AS DOUBLE) AS origin_block_longitude_double,
  CAST(origin_block_longitude AS STRING) AS origin_block_longitude_string,

  CAST(destination_block_latitude AS DOUBLE) AS destination_block_latitude_double,
  CAST(destination_block_latitude AS STRING) AS destination_block_latitude_string,

  CAST(destination_block_longitude AS DOUBLE) AS destination_block_longitude_double,
  CAST(destination_block_longitude AS STRING) AS destination_block_longitude_string,

  CAST(mileage AS DOUBLE) AS mileage_double,
  CAST(mileage AS STRING) AS mileage_string

FROM {DST_VIEW_NAME}""".replace('\n', ''))

query_df.write.parquet(f"{BUCKET_DST_PATH}", mode="overwrite")

job.commit()

### Copying the PySpark job code to the S3 bucket

In [None]:
%%bash
aws s3 cp dctaxi_csv_to_parquet.py s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/glue/
aws s3 ls s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/glue/dctaxi_csv_to_parquet.py

### Creating and starting the PySpark job

In [None]:
%%bash
aws glue delete-job --job-name dc-taxi-csv-to-parquet-job 2> /dev/null

aws glue create-job \
  --name dc-taxi-csv-to-parquet-job \
  --role $(aws iam get-role --role-name AWSGlueServiceRole-dc-taxi --query 'Role.Arn' --output text) \
  --default-arguments '{"--TempDir":"s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/glue/"}' \
  --command '{
    "ScriptLocation": "s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/glue/dctaxi_csv_to_parquet.py",
    "Name": "glueetl",
    "PythonVersion": "3"
  }' \
  --glue-version "2.0"

aws glue start-job-run \
  --job-name dc-taxi-csv-to-parquet-job \
  --arguments='--BUCKET_SRC_PATH="'$(
      echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/csv/
    )'",
  --BUCKET_DST_PATH="'$(
      echo s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/
    )'",
  --DST_VIEW_NAME="dc_taxi_csv"'

Once the PySpark job completes successfully, the job execution status should change from `RUNNING` to `SUCCEEDED`. We can re-run the next cell to get the updated job status.

In [None]:
!aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --output text --query 'JobRuns[0].JobRunState'

Poll the job every minute to wait for it to finish

In [None]:
%%bash
printf "Waiting for the job to finish..."
while echo "$(aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --query 'JobRuns[0].JobRunState')" | grep -q -E "STARTING|RUNNING|STOPPING"; do
   sleep 60
   printf "..."
done
aws glue get-job-runs --job-name dc-taxi-csv-to-parquet-job --output text --query 'JobRuns[0].JobRunState'


After the job succeeds, you can list the contents of the parquet folder in the bucket
using

In [None]:
!aws s3 ls --recursive --summarize --human-readable s3://dc-taxi-$BUCKET_ID-$AWS_DEFAULT_REGION/parquet/

### Crawling the Parquet dataset

In [None]:
%%bash
aws glue delete-crawler --name dc-taxi-parquet-crawler  2> /dev/null

aws glue create-crawler --name dc-taxi-parquet-crawler --database-name dc_taxi_db --table-prefix dc_taxi_ --role `aws iam get-role --role-name AWSGlueServiceRole-dc-taxi --query 'Role.Arn' --output text` --targets '{
  "S3Targets": [
    {
      "Path": "s3://dc-taxi-'$BUCKET_ID'-'$AWS_DEFAULT_REGION'/parquet/"
    }]
}'

aws glue start-crawler --name dc-taxi-parquet-crawler

### Monitoring the Parquet crawler status

Poll the crawler status every minute to wait for it to finish

In [None]:
%%bash
printf "Waiting for crawler to finish..."
until echo "$(aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text)" | grep -q "READY"; do
   sleep 10
   printf "..."
done
aws glue get-crawler --name dc-taxi-parquet-crawler --query 'Crawler.State' --output text

### Confirming that the crawler successfully created the `dc_taxi_parquet` table

If the crawler completed successfully, the number of records in the `dc_taxi_parquet` table should be equal to `53173692`

In [None]:
!aws glue get-table --database-name dc_taxi_db --name dc_taxi_parquet --query "Table.Parameters.recordCount" --output text
