In [1]:
# 1. Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# 2. Download Spark 3.5.0 with Hadoop 3
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

# 3. Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [2]:
# 4. Install findspark
!pip install -q findspark
import findspark
findspark.init()

In [3]:
# 5. Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Colab-Spark").getOrCreate()


In [4]:
spark

In [5]:
#To see spark jobs and DAGs
spark.sparkContext.uiWebUrl

'http://05aad96e7e25:4040'

In [6]:
!pip install -q pyngrok
from pyngrok import ngrok, conf

# Replace with your token
!ngrok config add-authtoken YOUR_AUTHENTICATION_TOKEN
# Connect to Spark UI (4040)
spark_ui = ngrok.connect(4040)
print("Spark UI link:", spark_ui.public_url)

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml




Spark UI link: https://arlo-unscorching-grumbly.ngrok-free.dev


In [10]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col

In [8]:
!pwd
!ls

/content
sample_data  spark-3.5.0-bin-hadoop3  spark-3.5.0-bin-hadoop3.tgz


In [11]:
spark.conf.set("spark.sal.adaptive.enabled",False)

In [18]:
df1 = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("spark-3.5.0-bin-hadoop3/R/MegaMart.csv")

df1.show()

+--------+-------+----------+----------+----------------+--------------------+--------+--------------+--------------+------------+
|order_id|user_id|order_date|product_id|product_category|        product_name|quantity|price_per_unit|payment_method|order_status|
+--------+-------+----------+----------+----------------+--------------------+--------+--------------+--------------+------------+
|    1001|   U188|2025-04-20|      P940|         Fashion|            Sneakers|       2|         58.53|        PayPal|   Cancelled|
|    1002|   U062|2025-04-16|      P794|         Fashion|             T-Shirt|       3|         83.76|           UPI|    Returned|
|    1003|   U058|2025-04-18|      P326|         Fashion|          Sunglasses|       2|         78.85|        PayPal|  Processing|
|    1004|   U011|2025-04-10|      P574|         Fashion|          Sunglasses|       5|         46.49|        PayPal|   Delivered|
|    1005|   U003|2025-04-19|      P988|      Home Decor|         Photo Frame|     

**Narrow Transformations(1 stage,1 task)**

In [19]:
df1= df1.filter(col('product_name')=='Sneakers')

In [20]:
df1=df1.select('order_id','product_name')

In [22]:
df1.show()

+--------+------------+
|order_id|product_name|
+--------+------------+
|    1001|    Sneakers|
|    1013|    Sneakers|
|    1026|    Sneakers|
|    1039|    Sneakers|
|    1155|    Sneakers|
|    1175|    Sneakers|
|    1190|    Sneakers|
|    1195|    Sneakers|
|    1233|    Sneakers|
|    1267|    Sneakers|
|    1281|    Sneakers|
|    1325|    Sneakers|
|    1330|    Sneakers|
|    1349|    Sneakers|
|    1362|    Sneakers|
|    1379|    Sneakers|
|    1442|    Sneakers|
|    1457|    Sneakers|
|    1480|    Sneakers|
|    1494|    Sneakers|
+--------+------------+
only showing top 20 rows



**Wide Transformation(1 stage,200 tasks)**

In [23]:
df1=df1.groupBy('product_name').agg(count(col('order_id')))

In [24]:
df1.show()

+------------+---------------+
|product_name|count(order_id)|
+------------+---------------+
|    Sneakers|             41|
+------------+---------------+



**JOINS**

In [31]:
spark.conf.set("spark.sql.autoBroadCastJoinThreshold",-1)
spark.conf.set("spark.sal.adaptive.enabled",False)

In [33]:
from pyspark.sql.functions import broadcast

In [35]:
#Create first Dataframe
data1=[
    (1,"Alice"),
    (2,"Bobby"),
    (3,"Charlie"),
    (4,"Don"),
    (5,"Eva")
]
df1=spark.createDataFrame(data1,["id","name"])

#Create second DataFrame
data2=[
    (1,50000),
    (2,60000),
    (3,70000),
    (6,90000)
]
df2=spark.createDataFrame(data2,["id","salary"])

In [36]:
#sort merge Join
df_join=df1.join(df2,df1['id']==df2['id'],'left')

In [37]:
df_join.show()

+---+-------+----+------+
| id|   name|  id|salary|
+---+-------+----+------+
|  1|  Alice|   1| 50000|
|  2|  Bobby|   2| 60000|
|  5|    Eva|NULL|  NULL|
|  3|Charlie|   3| 70000|
|  4|    Don|NULL|  NULL|
+---+-------+----+------+



In [38]:
#broad cast join
# disable ->spark.conf.set("spark.sql.autoBroadCastJoinThreshold",-1)
#df_join_broadcast=df1.join(broadcast(df2),df1['id']==df2['id'],'left')