In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/04 16:18:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/04 16:18:36 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.version

'3.3.2'

In [4]:
!ls -lh fhvhv_tripdata_2021-06.csv

-rw-rw-r-- 1 Dmitrii_Kalmanovich Dmitrii_Kalmanovich 878M Dec 20 00:13 fhvhv_tripdata_2021-06.csv


In [6]:
schema = types.StructType([    
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [27]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

df.select (df.pickup_datetime, df.dropoff_datetime).show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [10]:
df = df.repartition(12)

df.write.parquet('data/pq/fhvhv/2021/06/')

                                                                                

In [13]:
df = spark.read.parquet('data/pq/fhvhv/2021/06/')

**Q3**: How many taxi trips were there on February 15?

In [14]:
from pyspark.sql import functions as F
import datetime

In [15]:
   df.filter((df.pickup_datetime >= datetime.datetime(2021, 6, 15))
             &
             (df.pickup_datetime < datetime.datetime(2021, 6, 16))
            ) .count()

                                                                                

452470

In [36]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

450872

In [16]:
df.registerTempTable('fhvhv_2021_06')



In [9]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhvhv_2021_06
WHERE
    to_date(pickup_datetime) = '2021-06-15';
""").show()



+--------+
|count(1)|
+--------+
|  450872|
+--------+



                                                                                

**Q4**: Longest trip for each day

In [17]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [18]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875|
|              B02871|2021-06-03 11:47:48|2021-06-03 11:52:23|         231|          13|      N|                B02871|
|              B02888|2021-06-03 08:45:25|2021-06-03 09:00:12|           9|          92|      N|                B02888|
|              B02510|2021-06-05 09:50:43|2021-06-05 10:06:53|          14|         133|      N|                  null|
|              B02764|2021-06-03 22:55:5

In [33]:
df.select(df.pickup_datetime, df.dropoff_datetime) \
.withColumn('durationHours', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long') )/(60*60)) \
.show()

+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropoff_datetime|      durationHours|
+-------------------+-------------------+-------------------+
|2021-06-01 00:02:41|2021-06-01 00:07:46|0.08472222222222223|
|2021-06-01 00:16:16|2021-06-01 00:21:14|0.08277777777777778|
|2021-06-01 00:27:01|2021-06-01 00:42:11|0.25277777777777777|
|2021-06-01 00:46:08|2021-06-01 00:53:45|0.12694444444444444|
|2021-06-01 00:45:42|2021-06-01 01:03:33|             0.2975|
|2021-06-01 00:18:15|2021-06-01 00:25:47|0.12555555555555556|
|2021-06-01 00:33:06|2021-06-01 00:42:46|0.16111111111111112|
|2021-06-01 00:46:27|2021-06-01 00:56:50|0.17305555555555555|
|2021-06-01 00:48:06|2021-06-01 01:04:10| 0.2677777777777778|
|2021-06-01 00:18:54|2021-06-01 00:26:14|0.12222222222222222|
|2021-06-01 00:31:02|2021-06-01 00:36:39|0.09361111111111112|
|2021-06-01 00:41:53|2021-06-01 01:07:32|             0.4275|
|2021-06-01 00:29:52|2021-06-01 00:54:41| 0.4136111111111111|
|2021-06

In [39]:
df \
    .withColumn('durationHours', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long') )/(60*60)) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('durationHours') \
    .orderBy('max(durationHours)', ascending=False) \
    .limit(5) \
    .show()



+-----------+------------------+
|pickup_date|max(durationHours)|
+-----------+------------------+
| 2021-06-25|  66.8788888888889|
| 2021-06-22|25.549722222222222|
| 2021-06-27|19.980833333333333|
| 2021-06-26|18.197222222222223|
| 2021-06-23|16.466944444444444|
+-----------+------------------+



                                                                                

In [42]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
       MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS duration
FROM 
    fhvhv_2021_06
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 100;
""").show()



+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2021-06-25|  66.8788888888889|
| 2021-06-22|25.549722222222222|
| 2021-06-27|19.980833333333333|
| 2021-06-26|18.197222222222223|
| 2021-06-23|16.466944444444444|
| 2021-06-24|13.909722222222221|
| 2021-06-04|             11.67|
| 2021-06-20|10.984444444444444|
| 2021-06-01|           10.2675|
| 2021-06-28| 9.966388888888888|
| 2021-06-18| 9.624444444444444|
| 2021-06-08| 9.480277777777777|
| 2021-06-11| 9.471666666666666|
| 2021-06-15| 9.402222222222223|
| 2021-06-03| 9.365833333333333|
| 2021-06-19| 9.106944444444444|
| 2021-06-30| 9.056111111111111|
| 2021-06-09| 9.030277777777778|
| 2021-06-17| 8.774166666666666|
| 2021-06-29| 8.571666666666667|
+-----------+------------------+
only showing top 20 rows



                                                                                

**Q5**: Most frequent `dispatching_base_num`

How many stages this spark job has?



In [44]:
spark.sql("""
SELECT
    dispatching_base_num,
    COUNT(1)
FROM 
    fhvhv_2021_02
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()



+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
+--------------------+--------+





In [46]:
df \
    .groupBy('dispatching_base_num') \
        .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+





In [79]:
z_schema = types.StructType([    
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

**Q6**: Most common locations pair

In [82]:
# df_zones = spark.read.parquet('zones') #.schema(schema) \
df_zones = spark.read \
    .option("header", "true") \
    .schema(z_schema) \
    .csv('./data/lookup/taxi_zone_lookup.csv')

In [70]:
!head ./data/lookup/taxi_zone_lookup.csv

"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"


In [83]:
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [51]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [84]:
df_zones.registerTempTable('zones')

In [86]:
spark.sql("""
SELECT
    CONCAT(pul.Zone) AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv_2021_06 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      --LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()



+-------------------+--------+
|         pu_do_pair|count(1)|
+-------------------+--------+
|Crown Heights North|  231279|
|       East Village|  221244|
|        JFK Airport|  188867|
|     Bushwick South|  187929|
|      East New York|  186780|
+-------------------+--------+



                                                                                