# Visual Analytics

## Assignment 3

**Instructor:** Dr. Marco D'Ambros  
**TAs:** Carmen Armenti, Mattia Giannaccari

**Contacts:** marco.dambros@usi.ch, carmen.armenti@usi.ch, mattia.giannaccari@usi.ch

**Due Date:** May 16, 2025 @ 23:55

---
The goal of this assignment is to use **Spark (PySpark)** and **Polars** in Jupyter notebooks.  
The files `trip_data.csv`, `trip_fare.csv`, and `nyc_boroughs.geojson` are available in the provided folder: [Assignment3-data](https://usi365-my.sharepoint.com/:f:/g/personal/armenc_usi_ch/Ejp7sb8QAMROoWe0XUDcAkMBoqUFk-w2Vgroup025NhAww?e=2I7SMC).

You may clean the data as needed; however, please note that specific data cleaning steps will be required in **Exercise 5**. If you choose to clean the data before Exercise 5, make sure to retain the **original dataset** for use with the Polars exercises.

- Use **Spark** to solve **Exercises 1–4**
- Use **Polars** to solve **Exercises 5–8**

You are encouraged to use [Spark window functions](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) whenever appropriate.

Please name your notebook file as `SurnameName_Assignment3.ipynb`

In [1]:
%pip install pyspark
%pip install polars
%pip install bokeh
%pip install geopandas
%pip install bokeh_sampledata

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Spark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

### Exercise 1
Join the `trip_data` and `trip_fare` dataframes into one and consider only data on 2013-01-01. Please specify the number of rows obtained after joining the 2 datasets.

The first thing to do is to create a `SparkSession`

In [None]:
spark = SparkSession.builder.getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 400)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

25/05/06 22:50:07 WARN Utils: Your hostname, USILU-16210.local resolves to a loopback address: 127.0.0.1; using 192.168.43.129 instead (on interface en0)
25/05/06 22:50:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/06 22:50:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 59528)
Traceback (most recent call last):
  File "/opt/anaconda3/envs/VS-assignment3/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/envs/VS-assignment3/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/envs/VS-assignment3/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/envs/VS-assignment3/lib/python3.12/socketserver.py", line 766, in __init__
    self.handle()
  File "/opt/anaconda3/envs/VS-assignment3/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/anaconda3/envs/VS-assignment3/lib/python3.12/site-packages/pyspark/accumulators.py", line 267, in 

In [4]:
trip_data_df = spark.read.csv('./data/trip_data.csv', header=True, inferSchema=True)
trip_fare_df = spark.read.csv('./data/trip_fare.csv', header=True, inferSchema=True)

                                                                                

Before joining the 2 dataframe I am going to filter the datas by the given data.

In [5]:
trip_data_df = trip_data_df.select([col(c).alias(c.strip()) for c in trip_data_df.columns])
trip_data_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)



In [6]:
trip_fare_df = trip_fare_df.select([col(c).alias(c.strip()) for c in trip_fare_df.columns])
trip_fare_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



Since we have 2 temporal metric I am going to filter on both in order to avoid `pickup_datetime` being 2012-12-31 while `dropoff_datetime` is 2013-01-01 and in similar way I want to avoid `pickup_datetime` being 2013-01-01 while `dropoff_datetime` is 2013-01-02

In [7]:
trip_data_filtered_df = trip_data_df.filter(
    (to_date("pickup_datetime") == "2013-01-01") &
    (to_date("dropoff_datetime") == "2013-01-01")
)

trip_fare_filtered_df = trip_fare_df.filter(
    (to_date("pickup_datetime") == "2013-01-01")
)

I can now merge the 2 dataframes using the `join` per `medallion`, `hack_license`, `vendor_id` and `pickup_datetime`

In [8]:
trip_filtered_df = trip_data_filtered_df.join(
    trip_fare_filtered_df,
    on=["medallion", "hack_license", "vendor_id", "pickup_datetime"],
    how="inner"
)

In [9]:
print(f"The number of rows are: {trip_filtered_df.count()}")

[Stage 8:>                                                        (0 + 10) / 11]

The number of rows are: 410816


                                                                                

### Exercise 2
Provide a graphical representation to compare the average fare amount for trips _within_ and _across_ all the boroughs. You may want to have a look at: https://docs.bokeh.org/en/latest/docs/user_guide/topics/categorical.html#categorical-heatmaps

In [10]:
import geopandas as gpd

from pyspark.sql.functions import max as spark_max, min as spark_min

