In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pyspark.sql.functions as F

## Q1 Answer '3.3.2'

In [2]:
print(pyspark.__version__)

3.3.2


In [3]:
pyspark.__file__

'/home/andrew/spark/spark-3.3.2-bin-hadoop3/python/pyspark/__init__.py'

In [4]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv')

df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/23 02:14:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
+--------------------+------------------

In [6]:
df \
    .repartition(6) \
    .write.parquet('fhv')

                                                                                

### Q2 Answer 6M

In [7]:
!ls fhv -lh

total 37M
-rw-r--r-- 1 andrew andrew    0 Feb 23 02:15 _SUCCESS
-rw-r--r-- 1 andrew andrew 6.2M Feb 23 02:15 part-00000-5917e3ce-245b-4bf4-a3e1-5687a3c628f8-c000.snappy.parquet
-rw-r--r-- 1 andrew andrew 6.2M Feb 23 02:15 part-00001-5917e3ce-245b-4bf4-a3e1-5687a3c628f8-c000.snappy.parquet
-rw-r--r-- 1 andrew andrew 6.2M Feb 23 02:15 part-00002-5917e3ce-245b-4bf4-a3e1-5687a3c628f8-c000.snappy.parquet
-rw-r--r-- 1 andrew andrew 6.2M Feb 23 02:15 part-00003-5917e3ce-245b-4bf4-a3e1-5687a3c628f8-c000.snappy.parquet
-rw-r--r-- 1 andrew andrew 6.2M Feb 23 02:15 part-00004-5917e3ce-245b-4bf4-a3e1-5687a3c628f8-c000.snappy.parquet
-rw-r--r-- 1 andrew andrew 6.2M Feb 23 02:15 part-00005-5917e3ce-245b-4bf4-a3e1-5687a3c628f8-c000.snappy.parquet


### Q2 Answer 62610

In [8]:
# Filter for pickup_datetime on 15th October 2019
filtered_df = df.filter(F.col("pickup_datetime").cast("date") == "2019-10-15")

# Count the number of rows
row_count = filtered_df.count()

print(row_count)

[Stage 4:>                                                          (0 + 4) / 4]

62610




### Q3 Answer 631,152.5

In [9]:
# Calculate trip time in hours and name it as 'trip_time_hours'
df_with_trip_time = df.withColumn("trip_time_seconds", 
                                  (F.unix_timestamp(F.col("dropoff_datetime")) - F.unix_timestamp(F.col("pickup_datetime")))) \
                      .withColumn("trip_time_hours", F.col("trip_time_seconds") / 3600)

# Get the maximum trip time
max_trip_time = df_with_trip_time.agg(F.max(F.col("trip_time_hours"))).collect()[0][0]

print("Max trip time in hours :", max_trip_time)



Max trip time in hours : 631152.5


                                                                                

In [10]:
#sanity check
df_with_trip_time.orderBy(F.col("trip_time_hours").desc()).show(3)



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------+-----------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|trip_time_seconds|  trip_time_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------+-----------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   null|                B02832|       2272149000|         631152.5|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832|       2272149000|         631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        null|        null|   null|                B02416|        315620787|87672.44083333333|
+--------------------+-------------------+-------------------+--

                                                                                

### HW 5 A : port 4040

![alt text](https://i.imgur.com/LFroEn2.png "Title")


### HW 6 A : Jamaica Bay

In [11]:
!ls

HW5.ipynb  fhv_tripdata_2019-10.csv  taxi+_zone_lookup.csv  zones
fhv	   spark-warehouse	     taxi_zone_lookup.csv


In [12]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [13]:
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [14]:
df_zones.createOrReplaceTempView('zones_table')

In [15]:
df.createOrReplaceTempView('fhv')

In [16]:
spark.sql("""
SELECT z.Zone, COUNT(*) AS pickup_count
FROM fhv f
JOIN zones_table z ON f.PULocationID = z.LocationID
GROUP BY z.Zone
ORDER BY pickup_count ASC
LIMIT 5;
""").show()



+--------------------+------------+
|                Zone|pickup_count|
+--------------------+------------+
|         Jamaica Bay|           1|
|Governor's Island...|           2|
| Green-Wood Cemetery|           5|
|       Broad Channel|           8|
|     Highbridge Park|          14|
+--------------------+------------+



                                                                                