In [2]:
!pip install pyspark
!pip install findspark



In [3]:
import findspark
findspark.init()

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, sum, rank
from pyspark.sql.window import Window

In [5]:
# 𝐐𝐮𝐞𝐬𝐭𝐢𝐨𝐧
# Imagine you're analyzing the monthly sales performance of a company across different regions. You want to calculate:
# The cumulative sales for each region over months.
# The rank of each month based on sales within the same region.

In [6]:
spark = SparkSession.builder.appName('monthly sales performance').getOrCreate()
columns = ["Region", "Month", "Sales"]

data = [ ("East", "Jan", 200), ("East", "Feb", 300), 
("East", "Mar", 250), ("West", "Jan", 400), 
("West", "Feb", 350), ("West", "Mar", 450) ]
sales_df = spark.createDataFrame(data, columns)
sales_df.show()

25/04/26 19:30:37 WARN Utils: Your hostname, MC1275 resolves to a loopback address: 127.0.1.1; using 192.168.29.179 instead (on interface wlp0s20f3)
25/04/26 19:30:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/26 19:30:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/26 19:30:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/26 19:30:38 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/04/26 19:30:38 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/04/26 19:30:38 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/04/26 19:30:38 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4

+------+-----+-----+
|Region|Month|Sales|
+------+-----+-----+
|  East|  Jan|  200|
|  East|  Feb|  300|
|  East|  Mar|  250|
|  West|  Jan|  400|
|  West|  Feb|  350|
|  West|  Mar|  450|
+------+-----+-----+



25/04/26 19:30:56 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [12]:
# define a window partition by region and ordered by sales
window_spec = Window.partitionBy("Region").orderBy("Sales")
# Add cumulative sum and  rank column
result_df = sales_df.withColumn("cumulative_sales", sum("Sales").over(window_spec)).withColumn("Rank", rank().over(window_spec))

result_df.show()
            

+------+-----+-----+----------------+----+
|Region|Month|Sales|cumulative_sales|Rank|
+------+-----+-----+----------------+----+
|  East|  Jan|  200|             200|   1|
|  East|  Mar|  250|             450|   2|
|  East|  Feb|  300|             750|   3|
|  West|  Feb|  350|             350|   1|
|  West|  Jan|  400|             750|   2|
|  West|  Mar|  450|            1200|   3|
+------+-----+-----+----------------+----+



In [14]:
# SQL Solutions
sales_df.createOrReplaceTempView('sales')

spark.sql("SELECT * FROM sales").show()

+------+-----+-----+
|Region|Month|Sales|
+------+-----+-----+
|  East|  Jan|  200|
|  East|  Feb|  300|
|  East|  Mar|  250|
|  West|  Jan|  400|
|  West|  Feb|  350|
|  West|  Mar|  450|
+------+-----+-----+



In [27]:
spark.sql("""
        SELECT *,
                SUM(Sales) OVER (PARTITION BY Region ORDER BY Sales) AS cumulative_sales,
                RANK() OVER (PARTITION BY Region ORDER BY Sales) AS Rank
        FROM Sales
        """).show()

+------+-----+-----+----------------+----+
|Region|Month|Sales|cumulative_sales|Rank|
+------+-----+-----+----------------+----+
|  East|  Jan|  200|             200|   1|
|  East|  Mar|  250|             450|   2|
|  East|  Feb|  300|             750|   3|
|  West|  Feb|  350|             350|   1|
|  West|  Jan|  400|             750|   2|
|  West|  Mar|  450|            1200|   3|
+------+-----+-----+----------------+----+

