In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/31 23:06:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/31 23:06:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df_fhv = spark.read.parquet('data/pq/fhv/')

                                                                                

In [7]:
df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   null|                  null|
|              B02429|2019-10-21 04:15:47|2019-10-21 04:36:04|         264|         264|   null|                B02429|
|              B01482|2019-10-19 12:00:00|2019-10-19 12:20:00|         264|         264|   null|                B01482|
|              B03015|2019-10-11 14:28:00|2019-10-11 14:32:44|         264|         216|   null|                B03015|
|              B01529|2019-10-21 18:00:26|2019-10-21 18:07:21|         264|          80|   null|                B01529|
+--------------------+------------------

In [8]:
df_fhv.registerTempTable('fhv')



In [12]:
df_fhv_count = spark.sql("""
    SELECT COUNT(1) FROM fhv WHERE DATE(pickup_datetime) = '2019-10-15'
""")

In [13]:
df_fhv_count.show()



+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

In [22]:
df_fhv_length = spark.sql("""
    SELECT (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/3600 AS length FROM fhv ORDER BY 1 DESC LIMIT 5
""")

In [23]:
df_fhv_length.show()



+-----------------+
|           length|
+-----------------+
|         631152.5|
|         631152.5|
|87672.44083333333|
|70128.02805555555|
|           8794.0|
+-----------------+



                                                                                

In [24]:
df_zones = spark.read \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .csv('taxi_zone_lookup.csv')

In [25]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [39]:
df_zones.registerTempTable('zones')



In [27]:
df_join = df_fhv.join(df_zones, df_fhv.PUlocationID == df_zones.LocationID)

In [37]:
df_join \
    .show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+--------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|LocationID| Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+--------+--------------------+------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   null|                  null|        89|Brooklyn|Flatbush/Ditmas Park|   Boro Zone|
|              B02429|2019-10-21 04:15:47|2019-10-21 04:36:04|         264|         264|   null|                B02429|       264| Unknown|                 N/A|         N/A|
|              B01482|2019-10-19 12:00:00|2019-10-19 12:20:00|         264|         264|   null|                B01482|       264|

In [51]:
df_join \
    .select('Zone') \
    .groupBy('Zone') \
    .count() \
    .orderBy('count') \
    .show(5)

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
+--------------------+-----+
only showing top 5 rows



In [49]:
df_join_new = spark.sql("""
    SELECT Zone, COUNT(1)
    FROM fhv JOIN zones ON fhv.PUlocationID = zones.LocationID 
    GROUP BY 1
    ORDER BY 2
    LIMIT 5
""")

In [50]:
df_join_new.show()

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+

