# First PySpark Application using NYC Taxi data
To build our first PySpark application we will use easily accessible New York Taxi data. I use this because it's available from Databricks, Azure Synapse, and for download from the web. We can start small but also scale to a large volume with this sample dataset when we are read.

In [0]:
#Yellow Trip Read
trip_input_df = (spark.read
      .format("csv")
      .option("header", True)
      .load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz"))

display(trip_input_df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,
2,2018-11-28 16:25:49,2018-11-28 16:28:26,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,5.76,0.3,13.31,
2,2018-11-28 16:29:37,2018-11-28 16:33:43,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1,1.3,1,N,163,229,1,6.5,0.5,0.5,1.25,0.0,0.3,9.05,
1,2019-01-01 00:32:01,2019-01-01 00:45:39,1,3.7,1,N,229,7,1,13.5,0.5,0.5,3.7,0.0,0.3,18.5,
1,2019-01-01 00:57:32,2019-01-01 01:09:32,2,2.1,1,N,141,234,1,10.0,0.5,0.5,1.7,0.0,0.3,13.0,


In [0]:
from pyspark.sql.functions import col, substring, to_date, regexp_replace

trip_df = (trip_input_df
    .withColumn("year_month", regexp_replace(substring("tpep_pickup_datetime",1,7), '-', '_'))
    .withColumn("pickup_dt", to_date("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss")) 
    .withColumn("dropoff_dt", to_date("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("tip_pct", col("tip_amount") / col("total_amount"))
    .limit(1000)
)
display(trip_df)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,year_month,pickup_dt,dropoff_dt,tip_pct
1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,,2019_01,2019-01-01,2019-01-01,0.1658291457286432
1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,,2019_01,2019-01-01,2019-01-01,0.0613496932515337
2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,2018_12,2018-12-21,2018-12-21,0.0
2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,,2018_11,2018-11-28,2018-11-28,0.0
2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,2018_11,2018-11-28,2018-11-28,0.0
2,2018-11-28 16:25:49,2018-11-28 16:28:26,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,5.76,0.3,13.31,,2018_11,2018-11-28,2018-11-28,0.0
2,2018-11-28 16:29:37,2018-11-28 16:33:43,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,,2018_11,2018-11-28,2018-11-28,0.0
1,2019-01-01 00:21:28,2019-01-01 00:28:37,1,1.3,1,N,163,229,1,6.5,0.5,0.5,1.25,0.0,0.3,9.05,,2019_01,2019-01-01,2019-01-01,0.1381215469613259
1,2019-01-01 00:32:01,2019-01-01 00:45:39,1,3.7,1,N,229,7,1,13.5,0.5,0.5,3.7,0.0,0.3,18.5,,2019_01,2019-01-01,2019-01-01,0.2
1,2019-01-01 00:57:32,2019-01-01 01:09:32,2,2.1,1,N,141,234,1,10.0,0.5,0.5,1.7,0.0,0.3,13.0,,2019_01,2019-01-01,2019-01-01,0.1307692307692307


In [0]:
#Zone Read
zone_df = (spark.read
  .option("header", "true")
  .csv("/databricks-datasets/nyctaxi/taxizone/taxi_zone_lookup.csv")
)

zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [0]:

full_df = (trip_df
  .join(zone_df,
        trip_df.PULocationID == zone_df.LocationID,
        how="left")
  .drop("LocationID")
)

In [0]:
(full_df.write
   .format("delta")
   .mode("overwrite")
   .save("/tmp/datasets/datakickstart/yellow_trips_sample")
)

In [0]:
full_df.write.mode("overwrite").saveAsTable("yellow_trips_sample")

In [0]:
new_df = spark.read.table("yellow_trips_sample")

## Alternate Read & Write (meant for local environment)
If you need this data local, you can download just one month of Yellow trip data plus the Taxi Zone Lookup Table. Data can be found at https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Bash commands to download two files:
```bash
mkdir -p /tmp/datasets/nyctaxi/taxizone
cd /tmp/datasets/nyctaxi/taxizone
wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
mv taxi+_zone_lookup.csv taxi_zone_lookup.csv

mkdir -p /tmp/datasets/nyctaxi/tables/nyctaxi_yellow
cd /tmp/datasets/nyctaxi/tables/nyctaxi_yellow
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet
```

In [0]:
#Yellow Trip Read

# trip_format = "delta"
trip_format = "parquet"

trip_input_df = (spark.read
      .format(trip_format)
      .load("/tmp/datasets/nyctaxi/tables/nyctaxi_yellow"))

trip_input_df.show()

In [0]:
#Zone Read
zone_df = (spark.read
  .option("header", "true")
  .csv("/tmp/datasets/nyctaxi/taxizone/taxi_zone_lookup.csv")
)

zone_df.show()

In [0]:
from pyspark.sql.functions import col, substring, to_date, regexp_replace

trip_df = (trip_input_df
    .withColumn("year_month", regexp_replace(substring("tpep_pickup_datetime",1,7), '-', '_'))
    .withColumn("pickup_dt", to_date("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss")) 
    .withColumn("dropoff_dt", to_date("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("tip_pct", col("tip_amount") / col("total_amount"))
    .limit(1000)
)


In [0]:
full_df = (trip_df
  .join(zone_df,
        trip_df.PULocationID == zone_df.LocationID,
        how="left")
  .drop("LocationID")
)

In [0]:
(full_df.write
    .format("parquet")
    .save("/tmp/datasets/datakickstart/yellow_trips_sample_parquet")
)