In [33]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

In [2]:
schema = types.StructType(
    [
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True),
    ]
)

In [67]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .schema(schema=schema) \
    .csv('fhv_tripdata_2021-06.csv')

In [5]:
df = df.repartition(12)
df.write.parquet('fhv/')

                                                                                

In [6]:
df.filter(F.to_date(df.pickup_datetime) == '2021-06-15').count()

                                                                                

452470

In [22]:
def to_hours(delta):
    return delta.total.hours

In [43]:
df \
    .withColumn("diff_time", F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) \
    .withColumn('diff_in_hour', F.col('diff_time')/3600) \
    .orderBy("diff_in_hour", ascending=False) \
    .limit(1) \
    .select('diff_in_hour') \
    .show()
    



+----------------+
|    diff_in_hour|
+----------------+
|66.8788888888889|
+----------------+



                                                                                

In [54]:
df_zones =  spark.read \
    .option("header", "true") \
    .schema(
        types.StructType(
            [
                types.StructField('LocationID', types.IntegerType(), True),
                types.StructField('Borough', types.StringType(), True),
                types.StructField('Zone', types.StringType(), True),
                types.StructField('service_zone', types.StringType(), True),
            ]
        )
    ) \
    .csv('taxi_zone_lookup.csv')

In [68]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [66]:
df.join(df_zones, df.PULocationID == df_zones.LocationID, how='left') \
    .groupBy(df_zones.Zone) \
    .count().orderBy("count", ascending=False).limit(1).show()

                                                                                

+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+

