In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/13 01:31:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Question 1. Install Spark and PySpark

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version

What's the output?

In [3]:
spark.version

'3.3.0'

### Question 2. HVFHW February 2021

Download the HVFHV data for february 2021.

Read it with Spark using the same schema as we did in the lessons. We will use this dataset for all the remaining questions.

Repartition it to 24 partitions and save it to parquet.

What's the size of the folder with results (in MB)?

In [5]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-10-13 00:34:59--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.224.194.107, 13.224.194.4, 13.224.194.50, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.224.194.107|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet’


2022-10-13 00:35:03 (4.39 MB/s) - ‘fhv_tripdata_2021-02.parquet’ saved [10645466/10645466]



In [3]:
!ls

03_test.ipynb		fhv_tripdata_2021-02.parquet
04_pyspark.ipynb	fhvhv
05_taxi_schema.ipynb	fhvhv_tripdata_2021-01.csv
06_spark_sql.ipynb	fhvhv_tripdata_2021-01.parquet
07_groupBy_join.ipynb	head.parquet
09_spark_sql_gcd.ipynb	homework_solutions.ipynb
data			lib
download_data.sh	taxi+_zone_lookup.csv


In [4]:
df_hw = spark.read.parquet("fhv_tripdata_2021-02.parquet")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [5]:
df_hw.repartition(24).write.parquet('data/pq/fhvhv/2021/02/', mode='overwrite')

                                                                                

In [6]:
!ls -lh fhv_tripdata_2021-02.parquet

-rw-rw-r-- 1 baluramachandra baluramachandra 11M Jul 18 19:25 fhv_tripdata_2021-02.parquet


### Question 3. Count records

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [7]:
df_hw.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [8]:
df_hw.withColumn('pickup_date', F.to_date(df_hw.pickup_datetime)) \
    .filter("pickup_date = '2021-02-15'") \
    .count()

34814

### Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [9]:
df_hw.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [10]:
df_hw = df_hw \
        .withColumn('duration', df_hw.dropOff_datetime.cast('long') - df_hw.pickup_datetime.cast('long')) \
        .withColumn('pickup_date', F.to_date(df_hw.pickup_datetime))

In [11]:
df_hw \
    .groupBy('pickup_date').max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .show()

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-02-04|      6655140|
| 2021-02-01|      2777400|
| 2021-02-25|      2429340|
| 2021-02-23|      2421120|
| 2021-02-03|      2402093|
| 2021-02-27|      1025040|
| 2021-02-28|       945780|
| 2021-02-15|       880209|
| 2021-02-22|       780092|
| 2021-02-08|       565495|
| 2021-02-19|       540729|
| 2021-02-13|       505361|
| 2021-02-16|       288966|
| 2021-02-12|       260640|
| 2021-02-17|       257087|
| 2021-02-10|       193189|
| 2021-02-05|       191081|
| 2021-02-11|       169153|
| 2021-02-20|       156934|
| 2021-02-18|       116086|
+-----------+-------------+
only showing top 20 rows



### Question 5. Most frequent dispatching_base_num

Now find the most frequently occurring dispatching_base_num in this dataset.

How many stages this spark job has? 2

In [12]:
df_hw \
    .groupBy('dispatching_base_num').count() \
    .orderBy('count', ascending=False) \
    .show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
|              B01239|24591|
|              B02735|21031|
|              B00310|18141|
|              B01899|16563|
|              B00900|16024|
|              B01437|15494|
|              B01231|14622|
|              B02849|14584|
|              B00647|13167|
|              B02550|12847|
|              B00821|12788|
|              B00256|12314|
|              B02563|10967|
|              B01536|10542|
|              B01667| 9864|
+--------------------+-----+
only showing top 20 rows



### Question 6. Most common locations pair

Find the most common pickup-dropoff pair.

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East".

In [28]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

--2022-10-13 00:54:11--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.224.194.4, 13.224.194.91, 13.224.194.107, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.224.194.4|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [text/csv]
Saving to: ‘taxi+_zone_lookup.csv’


2022-10-13 00:54:13 (887 KB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [13]:
zones = spark.read.option("header", "true").csv("taxi+_zone_lookup.csv")

In [14]:
df_hw.registerTempTable('fhvhv_2021_02')
zones.registerTempTable('zones')



In [19]:
spark.sql("""
SELECT
    CONCAT(pu.Zone, ' / ', do.Zone) AS pu_do_pair,
    COUNT(*)as count
FROM 
    fhvhv_2021_02 fhv 
LEFT JOIN zones pu 
    ON fhv.PULocationID = pu.LocationID
LEFT JOIN zones do 
    ON fhv.DOLocationID = do.LocationID
GROUP BY 
    1
ORDER BY
    2 DESC;
""").show(5)

+--------------------+------+
|          pu_do_pair| count|
+--------------------+------+
|                null|897362|
|Saint George/New ...|  2374|
|Stapleton / Saint...|  2112|
|Jackson Heights /...|  1902|
|   Astoria / Astoria|  1829|
+--------------------+------+
only showing top 5 rows



In [23]:
spark.sql("""
SELECT fhv.PULocationID, pu.LocationID, pu.Zone, fhv.DOLocationID, do.LocationID, do.Zone,
    CONCAT(pu.Zone, ' / ', do.Zone) AS pu_do_pair
FROM 
    fhvhv_2021_02 fhv 
LEFT JOIN zones pu 
    ON fhv.PULocationID = pu.LocationID
LEFT JOIN zones do 
    ON fhv.DOLocationID = do.LocationID
""").show()

+------------+----------+-----------------+------------+----------+--------------------+--------------------+
|PULocationID|LocationID|             Zone|DOLocationID|LocationID|                Zone|          pu_do_pair|
+------------+----------+-----------------+------------+----------+--------------------+--------------------+
|        null|      null|             null|        null|      null|                null|                null|
|       173.0|       173|     North Corona|        82.0|        82|            Elmhurst|North Corona / El...|
|       173.0|       173|     North Corona|        56.0|        56|              Corona|North Corona / Co...|
|        82.0|        82|         Elmhurst|       129.0|       129|     Jackson Heights|Elmhurst / Jackso...|
|        null|      null|             null|       225.0|       225|  Stuyvesant Heights|                null|
|        null|      null|             null|        61.0|        61| Crown Heights North|                null|
|        n