from bokeh.plotting import figure, show, reset_output, output_notebook
from bokeh.models import GeoJSONDataSource

reset_output()
output_notebook()

In [11]:
borough_gdf = gpd.read_file('./data/nyc-boroughs.geojson')

In [12]:
trip_filtered_pd_df = trip_filtered_df.toPandas()

pickup_gdf = gpd.GeoDataFrame(
    trip_filtered_pd_df,
    geometry=gpd.points_from_xy(trip_filtered_pd_df['pickup_longitude'], trip_filtered_pd_df['pickup_latitude']),
    crs=borough_gdf.crs
)

dropoff_gdf = gpd.GeoDataFrame(
    trip_filtered_pd_df,
    geometry=gpd.points_from_xy(trip_filtered_pd_df['dropoff_longitude'], trip_filtered_pd_df['dropoff_latitude']),
    crs=borough_gdf.crs
)

pickup_with_borough = gpd.sjoin(pickup_gdf, borough_gdf[['borough', 'geometry']], how='left', predicate='within')
dropoff_with_borough = gpd.sjoin(dropoff_gdf, borough_gdf[['borough', 'geometry']], how='left', predicate='within')

trip_filtered_pd_df['pickup_borough'] = pickup_with_borough['borough']
trip_filtered_pd_df['dropoff_borough'] = dropoff_with_borough['borough']

trip_filtered_df = spark.createDataFrame(trip_filtered_pd_df)

                                                                                

In [13]:
from bokeh.models import BasicTicker, PrintfTickFormatter
from bokeh.plotting import figure, show
from bokeh.transform import linear_cmap

trip_group_df = trip_filtered_df \
    .groupBy(['pickup_borough', 'dropoff_borough']) \
    .avg('fare_amount') \
    .withColumnRenamed('avg(fare_amount)', 'avg_fare')

pickup_borough = borough_gdf['borough'].unique()
dropoff_borough = borough_gdf['borough'].unique()

min = trip_group_df.agg(spark_min('avg_fare')).collect()[0][0]
max = trip_group_df.agg(spark_max('avg_fare')).collect()[0][0]

colors = ["#03045e", "#023e8a", "#0077b6", "#0096c7", "#00b4d8", "#48cae4", "#90e0ef", "#ade8f4", "#caf0f8"]

TOOLS = "hover"
TOOLTIPS = [
    ('Pickup Borough', '@pickup_borough'),
    ('Dropoff Borough', '@dropoff_borough'),
    ('Average Fare Amount', '@avg_fare{0.2f}')
]

p = figure(title="Average Fare Amount for Pickup and Dropoff Boroughs",
           x_range=pickup_borough, y_range=dropoff_borough,
           x_axis_location="above", width=900, height=400,
           tools=TOOLS, toolbar_location='below', tooltips=TOOLTIPS)

p.grid.grid_line_color = None
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.axis.major_label_text_font_size = "7px"
p.axis.major_label_standoff = 0

r = p.rect(x="pickup_borough", y="dropoff_borough", width=1, height=1, source=trip_group_df.toPandas(),
           fill_color=linear_cmap("avg_fare", colors[::-1], low=min, high=max),
           line_color=None)

p.add_layout(r.construct_color_bar(
    major_label_text_font_size="7px",
    ticker=BasicTicker(desired_num_ticks=len(colors)),
    formatter=PrintfTickFormatter(format="%d%%"),
    label_standoff=6,
    border_line_color=None,
    padding=5,
), 'right')

show(p)

25/05/06 22:51:19 WARN TaskSetManager: Stage 18 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.
25/05/06 22:51:20 WARN TaskSetManager: Stage 24 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.
25/05/06 22:51:21 WARN TaskSetManager: Stage 30 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.


### Exercise 3
Consider only Manhattan, Bronx and Brooklyn boroughs. Then create a dataframe that shows the total number of trips *within* the same borough and *across* all the other boroughs mentioned before (Manhattan, Bronx, and Brooklyn) where the passengers are more or equal than 3.

For example, for Manhattan borough you should consider the total number of the following trips:
- Manhattan → Manhattan
- Manhattan → Bronx
- Manhattan → Brooklyn

You should then do the same for Bronx and Brooklyn boroughs.

In [14]:
trip_filtered_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- rate_code: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dr

In [15]:
boroughs = ['Manhattan', 'Brooklyn', 'Bronx']
min_passenger = 3

