In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/02/28 20:10:33 WARN Utils: Your hostname, DESKTOP-6UVSTNS resolves to a loopback address: 127.0.1.1; using 172.26.255.122 instead (on interface eth0)
24/02/28 20:10:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/28 20:10:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/28 20:10:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [8]:
!wc -l fhvhv_tripdata_2021-01.csv

11908469 fhvhv_tripdata_2021-01.csv


In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [12]:
df.head()

Row(dispatching_base_num='B00009', pickup_datetime='2019-10-01 00:23:00', dropOff_datetime='2019-10-01 00:35:00', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00009')

In [8]:
from pyspark.sql import types

In [14]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID',types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [16]:
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [19]:
df \
    .repartition(6) \
    .write.parquet('fhv_tripdata/2019/10/')

                                                                                

In [22]:
df.registerTempTable('tripdata')



In [35]:
spark.sql("""
Select count(1) from tripdata where pickup_datetime >='2019-10-15' and pickup_datetime < '2019-10-16'
""").show()

[Stage 26:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

In [53]:
spark.sql("""
select cast((dropoff_datetime-pickup_datetime) as long)/3600 hours_taken from tripdata order by 1 desc limit 1
""").show()

[Stage 43:>                                                         (0 + 1) / 1]

+-----------+
|hours_taken|
+-----------+
|   631152.5|
+-----------+



                                                                                

In [56]:
zone_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)
])

In [57]:
zone_df = spark.read \
    .option("header", "true") \
    .schema(zone_schema) \
    .csv('taxi_zone_lookup.csv')

In [58]:
zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [59]:
zone_df.createOrReplaceTempView('zone')

In [64]:
spark.sql("""
select PUlocationID, Zone, count(1) as no_row from tripdata a join zone b on a.PUlocationID=b.LocationID
group by PUlocationID, Zone
order by count(1) 
""").show()

[Stage 49:>                                                         (0 + 1) / 1]

+------------+--------------------+------+
|PUlocationID|                Zone|no_row|
+------------+--------------------+------+
|           2|         Jamaica Bay|     1|
|         105|Governor's Island...|     2|
|         111| Green-Wood Cemetery|     5|
|          30|       Broad Channel|     8|
|         120|     Highbridge Park|    14|
|          12|        Battery Park|    15|
|         207|Saint Michaels Ce...|    23|
|          27|Breezy Point/Fort...|    25|
|         154|Marine Park/Floyd...|    26|
|           8|        Astoria Park|    29|
|         128|    Inwood Hill Park|    39|
|         253|       Willets Point|    47|
|          96|Forest Park/Highl...|    53|
|          34|  Brooklyn Navy Yard|    57|
|          59|        Crotona Park|    62|
|          58|        Country Club|    77|
|          99|     Freshkills Park|    89|
|         190|       Prospect Park|    98|
|          54|     Columbia Street|   105|
|         217|  South Williamsburg|   110|
+----------

                                                                                