In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/03 22:07:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.5.1'

In [4]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [5]:
df.head()

Row(dispatching_base_num='B00009', pickup_datetime='2019-10-01 00:23:00', dropOff_datetime='2019-10-01 00:35:00', PUlocationID='264', DOlocationID='264', SR_Flag=None, Affiliated_base_number='B00009')

In [6]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [8]:
df.head()

Row(dispatching_base_num='B00009', pickup_datetime=datetime.datetime(2019, 10, 1, 0, 23), dropOff_datetime=datetime.datetime(2019, 10, 1, 0, 35), PULocationID=264, DOLocationID=264, SR_Flag=None, Affiliated_base_number='B00009')

In [9]:
df = df.repartition(6)

In [10]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [11]:
!ls -lh fhv/2019/10/

total 74496
-rw-r--r--  1 himanshu  staff     0B Mar  3 22:07 _SUCCESS
-rw-r--r--  1 himanshu  staff   5.9M Mar  3 22:07 part-00000-79f78a29-1e85-4552-80c0-ef75c228961b-c000.snappy.parquet
-rw-r--r--  1 himanshu  staff   5.9M Mar  3 22:07 part-00001-79f78a29-1e85-4552-80c0-ef75c228961b-c000.snappy.parquet
-rw-r--r--  1 himanshu  staff   5.9M Mar  3 22:07 part-00002-79f78a29-1e85-4552-80c0-ef75c228961b-c000.snappy.parquet
-rw-r--r--  1 himanshu  staff   5.9M Mar  3 22:07 part-00003-79f78a29-1e85-4552-80c0-ef75c228961b-c000.snappy.parquet
-rw-r--r--  1 himanshu  staff   5.9M Mar  3 22:07 part-00004-79f78a29-1e85-4552-80c0-ef75c228961b-c000.snappy.parquet
-rw-r--r--  1 himanshu  staff   5.9M Mar  3 22:07 part-00005-79f78a29-1e85-4552-80c0-ef75c228961b-c000.snappy.parquet


In [12]:
df = spark.read.parquet('fhv/2019/10/')

In [13]:
df.filter(F.to_date(df.pickup_datetime) == '2019-10-15').count()

62610

In [14]:
df.registerTempTable('trips_data')



In [15]:
spark.sql("""
SELECT
    max(UNIX_TIMESTAMP(dropoff_datetime)-UNIX_TIMESTAMP(pickup_datetime))/3600 as duration
FROM
    trips_data
""").show()

+--------+
|duration|
+--------+
|631152.5|
+--------+



In [16]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-03 22:07:41--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240304%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240304T030741Z&X-Amz-Expires=300&X-Amz-Signature=fe42479543c68ac47a5360fad59a7914275f8a94f0b2b1437a9a7bf8a5084ced&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-03 22:07:41--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [17]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [18]:
df_zones.createOrReplaceTempView("zones")

In [19]:
spark.sql("""
SELECT
    z.zone, count(1)
FROM
    trips_data t 
left join zones z
    on t.PULocationID = z.LocationID
group by z.zone
order by count(1)
""").show()

+--------------------+--------+
|                zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows

