Execution Query Plans

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, year

In [7]:
# Create SparkSession
spark = SparkSession.builder\
             .master("local[1]")\
             .appName("test")\
             .getOrCreate()

24/03/21 07:12:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
# Read non-partitioned taxi data
local_files = '/home/sasa/Downloads/Code/notebooks/datasets/parquet/'
df_taxis_non_partitioned_raw = spark.read.parquet(local_files)

                                                                                

In [10]:
# Because we cleaned the data in the previous notebook, let's do the same:
df_taxis_non_partitioned_raw = df_taxis_non_partitioned_raw.where(year(col('tpep_pickup_datetime')) == '2023')

In [11]:
# Read partitioned taxi data
local_path = '/home/sasa/Downloads/Code/notebooks/datasets/yellow_taxis_daily/'
df_taxis_daily_raw = spark.read.parquet(local_path)

                                                                                

In [12]:
# Show schema and find new partition column
df_taxis_daily_raw.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- p_date: date (nullable = true)



In [13]:
# Show new partition column
df_taxis_daily_raw.select('p_date').show(n=3)

+----------+
|    p_date|
+----------+
|2023-04-14|
|2023-04-14|
|2023-04-14|
+----------+
only showing top 3 rows



In [14]:
# Create same column p_date, so we can compare plans
df_taxis_nopartitioned_raw = df_taxis_non_partitioned_raw.withColumn("p_date",to_date(col('tpep_pickup_datetime')))

In [15]:
# Register Non-partitioned DF as View
df_taxis_nopartitioned_raw.createOrReplaceTempView("tbl_taxis_nopartitioned_raw")


In [16]:
# Register Daily DF as View
df_taxis_daily_raw.createOrReplaceTempView("tbl_taxis_daily_raw")

In [17]:
# Query by partition Key; i.e. using '2023-02-14' as filter
q1a = spark.sql("select avg(trip_distance) from tbl_taxis_daily_raw where p_date='2023-02-14' and RatecodeID=2")

In [18]:
# Show data
q1a.show()

+------------------+
|avg(trip_distance)|
+------------------+
|17.138035006604998|
+------------------+



In [19]:
# Explain plan
q1a.explain(extended=True)

