### Q1

In [55]:
spark.version

'3.3.2'

### Q2

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
!wc -l data/fhvhv_tripdata_2021-06.csv.gz

651315 data/fhvhv_tripdata_2021-06.csv.gz


In [6]:
!gzip -d data/fhvhv_tripdata_2021-06.csv.gz

In [8]:
df = spark.read \
    .option("header", "true") \
    .csv('data/fhvhv_tripdata_2021-06.csv')

In [9]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [10]:
!head -n 51 data/fhvhv_tripdata_2021-06.csv > head.csv

In [11]:
!wc -l head.csv

51 head.csv


In [12]:
import pandas as pd
df_pandas = pd.read_csv('head.csv')

In [13]:
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropoff_datetime          object
PULocationID               int64
DOLocationID               int64
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [16]:
spark.createDataFrame(df_pandas).schema

TypeError: field Affiliated_base_number: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [17]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [18]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhvhv_tripdata_2021-06.csv')

In [20]:
df = df.repartition(12)

In [21]:
df.write.parquet('fhvhv/2021/06/')

In [43]:
df = spark.read.parquet('fhvhv/2021/06/')

In [44]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [45]:
from pyspark.sql import functions as F

### Q3

In [46]:
dates = ("2021-06-15",  "2021-06-16")
df.where(F.col('pickup_datetime').between(*dates)).count()

452474

In [54]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter(F.col('pickup_date') == "2021-06-15") \
    .count()

452470

### Q4

In [57]:
df \
    .withColumn('DiffInSeconds',F.unix_timestamp("dropoff_datetime") - F.unix_timestamp('pickup_datetime')) \
    .withColumn('DiffInHours', F.round(F.col('DiffInSeconds')/3600, 2)) \
    .select('DiffInHours') \
    .orderBy(F.col("DiffInHours").desc()) \
    .show(truncate=False)

+-----------+
|DiffInHours|
+-----------+
|66.88      |
|25.55      |
|19.98      |
|18.2       |
|16.47      |
|14.27      |
|13.91      |
|11.67      |
|11.37      |
|10.98      |
|10.27      |
|9.97       |
|9.97       |
|9.64       |
|9.62       |
|9.48       |
|9.47       |
|9.4        |
|9.39       |
|9.38       |
+-----------+
only showing top 20 rows



### Q6

In [58]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('data/taxi_zone_lookup.csv')

In [59]:
df_zone.write.parquet('zones')

In [60]:
df_zones = spark.read.parquet('zones/')

In [61]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [62]:
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [66]:
df.join(df_zones, df.PULocationID == df_zones.LocationID) \
    .select('Borough', 'Zone').show()

+---------+--------------------+
|  Borough|                Zone|
+---------+--------------------+
|    Bronx|          Mount Hope|
| Brooklyn|East Flatbush/Far...|
| Brooklyn| Crown Heights North|
|Manhattan|Upper East Side N...|
|   Queens|           Sunnyside|
|Manhattan|            Kips Bay|
|Manhattan|    Hamilton Heights|
|   Queens|             Astoria|
|Manhattan|            Kips Bay|
|    Bronx|             Belmont|
|   Queens|         JFK Airport|
| Brooklyn|East Flatbush/Far...|
|Manhattan|Times Sq/Theatre ...|
|Manhattan|   Battery Park City|
|   Queens|         Kew Gardens|
|   Queens|         JFK Airport|
|   Queens|   LaGuardia Airport|
|    Bronx|Van Nest/Morris Park|
| Brooklyn|          Greenpoint|
|   Queens|     Queensboro Hill|
+---------+--------------------+
only showing top 20 rows



In [67]:
df_results = df.join(df_zones, df.PULocationID == df_zones.LocationID) 

In [68]:
df_results.registerTempTable('fhv_zones')



In [72]:
df_results.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [80]:
spark.sql("""
SELECT 
    dispatching_base_num
FROM
    fhv_zones
WHERE 
    pickup_datetime  BETWEEN '2021-06-01' AND '2021-06-30' 
""")

DataFrame[dispatching_base_num: string]

In [76]:
spark.sql("""
SELECT 
    Borough,
    COUNT(*) AS number_pickups
FROM
    fhv_zones
GROUP BY
    Borough
""")

