In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('hw') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/06 20:54:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read\
    .option("header", "true")\
    .csv('fhv_tripdata_2019-10.csv.gz')

In [5]:
df.dtypes

[('dispatching_base_num', 'string'),
 ('pickup_datetime', 'string'),
 ('dropOff_datetime', 'string'),
 ('PUlocationID', 'string'),
 ('DOlocationID', 'string'),
 ('SR_Flag', 'string'),
 ('Affiliated_base_number', 'string')]

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [8]:
from pyspark.sql import types

In [10]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)])

In [11]:
df = spark.read\
    .option("header", "true")\
    .schema(schema)\
    .csv('fhv_tripdata_2019-10.csv.gz')

In [12]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [14]:
df.head(10)

[Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00009'),
 Row(dispatching_base_num='B00013', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 13, 22), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00013'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 11, 43), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 37, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 56, 29), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 57, 47), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B00014'),
 Row(dispatching_base_num='B00014', pickup_datetime=

In [17]:
df = df.repartition(6)

In [58]:
df.write.parquet('fhv2019_repartitioned',mode="overwrite")

AttributeError: 'NoneType' object has no attribute 'write'

In [5]:
df = spark.read.parquet('fhv2019_repartitioned')

                                                                                

In [6]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [8]:
from pyspark.sql import functions as F

In [106]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime))\
    .withColumn('dropoff_date', F.to_date(df.dropOff_datetime))\
    .select('pickup_date', 'dropoff_date', 'PUlocationID', 'DOlocationID')\
    .filter("pickup_date='2019-10-15'")\
    .count()
                

62610

In [103]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [100]:
df.head(10)

[Row(dispatching_base_num='B02784', pickup_datetime=datetime.datetime(2019, 10, 1, 9, 55, 38), dropOff_datetime=datetime.datetime(2019, 10, 1, 10, 5, 43), PUlocationID=89, DOlocationID=85, SR_Flag=None, Affiliated_base_number=None),
 Row(dispatching_base_num='B02429', pickup_datetime=datetime.datetime(2019, 10, 21, 4, 15, 47), dropOff_datetime=datetime.datetime(2019, 10, 21, 4, 36, 4), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B02429'),
 Row(dispatching_base_num='B01482', pickup_datetime=datetime.datetime(2019, 10, 19, 12, 0), dropOff_datetime=datetime.datetime(2019, 10, 19, 12, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B01482'),
 Row(dispatching_base_num='B03015', pickup_datetime=datetime.datetime(2019, 10, 11, 14, 28), dropOff_datetime=datetime.datetime(2019, 10, 11, 14, 32, 44), PUlocationID=264, DOlocationID=216, SR_Flag=None, Affiliated_base_number='B03015'),
 Row(dispatching_base_num='B01529', pickup_datetime=da

In [98]:
df.select('pickup_datetime', 'dropOff_datetime', 'PUlocationID', 'DOlocationID')\
    .filter(df.pickup_datetime>='2019-10-15 00:00:00')\
    .count()

1054019

In [64]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   NULL|                  NULL|
|              B02429|2019-10-21 04:15:47|2019-10-21 04:36:04|         264|         264|   NULL|                B02429|
|              B01482|2019-10-19 12:00:00|2019-10-19 12:20:00|         264|         264|   NULL|                B01482|
|              B03015|2019-10-11 14:28:00|2019-10-11 14:32:44|         264|         216|   NULL|                B03015|
|              B01529|2019-10-21 18:00:26|2019-10-21 18:07:21|         264|          80|   NULL|                B01529|
|              B00477|2019-10-03 19:30:3

In [121]:
df \
    .withColumn('duration in hours', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long'))/3600) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration in hours') \
    .orderBy('max(duration in hours)', ascending=False) \
    .limit(5) \
    .show()

+-----------+----------------------+
|pickup_date|max(duration in hours)|
+-----------+----------------------+
| 2019-10-28|              631152.5|
| 2019-10-11|              631152.5|
| 2019-10-31|     87672.44083333333|
| 2019-10-01|     70128.02805555555|
| 2019-10-17|                8794.0|
+-----------+----------------------+



In [9]:
# Cast datetime columns and calculate duration in hours
df = df.withColumn('pickup_datetime', F.col('pickup_datetime').cast('timestamp')) \
       .withColumn('dropOff_datetime', F.col('dropOff_datetime').cast('timestamp')) \
       .withColumn('duration_in_hours', (F.col('dropOff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 3600)

# Extract pickup date
df = df.withColumn('pickup_date', F.to_date('pickup_datetime'))

# Group by pickup date, calculate max duration, order by max duration descending, and limit to 5
result_df = df.groupBy('pickup_date') \
              .agg(F.max('duration_in_hours').alias('max_duration_in_hours')) \
              .orderBy(F.desc('max_duration_in_hours')) \
              .limit(5)

# Show the result
result_df.show()


                                                                                

+-----------+---------------------+
|pickup_date|max_duration_in_hours|
+-----------+---------------------+
| 2019-10-28|             631152.5|
| 2019-10-11|             631152.5|
| 2019-10-31|    87672.44083333333|
| 2019-10-01|    70128.02805555555|
| 2019-10-17|               8794.0|
+-----------+---------------------+



In [10]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+--------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|   duration_in_hours|pickup_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+--------------------+-----------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   NULL|                  NULL| 0.16805555555555557| 2019-10-01|
|              B02429|2019-10-21 04:15:47|2019-10-21 04:36:04|         264|         264|   NULL|                B02429| 0.33805555555555555| 2019-10-21|
|              B01482|2019-10-19 12:00:00|2019-10-19 12:20:00|         264|         264|   NULL|                B01482|  0.3333333333333333| 2019-10-19|
|              B03015|2019-10-11 14:28:00|2019-10-11 14:32:44|         264|       

In [11]:
df_zones = spark.read.parquet('zones')

In [12]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [18]:
df_zones.createOrReplaceTempView('zones')

In [20]:
df.createOrReplaceTempView('fhv2019_repartitioned')

In [21]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number',
 'duration_in_hours',
 'pickup_date']

In [24]:
spark.sql("""
SELECT
    pul.Zone,
    COUNT(1)
FROM 
    fhv2019_repartitioned fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()



+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+



                                                                                