In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/03/01 20:12:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.version

'3.0.3'

In [4]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-01 15:05:42--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.77.204
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.77.204|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv.1’


2022-03-01 15:06:10 (26.0 MB/s) - ‘fhvhv_tripdata_2021-02.csv.1’ saved [733822658/733822658]



In [4]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [6]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [7]:
df = df.repartition(24)

In [9]:
df.write.parquet('fhvhv/2021/02/*', mode='overwrite')

                                                                                

### Question 3

In [28]:
df = spark.read.parquet('fhvhv/2021/02/*')

In [8]:
df.registerTempTable('test_table')

In [30]:
df_res = spark.sql("""
SELECT 
    count(1)
FROM
    test_table
WHERE
    DATE(pickup_datetime) = '2021-02-15'
""")

In [32]:
df_res.show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

### Question 4

In [50]:
df. \
    withColumn('date_diff', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')). \
    orderBy(F.col('date_diff').desc()). \
    show()

[Stage 24:>                                                         (0 + 4) / 4]

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+---------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|date_diff|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+---------+
|           HV0005|              B02510|2021-02-11 13:40:44|2021-02-12 10:39:44|         247|          41|   null|    75540|
|           HV0004|              B02800|2021-02-17 15:54:53|2021-02-18 07:48:34|         242|         254|   null|    57221|
|           HV0004|              B02800|2021-02-20 12:08:15|2021-02-21 00:22:14|         188|          55|   null|    44039|
|           HV0003|              B02864|2021-02-03 20:24:25|2021-02-04 07:41:58|          51|         147|   null|    40653|
|           HV0003|              B02887|2021-02-19 23:17:44|2021-02-20 09:44:01|         210|         149|   null|    37577|


                                                                                

### Question 5

In [15]:
df_res = spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(1) AS number_records
FROM
    test_table
GROUP BY
    dispatching_base_num
ORDER BY
    number_records desc
""")

In [17]:
df_res.show()



+--------------------+--------------+
|dispatching_base_num|number_records|
+--------------------+--------------+
|              B02510|       3233664|
|              B02764|        965568|
|              B02872|        882689|
|              B02875|        685390|
|              B02765|        559768|
|              B02869|        429720|
|              B02887|        322331|
|              B02871|        312364|
|              B02864|        311603|
|              B02866|        311089|
|              B02878|        305185|
|              B02682|        303255|
|              B02617|        274510|
|              B02883|        251617|
|              B02884|        244963|
|              B02882|        232173|
|              B02876|        215693|
|              B02879|        210137|
|              B02867|        200530|
|              B02877|        198938|
+--------------------+--------------+
only showing top 20 rows



                                                                                

### Question 6

In [18]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-03-01 20:36:11--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.140.232
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.140.232|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-03-01 20:36:12 (154 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [19]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [32]:
df_join = df.join(df_zones, df.PULocationID == df_zones.LocationID).withColumnRenamed('Zone', 'PUzone') \
            .join(df_zones, df.DOLocationID == df_zones.LocationID).withColumnRenamed('Zone', 'DOzone')

In [33]:
df_join.registerTempTable('test_table_join')

In [34]:
df_res = spark.sql("""
SELECT
    PUzone, DOzone,
    COUNT(1) AS number_records
FROM
    test_table_join
GROUP BY
    PUzone, DOzone
ORDER BY
    number_records desc
""")

In [35]:
df_res.show()

                                                                                

+-------------------+--------------------+--------------+
|             PUzone|              DOzone|number_records|
+-------------------+--------------------+--------------+
|      East New York|Governor's Island...|        135123|
|       Borough Park|Governor's Island...|        111987|
|      East New York|              Corona|         90082|
|           Canarsie|Governor's Island...|         84078|
|Crown Heights North|Governor's Island...|         77928|
|       Borough Park|              Corona|         74658|
|           Canarsie|              Corona|         56052|
|          Bay Ridge|Governor's Island...|         53802|
|Crown Heights North|              Corona|         51952|
|      East New York|     Pelham Bay Park|         45041|
|      East New York|          Mount Hope|         45041|
|      East New York|Forest Park/Highl...|         45041|
|      East New York|     Freshkills Park|         45041|
|      East New York|       East New York|         45041|
|      East Ne