In [1]:
import findspark
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
findspark.init()
findspark.find()

'C:\\tools\\spark-3.3.2-bin-hadoop3'

In [2]:
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql import types

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
spark.version

'3.3.2'

In [5]:
!ls -lh ./data/homework_data/fhv_tripdata_2019-10.csv

-rw-r--r-- 1 pc 197121 115M Nov 21  2022 ./data/homework_data/fhv_tripdata_2019-10.csv


In [6]:
spark.conf.get("spark.sql.files.maxPartitionBytes")

'134217728b'

In [7]:
df_homework = spark.read.option("header","true").csv("./data/homework_data/fhv_tripdata_2019-10.csv")

In [8]:
df_homework.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [9]:
df_homework.coalesce(6).write.parquet('data/homework/', mode='overwrite')

In [10]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [11]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('./data/homework_data/fhv_tripdata_2019-10.csv')

df = df.repartition(6)

df.write.parquet('data/homework/pq/fhvhv/', mode='overwrite')

In [12]:
df = spark.read.parquet('data/homework/pq/fhvhv/')

**Q3**: How many taxi trips were there on February 15?

In [13]:
from pyspark.sql import functions as F

In [14]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [15]:
df.registerTempTable('trip_data')



In [16]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    trip_data
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



**Q4**: Longest trip for each day

In [17]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [18]:
df_result = df \
    .withColumn('duration_seconds', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .withColumn('duration_hours', F.col('duration_seconds') / 3600) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration_hours') \
    .orderBy('max(duration_hours)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-------------------+
|pickup_date|max(duration_hours)|
+-----------+-------------------+
| 2019-10-28|           631152.5|
| 2019-10-11|           631152.5|
| 2019-10-31|  87672.44083333333|
| 2019-10-01|  70128.02805555555|
| 2019-10-17|             8794.0|
+-----------+-------------------+



In [19]:
zones = spark.read.option("header","true").csv("../data/taxi_zone_lookup.csv")

In [20]:
zones.registerTempTable("zone")

In [22]:
spark.sql("""
SELECT 
    z.zone,
    count(1)
FROM 
    trip_data t full outer join zone z on t.PULocationID=z.LocationID
GROUP BY 1
ORDER BY 2
""").show()


+--------------------+--------+
|                zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|       Rikers Island|       1|
|    Great Kills Park|       1|
|Governor's Island...|       4|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
+--------------------+--------+
only showing top 20 rows



**Q6**: Most common locations pair

In [None]:
df_zones = spark.read.parquet('zones')

In [None]:
df_zones.columns

In [None]:
df.columns

In [None]:
df_zones.registerTempTable('zones')

In [None]:
spark.sql("""
SELECT
    CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,
    COUNT(1)
FROM 
    fhvhv_2021_02 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID
                      LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID
GROUP BY 
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

In [None]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('./data/homework_data/fhv_tripdata_2019-10.csv')


In [None]:
df.show()