In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Catalyst and Tungsten Example").config("spark.sql.adaptive.enabled", "false").getOrCreate()

In [3]:
print("SparkSession created!")

SparkSession created!


In [4]:
data = [
    (1, "Laptop", 1000, "2025-03-01"),
    (2, "Phone", 500, "2025-03-02"),
    (3, "Laptop", 1200, "2025-03-03"),
    (4, "Tablet", 300, "2025-03-04"),
    (5, "Phone", 800, "2025-03-05")
]
columns = ["id", "product", "price", "date"]

In [5]:
df = spark.createDataFrame(data, columns)
df.show()

+---+-------+-----+----------+
| id|product|price|      date|
+---+-------+-----+----------+
|  1| Laptop| 1000|2025-03-01|
|  2|  Phone|  500|2025-03-02|
|  3| Laptop| 1200|2025-03-03|
|  4| Tablet|  300|2025-03-04|
|  5|  Phone|  800|2025-03-05|
+---+-------+-----+----------+



In [6]:
df.createOrReplaceTempView("sales")

In [7]:
result = spark.sql("""
    SELECT product, SUM(price) AS total_sales
    FROM sales
    WHERE price >= 500
    GROUP BY product
""")

In [8]:
result.show()

+-------+-----------+
|product|total_sales|
+-------+-----------+
|  Phone|       1300|
| Laptop|       2200|
+-------+-----------+



In [9]:
result.explain(True)

== Parsed Logical Plan ==
'Aggregate ['product], ['product, 'SUM('price) AS total_sales#25]
+- 'Filter ('price >= 500)
   +- 'UnresolvedRelation [sales], [], false

== Analyzed Logical Plan ==
product: string, total_sales: bigint
Aggregate [product#1], [product#1, sum(price#2L) AS total_sales#25L]
+- Filter (price#2L >= cast(500 as bigint))
   +- SubqueryAlias sales
      +- View (`sales`, [id#0L,product#1,price#2L,date#3])
         +- LogicalRDD [id#0L, product#1, price#2L, date#3], false

== Optimized Logical Plan ==
Aggregate [product#1], [product#1, sum(price#2L) AS total_sales#25L]
+- Project [product#1, price#2L]
   +- Filter (isnotnull(price#2L) AND (price#2L >= 500))
      +- LogicalRDD [id#0L, product#1, price#2L, date#3], false

== Physical Plan ==
*(2) HashAggregate(keys=[product#1], functions=[sum(price#2L)], output=[product#1, total_sales#25L])
+- Exchange hashpartitioning(product#1, 200), ENSURE_REQUIREMENTS, [plan_id=76]
   +- *(1) HashAggregate(keys=[product#1], functio