# Spark homework

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [15]:
schema_fhvhv = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [32]:
schema_zones = types.StructType([
    types.StructField('LocationID', types.StringType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)
])

In [3]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/03 14:07:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Spark version

In [48]:
spark.version

'3.3.2'

In [16]:
df_fhvhv = spark.read\
    .option('header', 'true')\
    .schema(schema_fhvhv)\
    .csv('/home/fedrpi/de-zoomcamp-2023/data/fhvhv/fhvhv_tripdata_2021-06.csv.gz')

In [17]:
df_fhvhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

                                                                                

[Stage 0:>                                                          (0 + 1) / 1]

In [18]:
df_fhvhv = df_fhvhv.repartition(12)

In [19]:
df_fhvhv.write.parquet('/home/fedrpi/de-zoomcamp-2023/data/pq')

[Stage 0:>                  (0 + 1) / 1][Stage 4:>                  (0 + 1) / 1]

23/03/03 14:27:52 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (de-zoomcamp.europe-west6-a.c.disco-bedrock-375516.internal executor driver): TaskKilled (Stage cancelled)


                                                                                

In [20]:
dfp = spark.read.parquet('/home/fedrpi/de-zoomcamp-2023/data/pq/')

In [21]:
dfp

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, Affiliated_base_number: string]

### How many taxi trips were started on June 15th?

In [28]:
dfp.withColumn('pickup_date', F.to_date(dfp.pickup_datetime))\
   .withColumn('dropoff_date', F.to_date(dfp.dropoff_datetime))\
   .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')\
   .filter(col('pickup_date') == '2021-06-15')\
   .count()

                                                                                

452470

### How long is the longest trip in the dataset? 

In [31]:
dfp.withColumn('hours_diff', (F.unix_timestamp(col("dropoff_datetime")) - F.unix_timestamp(col("pickup_datetime"))) / 3600)\
   .select('pickup_datetime', 'dropoff_datetime', 'hours_diff', 'PULocationID', 'DOLocationID')\
   .orderBy(col('hours_diff').desc())\
   .show()

                                                                                

+-------------------+-------------------+------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|        hours_diff|PULocationID|DOLocationID|
+-------------------+-------------------+------------------+------------+------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|  66.8788888888889|          98|         265|
|2021-06-22 12:09:45|2021-06-23 13:42:44|25.549722222222222|         188|         198|
|2021-06-27 10:32:29|2021-06-28 06:31:20|19.980833333333333|          78|         169|
|2021-06-26 22:37:11|2021-06-27 16:49:01|18.197222222222223|         263|          36|
|2021-06-23 20:40:43|2021-06-24 13:08:44|16.466944444444444|           3|         247|
|2021-06-23 22:03:31|2021-06-24 12:19:39|14.268888888888888|         186|         216|
|2021-06-24 23:11:00|2021-06-25 13:05:35|13.909722222222221|         181|          61|
|2021-06-04 20:56:02|2021-06-05 08:36:14|             11.67|          53|         252|
|2021-06-27 07:45:19|2021-06-27 19:07:16|11

In [37]:
dfp.registerTempTable('fhvhv')




In [33]:
df_zones = spark.read\
            .option('header', 'true')\
            .schema(schema_zones)\
            .csv('/home/fedrpi/de-zoomcamp-2023/data/fhvhv/taxi_zone_lookup.csv')

In [38]:
df_zones.registerTempTable('zones')

### What is the name of the most frequent pickup location zone?

In [46]:
gr_df = spark.sql('''
    select 
        f.PULocationID,
        z.Zone,
        count(*) total
    from 
        fhvhv f
    left join zones z
           on f.PULocationID = z.LocationID
    group by 1,2
    order by total desc
'''
)

In [47]:
gr_df.show()



+------------+--------------------+------+
|PULocationID|                Zone| total|
+------------+--------------------+------+
|          61| Crown Heights North|231279|
|          79|        East Village|221244|
|         132|         JFK Airport|188867|
|          37|      Bushwick South|187929|
|          76|       East New York|186780|
|         231|TriBeCa/Civic Center|164344|
|         138|   LaGuardia Airport|161596|
|         234|            Union Sq|158937|
|         249|        West Village|154698|
|           7|             Astoria|152493|
|         148|     Lower East Side|151020|
|          68|        East Chelsea|147673|
|          42|Central Harlem North|146402|
|         255|Williamsburg (Nor...|143683|
|         181|          Park Slope|143594|
|         225|  Stuyvesant Heights|141427|
|          48|        Clinton East|139611|
|         246|West Chelsea/Huds...|139431|
|          17|             Bedford|138428|
|         170|         Murray Hill|137879|
+----------

                                                                                