In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

23/03/06 13:08:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/06 13:08:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [34]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-04 15:11:00--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230304T141101Z&X-Amz-Expires=300&X-Amz-Signature=21665bde382795741be3ccb98937391df76fdd78ccd81376a422cabb472a8daf&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-04 15:11:01--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [35]:
!gzip -d fhvhv_tripdata_2021-06.csv.gz

gzip: fhvhv_tripdata_2021-06.csv already exists; do you wish to overwrite (y or n)? ^C


In [36]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

In [37]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
+--------------------+------------------

In [38]:
df.head(1)

[Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:02:41', dropoff_datetime='2021-06-01 00:07:46', PULocationID='174', DOLocationID='18', SR_Flag='N', Affiliated_base_number='B02764')]

In [40]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [5]:
from pyspark.sql import types

In [6]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [8]:
df.head(1)

[Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N', Affiliated_base_number='B02764')]

In [9]:
df = df.repartition(12)  # lazy command

In [11]:
df.write.parquet('fhvhv/2021/06/', mode="overwrite")

                                                                                

In [3]:
# Now we can read our parquet files from the location
df = spark.read.parquet('fhvhv/2021/06')

In [19]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [4]:
from pyspark.sql import functions as F

In [12]:
df.filter(F.to_date(df.pickup_datetime) == '2021-06-15').count()

                                                                                

452470

In [21]:
df \
    .withColumn('trip_duration', (df.dropoff_datetime - df.pickup_datetime).cast("integer") / 60.0 / 60.0) \
    .agg(F.max('trip_duration')) \
    .show()



+------------------+
|max(trip_duration)|
+------------------+
| 66.87888888888888|
+------------------+



                                                                                

In [6]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-06 13:13:46--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T121346Z&X-Amz-Expires=300&X-Amz-Signature=598ab44183c7a444219d1a623b072ad4b1915048003dfe36b4c54ccc116c40ba&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-06 13:13:46--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [13]:
df_zones = spark.read.csv("taxi_zone_lookup.csv", "LocationID INT, Borough STRING, Zone STRING, ServiceZone STRING")

In [16]:
df.registerTempTable("trips_data")

In [18]:
df_zones.registerTempTable("zones")

In [35]:
df_result = spark.sql("""
select td.PULocationID, first(z.Zone), count(*)
from trips_data td join zones z
on td.PULocationID = z.LocationID
group by td.PULocationID
order by 3 desc
""")

In [36]:
df_result.show()



+------------+--------------------+--------+
|PULocationID|         first(Zone)|count(1)|
+------------+--------------------+--------+
|          61| Crown Heights North|  231279|
|          79|        East Village|  221244|
|         132|         JFK Airport|  188867|
|          37|      Bushwick South|  187929|
|          76|       East New York|  186780|
|         231|TriBeCa/Civic Center|  164344|
|         138|   LaGuardia Airport|  161596|
|         234|            Union Sq|  158937|
|         249|        West Village|  154698|
|           7|             Astoria|  152493|
|         148|     Lower East Side|  151020|
|          68|        East Chelsea|  147673|
|          42|Central Harlem North|  146402|
|         255|Williamsburg (Nor...|  143683|
|         181|          Park Slope|  143594|
|         225|  Stuyvesant Heights|  141427|
|          48|        Clinton East|  139611|
|         246|West Chelsea/Huds...|  139431|
|          17|             Bedford|  138428|
|         

                                                                                