#Assignment: Spark SQL and Data Frames

Dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page (February 2021)

Tech Stack:

1.   PySpark
2.   Google BigQuery



In [1]:
pip install pyspark==3.0.3

Collecting pyspark==3.0.3
  Downloading pyspark-3.0.3.tar.gz (209.1 MB)
[K     |████████████████████████████████| 209.1 MB 55 kB/s /s eta 0:00:01
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 153.1 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.3-py2.py3-none-any.whl size=209435970 sha256=43bb864c589a2f4c81a3d9a6f7436e53e0d4201f1f0cd9b8e72e89be57bdd8d0
  Stored in directory: /home/Archie/.cache/pip/wheels/65/ff/a2/0e26ceacea69c69610bbbd569678580b54be2e6e8b88e0eb9a
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.5
    Uninstalling py4j-0.10.9.5:
      Successfully uninstalled py4j-0.10.9.5
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.3.2
 

In [2]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [4]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


In [1]:
import findspark
findspark.init()
findspark.find()

'/opt/spark'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("archiecm").config('spark.ui.port', '4050').getOrCreate()

23/03/16 00:42:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

In [6]:
df = spark.read.parquet("yellow_tripdata_2021-02.parquet", header=True, inferSchema=True)
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [7]:
rows = df.count()
cols = len(df.columns)

print(f'Dimensions of Data: {(rows,cols)}')
print(f'Rows of Data: {rows}')
print(f'Columns of Data: {cols}')

Dimensions of Data: (1371709, 19)
Rows of Data: 1371709
Columns of Data: 19


In [8]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 00:40:47|  2021-02-01 00:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

#### How many Taxi Trips were there on February 15?

In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [10]:
df = df \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

df.registerTempTable('data_table')

In [11]:
total_taxi_trip_0215 = spark.sql(""" 

    SELECT COUNT(pickup_datetime) AS total_taxi_trip_0215
    FROM data_table
    WHERE pickup_datetime >= '2021-02-15 00:00:00' AND pickup_datetime < '2021-02-16 00:00:00'

""")

total_taxi_trip_0215.show()

+--------------------+
|total_taxi_trip_0215|
+--------------------+
|               40322|
+--------------------+



#### Find the longest trip for each day.

In [12]:
df.createOrReplaceTempView('data_view')

In [13]:
longesttrip_eachday = df.withColumn("pickup_datetime" , to_date(df['pickup_datetime']))\
                      .select(['pickup_datetime','trip_distance'])\
                      .where("pickup_datetime >= '2021-02-01' ")\
                      .groupby(F.col('pickup_datetime')).agg(F.max('trip_distance').alias('longest_trip')).sort(desc("longest_trip"))
longesttrip_eachday.show(10)




+---------------+------------+
|pickup_datetime|longest_trip|
+---------------+------------+
|     2021-02-16|   221188.25|
|     2021-02-20|   188054.03|
|     2021-02-08|   186617.92|
|     2021-02-07|   186510.67|
|     2021-02-03|   186079.73|
|     2021-02-17|   140145.44|
|     2021-02-13|   115928.92|
|     2021-02-05|    91134.16|
|     2021-02-26|    90796.21|
|     2021-02-24|    90073.44|
+---------------+------------+
only showing top 10 rows




                                                                                

#### Find top 5 most frequent 'dispatching_base_num'.

In [14]:
df_fhv = spark.read.parquet("fhv_tripdata_2021-02.parquet", header=True, inferSchema=True)
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [15]:
rows = df_fhv.count()
cols = len(df_fhv.columns)

print(f'Dimensions of Data: {(rows,cols)}')
print(f'Rows of Data: {rows}')
print(f'Columns of Data: {cols}')

Dimensions of Data: (1037692, 7)
Rows of Data: 1037692
Columns of Data: 7


In [16]:
df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|                B00037|
+--------------------+------------------

In [17]:
top5_frequent_dbm = df_fhv.groupBy("dispatching_base_num").count() \
                    .orderBy(F.col('count').desc())

top5_frequent_dbm.show(5)

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+
only showing top 5 rows





                                                                                

#### Find top 5 most common location pairs (PULocationID and DOLocationID).

In [18]:
top5_location_pairs = df.where("PUlocationID IS NOT NULL AND DOlocationID IS NOT NULL") \
                      .groupBy(["PUlocationID",'DOlocationID']) \
                      .count() \
                      .orderBy(F.col('count').desc())
top5_location_pairs.show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|         237|         236|11455|
|         236|         237| 9901|
|         236|         236| 8819|
|         237|         237| 7324|
|         264|         264| 5732|
+------------+------------+-----+
only showing top 5 rows





                                                                                

In [19]:
top5_location_pairs_fhv = df_fhv.where("PUlocationID IS NOT NULL AND DOlocationID IS NOT NULL") \
                          .groupBy(["PUlocationID",'DOlocationID']) \
                          .count() \
                          .orderBy(F.col('count').desc())
top5_location_pairs_fhv.show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+
only showing top 5 rows



#### Write all of the result to BigQuery table.

In [39]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ".google/credentials/google_credentials.json"

In [None]:
gcs_bucket = 'fellowship_9_staging'
bq_dataset = 'taxi_trip_spark'
bq_table = 'taxi_trip_02_2021'

df.write.format("bigquery") \
  .option("table","{}.{}".format(bq_dataset, bq_table)) \
  .option("temporaryGcsBucket", gcs_bucket) \
  .mode('overwrite') \
  .save()