In [33]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
from pathlib import Path

In [17]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Prepare source data

In [18]:
BASE_DATA_DIR = Path().resolve() / "data"
SOURCE_DATA_DIR = BASE_DATA_DIR / "pq" / "yellow" / "2024" / "10"
SOURCE_FN = SOURCE_DATA_DIR / "yellow_tripdata_2024-10.parquet"

In [19]:
d_cmd = "wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet"
assert SOURCE_FN.exists(), f"Source file not found. Please run {d_cmd!r} into {str(SOURCE_DATA_DIR)!r} directory"

In [20]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('hw04') \
    .getOrCreate()

## Questions

### Question 1

In [21]:
spark.version

'3.3.2'

### Question 2

In [22]:
df = spark.read.parquet(str(SOURCE_FN))

In [23]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 04:30:44|  2024-10-01 04:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [24]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [25]:
OUTPUT_DIR = str(BASE_DATA_DIR / "work" / "yellow")
df.repartition(4).write.parquet(OUTPUT_DIR, mode="overwrite")

                                                                                

In [26]:
!ls -lh $OUTPUT_DIR

итого 97M
-rw-r--r--. 1 student student 25M мар  2 00:30 part-00000-c43a0319-1a77-4214-9cd4-c8d67b48252d-c000.snappy.parquet
-rw-r--r--. 1 student student 25M мар  2 00:30 part-00001-c43a0319-1a77-4214-9cd4-c8d67b48252d-c000.snappy.parquet
-rw-r--r--. 1 student student 25M мар  2 00:30 part-00002-c43a0319-1a77-4214-9cd4-c8d67b48252d-c000.snappy.parquet
-rw-r--r--. 1 student student 25M мар  2 00:30 part-00003-c43a0319-1a77-4214-9cd4-c8d67b48252d-c000.snappy.parquet
-rw-r--r--. 1 student student   0 мар  2 00:30 _SUCCESS


**Answer**: average size of the a Parquete file is `25Mb`

### Question 3

In [27]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

#### Method A. SQL

In [28]:
query = """
SELECT COUNT(*) 
  FROM yellow_tripdata
 WHERE cast(tpep_pickup_datetime as date)='2024-10-15'
"""

In [36]:
df.createOrReplaceTempView("yellow_tripdata")
# df.registerTempTable("yellow_tripdata")
spark.sql(query).show()

Py4JError: An error occurred while calling o25.sql. Trace:
py4j.Py4JException: Method sql([class java.lang.String, class [Ljava.lang.Object;]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)



#### Method B. RDD

In [54]:
q3_df = df.filter(
    F.to_date(F.col('tpep_pickup_datetime')) == '2024-10-15'
)
q3_df.count()

120790

In [32]:
df.filter(
    (F.col('tpep_pickup_datetime')   >= F.lit('2024-10-15 00:00:00'))
    & (F.col('tpep_pickup_datetime') <  F.lit('2024-10-16 00:00:00'))
).count()

120790

In [143]:
df.filter(
        (F.col('tpep_pickup_datetime') >= F.lit('2024-10-01 00:00:00')) & 
        (F.col('tpep_pickup_datetime') < F.lit('2024-11-01 00:00:00'))
    )\
    .groupby(F.to_date(F.col('tpep_pickup_datetime'))) \
    .count() \
    .sort(F.lit(1)) \
    .show(32)

+-----------------------------+------+
|to_date(tpep_pickup_datetime)| count|
+-----------------------------+------+
|                   2024-10-01| 91677|
|                   2024-10-02|118722|
|                   2024-10-03|107726|
|                   2024-10-04|109602|
|                   2024-10-05|120550|
|                   2024-10-06|118414|
|                   2024-10-07| 98754|
|                   2024-10-08|112161|
|                   2024-10-09|126646|
|                   2024-10-10|138069|
|                   2024-10-11|137417|
|                   2024-10-12|126473|
|                   2024-10-13|124164|
|                   2024-10-14| 97424|
|                   2024-10-15|120790|
|                   2024-10-16|132709|
|                   2024-10-17|134963|
|                   2024-10-18|134964|
|                   2024-10-19|136337|
|                   2024-10-20|130470|
|                   2024-10-21|104589|
|                   2024-10-22|114718|
|                   2024-

**Answer**: The closest answer from the suggested list is `125,567`

### Question 4

In [94]:
qr4_df = df.select(
    [
        'tpep_pickup_datetime',
        'tpep_dropoff_datetime',
        F.col('tpep_dropoff_datetime') - F.col('tpep_pickup_datetime')
        # F.to_unix_timestamp(F.col('tpep_dropoff_datetime'))
    ]
).sort(F.lit(3).desc()).take(1)

In [114]:
longest_trip_hours = qr4_df[0][-1].total_seconds() / 60 / 60
print(f"Answer: the longest trip in the dataset is '{int(longest_trip_hours)}' hours")

Answer: the longest trip in the dataset is '162' hours


### Question 5

**Answer**: Spark’s User Interface which shows the application's dashboard runs on `4040` local port

### Question 6

In [119]:
df_zone = spark.read \
    .option("header", "true") \
    .csv(str(BASE_DATA_DIR / "taxi_zone_lookup.csv"))

In [139]:
df_zone.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [131]:
joined_df = df.join(df_zone, df.PULocationID==df_zone.LocationID, "inner") 
joined_df.show(5, truncate=False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+---------+-------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|LocationID|Borough  |Zone               |service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+---------+-------------------+------------+
|2       |2024-10-01 04:3

In [125]:
joined_df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee',
 'LocationID',
 'Borough',
 'Zone',
 'service_zone']

In [134]:
cols = ['PULocationID', 'Zone']
qr6_rows = joined_df.groupby(cols) \
    .count() \
    .sort(F.lit(3)) \
    .take(1)

In [140]:
print(f"Answer: The name of the LEAST frequent pickup zone location: {qr6_rows[0][1]!r}")

Answer: The name of the LEAST frequent pickup zone location: "Governor's Island/Ellis Island/Liberty Island"
