In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd
import sys
import os

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
print(os.environ['JAVA_HOME'])
print(os.environ['SPARK_HOME'])

C:\tools\java
C:\tools\spark\spark-3.3.2-bin-hadoop3


In [4]:
!java -version

java version "11.0.21" 2023-10-17 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.21+9-LTS-193)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.21+9-LTS-193, mixed mode)


In [5]:
!pip show py4j               

Name: py4j
Version: 0.10.9.5
Summary: Enables Python programs to dynamically access arbitrary Java objects
Home-page: https://www.py4j.org/
Author: Barthelemy Dagenais
Author-email: barthelemy@infobart.com
License: BSD License
Location: c:\users\artyom\anaconda3\envs\spark_env\lib\site-packages
Requires: 
Required-by: pyspark


In [6]:
!pip show pyspark

Name: pyspark
Version: 3.3.2
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: c:\users\artyom\anaconda3\envs\spark_env\lib\site-packages
Requires: py4j
Required-by: 


In [7]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [8]:
print(spark.version)

3.3.2


In [9]:
df = spark.read \
    .option("header", "true") \
    .csv('../data/fhv_tripdata_2019-10.csv')

In [10]:
df_pandas = pd.read_csv('../data/fhv_tripdata_2019-10.csv', nrows = 1000)

In [11]:
df_pandas.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [12]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),  
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),  
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('../data/fhv_tripdata_2019-10.csv')

In [14]:
df = df.repartition(6)

In [15]:
df.write.parquet('fhv/2019/10/', mode='overwrite')

In [16]:
from pyspark.sql import functions as F

In [17]:
df \
    .withColumn('pickup_date', F.to_date('pickup_datetime')) \
    .withColumn('dropoff_date', F.to_date('dropoff_datetime')) \
    .show(10)
    

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|              B00271|2019-10-02 00:42:46|2019-10-02 01:25:23|         264|         265|   null|                B00271| 2019-10-02|  2019-10-02|
|              B03107|2019-10-02 17:57:32|2019-10-02 19:22:26|          24|         256|   null|                B03107| 2019-10-02|  2019-10-02|
|              B01087|2019-10-01 09:01:20|2019-10-01 09:19:33|         143|         170|   null|                B01087| 2019-10-01|  2019-10-01|
|              B02550|2019-10-01 11:21:37|2019-10-01 11:30:56|         264|         167|   null|                B02550| 2019-10-01

In [18]:
df \
    .withColumn('pickup_date', F.to_date('pickup_datetime')) \
    .filter(F.col("pickup_date") == "2019-10-15") \
    .count()

62610

----

In [19]:
df_dur = df \
    .withColumn('pickup_date', F.to_date('pickup_datetime')) \
    .withColumn("duration_hours", (F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime")) / 3600) \
    .groupby("pickup_date") 

In [20]:
# Calculate the duration in hours for each trip
df = df.withColumn("duration_hours", 
                   (F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime")) / 3600)

# Find the maximum duration
longest_trip_duration = df.agg(F.max("duration_hours")).first()[0]

In [21]:
print(longest_trip_duration)

631152.5


---

In [22]:
zone_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [23]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(zone_schema) \
    .csv('../data/taxi_zone_lookup.csv')

In [24]:
df_zone.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



In [25]:
df.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|     duration_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B00271|2019-10-02 00:42:46|2019-10-02 01:25:23|         264|         265|   null|                B00271| 0.7102777777777778|
|              B03107|2019-10-02 17:57:32|2019-10-02 19:22:26|          24|         256|   null|                B03107|              1.415|
|              B01087|2019-10-01 09:01:20|2019-10-01 09:19:33|         143|         170|   null|                B01087| 0.3036111111111111|
|              B02550|2019-10-01 11:21:37|2019-10-01 11:30:56|         264|         167|   null|                B02550|0.15527777777777776|
|              B0172

In [26]:
df_zone.registerTempTable("zone_lookup")
df.registerTempTable("trips")



In [27]:
spark.sql("SELECT * FROM trips LIMIT 10").show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|     duration_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B00271|2019-10-02 00:42:46|2019-10-02 01:25:23|         264|         265|   null|                B00271| 0.7102777777777778|
|              B03107|2019-10-02 17:57:32|2019-10-02 19:22:26|          24|         256|   null|                B03107|              1.415|
|              B01087|2019-10-01 09:01:20|2019-10-01 09:19:33|         143|         170|   null|                B01087| 0.3036111111111111|
|              B02550|2019-10-01 11:21:37|2019-10-01 11:30:56|         264|         167|   null|                B02550|0.15527777777777776|
|              B0172

In [34]:
least_frequent_pickup_query = """
SELECT 
    z.Zone, 
    COUNT(*) as trip_count
FROM 
    trips t
JOIN 
    zone_lookup z 
ON 
    t.PULocationID = z.LocationID
GROUP BY 
    z.Zone
ORDER BY 
    trip_count ASC
LIMIT 10
"""

In [35]:
least_frequent_pickup_df = spark.sql(least_frequent_pickup_query)

least_frequent_pickup_df.show()

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|         Jamaica Bay|         1|
|Governor's Island...|         2|
| Green-Wood Cemetery|         5|
|       Broad Channel|         8|
|     Highbridge Park|        14|
|        Battery Park|        15|
|Saint Michaels Ce...|        23|
|Breezy Point/Fort...|        25|
|Marine Park/Floyd...|        26|
|        Astoria Park|        29|
+--------------------+----------+

