In [3]:
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

spark = SparkSession.builder.appName("logical_plan_and_physical_plan").getOrCreate()

######################
# code block 1 - Job #
######################
file_path = "file:///home/jovyan/work/sample/lorem_ipsum.txt"
df = spark.read.text(file_path)
df.explain(mode="formatted")

== Physical Plan ==
Scan text  (1)


(1) Scan text 
Output [1]: [value#0]
Batched: false
Location: InMemoryFileIndex [file:/home/jovyan/work/sample/lorem_ipsum.txt]
ReadSchema: struct<value:string>




In [5]:
######################
# code block 2 - Job #
######################
words = df.select(
    f.explode(
        f.split(df.value, ' ')).alias("word"))

word_counts = words.groupBy("word").count()

word_counts.explain(extended=True)

== Parsed Logical Plan ==
'Aggregate ['word], ['word, count(1) AS count#16L]
+- Project [word#12]
   +- Generate explode(split(value#0,  , -1)), false, [word#12]
      +- Relation [value#0] text

== Analyzed Logical Plan ==
word: string, count: bigint
Aggregate [word#12], [word#12, count(1) AS count#16L]
+- Project [word#12]
   +- Generate explode(split(value#0,  , -1)), false, [word#12]
      +- Relation [value#0] text

== Optimized Logical Plan ==
Aggregate [word#12], [word#12, count(1) AS count#16L]
+- Generate explode(split(value#0,  , -1)), [0], false, [word#12]
   +- Relation [value#0] text

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[word#12], functions=[count(1)], output=[word#12, count#16L])
   +- Exchange hashpartitioning(word#12, 200), ENSURE_REQUIREMENTS, [plan_id=18]
      +- HashAggregate(keys=[word#12], functions=[partial_count(1)], output=[word#12, count#20L])
         +- Generate explode(split(value#0,  , -1)), false, [word#12]
      

In [7]:
word_counts.explain(mode="formatted")
# word_counts.show()

== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
   +- Exchange (4)
      +- HashAggregate (3)
         +- Generate (2)
            +- Scan text  (1)


(1) Scan text 
Output [1]: [value#0]
Batched: false
Location: InMemoryFileIndex [file:/home/jovyan/work/sample/lorem_ipsum.txt]
ReadSchema: struct<value:string>

(2) Generate
Input [1]: [value#0]
Arguments: explode(split(value#0,  , -1)), false, [word#12]

(3) HashAggregate
Input [1]: [word#12]
Keys [1]: [word#12]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#19L]
Results [2]: [word#12, count#20L]

(4) Exchange
Input [2]: [word#12, count#20L]
Arguments: hashpartitioning(word#12, 200), ENSURE_REQUIREMENTS, [plan_id=18]

(5) HashAggregate
Input [2]: [word#12, count#20L]
Keys [1]: [word#12]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#15L]
Results [2]: [word#12, count(1)#15L AS count#16L]

(6) AdaptiveSparkPlan
Output [2]: [word#12, count#16L]
Arguments: isFinalPlan=false


+------------+