<a href="https://colab.research.google.com/github/Nadiyapathan/pyspark/blob/main/Tungsten_%26_Catalyst.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
from pyspark.sql import SparkSession

Spark=SparkSession.builder.appName("Pyspark").getOrCreate()

In [11]:
data=[("nadi",32),("Jabi",39),("Ayesha",6),("Ayaan",3)]
columns=["Name","Age"]
df = Spark.createDataFrame(data,columns)
df.show()

+------+---+
|  Name|Age|
+------+---+
|  nadi| 32|
|  Jabi| 39|
|Ayesha|  6|
| Ayaan|  3|
+------+---+



In [12]:
from pyspark.sql.functions import col

large_df = Spark.range(1, 1000000)
result_df=large_df.select((col("id")*2).alias("double_id"))
result_df.show(10)

+---------+
|double_id|
+---------+
|        2|
|        4|
|        6|
|        8|
|       10|
|       12|
|       14|
|       16|
|       18|
|       20|
+---------+
only showing top 10 rows



In [13]:
df.createOrReplaceTempView("people")


optimized_df = Spark.sql("Select name from people where age>30")
optimized_df.show()



+----+
|name|
+----+
|nadi|
|Jabi|
+----+



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit

In [3]:
spark =SparkSession.builder.appName("pysparkPractice").getOrCreate()

data =[("John","Engineering",633333),("nadi","Marketing",75000),("sonu","Finance",500000)]
columns=["Name","Department","Salary"]


In [4]:
df = spark.createDataFrame(data, columns)
df.show()

+----+-----------+------+
|Name| Department|Salary|
+----+-----------+------+
|John|Engineering|633333|
|nadi|  Marketing| 75000|
|sonu|    Finance|500000|
+----+-----------+------+



In [5]:
df_filtered=df.filter(col("Salary")>70000)
df_filtered.show()

+----+-----------+------+
|Name| Department|Salary|
+----+-----------+------+
|John|Engineering|633333|
|nadi|  Marketing| 75000|
|sonu|    Finance|500000|
+----+-----------+------+



In [6]:
df_with_bouns=df.withColumn("Bonus",col("Salary")*0.1)
df_with_bouns.show()

+----+-----------+------+-------+
|Name| Department|Salary|  Bonus|
+----+-----------+------+-------+
|John|Engineering|633333|63333.3|
|nadi|  Marketing| 75000| 7500.0|
|sonu|    Finance|500000|50000.0|
+----+-----------+------+-------+



In [7]:
df_with_new_col = df.withColumn("Location", lit("New York"))
df_with_new_col.show()


+----+-----------+------+--------+
|Name| Department|Salary|Location|
+----+-----------+------+--------+
|John|Engineering|633333|New York|
|nadi|  Marketing| 75000|New York|
|sonu|    Finance|500000|New York|
+----+-----------+------+--------+



In [17]:

data = [("Finance", 80000),
        ("Marketing", 75000),
        ("Finance", 90000),
        ("Engineering", 70000),
        ("Marketing", 72000)]

columns = ["Department", "Salary"]

df = spark.createDataFrame(data, columns)

df_grouped = df.groupBy("Department").avg("Salary")
df_grouped.show()

df_count = df.groupBy("Department").count()
df_count.show()

+-----------+-----------+
| Department|avg(Salary)|
+-----------+-----------+
|    Finance|    85000.0|
|  Marketing|    73500.0|
|Engineering|    70000.0|
+-----------+-----------+

+-----------+-----+
| Department|count|
+-----------+-----+
|    Finance|    2|
|  Marketing|    2|
|Engineering|    1|
+-----------+-----+



In [18]:


from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("TungstenOptimization").getOrCreate()

data = [("Apple", "Fruit", 52),
        ("Banana", "Fruit", 89),
        ("Carrot", "Vegetable", 41),
        ("Tomato", "Vegetable", 18),
        ("Mango", "Fruit", 60),
        ("Broccoli", "Vegetable", 55)]

columns = ["Name", "Type", "Calories"]

df = spark.createDataFrame(data, columns)

optimized_df = df.withColumn("Calories_Doubled", col("Calories") * 2)

optimized_df.show()




+--------+---------+--------+----------------+
|    Name|     Type|Calories|Calories_Doubled|
+--------+---------+--------+----------------+
|   Apple|    Fruit|      52|             104|
|  Banana|    Fruit|      89|             178|
|  Carrot|Vegetable|      41|              82|
|  Tomato|Vegetable|      18|              36|
|   Mango|    Fruit|      60|             120|
|Broccoli|Vegetable|      55|             110|
+--------+---------+--------+----------------+



