In [1]:
import pyspark

from pyspark.sql import types, SparkSession, functions as F 

In [2]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("homework_spark")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/03 03:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.3.2'

In [4]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [5]:
df = spark.read.schema(schema)\
    .option("header","true")\
    .csv('fhv_tripdata_2019-10.csv.gz')

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
+--------------------+------------------

Trips started on Oct 15th

In [8]:
df.filter(df['pickup_datetime'] >= '2019-10-15 00:00:00')\
        .filter(df['pickup_datetime'] < '2019-10-16 00:00:00').count()

                                                                                

62610

In [9]:
df.registerTempTable('fhv')



In [10]:
spark.sql("""
    SELECT 
        count(1)
    FROM 
        fhv
    WHERE
        extract(year from cast(pickup_datetime as date)) = '2019'
""").show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
| 1897493|
+--------+



                                                                                

What is the length of the longest trip in the dataset in hours?

In [18]:
spark.sql("""
    SELECT 
        
        pickup_datetime,
        dropoff_datetime,
        datediff(dropoff_datetime, pickup_datetime) * 24 AS hour_difference
    FROM 
        fhv    
    WHERE 
        extract(year from cast(pickup_datetime as date)) = '2019'and extract(year from cast(dropoff_datetime as date)) = '2019'
    ORDER BY 3 desc
""").show()

[Stage 11:>                                                         (0 + 1) / 1]

+-------------------+-------------------+---------------+
|    pickup_datetime|   dropoff_datetime|hour_difference|
+-------------------+-------------------+---------------+
|2019-10-30 12:30:04|2019-12-30 13:02:08|           1464|
|2019-10-25 07:04:57|2019-12-08 07:54:33|           1056|
|2019-10-25 07:04:57|2019-12-08 07:21:11|           1056|
|2019-10-01 06:04:13|2019-11-03 06:32:29|            792|
|2019-10-01 07:21:19|2019-11-03 07:28:09|            792|
|2019-10-01 06:17:21|2019-11-03 06:38:43|            792|
|2019-10-01 06:52:48|2019-11-03 07:07:41|            792|
|2019-10-01 06:05:29|2019-11-03 06:25:53|            792|
|2019-10-01 06:13:24|2019-11-03 06:23:30|            792|
|2019-10-01 02:30:01|2019-11-03 03:02:02|            792|
|2019-10-01 06:29:06|2019-11-03 06:41:57|            792|
|2019-10-01 04:29:49|2019-11-03 04:56:10|            792|
|2019-10-01 06:54:57|2019-11-03 07:22:01|            792|
|2019-10-01 05:11:04|2019-11-03 05:13:25|            792|
|2019-10-01 05

                                                                                

In [13]:
df_zones = spark.read.option("header","true").csv("taxi+_zone_lookup.csv")

In [18]:
df_zones

DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]

In [17]:
df_zones.registerTempTable("zones")



In [21]:
spark.sql("""
    SELECT 
        zone,
        COUNT(1)
    FROM 
        zones z
    JOIN fhv f
        ON z.LocationID = f.PUlocationID
    GROUP BY 1
    ORDER BY 2

""").show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+--------+
|                zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows



                                                                                

In [23]:
df_joins = df.join(df_zones,df_zones.LocationID == df.PUlocationID,how='inner')

In [50]:
df_joins.select('zone','PUlocationID')\
        .groupBy('zone','PUlocationID').count()\
        .sort('count').show()

[Stage 64:>                                                         (0 + 1) / 1]

+--------------------+------------+-----+
|                zone|PUlocationID|count|
+--------------------+------------+-----+
|         Jamaica Bay|           2|    1|
|Governor's Island...|         105|    2|
| Green-Wood Cemetery|         111|    5|
|       Broad Channel|          30|    8|
|     Highbridge Park|         120|   14|
|        Battery Park|          12|   15|
|Saint Michaels Ce...|         207|   23|
|Breezy Point/Fort...|          27|   25|
|Marine Park/Floyd...|         154|   26|
|        Astoria Park|           8|   29|
|    Inwood Hill Park|         128|   39|
|       Willets Point|         253|   47|
|Forest Park/Highl...|          96|   53|
|  Brooklyn Navy Yard|          34|   57|
|        Crotona Park|          59|   62|
|        Country Club|          58|   77|
|     Freshkills Park|          99|   89|
|       Prospect Park|         190|   98|
|     Columbia Street|          54|  105|
|  South Williamsburg|         217|  110|
+--------------------+------------

                                                                                

In [19]:
df.repartition(6).write.parquet('result/fhv/2010')

                                                                                