In [None]:
import pyspark

from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

df = spark.read.option("header", "true").csv(
    "/sparkdata/data/fhv_tripdata_2019-10.csv.gz", inferSchema=True
)

In [None]:
df.count()

In [None]:
df.repartition(6).write.parquet("/sparkdata/data/parquet")

In [None]:
from pyspark.sql import functions as F

df.withColumn("pickup_date", F.to_date(df.pickup_datetime)).filter(
    "pickup_date = '2019-10-15'"
).count()

In [None]:
df.registerTempTable("fhv_2019_10")

In [None]:
spark.sql(
    """
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropOff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration
FROM 
    fhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
"""
).show()

In [46]:
zones = spark.read.option("header", "true").csv("/sparkdata/data/taxi_zone_lookup.csv")
zones.registerTempTable("zones")



In [49]:
zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [55]:
spark.sql(
    """
SELECT
    pul.zone,
    COUNT(1)
FROM 
    fhv_2019_10 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
"""
).show()

+--------------------+--------+
|                zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+