In [19]:

df.createOrReplaceTempView("foods")

optimized_query = spark.sql("SELECT Name, Calories FROM foods WHERE Calories > 50")

optimized_query.show()

optimized_query.explain(mode="formatted")

+--------+--------+
|    Name|Calories|
+--------+--------+
|   Apple|      52|
|  Banana|      89|
|   Mango|      60|
|Broccoli|      55|
+--------+--------+

== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#146, Type#147, Calories#148L]
Arguments: [Name#146, Type#147, Calories#148L], MapPartitionsRDD[45] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [3]: [Name#146, Type#147, Calories#148L]
Condition : (isnotnull(Calories#148L) AND (Calories#148L > 50))

(3) Project [codegen id : 1]
Output [2]: [Name#146, Calories#148L]
Input [3]: [Name#146, Type#147, Calories#148L]




In [20]:

df_transformed = df.withColumn("Calories_Doubled", col("Calories") * 2)

df_transformed.explain(mode="formatted")
df_transformed.show()


== Physical Plan ==
* Project (2)
+- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#146, Type#147, Calories#148L]
Arguments: [Name#146, Type#147, Calories#148L], MapPartitionsRDD[45] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Project [codegen id : 1]
Output [4]: [Name#146, Type#147, Calories#148L, (Calories#148L * 2) AS Calories_Doubled#185L]
Input [3]: [Name#146, Type#147, Calories#148L]


+--------+---------+--------+----------------+
|    Name|     Type|Calories|Calories_Doubled|
+--------+---------+--------+----------------+
|   Apple|    Fruit|      52|             104|
|  Banana|    Fruit|      89|             178|
|  Carrot|Vegetable|      41|              82|
|  Tomato|Vegetable|      18|              36|
|   Mango|    Fruit|      60|             120|
|Broccoli|Vegetable|      55|             110|
+--------+---------+--------+----------------+



In [21]:

df.createOrReplaceTempView("foods")

optimized_query = spark.sql("SELECT Name FROM foods WHERE Calories > 50")

optimized_query.explain(mode="formatted")
optimized_query.show()


== Physical Plan ==
* Project (3)
+- * Filter (2)
   +- * Scan ExistingRDD (1)


(1) Scan ExistingRDD [codegen id : 1]
Output [3]: [Name#146, Type#147, Calories#148L]
Arguments: [Name#146, Type#147, Calories#148L], MapPartitionsRDD[45] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [3]: [Name#146, Type#147, Calories#148L]
Condition : (isnotnull(Calories#148L) AND (Calories#148L > 50))

(3) Project [codegen id : 1]
Output [1]: [Name#146]
Input [3]: [Name#146, Type#147, Calories#148L]


+--------+
|    Name|
+--------+
|   Apple|
|  Banana|
|   Mango|
|Broccoli|
+--------+



In [22]:

from pyspark.sql.functions import broadcast

price_data = [("Apple", 1.2), ("Banana", 0.5), ("Carrot", 0.8), ("Tomato", 1.5), ("Mango", 2.0), ("Broccoli", 1.8)]
price_columns = ["Name", "Price_per_kg"]

df_price = spark.createDataFrame(price_data, price_columns)

joined_df = df.join(broadcast(df_price), "Name", "inner")

joined_df.explain(mode="formatted")
joined_df.show()


== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
   +- BroadcastHashJoin Inner BuildRight (6)
      :- Filter (2)
      :  +- Scan ExistingRDD (1)
      +- BroadcastExchange (5)
         +- Filter (4)
            +- Scan ExistingRDD (3)


(1) Scan ExistingRDD
Output [3]: [Name#146, Type#147, Calories#148L]
Arguments: [Name#146, Type#147, Calories#148L], MapPartitionsRDD[45] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter
Input [3]: [Name#146, Type#147, Calories#148L]
Condition : isnotnull(Name#146)

(3) Scan ExistingRDD
Output [2]: [Name#213, Price_per_kg#214]
Arguments: [Name#213, Price_per_kg#214], MapPartitionsRDD[58] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(4) Filter
Input [2]: [Name#213, Price_per_kg#214]
Condition : isnotnull(Name#213)

(5) BroadcastExchange
Input [2]: [Name#213, Price_per_kg#214]
Arguments: HashedRelationBroadcastMode(List(input[0, str