In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [13]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.IntegerType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.IntegerType(), True), 
    types.StructField('Affiliated_base_number', types.IntegerType(), True)])

In [14]:
input_path = f'data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz'
output_path = f'data/pq/fhv/2019/10/'

df_fhv = spark.read \
        .option("header", "true") \
        .schema(fhv_schema) \
        .csv(input_path)

In [5]:
df_fhv \
    .repartition(6) \
    .write.mode("overwrite").parquet(output_path)

                                                                                

In [6]:
df_fhv.schema

StructType([StructField('dispatching_base_num', IntegerType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', IntegerType(), True), StructField('Affiliated_base_number', IntegerType(), True)])

In [7]:
df_fhv.registerTempTable("trips_data")



In [8]:
spark.sql("""
SELECT 
    COUNT(1)
FROM
    trips_data
WHERE pickup_datetime LIKE '2019-10-15%'
""").show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

In [9]:
from pyspark.sql import functions as F

df_fhv \
    .withColumn("unix_timestamp_pickup_datetime", F.unix_timestamp("pickup_datetime")) \
    .withColumn("unix_timestamp_dropOff_datetime", F.unix_timestamp("dropOff_datetime")) \
    .withColumn("seconds_between", (F.unix_timestamp("dropOff_datetime") - F.unix_timestamp("pickup_datetime"))/3600) \
    .registerTempTable("trips_data")

spark.sql("""
SELECT 
    MAX(seconds_between)
FROM
    trips_data
""").show()

[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+
|max(seconds_between)|
+--------------------+
|            631152.5|
+--------------------+



                                                                                

In [15]:
input_path = f'data/raw/taxi_zone_lookup.csv'

df_zones = spark.read \
        .option("header", "true") \
        .csv(input_path)

In [19]:
df_fhv.registerTempTable("trips_data")

df_zones.registerTempTable("zones_data")

In [17]:
spark.sql("""
SELECT *
FROM trips_data
""").show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|                null|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                  null|
|                null|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                  null|
|                null|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                  null|
|                null|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                  null|
|                null|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                  null|
|                null|2019-10-01 00:00:4

In [20]:
spark.sql("""
SELECT *
FROM zones_data
""").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [23]:
spark.sql("""
SELECT
    zd.Zone
    , COUNT(1) AS trips_count
FROM trips_data td 
INNER JOIN zones_data zd
ON td.PUlocationID = zd.LocationID
GROUP BY 1
ORDER BY 2
""").show()

[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+-----------+
|                Zone|trips_count|
+--------------------+-----------+
|         Jamaica Bay|          1|
|Governor's Island...|          2|
| Green-Wood Cemetery|          5|
|       Broad Channel|          8|
|     Highbridge Park|         14|
|        Battery Park|         15|
|Saint Michaels Ce...|         23|
|Breezy Point/Fort...|         25|
|Marine Park/Floyd...|         26|
|        Astoria Park|         29|
|    Inwood Hill Park|         39|
|       Willets Point|         47|
|Forest Park/Highl...|         53|
|  Brooklyn Navy Yard|         57|
|        Crotona Park|         62|
|        Country Club|         77|
|     Freshkills Park|         89|
|       Prospect Park|         98|
|     Columbia Street|        105|
|  South Williamsburg|        110|
+--------------------+-----------+
only showing top 20 rows



                                                                                