trip_group_bp_df = trip_filtered_df \
    .filter(
        (col('pickup_borough').isin(boroughs)) &
        (col('dropoff_borough').isin(boroughs)) &
        (col('passenger_count') >= min_passenger)
    ) \
    .groupBy(['pickup_borough', 'dropoff_borough']) \
    .count() \
    .withColumnRenamed('count', 'trip_count') \
    .orderBy('trip_count', ascending=False)

trip_group_bp_df

25/05/06 22:51:21 WARN TaskSetManager: Stage 33 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.
25/05/06 22:51:39 ERROR Executor: Exception in task 1.0 in stage 33.0 (TID 157)]
java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:328)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:201)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:172)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.sca

Py4JJavaError: An error occurred while calling o200.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 33.0 failed 1 times, most recent failure: Lost task 1.0 in stage 33.0 (TID 157) (192.168.43.129 executor driver): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:328)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:201)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:172)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:777)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:328)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:201)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:172)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:777)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)


25/05/06 22:51:40 WARN TaskSetManager: Stage 34 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

pickup_borough,dropoff_borough,trip_count
Manhattan,Manhattan,62391
Manhattan,Brooklyn,2762
Brooklyn,Brooklyn,2014
Brooklyn,Manhattan,1326
Manhattan,Bronx,527
Bronx,Bronx,103
Bronx,Manhattan,55
Brooklyn,Bronx,10
Bronx,Brooklyn,2


### Exercise 4
Create a dataframe where each row represents a driver, and there is one column per borough.
For each driver-borough, the dataframe provides the maximum number of consecutive trips
for the given driver, within the given borough. Please consider only trips which were payed by card. 

For example, if for driver A we have (sorted by time):
- Trip 1: Bronx → Bronx
- Trip 2: Bronx → Bronx
- Trip 3: Bronx → Manhattan
- Trip 4: Manhattan → Bronx.
    
The maximum number of consecutive trips for Bronx is 2.

In [16]:
trip_filtered_df

25/05/06 22:51:40 WARN TaskSetManager: Stage 37 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.
25/05/06 22:51:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 37 (TID 177): Attempting to kill Python Worker
25/05/06 22:51:45 WARN TaskSetManager: Stage 38 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.


medallion,hack_license,vendor_id,pickup_datetime,rate_code,store_and_fwd_flag,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount,pickup_borough,dropoff_borough
00005007A9F30E289...,43468C5D35F828693...,CMT,2013-01-01 07:04:33,1,N,2013-01-01 07:16:03,1,689,3.1,-73.997002,40.732533,-73.970032,40.764423,CRD,12.0,0.0,0.5,2.5,0.0,15.0,Manhattan,Manhattan
00005007A9F30E289...,43468C5D35F828693...,CMT,2013-01-01 08:03:10,1,N,2013-01-01 08:06:41,1,211,1.3,-73.976471,40.743862,-73.985283,40.728035,CSH,6.0,0.0,0.5,0.0,0.0,6.5,Manhattan,Manhattan
00005007A9F30E289...,43468C5D35F828693...,CMT,2013-01-01 12:11:15,1,N,2013-01-01 12:26:47,1,931,8.3,-73.874786,40.774025,-73.970665,40.749886,CRD,24.0,0.0,0.5,5.86,4.8,35.16,Queens,Manhattan
00005007A9F30E289...,43468C5D35F828693...,CMT,2013-01-01 14:01:48,1,N,2013-01-01 14:11:06,1,558,2.3,-73.953491,40.785721,-73.967339,40.759228,CSH,10.0,0.0,0.5,0.0,0.0,10.5,Manhattan,Manhattan
00005007A9F30E289...,A9AE329EA1138052D...,CMT,2013-01-01 02:43:02,1,N,2013-01-01 03:00:27,1,1045,3.7,-73.953056,40.78009,-73.993317,40.758999,CRD,15.5,0.5,0.5,3.3,0.0,19.8,Manhattan,Manhattan
00005007A9F30E289...,C72A773829ED990AF...,CMT,2013-01-01 18:33:03,1,N,2013-01-01 18:38:18,1,315,1.4,-73.958992,40.780998,-73.950035,40.795525,CSH,6.5,0.0,0.5,0.0,0.0,7.0,Manhattan,Manhattan
000318C2E3E638158...,91CE3B3A2F548CD8A...,VTS,2013-01-01 18:22:00,1,,2013-01-01 18:27:00,5,300,1.17,-73.982239,40.773361,-73.991432,40.760235,CRD,6.0,0.0,0.5,1.5,0.0,8.0,Manhattan,Manhattan
000351EDC735C0792...,9413377237F83B3FE...,CMT,2013-01-01 02:02:24,1,N,2013-01-01 02:06:30,2,246,0.7,-73.946609,40.792274,-73.936211,40.794765,CSH,5.0,0.5,0.5,0.0,0.0,6.0,Manhattan,Manhattan
000351EDC735C0792...,9413377237F83B3FE...,CMT,2013-01-01 02:37:23,1,N,2013-01-01 02:39:21,2,118,0.4,-73.980286,40.751598,-73.974274,40.750317,CRD,3.5,0.5,0.5,3.0,0.0,7.5,Manhattan,Manhattan
0009986BDBAB2F9A1...,44CED38841518B1FB...,CMT,2013-01-01 01:10:38,1,N,2013-01-01 01:23:30,1,771,5.3,-73.950111,40.780079,-73.982079,40.722294,CSH,17.5,0.5,0.5,0.0,0.0,18.5,Manhattan,Manhattan


