In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/10 22:39:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
pyspark.__version__

'3.3.2'

In [94]:
spark.version

'3.3.2'

In [13]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

                                                                                

In [15]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [19]:
from pyspark.sql import types

In [20]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [21]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [23]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [24]:
df \
    .repartition(12) \
    .write.parquet("data/pq/fhvhv/2021/06/")

                                                                                

In [26]:
from pyspark.sql import functions as F

In [27]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

452470

In [29]:
df.registerTempTable('fhvhv_2021_06')



In [31]:
spark.sql("""
SELECT
    COUNT(*)
FROM
    fhvhv_2021_06
WHERE
    to_date(pickup_datetime) = '2021-06-15';
""").show()



+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

In [52]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('duration_h', 
                F.round((df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long")) / 3600, 2)
        ) \
    .groupBy('pickup_date') \
    .max('duration_h') \
    .orderBy(F.col('max(duration_h)').desc()) \
    .limit(5) \
    .show()



+-----------+---------------+
|pickup_date|max(duration_h)|
+-----------+---------------+
| 2021-06-25|          66.88|
| 2021-06-22|          25.55|
| 2021-06-27|          19.98|
| 2021-06-26|           18.2|
| 2021-06-23|          16.47|
+-----------+---------------+



                                                                                

In [60]:
spark.sql("""
SELECT
    to_date(pickup_datetime),
    MAX(ROUND((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600, 2)) AS max_duration_h
FROM
    fhvhv_2021_06
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT
    10
;
""").show()



+------------------------+--------------+
|to_date(pickup_datetime)|max_duration_h|
+------------------------+--------------+
|              2021-06-25|         66.88|
|              2021-06-22|         25.55|
|              2021-06-27|         19.98|
|              2021-06-26|          18.2|
|              2021-06-23|         16.47|
|              2021-06-24|         13.91|
|              2021-06-04|         11.67|
|              2021-06-20|         10.98|
|              2021-06-01|         10.27|
|              2021-06-28|          9.97|
+------------------------+--------------+



                                                                                

In [61]:
df_zones = spark.read.parquet('zones/')

In [72]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [83]:
df \
    .join(df_zones, df.PULocationID == df_zones.LocationID) \
    .groupBy(df_zones.LocationID, df_zones.Zone) \
    .count() \
    .orderBy(F.col('count').desc()) \
    .limit(5) \
    .show()

+----------+-------------------+------+
|LocationID|               Zone| count|
+----------+-------------------+------+
|        61|Crown Heights North|231279|
|        79|       East Village|221244|
|       132|        JFK Airport|188867|
|        37|     Bushwick South|187929|
|        76|      East New York|186780|
+----------+-------------------+------+



In [85]:
df_zones.registerTempTable('zones')



In [93]:
spark.sql("""
SELECT
    zones.Zone,
    COUNT(*) AS cnt
FROM
    fhvhv_2021_06
INNER JOIN zones
        ON fhvhv_2021_06.PULocationID = zones.LocationID
GROUP BY
    fhvhv_2021_06.PULocationID, zones.Zone
ORDER BY
    cnt DESC
LIMIT
    10
;
""").show()

+--------------------+------+
|                Zone|   cnt|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
+--------------------+------+

