In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

**Q1: Spark version**

In [5]:
spark.version

'3.3.2'

In [6]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2019-10.parquet

--2024-03-07 02:09:52--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2019-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:26df:b000:b:20a5:b140:21, 2600:9000:26df:d800:b:20a5:b140:21, 2600:9000:26df:6e00:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:26df:b000:b:20a5:b140:21|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18344035 (17M) [application/x-www-form-urlencoded]
Saving to: ‘fhv_tripdata_2019-10.parquet’


2024-03-07 02:09:53 (31,2 MB/s) - ‘fhv_tripdata_2019-10.parquet’ saved [18344035/18344035]



In [11]:
!ls -lh fhv_tripdata_2019-10.parquet

-rw-rw-r-- 1 d4m d4m 18M lip 18  2022 fhv_tripdata_2019-10.parquet


In [13]:
df = spark.read \
    .option("header", "true") \
    .parquet('fhv_tripdata_2019-10.parquet')

df = df.repartition(6)
df.write.parquet('fhv/2019/10')

                                                                                

**Q2: Partition size**

In [54]:
!ls -lh fhv/2019/10

total 39M
-rw-r--r-- 1 d4m d4m 6,4M mar  7 02:17 part-00000-1696d466-562a-4717-bc0c-78e0e6cc1ae9-c000.snappy.parquet
-rw-r--r-- 1 d4m d4m 6,4M mar  7 02:17 part-00001-1696d466-562a-4717-bc0c-78e0e6cc1ae9-c000.snappy.parquet
-rw-r--r-- 1 d4m d4m 6,4M mar  7 02:17 part-00002-1696d466-562a-4717-bc0c-78e0e6cc1ae9-c000.snappy.parquet
-rw-r--r-- 1 d4m d4m 6,4M mar  7 02:17 part-00003-1696d466-562a-4717-bc0c-78e0e6cc1ae9-c000.snappy.parquet
-rw-r--r-- 1 d4m d4m 6,4M mar  7 02:17 part-00004-1696d466-562a-4717-bc0c-78e0e6cc1ae9-c000.snappy.parquet
-rw-r--r-- 1 d4m d4m 6,4M mar  7 02:17 part-00005-1696d466-562a-4717-bc0c-78e0e6cc1ae9-c000.snappy.parquet
-rw-r--r-- 1 d4m d4m    0 mar  7 02:17 _SUCCESS


In [16]:
df = spark.read.parquet('data/pq/fhvhv/2021/02/')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

**Q3: How many taxi trips were there on October 15?**

In [19]:
from pyspark.sql import functions as F

In [20]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62318

In [36]:
df.createOrReplaceTempView('fhv_2019_10')

In [26]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62318|
+--------+



**Q4: Longest trip for each day**

In [30]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [28]:
df \
    .withColumn('duration', df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()



+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2019-10-28|   2272149000|
| 2019-10-11|   2272149000|
| 2019-11-01|    315620787|
| 2019-10-01|    252460901|
| 2019-10-17|     31658400|
+-----------+-------------+



                                                                                

In [78]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration
FROM 
    fhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()



+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2019-10-28|        3.786915E7|
| 2019-10-11|        3.786915E7|
| 2019-11-01|        5260346.45|
| 2019-10-01| 4207681.683333334|
| 2019-10-17|          527640.0|
| 2019-10-26|          527050.0|
| 2019-10-30| 87872.06666666667|
| 2019-10-25|           63409.6|
| 2019-10-02|47552.183333333334|
| 2019-10-03| 46139.01666666667|
+-----------+------------------+



                                                                                

**Q6: Least frequent pickup location zone**





In [32]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [33]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [34]:
df_zones.createOrReplaceTempView('zones')

In [51]:
spark.sql("""
SELECT
    LocationID,
    Zone,
    COUNT(1)
FROM 
    fhv_2019_10
JOIN zones 
ON fhv_2019_10.PUlocationID = zones.LocationID
GROUP BY
    LocationID,
    Zone
ORDER BY
    3 ASC
LIMIT 5;
""").show()


+----------+--------------------+--------+
|LocationID|                Zone|count(1)|
+----------+--------------------+--------+
|         2|         Jamaica Bay|       1|
|       105|Governor's Island...|       2|
|       111| Green-Wood Cemetery|       5|
|        30|       Broad Channel|       8|
|       120|     Highbridge Park|      14|
+----------+--------------------+--------+

