In [11]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import types

In [2]:
spark=SparkSession.builder \
      .master("local[*]") \
      .appName('test') \
      .getOrCreate()

## Question 1

In [46]:
spark.version

'3.3.2'

In [5]:
df = spark.read \
    .option("header","true") \
    .csv("fhvhv_tripdata_2021-06.csv")

In [6]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [10]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [90]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

## Question 2

In [91]:
df=spark.read \
  .option("header","true") \
  .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S") \
  .schema(schema) \
  .csv('fhvhv_tripdata_2021-06.csv')

In [92]:
df = df.repartition(12)

In [93]:
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
df.write.parquet('fhvhv/2021/06/') 

In [94]:
!ls -lh ./fhvhv/2021/06/

total 245M
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121   0 Mar  3 18:23 _SUCCESS
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00000-b2911a93-d9d2-4547-9efc-b9ea53c03517-c000.snappy.parquet
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00001-b2911a93-d9d2-4547-9efc-b9ea53c03517-c000.snappy.parquet
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00002-b2911a93-d9d2-4547-9efc-b9ea53c03517-c000.snappy.parquet
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00003-b2911a93-d9d2-4547-9efc-b9ea53c03517-c000.snappy.parquet
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00004-b2911a93-d9d2-4547-9efc-b9ea53c03517-c000.snappy.parquet
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00005-b2911a93-d9d2-4547-9efc-b9ea53c03517-c000.snappy.parquet
-rw-r--r-- 1 TENX-ISB-FT0456+Administrator 197121 21M Mar  3 18:23 part-00006-b2911a93-d9d2-4547-9efc-b

In [95]:
df=spark.read.parquet('fhvhv/2021/06/*')

In [96]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02510|2021-06-04 00:36:29|2021-06-04 00:49:07|         148|          25|      N|                  null|
|              B02884|2021-06-02 14:15:33|2021-06-02 14:25:34|         212|         250|      N|                B02884|
|              B02879|2021-06-02 08:31:42|2021-06-02 08:37:33|         125|         114|      N|                B02879|
|              B02682|2021-06-01 13:04:35|2021-06-01 13:10:29|          82|          82|      N|                B02682|
|              B02875|2021-06-04 11:46:23|2021-06-04 11:52:27|          41|          41|      N|                B02875|
|              B02765|2021-06-02 19:34:5

In [97]:
df_3=df \
    .withColumn('pickup_datetime', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_datetime', F.to_date(df.dropoff_datetime)) \
    .withColumnRenamed('pickup_datetime','pickup_date') \
    .withColumnRenamed('dropoff_datetime','dropoff_date')

In [98]:
df_3.show()

+--------------------+-----------+------------+------------+------------+-------+----------------------+
|dispatching_base_num|pickup_date|dropoff_date|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-----------+------------+------------+------------+-------+----------------------+
|              B02510| 2021-06-04|  2021-06-04|         148|          25|      N|                  null|
|              B02884| 2021-06-02|  2021-06-02|         212|         250|      N|                B02884|
|              B02879| 2021-06-02|  2021-06-02|         125|         114|      N|                B02879|
|              B02682| 2021-06-01|  2021-06-01|          82|          82|      N|                B02682|
|              B02875| 2021-06-04|  2021-06-04|          41|          41|      N|                B02875|
|              B02765| 2021-06-02|  2021-06-02|         261|          45|      N|                B02765|
|              B02764| 2021-06-01|  2021-06-01|        

## Question 3

In [99]:
df_3.where(df_3.pickup_date == '2021-06-15').groupBy('pickup_date').count().show()


+-----------+------+
|pickup_date| count|
+-----------+------+
| 2021-06-15|452470|
+-----------+------+



## Question 4

In [133]:
df_4 = df.select(
    F.col("dropoff_datetime"),F.col("pickup_datetime")) \
    .withColumn("longest_trip_hrs",(F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long"))/3600)\
    .withColumn("longest_trip_hrs",F.round("longest_trip_hrs",3)) \
    .orderBy("longest_trip_hrs", ascending=False) \
    .limit(1) \
    .show()

+-------------------+-------------------+----------------+
|   dropoff_datetime|    pickup_datetime|longest_trip_hrs|
+-------------------+-------------------+----------------+
|2021-06-28 08:48:25|2021-06-25 13:55:41|          66.879|
+-------------------+-------------------+----------------+



## Question 6

In [141]:
df_zones = spark.read.parquet('zones/')

In [144]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [150]:
df_6 = df.join(df_zones, df_zones.LocationID == df.PULocationID)

In [151]:
df_6.groupBy("Zone").count().orderBy("count", ascending=False).limit(1).show()

+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+

