In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("ColabSparkDemo").getOrCreate()

In [22]:
dataj1 = [
    ('1', 'Abubakar', 'd01'),
    ('2', 'Umer', 'd02'),
    ('3', 'Usman', 'd03'),
    ('4', 'Ali', 'd03'),
    ('5', 'Talha', 'd05'),
    ('6', 'Amar', 'd06')
]

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING'

df1 = spark.createDataFrame(dataj1, schemaj1)

In [6]:
dataj2 = [
    ('d01', 'HR'),
    ('d02', 'Nurbeting'),
    ('d03', 'Accounts'),
    ('d04', 'IT'),
    ('d05', 'Finance')
]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2, schemaj2)

In [23]:
df = df1.show()

+------+--------+-------+
|emp_id|emp_name|dept_id|
+------+--------+-------+
|     1|Abubakar|    d01|
|     2|    Umer|    d02|
|     3|   Usman|    d03|
|     4|     Ali|    d03|
|     5|   Talha|    d05|
|     6|    Amar|    d06|
+------+--------+-------+



In [18]:
df = df2.show()

+-------+----------+
|dept_id|department|
+-------+----------+
|    d01|        HR|
|    d02| Nurbeting|
|    d03|  Accounts|
|    d04|        IT|
|    d05|   Finance|
+-------+----------+



Joins

In [24]:
#inner Join
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'inner').show()

+------+--------+-------+-------+----------+
|emp_id|emp_name|dept_id|dept_id|department|
+------+--------+-------+-------+----------+
|     1|Abubakar|    d01|    d01|        HR|
|     2|    Umer|    d02|    d02| Nurbeting|
|     3|   Usman|    d03|    d03|  Accounts|
|     4|     Ali|    d03|    d03|  Accounts|
|     5|   Talha|    d05|    d05|   Finance|
+------+--------+-------+-------+----------+



In [25]:
#left join
df1.join(df2, df1['dept_id']==df2['dept_id'], 'left').show()

+------+--------+-------+-------+----------+
|emp_id|emp_name|dept_id|dept_id|department|
+------+--------+-------+-------+----------+
|     1|Abubakar|    d01|    d01|        HR|
|     3|   Usman|    d03|    d03|  Accounts|
|     2|    Umer|    d02|    d02| Nurbeting|
|     4|     Ali|    d03|    d03|  Accounts|
|     6|    Amar|    d06|   NULL|      NULL|
|     5|   Talha|    d05|    d05|   Finance|
+------+--------+-------+-------+----------+



In [27]:
#right join
df1.join(df2, df1['dept_id']==df2['dept_id'], 'right').show()

+------+--------+-------+-------+----------+
|emp_id|emp_name|dept_id|dept_id|department|
+------+--------+-------+-------+----------+
|     1|Abubakar|    d01|    d01|        HR|
|     2|    Umer|    d02|    d02| Nurbeting|
|     4|     Ali|    d03|    d03|  Accounts|
|     3|   Usman|    d03|    d03|  Accounts|
|  NULL|    NULL|   NULL|    d04|        IT|
|     5|   Talha|    d05|    d05|   Finance|
+------+--------+-------+-------+----------+



In [28]:
#Anti Join
df1.join(df2, df1['dept_id']==df2['dept_id'], 'anti').show()

+------+--------+-------+
|emp_id|emp_name|dept_id|
+------+--------+-------+
|     6|    Amar|    d06|
+------+--------+-------+



Window Functions

Import the Dataset

In [30]:
from google.colab import files

uploaded = files.upload()

Saving BigMart Sales.csv to BigMart Sales.csv


In [31]:
df = spark.read.csv('BigMart Sales.csv', header=True,inferSchema=True)

df.show()

+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|           Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|
+---------------+-----------+----------------+---------------+--------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+
|          FDA15|        9.3|         Low Fat|    0.016047301|               Dairy|249.8092|           OUT049|                     1999|     Medium|              Tier 1|Supermarket Type1|         3735.138|
|          DRC01|       5.92|         Regular|    0.019278216|         Soft Drinks| 48.2692|           OUT018|                     2009|     Medium|              Tier 3|Superma

In [34]:
from pyspark.sql import window

Row_number()Function

In [37]:
from pyspark.sql.functions import row_number
from pyspark.sql import window

df.withColumn('rowCol', row_number().over(window.OrderBy('Item_Identifier'))).show()

+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|  Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|rowCol|
+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+------+
|          DRA12|       11.6|         Low Fat|    0.041177505|Soft Drinks|140.3154|           OUT017|                     2007|       NULL|              Tier 2|Supermarket Type1|        2552.6772|     1|
|          DRA12|       11.6|         Low Fat|            0.0|Soft Drinks|141.6154|           OUT045|                     2002|       NULL|              Tier 2|Supermarket Type1|      

Rank & Dense Rank

In [46]:
from pyspark.sql.functions import rank
from pyspark.sql.window import Window



