In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
	.master("local[*]") \
	.appName('test') \
	.config("spark.executor.memory", "4g") \
	.config("spark.driver.memory", "4g") \
	.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-03-01 19:10:17,749 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.2.1'

In [5]:
!ls -lh fhv_tripdata_2019-10.csv.gz

-rw-r--r--@ 1 djr  staff    18M Mar  1 19:16 fhv_tripdata_2019-10.csv.gz


In [6]:
import gzip

with gzip.open('fhv_tripdata_2019-10.csv.gz', 'rt', newline='') as csv_file:
    csv_data = csv_file.read()
    with open('fhv_tripdata_2019-10.csv', 'wt') as out_file:
         out_file.write(csv_data)

In [7]:
!ls -lh fhv_tripdata_2019-10.csv

-rw-r--r--  1 djr  staff   114M Mar  1 19:20 fhv_tripdata_2019-10.csv


In [8]:
!wc -l fhv_tripdata_2019-10.csv

 1897494 fhv_tripdata_2019-10.csv


In [9]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [10]:
df.schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropOff_datetime,StringType,true),StructField(PUlocationID,StringType,true),StructField(DOlocationID,StringType,true),StructField(SR_Flag,StringType,true),StructField(Affiliated_base_number,StringType,true)))

In [11]:
from pyspark.sql import types

In [12]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [14]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [15]:
df = df.repartition(6)

In [17]:
df.write.parquet('data/pq/fhv/2019/10/')

                                                                                

In [18]:
df = spark.read.parquet('data/pq/fhv/2019/10/')

In [19]:
from pyspark.sql import functions as F

In [20]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [21]:
df.createOrReplaceTempView('fhv_2019_10')

In [22]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



In [23]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [31]:
df \
    .withColumn('duration', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long'))/(60*60)) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
    .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-----------------+
|pickup_date|    max(duration)|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



In [32]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / (60*60)) AS duration
FROM 
    fhv_2019_10
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

+-----------+-----------------+
|pickup_date|         duration|
+-----------+-----------------+
| 2019-10-28|         631152.5|
| 2019-10-11|         631152.5|
| 2019-10-31|87672.44083333333|
| 2019-10-01|70128.02805555555|
| 2019-10-17|           8794.0|
+-----------+-----------------+



In [34]:
!ls -lh taxi_zone_lookup.csv

-rw-r--r--@ 1 djr  staff    12K Mar  1 19:47 taxi_zone_lookup.csv


In [42]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [43]:
df_zones.schema

StructType(List(StructField(LocationID,StringType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [44]:
zone_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [45]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(zone_schema) \
    .csv('taxi_zone_lookup.csv')

In [46]:
df_zones.head(10)

[Row(LocationID=1, Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID=2, Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID=3, Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID=4, Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID=5, Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone'),
 Row(LocationID=6, Borough='Staten Island', Zone='Arrochar/Fort Wadsworth', service_zone='Boro Zone'),
 Row(LocationID=7, Borough='Queens', Zone='Astoria', service_zone='Boro Zone'),
 Row(LocationID=8, Borough='Queens', Zone='Astoria Park', service_zone='Boro Zone'),
 Row(LocationID=9, Borough='Queens', Zone='Auburndale', service_zone='Boro Zone'),
 Row(LocationID=10, Borough='Queens', Zone='Baisley Park', service_zone='Boro Zone')]

In [48]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [49]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [50]:
df_zones.createOrReplaceTempView('zones')

In [53]:
spark.sql("""
SELECT
    pul.Zone AS pu_zone,
    COUNT(1)
FROM 
    fhv_2019_10 fhv 
LEFT JOIN 
    zones pul 
ON 
    fhv.PULocationID = pul.LocationID
LEFT JOIN 
    zones dol 
ON 
    fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()

+--------------------+--------+
|             pu_zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+



                                                                                