In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/06 03:34:55 WARN Utils: Your hostname, codespaces-800b10 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/06 03:34:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/06 03:34:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [24]:
spark.version

'3.5.1'

In [41]:
# download csv data
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-06 04:27:58--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240306T042758Z&X-Amz-Expires=300&X-Amz-Signature=f4594a3adb0e967813bbd0e521cd0c38bada54249fba02f34b6a85264fad316c&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-06 04:27:58--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [45]:
# unzip data and count lines
!gzip -dc fhv_tripdata_2019-10.csv.gz > fhv_tripdata_2019-10.csv
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


In [46]:
# create schema for pyspark
from pyspark.sql import types
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])
    

In [47]:
# read data to spark df
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [48]:
# repartition and save to parquet
parquet_path = 'fhv/2019/10'
df = df.repartition(6)
df.write.mode('overwrite').parquet(parquet_path)
df = spark.read.parquet(parquet_path)

                                                                                

In [49]:
# How many taxi trips were there on the 15th of October?
from pyspark.sql.functions import col, date_format
df \
    .withColumn('pickup_date', date_format(col('pickup_datetime'), 'yyyy-MM-dd')) \
    .filter(col('pickup_date') == '2019-10-15') \
    .count()

                                                                                

62610

In [50]:
# alternatively...
from pyspark.sql import functions as F
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [51]:
# how long was the longest trip? (hours)
df \
    .withColumn('duration', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long'))/3600) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

[Stage 75:>                                                         (0 + 2) / 2]

+-----------+-----------------+
|pickup_date|    max(duration)|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



                                                                                

In [52]:
# least frequent pickup zone
df \
    .groupBy('PULocationID') \
        .count() \
    .orderBy('count', ascending=True) \
    .limit(5) \
    .show()

+------------+-----+
|PULocationID|count|
+------------+-----+
|           2|    1|
|         105|    2|
|         111|    5|
|          30|    8|
|         120|   14|
+------------+-----+



In [54]:
# least frequent pickup zone
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')
df_joined = df.join(
    df_zones,
    df_zones['LocationID'] == df['PULocationID'], 'left')
df_joined.groupBy('Zone')\
    .count()\
    .orderBy('count').show()


+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows

