# Code / work for Module 5 on batch processing

In [1]:
from pyspark.sql import SparkSession

### Question 1: Spark version

In [2]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('hw') \
    .getOrCreate()
spark.version

25/03/06 21:14:08 WARN Utils: Your hostname, eli-mac.local resolves to a loopback address: 127.0.0.1; using 192.168.1.25 instead (on interface en0)
25/03/06 21:14:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/06 21:14:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.5.5'

### Question 2: Size of partitioned parquet files?

In [3]:
# Download 2024-10 yellow taxi data (parquet)
!cd ../data/hw/ && curl -O https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.3M  100 61.3M    0     0  34.6M      0  0:00:01  0:00:01 --:--:-- 34.6M


In [4]:
df = spark.read.parquet('../data/hw/yellow_tripdata_2024-10.parquet')

                                                                                

In [6]:
df = df.repartition(4)
df.write.parquet('../data/hw/pq/2024/10/', mode='overwrite')

                                                                                

In [7]:
!ls -lh ../data/hw/pq/2024/10/

total 188928
-rw-r--r--  1 elijahsutton  staff     0B Mar  6 21:15 _SUCCESS
-rw-r--r--  1 elijahsutton  staff    22M Mar  6 21:15 part-00000-d6cc26b4-71a3-49dd-90c2-5bd153f67b1c-c000.snappy.parquet
-rw-r--r--  1 elijahsutton  staff    22M Mar  6 21:15 part-00001-d6cc26b4-71a3-49dd-90c2-5bd153f67b1c-c000.snappy.parquet
-rw-r--r--  1 elijahsutton  staff    22M Mar  6 21:15 part-00002-d6cc26b4-71a3-49dd-90c2-5bd153f67b1c-c000.snappy.parquet
-rw-r--r--  1 elijahsutton  staff    22M Mar  6 21:15 part-00003-d6cc26b4-71a3-49dd-90c2-5bd153f67b1c-c000.snappy.parquet


### Question 3: Number of taxi trips on Oct. 15?

In [3]:
from pyspark.sql import functions as F

In [18]:
df.filter(
    F.to_date('tpep_pickup_datetime') == '2024-10-15'
).count()

128893

### Question 4: Longest trip (in hours)?

In [31]:
df.withColumn(
    'time_diff_hrs', 
    (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 3600
).select(
    F.max('time_diff_hrs')
).show()



+------------------+
|max(time_diff_hrs)|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

### Question 6:

In [9]:
zones_df = spark.read \
    .option('header', 'true') \
    .csv('../data/tst/taxi_zone_lookup.csv')

In [10]:
zones_df.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [11]:
zones_df.createOrReplaceTempView('zones')
df.createOrReplaceTempView('trips')

In [12]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [None]:
spark.sql(
    """
    SELECT
        pickup_zone.zone as pickup_zone,
        COUNT(1) AS zone_pickups_cnt
    FROM trips
        INNER JOIN zones AS pickup_zone
            ON trips.PULocationID = pickup_zone.LocationID
        INNER JOIN zones AS dropoff_zone
            ON trips.DOLocationID = dropoff_zone.LocationID
    GROUP BY pickup_zone
    ORDER BY zone_pickups_cnt
    """
).show()

[Stage 20:>                                                         (0 + 4) / 4]

+--------------------+----------------+
|         pickup_zone|zone_pickups_cnt|
+--------------------+----------------+
|Governor's Island...|               1|
|       Arden Heights|               2|
|       Rikers Island|               2|
|         Jamaica Bay|               3|
| Green-Wood Cemetery|               3|
|   Rossville/Woodrow|               4|
|       West Brighton|               4|
|       Port Richmond|               4|
|Charleston/Totten...|               4|
|Eltingville/Annad...|               4|
|         Great Kills|               6|
|        Crotona Park|               6|
|Heartland Village...|               7|
|     Mariners Harbor|               7|
|Saint George/New ...|               9|
|             Oakwood|               9|
|       Broad Channel|              10|
|New Dorp/Midland ...|              10|
|         Westerleigh|              12|
|     Pelham Bay Park|              12|
+--------------------+----------------+
only showing top 20 rows



                                                                                