In [17]:
trip_filtered_df = trip_filtered_df.sort(['hack_license', 'pickup_datetime'], ascending=True)
trip_filtered_df

25/05/06 22:51:45 WARN TaskSetManager: Stage 39 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.
25/05/06 22:51:45 WARN TaskSetManager: Stage 40 contains a task of very large size (8600 KiB). The maximum recommended task size is 1000 KiB.


medallion,hack_license,vendor_id,pickup_datetime,rate_code,store_and_fwd_flag,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount,pickup_borough,dropoff_borough
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 19:00:00,1,N,2013-01-01 19:16:33,1,992,7.9,-73.885361,40.77314,-73.985107,40.745296,CRD,24.0,0.0,0.5,5.5,4.8,34.8,Queens,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 19:52:35,1,N,2013-01-01 19:58:07,2,331,1.5,-73.973091,40.748756,-73.984703,40.72897,CRD,7.0,0.0,0.5,1.5,0.0,9.0,Manhattan,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 19:59:25,1,N,2013-01-01 20:07:57,2,512,2.2,-73.984062,40.72908,-73.981079,40.753746,CSH,9.0,0.0,0.5,0.0,0.0,9.5,Manhattan,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 20:15:23,1,N,2013-01-01 20:26:49,3,685,1.0,-73.975067,40.765324,-73.985733,40.755859,CSH,8.5,0.5,0.5,0.0,0.0,9.5,Manhattan,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 20:52:23,1,N,2013-01-01 20:58:23,2,360,1.6,-73.995766,40.754169,-74.003349,40.732578,CSH,7.0,0.5,0.5,0.0,0.0,8.0,Manhattan,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 21:23:24,1,N,2013-01-01 21:40:06,1,1001,9.2,-73.885406,40.773132,-73.983795,40.729431,CRD,27.0,0.5,0.5,8.4,0.0,36.4,Queens,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 21:47:40,1,N,2013-01-01 21:57:05,1,564,4.2,-73.988541,40.720161,-73.955399,40.772987,CSH,13.0,0.5,0.5,0.0,0.0,14.0,Manhattan,Manhattan
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 23:01:07,1,N,2013-01-01 23:05:41,2,273,1.7,-73.862869,40.768734,-73.866905,40.767513,CRD,7.0,0.5,0.5,1.0,0.0,9.0,Queens,Queens
BE530E79CB7E459DE...,0002555BBE359440D...,CMT,2013-01-01 23:40:33,1,N,2013-01-01 23:53:54,1,801,7.5,-73.873032,40.774212,-73.944611,40.780037,CRD,22.0,0.5,0.5,5.0,4.8,32.8,Queens,Manhattan
515D614B20D2BF3BE...,000A4EBF1CEB9C6BD...,CMT,2013-01-01 18:09:09,1,N,2013-01-01 18:15:36,1,386,0.9,-74.014488,40.715797,-74.004692,40.71751,CSH,6.0,0.0,0.5,0.0,0.0,6.5,Manhattan,Manhattan


In [None]:
import pandas as pd

# Ensure the correct columns are selected
trip_drivers_df = trip_filtered_pd_df[['hack_license', 'pickup_borough', 'dropoff_borough', 'payment_type']]
boroughs = borough_gdf['borough'].unique()
payment = 'CRD'

# Result dictionary to hold per-borough max consecutive Trues per driver
result = {}

