<a href="https://colab.research.google.com/github/chinnuanna123/spark/blob/main/tungsten_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Tungsten Project

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Initialize Spark Session
spark = SparkSession.builder.appName("TungstenPlans").getOrCreate()

# Sample Employee Data
employees = [
    (101, "Alice", "HR", 60000),
    (102, "Bob", "Finance", 75000),
    (103, "Charlie", "IT", 90000),
    (104, "David", "HR", 65000),
    (105, "Eve", "IT", 95000)
]

# Define schema (column names)
columns = ["emp_id", "name", "department", "salary"]

# Create DataFrame
df = spark.createDataFrame(employees, columns)

# Show DataFrame
df.show()


+------+-------+----------+------+
|emp_id|   name|department|salary|
+------+-------+----------+------+
|   101|  Alice|        HR| 60000|
|   102|    Bob|   Finance| 75000|
|   103|Charlie|        IT| 90000|
|   104|  David|        HR| 65000|
|   105|    Eve|        IT| 95000|
+------+-------+----------+------+



Get the Logical Plan (Optimized Plan)
Use .explain() to display how Spark processes queries logically before execution.

In [2]:
df.filter(df.salary > 70000).select("name", "salary").explain()


== Physical Plan ==
*(1) Project [name#1, salary#3L]
+- *(1) Filter (isnotnull(salary#3L) AND (salary#3L > 70000))
   +- *(1) Scan ExistingRDD[emp_id#0L,name#1,department#2,salary#3L]




Get the Physical Plan (Execution Plan)
To see the Tungsten-optimized Physical Plan, use .explain(True)

In [3]:
df.filter(df.salary > 70000).select("name", "salary").explain(True)


== Parsed Logical Plan ==
'Project ['name, 'salary]
+- Filter (salary#3L > cast(70000 as bigint))
   +- LogicalRDD [emp_id#0L, name#1, department#2, salary#3L], false

== Analyzed Logical Plan ==
name: string, salary: bigint
Project [name#1, salary#3L]
+- Filter (salary#3L > cast(70000 as bigint))
   +- LogicalRDD [emp_id#0L, name#1, department#2, salary#3L], false

== Optimized Logical Plan ==
Project [name#1, salary#3L]
+- Filter (isnotnull(salary#3L) AND (salary#3L > 70000))
   +- LogicalRDD [emp_id#0L, name#1, department#2, salary#3L], false

== Physical Plan ==
*(1) Project [name#1, salary#3L]
+- *(1) Filter (isnotnull(salary#3L) AND (salary#3L > 70000))
   +- *(1) Scan ExistingRDD[emp_id#0L,name#1,department#2,salary#3L]



Checking Tungsten Optimizations
To verify if Whole-Stage Code Generation (WSCG) is enabled:

In [4]:
print(spark.conf.get("spark.sql.codegen.wholeStage"))  # Should return "true"


true


In [5]:
df.orderBy(df.salary.desc()).show()


+------+-------+----------+------+
|emp_id|   name|department|salary|
+------+-------+----------+------+
|   105|    Eve|        IT| 95000|
|   103|Charlie|        IT| 90000|
|   102|    Bob|   Finance| 75000|
|   104|  David|        HR| 65000|
|   101|  Alice|        HR| 60000|
+------+-------+----------+------+



In [7]:
df.groupBy("department").agg(avg("salary").alias("avg_salary")).show()

+----------+----------+
|department|avg_salary|
+----------+----------+
|        HR|   62500.0|
|   Finance|   75000.0|
|        IT|   92500.0|
+----------+----------+



In [8]:
df.groupBy("department").agg(avg("salary").alias("avg_salary")).explain(mode="extended")

== Parsed Logical Plan ==
'Aggregate ['department], ['department, avg('salary) AS avg_salary#99]
+- LogicalRDD [emp_id#46L, name#47, department#48, salary#49L], false

== Analyzed Logical Plan ==
department: string, avg_salary: double
Aggregate [department#48], [department#48, avg(salary#49L) AS avg_salary#99]
+- LogicalRDD [emp_id#46L, name#47, department#48, salary#49L], false

== Optimized Logical Plan ==
Aggregate [department#48], [department#48, avg(salary#49L) AS avg_salary#99]
+- Project [department#48, salary#49L]
   +- LogicalRDD [emp_id#46L, name#47, department#48, salary#49L], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[department#48], functions=[avg(salary#49L)], output=[department#48, avg_salary#99])
   +- Exchange hashpartitioning(department#48, 200), ENSURE_REQUIREMENTS, [plan_id=113]
      +- HashAggregate(keys=[department#48], functions=[partial_avg(salary#49L)], output=[department#48, sum#104, count#105L])
         +- Project 

Two DataFrames in PySpark

In [9]:
# Department Data
department_data = [
    ("HR", "New York"),
    ("Finance", "London"),
    ("IT", "San Francisco"),
    ("Marketing", "Berlin")
]

# Define schema (column names)
department_columns = ["department", "location"]

# Create Department DataFrame
df_departments = spark.createDataFrame(department_data, department_columns)

# Show Department DataFrame
df_departments.show()


+----------+-------------+
|department|     location|
+----------+-------------+
|        HR|     New York|
|   Finance|       London|
|        IT|San Francisco|
| Marketing|       Berlin|
+----------+-------------+

