In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=132ead334bb01688a71235ccc718df161efc8fceba11783c9d88b8ccc9cda185
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("WindowExample").getOrCreate()

data = [(1, "Product A", 100),
 (2, "Product B", 150),
 (3, "Product C", 200),
 (4, "Product D", 120)]
columns = ["product_id", "product_name", "revenue"]
df = spark.createDataFrame(data, columns)

In [9]:
window_spec = Window.orderBy("product_id")
result = df.withColumn("total_revenue", F.sum("revenue").over(window_spec))
result.show()

+----------+------------+-------+-------------+
|product_id|product_name|revenue|total_revenue|
+----------+------------+-------+-------------+
|         1|   Product A|    100|          100|
|         2|   Product B|    150|          250|
|         3|   Product C|    200|          450|
|         4|   Product D|    120|          570|
+----------+------------+-------+-------------+



In [13]:
window_spec = Window.partitionBy("product_name").orderBy("revenue")
result = df.withColumn("rank", F.rank().over(window_spec))
result.show()

+----------+------------+-------+----+
|product_id|product_name|revenue|rank|
+----------+------------+-------+----+
|         1|   Product A|    100|   1|
|         2|   Product B|    150|   1|
|         3|   Product C|    200|   1|
|         4|   Product D|    120|   1|
+----------+------------+-------+----+



In [14]:
window_spec = Window.orderBy("product_id").rowsBetween(-1, 1)
result = df.withColumn("moving_avg", F.avg("revenue").over(window_spec))
result.show()

+----------+------------+-------+------------------+
|product_id|product_name|revenue|        moving_avg|
+----------+------------+-------+------------------+
|         1|   Product A|    100|             125.0|
|         2|   Product B|    150|             150.0|
|         3|   Product C|    200|156.66666666666666|
|         4|   Product D|    120|             160.0|
+----------+------------+-------+------------------+



In [12]:
import time

start_time = time.time()
grouped_df = df.groupBy("product_name").agg(F.sum("revenue"))
group_by_time = time.time() - start_time

start_time = time.time()
window_spec = Window.partitionBy("product_name")
windowed_df = df.withColumn("total_revenue", F.sum("revenue").over(window_spec))
window_time = time.time() - start_time

print(f"Time taken for GROUP BY: {group_by_time:.4f} seconds")
print(f"Time taken for Window Expression: {window_time:.4f} seconds")

Time taken for GROUP BY: 0.0720 seconds
Time taken for Window Expression: 0.0391 seconds