for borough in boroughs:
    # Step 1: Apply the filters
    pickup_filter = trip_drivers_df['pickup_borough'] == borough
    dropoff_filter = trip_drivers_df['dropoff_borough'] == borough
    payment_filter = trip_drivers_df['payment_type'] == payment
    condition = pickup_filter & dropoff_filter & payment_filter

    # Step 2: Add a temporary column for this condition
    trip_drivers_df['condition'] = condition

    # Step 3: Group by hack_license and calculate max consecutive True
    def max_consecutive_trues(s):
        return s.groupby((~s).cumsum()).sum().max()

    max_streaks = trip_drivers_df.groupby('hack_license')['condition'].apply(max_consecutive_trues)

    # Store result
    result[borough] = max_streaks

# `result` is now a dictionary where each key is a borough and each value is a Series of max streaks per driver.

trip_drivers_df    

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  trip_drivers_df['condition'] = condition
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  trip_drivers_df['condition'] = condition
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  trip_drivers_df['condition'] = condition
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[

Unnamed: 0,hack_license,pickup_borough,dropoff_borough,payment_type,condition
0,43468C5D35F828693D96CB7CC9FDF341,Manhattan,Manhattan,CRD,False
1,43468C5D35F828693D96CB7CC9FDF341,Manhattan,Manhattan,CSH,False
2,43468C5D35F828693D96CB7CC9FDF341,Queens,Manhattan,CRD,False
3,43468C5D35F828693D96CB7CC9FDF341,Manhattan,Manhattan,CSH,False
4,A9AE329EA1138052DAC8FDFD8BA86603,Manhattan,Manhattan,CRD,False
...,...,...,...,...,...
410811,E63C65190C8631169C32E03736965EB1,Manhattan,Manhattan,CRD,False
410812,02182B348E0AD9B1332DB0F799D53A2C,,,CSH,False
410813,152770A91DC9D866E5BD97B472A578BE,,,CSH,False
410814,297FBB63FE6223D1F61C13CF32D2682B,Manhattan,Manhattan,CSH,False


## Polars

### Exercise 5

Please work on the merged dataset of trips and fares and perform the following data cleaning tasks:

1. Remove trips with invalid locations (i.e. not in New York City);
3. Remove trips with invalid amounts:
    - Total amount must be greater than zero;
    - Total amount must correspond to the sum of all the other amounts.
5. Remove trips with invalid time:
    - Pick-up before drop-off;
    - Valid duration.

After each data cleaning task, report how many rows where removed. Finally report:
- Are there **duplicate trips**?
- How many trips remain after cleaning?

In [None]:
import polars as pl
import chardet

ModuleNotFoundError: No module named 'chardet'

In [None]:
with open('data/trip_fare.csv', 'rb') as f:
    data = f.read()

encoding_result = chardet.detect(data)
encoding = encoding_result['encoding']
print(f"Detected encoding: {encoding}")

Detected encoding: ascii


In [None]:
tripdata_polar_df = pl.read_csv('./data/trip_data.csv')
tripfare_polar_df = pl.read_csv('./data/trip_fare.csv', encoding=encoding)


ComputeError: could not parse `1.75` as dtype `i64` at column ' tip_amount' (column number 9)

The current offset in the file is 17600 bytes.

You might want to try:
- increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),
- specifying correct dtype with the `schema_overrides` argument
- setting `ignore_errors` to `True`,
- adding `1.75` to the `null_values` list.

Original error: ```remaining bytes non-empty```

### Exercise 6

Compute the **total revenue** (total_amount) grouped by:
- Pick-up hour of the day (0–23)
- Passenger count (group >=6 into “6+”)

Create a heatmap where:
- X-axis = hour
- Y-axis = passenger count group
- Cell value = average revenue per trip

### Exercise 7

Define an "anomalous trip" as one that satisfies at least two of the following:
- Fare per mile is above the 95th percentile
- Tip amount > 100% of fare
- trip_time_in_secs is less than 60 seconds but distance is more than 1 mile

Create a dataframe of anomalous trips and:
- Report how many such trips exist
- Create a scatterplot to visualize the anomaly metrics
- Describe the visualization identifying groups and outliers

### Exercise 8
For each driver (hack_license), calculate the **total profit per hour worked**, where:
> profit = 0.7 * (fare_amount + tip_amount) when the trip starts between 7:01 AM and 7:00 PM\
> profit = 0.8 * (fare_amount + tip_amount) when the trip starts between 7:01PM and 7:00 AM

Estimate "hours worked" by summing trip_time_in_secs.

Plot a line chart showing the distribution of average profit per hour **for the top 10% drivers** in terms of total trips.

Which time of day offers **best earning efficiency**?