In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/03 07:29:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Q1**: Spark version

In [3]:
spark.version

'3.3.2'

# Files download

In [25]:
# Zones
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2023-03-03 07:45:56--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230303T074556Z&X-Amz-Expires=300&X-Amz-Signature=b5c86974a604fd9f6b4fb8ac79dea5afc662a908033609127a2a1b9794545024&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-03 07:45:56--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [4]:
# Trips
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-06.parquet

--2023-03-02 20:36:34--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-06.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.155.128.6, 18.155.128.46, 18.155.128.187, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.155.128.6|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 394114750 (376M) [application/x-www-form-urlencoded]
Saving to: ‘fhvhv_tripdata_2021-06.parquet’


2023-03-02 20:36:45 (37.9 MB/s) - ‘fhvhv_tripdata_2021-06.parquet’ saved [394114750/394114750]



In [7]:
# Transformation from parquet to CSV with required columns.
    
df_pandas = pd.read_parquet('fhvhv_tripdata_2021-06.parquet')
df_pandas = df_pandas[[
    'hvfhs_license_num', 
    'dispatching_base_num', 
    'pickup_datetime', 
    'dropoff_datetime', 
    'PULocationID', 
    'DOLocationID', 
    'shared_request_flag',
]]
df_pandas = df_pandas.rename(columns={'shared_request_flag': 'SR_Flag'})
df_pandas.to_csv('fhvhv_tripdata_2021-06.csv', index=False, encoding='UTF-8')
del(df_pandas)

# Reading and partitioning

In [61]:
!ls -lh fhvhv_tripdata_2021-06.csv

-rw-rw-r-- 1 alberto alberto 903M Mar  2 20:39 fhvhv_tripdata_2021-06.csv


In [62]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [63]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [64]:
df = df.repartition(12)

**Q2**: File size after splitting in 12 partitions

In [65]:
df.write.parquet('data/pq/fhvhv/2021/06/')

                                                                                

# Analysis

In [66]:
df = spark.read.parquet('data/pq/fhvhv/2021/06/')

**Q3**: How many taxi trips were there on June 15?

In [67]:
from pyspark.sql import functions as F

In [68]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

452470

In [69]:
df.registerTempTable('fhvhv_2021_06')

In [70]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2021_06
WHERE
    to_date(pickup_datetime) = '2021-06-15';
""").show()

+--------+
|count(1)|
+--------+
|  452470|
+--------+



**Q4**: Longest trip in hours

In [71]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [72]:
df \
    .withColumn('duration',(df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) / 3600) \
    .orderBy('duration', ascending=False) \
    .limit(5) \
    .show()

[Stage 67:>                                                         (0 + 4) / 4]

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+------------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|          duration|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+------------------+
|           HV0003|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|  66.8788888888889|
|           HV0003|              B02765|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|      N|25.549722222222222|
|           HV0003|              B02879|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|      N|19.980833333333333|
|           HV0004|              B02800|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|      N|18.197222222222223|
|           HV0003|              B02682|2021-06-23 20:40:43|20

                                                                                

In [73]:
spark.sql("""
SELECT
    (CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600 AS duration
FROM 
    fhvhv_2021_06
ORDER BY
    duration DESC
LIMIT 5;
""").show()

[Stage 69:>                                                         (0 + 4) / 4]

+------------------+
|          duration|
+------------------+
|  66.8788888888889|
|25.549722222222222|
|19.980833333333333|
|18.197222222222223|
|16.466944444444444|
+------------------+



                                                                                

**Q6**: Most frequent pickup location zone

In [74]:
schema_zone = types.StructType([
    types.StructField('LocationID', types.StringType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [75]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(schema_zone) \
    .csv('taxi_zone_lookup.csv')

In [76]:
df_zones.head(5)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [77]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [78]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [79]:
df_zones.registerTempTable('zones')

In [80]:
spark.sql("""
SELECT
    pul.Zone, count(*) 
FROM 
    fhvhv_2021_06 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
GROUP BY 
    pul.Zone
ORDER BY
    2 DESC
;
""").show()



+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
| Crown Heights North|  231279|
|        East Village|  221244|
|         JFK Airport|  188867|
|      Bushwick South|  187929|
|       East New York|  186780|
|TriBeCa/Civic Center|  164344|
|   LaGuardia Airport|  161596|
|            Union Sq|  158937|
|        West Village|  154698|
|             Astoria|  152493|
|     Lower East Side|  151020|
|        East Chelsea|  147673|
|Central Harlem North|  146402|
|Williamsburg (Nor...|  143683|
|          Park Slope|  143594|
|  Stuyvesant Heights|  141427|
|        Clinton East|  139611|
|West Chelsea/Huds...|  139431|
|             Bedford|  138428|
|         Murray Hill|  137879|
+--------------------+--------+
only showing top 20 rows



                                                                                