== Parsed Logical Plan ==
'Project [unresolvedalias('avg('trip_distance), None)]
+- 'Filter (('p_date = 2023-02-14) AND ('RatecodeID = 2))
   +- 'UnresolvedRelation [tbl_taxis_daily_raw], [], false

== Analyzed Logical Plan ==
avg(trip_distance): double
Aggregate [avg(trip_distance#43) AS avg(trip_distance)#108]
+- Filter ((p_date#58 = cast(2023-02-14 as date)) AND (RatecodeID#44L = cast(2 as bigint)))
   +- SubqueryAlias tbl_taxis_daily_raw
      +- View (`tbl_taxis_daily_raw`, [VendorID#39,tpep_pickup_datetime#40,tpep_dropoff_datetime#41,passenger_count#42L,trip_distance#43,RatecodeID#44L,store_and_fwd_flag#45,PULocationID#46,DOLocationID#47,payment_type#48L,fare_amount#49,extra#50,mta_tax#51,tip_amount#52,tolls_amount#53,improvement_surcharge#54,total_amount#55,congestion_surcharge#56,Airport_fee#57,p_date#58])
         +- Relation [VendorID#39,tpep_pickup_datetime#40,tpep_dropoff_datetime#41,passenger_count#42L,trip_distance#43,RatecodeID#44L,store_and_fwd_flag#45,PULocationID#46,D

In [20]:
# Query by partition Key; i.e. using '2023-02-14' as filter
q1b = spark.sql("select avg(trip_distance) from tbl_taxis_nopartitioned_raw where p_date='2023-02-14' and RatecodeID=2")

In [21]:
# Explain plan
q1b.explain(extended=True)

== Parsed Logical Plan ==
'Project [unresolvedalias('avg('trip_distance), None)]
+- 'Filter (('p_date = 2023-02-14) AND ('RatecodeID = 2))
   +- 'UnresolvedRelation [tbl_taxis_nopartitioned_raw], [], false

== Analyzed Logical Plan ==
avg(trip_distance): double
Aggregate [avg(trip_distance#4) AS avg(trip_distance)#124]
+- Filter ((p_date#86 = cast(2023-02-14 as date)) AND (RatecodeID#5L = cast(2 as bigint)))
   +- SubqueryAlias tbl_taxis_nopartitioned_raw
      +- View (`tbl_taxis_nopartitioned_raw`, [VendorID#0,tpep_pickup_datetime#1,tpep_dropoff_datetime#2,passenger_count#3L,trip_distance#4,RatecodeID#5L,store_and_fwd_flag#6,PULocationID#7,DOLocationID#8,payment_type#9L,fare_amount#10,extra#11,mta_tax#12,tip_amount#13,tolls_amount#14,improvement_surcharge#15,total_amount#16,congestion_surcharge#17,Airport_fee#18,p_date#86])
         +- Project [VendorID#0, tpep_pickup_datetime#1, tpep_dropoff_datetime#2, passenger_count#3L, trip_distance#4, RatecodeID#5L, store_and_fwd_flag#6, PULoca

In [22]:
# Query by partition Key; i.e. using '2023-02-14' as filter
q2a = spark.sql("select p_date,count(1) from tbl_taxis_daily_raw where p_date in
('2023-02-14','2023-02-15','2023-02-16')group by p_date")

In [23]:
# Query by partition Key; i.e. using '2023-02-14' as filter
q2b = spark.sql("select p_date,count(1) from tbl_taxis_nopartitioned_raw where p_date in 
('2023-02-14','2023-02-15','2023-02-16') group by p_date")

In [24]:
# Show plan
q2a.explain(extended=False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[p_date#58], functions=[count(1)])
   +- Exchange hashpartitioning(p_date#58, 200), ENSURE_REQUIREMENTS, [plan_id=120]
      +- HashAggregate(keys=[p_date#58], functions=[partial_count(1)])
         +- FileScan parquet [p_date#58] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/sasa/Downloads/Code/notebooks/datasets/yellow_taxis_daily], PartitionFilters: [cast(p_date#58 as string) IN (2023-02-14,2023-02-15,2023-02-16)], PushedFilters: [], ReadSchema: struct<>




In [25]:
# Show plan
q2b.explain(extended=False)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[p_date#86], functions=[count(1)])
   +- Exchange hashpartitioning(p_date#86, 200), ENSURE_REQUIREMENTS, [plan_id=137]
      +- HashAggregate(keys=[p_date#86], functions=[partial_count(1)])
         +- Project [cast(tpep_pickup_datetime#1 as date) AS p_date#86]
            +- Filter ((isnotnull(tpep_pickup_datetime#1) AND (year(cast(tpep_pickup_datetime#1 as date)) = 2023)) AND cast(cast(tpep_pickup_datetime#1 as date) as string) IN (2023-02-14,2023-02-15,2023-02-16))
               +- FileScan parquet [tpep_pickup_datetime#1] Batched: true, DataFilters: [isnotnull(tpep_pickup_datetime#1), (year(cast(tpep_pickup_datetime#1 as date)) = 2023), cast(cas..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/sasa/Downloads/Code/notebooks/datasets/parquet], PartitionFilters: [], PushedFilters: [IsNotNull(tpep_pickup_datetime)], ReadSchema: struct<tpep_pickup_datetime:timestamp_ntz>




#Compare performance

In [26]:
# Show plan
q2a.explain(extended="formatted")

== Physical Plan ==
AdaptiveSparkPlan (5)
+- HashAggregate (4)
   +- Exchange (3)
      +- HashAggregate (2)
         +- Scan parquet  (1)


(1) Scan parquet 
Output [1]: [p_date#58]
Batched: true
Location: InMemoryFileIndex [file:/home/sasa/Downloads/Code/notebooks/datasets/yellow_taxis_daily]
PartitionFilters: [cast(p_date#58 as string) IN (2023-02-14,2023-02-15,2023-02-16)]
ReadSchema: struct<>

(2) HashAggregate
Input [1]: [p_date#58]
Keys [1]: [p_date#58]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#138L]
Results [2]: [p_date#58, count#139L]

(3) Exchange
Input [2]: [p_date#58, count#139L]
Arguments: hashpartitioning(p_date#58, 200), ENSURE_REQUIREMENTS, [plan_id=120]

(4) HashAggregate
Input [2]: [p_date#58, count#139L]
Keys [1]: [p_date#58]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#130L]
Results [2]: [p_date#58, count(1)#130L AS count(1)#131L]

(5) AdaptiveSparkPlan
Output [2]: [p_date#58, count(1)#131L]
Arguments: isFinalPlan=false




In [27]:
# Show plan
q2b.explain(extended="formatted")

== Physical Plan ==
AdaptiveSparkPlan (7)
+- HashAggregate (6)
   +- Exchange (5)
      +- HashAggregate (4)
         +- Project (3)
            +- Filter (2)
               +- Scan parquet  (1)


(1) Scan parquet 
Output [1]: [tpep_pickup_datetime#1]
Batched: true
Location: InMemoryFileIndex [file:/home/sasa/Downloads/Code/notebooks/datasets/parquet]
PushedFilters: [IsNotNull(tpep_pickup_datetime)]
ReadSchema: struct<tpep_pickup_datetime:timestamp_ntz>

(2) Filter
Input [1]: [tpep_pickup_datetime#1]
Condition : ((isnotnull(tpep_pickup_datetime#1) AND (year(cast(tpep_pickup_datetime#1 as date)) = 2023)) AND cast(cast(tpep_pickup_datetime#1 as date) as string) IN (2023-02-14,2023-02-15,2023-02-16))

(3) Project
Output [1]: [cast(tpep_pickup_datetime#1 as date) AS p_date#86]
Input [1]: [tpep_pickup_datetime#1]

(4) HashAggregate
Input [1]: [p_date#86]
Keys [1]: [p_date#86]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#140L]
Results [2]: [p_date#86, count#141L]

(5) E

In [28]:
%%timeit

# Query by partition Key; i.e. using '2023-02-14' as filter
spark.sql("select p_date,count(1) from tbl_taxis_daily_raw group by p_date order by to_date(p_date)").show(n=5)

                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770

In [29]:
%%timeit

# Query by partition Key; i.e. using '2023-02-14' as filter
spark.sql("select p_date,count(1) from tbl_taxis_nopartitioned_raw group by p_date order by to_date(p_date)").show(n=5)

                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows



                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows



                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows



                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows



                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows



                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows



                                                                                

+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows





+----------+--------+
|    p_date|count(1)|
+----------+--------+
|2023-01-31|       8|
|2023-02-01|  107770|
|2023-02-02|  113074|
|2023-02-03|  115149|
|2023-02-04|   98236|
+----------+--------+
only showing top 5 rows

1.83 s ± 330 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


24/03/21 08:30:08 WARN Executor: Issue communicating with driver in heartbeater 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from 10.0.2.15:42587 in 10000 milliseconds. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at scala.util.Failure.recover(Try.scala:234)
	at scala.concurrent.Future.$anonfun$recover$1(Future.scala:395)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at org.apache.spark.util.Thread

In [None]:
# Stop the session
# spark.stop()