In [5]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('data-dransformer') \
    .getOrCreate()

In [7]:
pyspark.__version__

'3.2.1'

In [4]:
!wget wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-07 19:43:24--  http://wget/
Resolving wget (wget)... failed: Name or service not known.
wget: unable to resolve host address 'wget'
--2022-03-07 19:43:24--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.17.156
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.17.156|:443... connected.
HTTP request sent, awaiting response... ^C


In [8]:
df= spark.read.csv('fhvhv_tripdata_2021-02.csv', header=True, inferSchema=True)

                                                                                

In [9]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [10]:
df.show(2)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 2 rows



In [11]:
from pyspark.sql import functions as F, types as T

In [12]:
df = df.withColumn('pickup_datetime', df.pickup_datetime.cast(T.TimestampType()))\
.withColumn('dropoff_datetime', df.dropoff_datetime.cast(T.TimestampType()))

In [13]:
df.show(2)
df.printSchema()


+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 2 rows

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nulla

In [11]:
df.repartition(24).write.parquet('data/pq/fhv/')

                                                                                

In [14]:
df.count()

                                                                                

11613942

In [15]:
df.select('dispatching_base_num')\
.where(F.to_date(F.lit(F.col('pickup_datetime')),'yyyy-MM-dd HH:mm:ss') == '2021-02-15')\
.count()

                                                                                

367170

In [16]:
df.createOrReplaceTempView('fhv')

In [17]:
trips_15_feb= spark.sql('''
select count(*) as trips
from fhv
where cast(pickup_datetime as date) = '2021-02-15'
''')
trips_15_feb.show()



+------+
| trips|
+------+
|367170|
+------+



                                                                                

In [29]:
# day with longest trip
spark.sql('''
select 
cast(pickup_datetime as date) as dates,
pickup_datetime, dropoff_datetime,
unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime) as duration
from fhv
order by duration desc
limit 1
''').show()



+----------+-------------------+-------------------+--------+
|     dates|    pickup_datetime|   dropoff_datetime|duration|
+----------+-------------------+-------------------+--------+
|2021-02-11|2021-02-11 13:40:44|2021-02-12 10:39:44|   75540|
+----------+-------------------+-------------------+--------+



                                                                                

In [33]:
# most frequent base_num
spark.sql('''
select 
dispatching_base_num, count(*) as frequency
from fhv
group by dispatching_base_num
order by frequency desc 
limit 1
''').show()



+--------------------+---------+
|dispatching_base_num|frequency|
+--------------------+---------+
|              B02510|  3233664|
+--------------------+---------+



                                                                                

In [34]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-03-07 20:29:15--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.163.8
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.163.8|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: 'taxi+_zone_lookup.csv'


2022-03-07 20:29:16 (49.3 MB/s) - 'taxi+_zone_lookup.csv' saved [12322/12322]



In [35]:
df_zone= spark.read.csv('taxi+_zone_lookup.csv', header=True, inferSchema=True)
df_zone.show(2)
df_zone.printSchema()

+----------+-------+--------------+------------+
|LocationID|Borough|          Zone|service_zone|
+----------+-------+--------------+------------+
|         1|    EWR|Newark Airport|         EWR|
|         2| Queens|   Jamaica Bay|   Boro Zone|
+----------+-------+--------------+------------+
only showing top 2 rows

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [36]:
df_zone.createOrReplaceTempView('zones')

In [41]:
# most frequent location pair
spark.sql('''
select 
concat(coalesce(puzones.Zone, 'Unknown'), '/', coalesce(dozones.Zone, 'Unknown')) as pairs,
count(*) as trips
from fhv
left join zones as puzones
on fhv.PULocationID = puzones.LocationID
left join zones as dozones
on fhv.DOLocationID = dozones.LocationID
group by pairs
order by trips desc
limit 10
''').show()



+--------------------+-----+
|               pairs|trips|
+--------------------+-----+
|East New York/Eas...|45041|
|Borough Park/Boro...|37329|
|   Canarsie/Canarsie|28026|
|Crown Heights Nor...|25976|
| Bay Ridge/Bay Ridge|17934|
|Jackson Heights/J...|14688|
|     Astoria/Astoria|14688|
|Central Harlem No...|14481|
|Bushwick South/Bu...|14424|
|Flatbush/Ditmas P...|13976|
+--------------------+-----+



                                                                                