In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

# Create Spark session
spark = SparkSession.builder \
    .appName("SampleData") \
    .getOrCreate()

# Create sample data without 'amount' first
data = [(i, f"Product_{i%10}", i%5, 2024) for i in range(1, 100001)]
columns = ["order_id", "product", "category_id", "year"]

df = spark.createDataFrame(data, columns)

# Add 'amount' column using rand()
df = df.withColumn("amount", rand())

df.show(5)

+--------+---------+-----------+----+-------------------+
|order_id|  product|category_id|year|             amount|
+--------+---------+-----------+----+-------------------+
|       1|Product_1|          1|2024| 0.5992323041299771|
|       2|Product_2|          2|2024|0.07822848401093152|
|       3|Product_3|          3|2024| 0.2427299357775693|
|       4|Product_4|          4|2024|0.07179594608397344|
|       5|Product_5|          0|2024| 0.6011166094871854|
+--------+---------+-----------+----+-------------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import col

# Filter orders with amount > 0.1 (to remove very small random amounts)
# and select only required columns
df_transformed = df.filter(col("amount") > 0.1) \
                   .select("order_id", "product", "category_id", "year", "amount")

# Show sample data
df_transformed.show(5)

+--------+---------+-----------+----+-------------------+
|order_id|  product|category_id|year|             amount|
+--------+---------+-----------+----+-------------------+
|       1|Product_1|          1|2024| 0.5992323041299771|
|       3|Product_3|          3|2024| 0.2427299357775693|
|       5|Product_5|          0|2024| 0.6011166094871854|
|       8|Product_8|          3|2024|0.11165680345968576|
|      10|Product_0|          0|2024|0.20870566569233995|
+--------+---------+-----------+----+-------------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import sum, avg, round

# Compute total and average amount per category
df_summary = df_transformed.groupBy("category_id") \
                           .agg(
                               round(sum("amount"),2).alias("total_amount"),
                               round(avg("amount"),2).alias("avg_amount")
                           )

df_summary.show()


+-----------+------------+----------+
|category_id|total_amount|avg_amount|
+-----------+------------+----------+
|          0|     9836.18|      0.55|
|          2|     9911.57|      0.55|
|          1|     9933.45|      0.55|
|          3|     9968.72|      0.55|
|          4|     9886.08|      0.55|
+-----------+------------+----------+



In [0]:
# Convert to Pandas and download locally
df_summary_pd = df_summary.toPandas()

# Display in notebook
display(df_summary_pd)

# Save locally on your machine (optional)
df_summary_pd.to_csv("cost_optimized_summary.csv", index=False)
print("Pipeline complete: summary ready in-memory and downloadable locally.")

category_id,total_amount,avg_amount
0,9836.18,0.55
2,9911.57,0.55
1,9933.45,0.55
3,9968.72,0.55
4,9886.08,0.55


Pipeline complete: summary ready in-memory and downloadable locally.


In [0]:
display(df_summary)

category_id,total_amount,avg_amount
0,9951.33,0.55
2,9829.88,0.55
1,9861.99,0.55
3,9941.22,0.55
4,9912.37,0.55


In [0]:
# On serverless compute, caching and persisting is not supported
# So just use the transformed DataFrame directly

# Check row count after filtering
print("Total rows after filter:", df_transformed.count())

Total rows after filter: 89990


In [0]:
from pyspark.sql.functions import sum, avg, round

df_summary = df_transformed.groupBy("category_id") \
                           .agg(
                               round(sum("amount"),2).alias("total_amount"),
                               round(avg("amount"),2).alias("avg_amount")
                           )

display(df_summary)

category_id,total_amount,avg_amount
0,9951.33,0.55
2,9829.88,0.55
1,9861.99,0.55
3,9941.22,0.55
4,9912.37,0.55


In [0]:
# Convert to Pandas for local download
df_summary_pd = df_summary.toPandas()

# Display in notebook
display(df_summary_pd)

# Optional: Save CSV / Excel locally
df_summary_pd.to_csv("cost_optimized_summary.csv", index=False)
df_summary_pd.to_excel("cost_optimized_summary.xlsx", index=False)

print("Pipeline complete: summary ready and downloadable locally.")

category_id,total_amount,avg_amount
0,9836.18,0.55
2,9911.57,0.55
1,9933.45,0.55
3,9968.72,0.55
4,9886.08,0.55


Pipeline complete: summary ready and downloadable locally.


In [0]:
# Install openpyxl in the cluster
%pip install openpyxl

Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Restart Python to activate the newly installed package
%restart_python

In [0]:
# Convert to Pandas (if not already)
df_summary_pd = df_summary.toPandas()

# Save as Excel
df_summary_pd.to_excel("cost_optimized_summary.xlsx", index=False)

print("Pipeline complete: Excel file saved locally!")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-6936184569399163>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Convert to Pandas (if not already)[39;00m
[0;32m----> 2[0m df_summary_pd [38;5;241m=[39m df_summary[38;5;241m.[39mtoPandas()
[1;32m      4[0m [38;5;66;03m# Save as Excel[39;00m
[1;32m      5[0m df_summary_pd[38;5;241m.[39mto_excel([38;5;124m"[39m[38;5;124mcost_optimized_summary.xlsx[39m[38;5;124m"[39m, index[38;5;241m=[39m[38;5;28;01mFalse[39;00m)

[0;31mNameError[0m: name 'df_summary' is not defined

In [0]:
spark.stop()