In [1]:
import pyspark
from pyspark.sql import SparkSession
pyspark.__version__

'3.3.2'

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/05 06:41:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:

df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

In [4]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [6]:
from pyspark.sql import types

In [7]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)])

In [8]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [9]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [10]:
df.registerTempTable('trips')



In [19]:
spark.sql("""
SELECT date_trunc('day', pickup_datetime) AS day, count(1)
FROM trips
Where date_trunc('day', pickup_datetime) = '2021-06-15 00:00:00'
GROUP BY 1;
""").show()



+-------------------+--------+
|                day|count(1)|
+-------------------+--------+
|2021-06-15 00:00:00|  452470|
+-------------------+--------+



                                                                                

In [35]:
spark.sql("""
SELECT  pickup_datetime, dropoff_datetime, DATEDIFF(hour, pickup_datetime, dropoff_datetime) AS trip_time
FROM trips
ORDER BY trip_time DESC;
""").show()



+-------------------+-------------------+---------+
|    pickup_datetime|   dropoff_datetime|trip_time|
+-------------------+-------------------+---------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|       66|
|2021-06-22 12:09:45|2021-06-23 13:42:44|       25|
|2021-06-27 10:32:29|2021-06-28 06:31:20|       19|
|2021-06-26 22:37:11|2021-06-27 16:49:01|       18|
|2021-06-23 20:40:43|2021-06-24 13:08:44|       16|
|2021-06-23 22:03:31|2021-06-24 12:19:39|       14|
|2021-06-24 23:11:00|2021-06-25 13:05:35|       13|
|2021-06-04 20:56:02|2021-06-05 08:36:14|       11|
|2021-06-27 07:45:19|2021-06-27 19:07:16|       11|
|2021-06-01 12:25:29|2021-06-01 22:41:32|       10|
|2021-06-20 17:05:12|2021-06-21 04:04:16|       10|
|2021-06-08 16:38:14|2021-06-09 02:07:03|        9|
|2021-06-03 11:10:01|2021-06-03 20:31:58|        9|
|2021-06-19 00:12:34|2021-06-19 09:18:59|        9|
|2021-06-11 23:26:20|2021-06-12 08:54:38|        9|
|2021-06-04 10:09:52|2021-06-04 19:24:40|        9|
|2021-06-30 

                                                                                

In [36]:
df_zones = spark.read.parquet('zones/')

In [39]:
df_zones.registerTempTable('zones')

In [47]:
spark.sql("""
SELECT t.PULocationID, z.Zone, COUNT(1) AS trips_count
FROM trips t
LEFT JOIN zones z
ON t.PULocationID = z.LocationID
GROUP BY 1,2
ORDER BY 3 DESC;
""").show()



+------------+--------------------+-----------+
|PULocationID|                Zone|trips_count|
+------------+--------------------+-----------+
|          61| Crown Heights North|     231279|
|          79|        East Village|     221244|
|         132|         JFK Airport|     188867|
|          37|      Bushwick South|     187929|
|          76|       East New York|     186780|
|         231|TriBeCa/Civic Center|     164344|
|         138|   LaGuardia Airport|     161596|
|         234|            Union Sq|     158937|
|         249|        West Village|     154698|
|           7|             Astoria|     152493|
|         148|     Lower East Side|     151020|
|          68|        East Chelsea|     147673|
|          42|Central Harlem North|     146402|
|         255|Williamsburg (Nor...|     143683|
|         181|          Park Slope|     143594|
|         225|  Stuyvesant Heights|     141427|
|          48|        Clinton East|     139611|
|         246|West Chelsea/Huds...|     

                                                                                