DataFrame[Borough: string, number_pickups: bigint]

In [74]:
spark.sql("""
SELECT 
    Zone,
    COUNT(1) AS number_pickups
FROM
    fhv_zones
GROUP BY
    Zone
""")

DataFrame[Zone: string, number_pickups: bigint]

In [85]:
df \
    .join(df_zones, df.PULocationID == df_zones.LocationID) \
    .groupBy("Borough").count() \
    .show()

+-------------+-------+
|      Borough|  count|
+-------------+-------+
|       Queens|2878281|
|          EWR|    228|
|      Unknown|   1376|
|     Brooklyn|4208983|
|Staten Island| 211355|
|    Manhattan|5705227|
|        Bronx|1956442|
+-------------+-------+



In [91]:
df_results \
    .groupBy("Zone") \
    .count() \
    .show()

+--------------------+------+
|                Zone| count|
+--------------------+------+
|           Homecrest| 42555|
|              Corona| 48578|
|    Bensonhurst West| 52137|
|         Westerleigh| 10980|
|Charleston/Totten...|  5481|
|          Douglaston|  8849|
|      Newark Airport|   228|
|          Mount Hope| 73909|
|East Concourse/Co...| 74920|
|      Pelham Parkway| 50198|
|         Marble Hill| 14781|
|           Rego Park| 38686|
|Upper East Side S...|124621|
|Heartland Village...| 15737|
|       Dyker Heights| 24657|
|   Kew Gardens Hills| 24636|
|     Jackson Heights|114413|
|             Bayside| 28515|
|TriBeCa/Civic Center|164344|
|      Yorkville West| 80980|
+--------------------+------+
only showing top 20 rows



In [90]:
df_results \
    .groupBy("Zone") \
    .agg(F.count('dispatching_base_num').alias("number_pickup")) \
    .show()

+--------------------+-------------+
|                Zone|number_pickup|
+--------------------+-------------+
|           Homecrest|        42555|
|              Corona|        48578|
|    Bensonhurst West|        52137|
|         Westerleigh|        10980|
|Charleston/Totten...|         5481|
|          Douglaston|         8849|
|      Newark Airport|          228|
|          Mount Hope|        73909|
|East Concourse/Co...|        74920|
|      Pelham Parkway|        50198|
|         Marble Hill|        14781|
|           Rego Park|        38686|
|Upper East Side S...|       124621|
|Heartland Village...|        15737|
|       Dyker Heights|        24657|
|   Kew Gardens Hills|        24636|
|     Jackson Heights|       114413|
|             Bayside|        28515|
|TriBeCa/Civic Center|       164344|
|      Yorkville West|        80980|
+--------------------+-------------+
only showing top 20 rows



#### ANSWER Q6

In [93]:
df_results \
    .groupBy("Zone") \
    .agg(F.count('dispatching_base_num').alias("number_pickup")) \
    .sort(F.desc('number_pickup')) \
    .show()

+--------------------+-------------+
|                Zone|number_pickup|
+--------------------+-------------+
| Crown Heights North|       231279|
|        East Village|       221244|
|         JFK Airport|       188867|
|      Bushwick South|       187929|
|       East New York|       186780|
|TriBeCa/Civic Center|       164344|
|   LaGuardia Airport|       161596|
|            Union Sq|       158937|
|        West Village|       154698|
|             Astoria|       152493|
|     Lower East Side|       151020|
|        East Chelsea|       147673|
|Central Harlem North|       146402|
|Williamsburg (Nor...|       143683|
|          Park Slope|       143594|
|  Stuyvesant Heights|       141427|
|        Clinton East|       139611|
|West Chelsea/Huds...|       139431|
|             Bedford|       138428|
|         Murray Hill|       137879|
+--------------------+-------------+
only showing top 20 rows



In [94]:
df_results \
    .groupBy("Borough") \
    .agg(F.count('dispatching_base_num').alias("number_pickup")) \
    .sort(F.desc('number_pickup')) \
    .show()

+-------------+-------------+
|      Borough|number_pickup|
+-------------+-------------+
|    Manhattan|      5705227|
|     Brooklyn|      4208983|
|       Queens|      2878281|
|        Bronx|      1956442|
|Staten Island|       211355|
|      Unknown|         1376|
|          EWR|          228|
+-------------+-------------+