df.withColumn("rank", rank().over(Window.orderBy("item_identifier"))).show()

+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|  Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|rank|
+---------------+-----------+----------------+---------------+-----------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          DRA12|       11.6|         Low Fat|    0.041177505|Soft Drinks|140.3154|           OUT017|                     2007|       NULL|              Tier 2|Supermarket Type1|        2552.6772|   1|
|          DRA12|       11.6|         Low Fat|            0.0|Soft Drinks|141.6154|           OUT045|                     2002|       NULL|              Tier 2|Supermarket Type1|        3829.0

In [47]:
#arrange in desc first
from pyspark.sql.functions import rank
from pyspark.sql.window import Window
from pyspark.sql.functions import col



df.withColumn("rank", rank().over(Window.orderBy(col('Item_Identifier').desc()))).show()

+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|         Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|rank|
+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          NCZ54|      14.65|         Low Fat|    0.083359391|         Household|161.9552|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|        4711.2008|   1|
|          NCZ54|       NULL|         Low Fat|    0.082955719|         Household|164.0552|           OUT027|                     1985|     Medium|              Tier

In [51]:
#Dense rank

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import col



df.withColumn("rank", dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).show()

+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|Item_Identifier|Item_Weight|Item_Fat_Content|Item_Visibility|         Item_Type|Item_MRP|Outlet_Identifier|Outlet_Establishment_Year|Outlet_Size|Outlet_Location_Type|      Outlet_Type|Item_Outlet_Sales|rank|
+---------------+-----------+----------------+---------------+------------------+--------+-----------------+-------------------------+-----------+--------------------+-----------------+-----------------+----+
|          NCZ54|      14.65|         Low Fat|    0.083359391|         Household|161.9552|           OUT046|                     1997|      Small|              Tier 1|Supermarket Type1|        4711.2008|   1|
|          NCZ54|       NULL|         Low Fat|    0.082955719|         Household|164.0552|           OUT027|                     1985|     Medium|              Tier

Tasks/Project

In [54]:
from google.colab import files

uploaded = files.upload()

Saving customer_transaction_data.csv to customer_transaction_data.csv


In [55]:
df = spark.read.csv('customer_transaction_data.csv', header=True,inferSchema=True)

df.show()

+-----------+------+--------+-----------+------+----------+----+---------+
|customer_id|txn_id| product|   category|amount|  txn_date|name|     city|
+-----------+------+--------+-----------+------+----------+----+---------+
|          1|   101|ProductA|Electronics|   500|2024-05-01| Ali|   Lahore|
|          1|   102|ProductB|Electronics|   700|2024-06-01| Ali|   Lahore|
|          2|   103|ProductC|    Fashion|   200|2024-06-02|Sara|  Karachi|
|          2|   104|ProductB|Electronics|   700|2024-06-05|Sara|  Karachi|
|          3|   105|ProductA|Electronics|   500|2024-05-02|John|Islamabad|
+-----------+------+--------+-----------+------+----------+----+---------+



In [56]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, rank, dense_rank, sum as _sum
from pyspark.sql.window import Window

In [57]:
# Creating a window for customer-wise transaction ordering
txn_window = Window.partitionBy("customer_id").orderBy(df["txn_date"].desc())

# Adding row numbers and filter only the latest transaction per customer
from pyspark.sql.functions import row_number

latest_txn_df = df.withColumn("row_num", row_number().over(txn_window))
latest_txn_df = latest_txn_df.filter(latest_txn_df["row_num"] == 1).drop("row_num")


latest_txn_df.show()

+-----------+------+--------+-----------+------+----------+----+---------+
|customer_id|txn_id| product|   category|amount|  txn_date|name|     city|
+-----------+------+--------+-----------+------+----------+----+---------+
|          1|   102|ProductB|Electronics|   700|2024-06-01| Ali|   Lahore|
|          2|   104|ProductB|Electronics|   700|2024-06-05|Sara|  Karachi|
|          3|   105|ProductA|Electronics|   500|2024-05-02|John|Islamabad|
+-----------+------+--------+-----------+------+----------+----+---------+



In [64]:
#Now Rank products by revenue in each category
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, sum as _sum


# Step 1: Compute total revenue per product-category and set a clean column name
revenue_df = df.groupBy("product", "category") \
               .agg(_sum("amount").alias("total_revenue"))  # <-- use alias here

# Step 2: Define window using column name as a string
category_window = Window.partitionBy("category").orderBy("total_revenue")

# Step 3: Apply dense_rank safely
ranked_df = revenue_df.withColumn("dense_rank", dense_rank().over(category_window))

# Step 4: Show results
ranked_df.show()

+--------+-----------+-------------+----------+
| product|   category|total_revenue|dense_rank|
+--------+-----------+-------------+----------+
|ProductA|Electronics|         1000|         1|
|ProductB|Electronics|         1400|         2|
|ProductC|    Fashion|          200|         1|
+--------+-----------+-------------+----------+

