In [1]:
from pyspark.sql.types import *
 
orderSchema = StructType([
     StructField("SalesOrderNumber", StringType()),
     StructField("SalesOrderLineNumber", IntegerType()),
     StructField("OrderDate", DateType()),
     StructField("CustomerName", StringType()),
     StructField("Email", StringType()),
     StructField("Item", StringType()),
     StructField("Quantity", IntegerType()),
     StructField("UnitPrice", FloatType()),
     StructField("Tax", FloatType())
     ])
    
 # Import all files from bronze folder of lakehouse
df = spark.read.format("csv").option("header", "true").schema(orderSchema).load("Files/Bronze/*.csv")

StatementMeta(, c81586df-5cb4-4404-91a7-5020d186b438, 3, Finished, Available, Finished)

In [2]:
from pyspark.sql.functions import col, sum as _sum, round

# Calculate total sales (Quantity * UnitPrice)
df = df.withColumn("TotalSales", col("Quantity") * col("UnitPrice"))

# Calculate Customer Lifetime Value (CLV)
clv = df.groupBy("CustomerName").agg(
    _sum("TotalSales").alias("TotalSales"),
    _sum("Tax").alias("TotalTax")
).withColumn("CustomerLifetimeValue", round(col("TotalSales") + col("TotalTax"), 2))

# Show results
clv.orderBy(col("CustomerLifetimeValue").desc()).show()

StatementMeta(, c81586df-5cb4-4404-91a7-5020d186b438, 4, Finished, Available, Finished)

+------------------+------------------+-----------------+---------------------+
|      CustomerName|        TotalSales|         TotalTax|CustomerLifetimeValue|
+------------------+------------------+-----------------+---------------------+
|     Larry Vazquez|10899.620180130005|871.9695977568626|             11771.59|
| Kaitlyn Henderson|10896.220178842545|871.6975977420807|             11767.92|
|      Nichole Nara|10876.320177316666|870.1055978238583|             11746.43|
|        Kate Anand| 10872.06017780304|869.7647977471352|             11741.82|
|       Margaret He|10841.220174789429|867.2975977361202|             11708.52|
|   Lawrence Alonso|10836.900176048279| 866.951997667551|             11703.85|
|Terrance Rodriguez|10829.220174789429|866.3375976979733|             11695.56|
|           Rosa Hu|10817.600177764893|865.4079977273941|             11683.01|
|      Aaron Wright|10813.630178451538|865.0903976857662|             11678.72|
|      Clarence Gao|10799.520174980164|8

In [3]:
from pyspark.sql.functions import col

# Calculate total sales per product
product_performance = df.groupBy("Item").agg(
    _sum("Quantity").alias("TotalQuantitySold"),
    _sum("TotalSales").alias("TotalRevenue")
)

# Show top 10 products by total revenue
product_performance.orderBy(col("TotalRevenue").desc()).show(10)

StatementMeta(, c81586df-5cb4-4404-91a7-5020d186b438, 5, Finished, Available, Finished)

+--------------------+-----------------+------------------+
|                Item|TotalQuantitySold|      TotalRevenue|
+--------------------+-----------------+------------------+
|    Road-150 Red, 48|              337|1205876.9965820312|
|    Road-150 Red, 62|              336|   1202298.7265625|
|    Road-150 Red, 52|              302|1080637.5458984375|
|    Road-150 Red, 56|              295|1055589.6557617188|
|    Road-150 Red, 44|              281|1005493.8754882812|
|Mountain-200 Blac...|              425| 925946.4848632812|
|Mountain-200 Blac...|              388| 844474.3410644531|
|Mountain-200 Silv...|              371| 816222.2004394531|
|Mountain-200 Silv...|              370| 815145.0620117188|
|Mountain-200 Blac...|              366| 799148.2900390625|
+--------------------+-----------------+------------------+
only showing top 10 rows



In [4]:
from pyspark.sql.functions import month, year, col, sum as _sum, lag
from pyspark.sql.window import Window

# Extract year and month from OrderDate
df = df.withColumn("Year", year("OrderDate")).withColumn("Month", month("OrderDate"))

# Calculate monthly total sales
monthly_sales = df.groupBy("Year", "Month").agg(_sum("TotalSales").alias("MonthlyRevenue"))

# Calculate monthly sales growth
window_spec = Window.orderBy("Year", "Month")
monthly_sales = monthly_sales.withColumn("PreviousMonthRevenue", lag("MonthlyRevenue").over(window_spec))
monthly_sales = monthly_sales.withColumn("MonthlyGrowth", 
                                         (col("MonthlyRevenue") - col("PreviousMonthRevenue")) / col("PreviousMonthRevenue") * 100)

# Show monthly sales growth
monthly_sales.orderBy("Year", "Month").show()

StatementMeta(, c81586df-5cb4-4404-91a7-5020d186b438, 6, Finished, Available, Finished)

+----+-----+------------------+--------------------+-------------------+
|Year|Month|    MonthlyRevenue|PreviousMonthRevenue|      MonthlyGrowth|
+----+-----+------------------+--------------------+-------------------+
|2019|    7| 930628.7577514648|                null|               null|
|2019|    8| 519494.1435546875|   930628.7577514648|-44.178154905736925|
|2019|    9|513329.47607421875|   519494.1435546875| -1.186667368045082|
|2019|   10| 561681.4779663086|  513329.47607421875|  9.419291925698605|
|2019|   11| 737839.8244018555|   561681.4779663086|  31.36267677427551|
|2019|   12| 596746.5590820312|   737839.8244018555| -19.12247897898494|
|2020|    1| 613858.8395996094|   596746.5590820312| 2.8675960099211566|
|2020|    2| 580393.9633789062|   613858.8395996094| -5.451558902781404|
|2020|    3| 700873.1859130859|   580393.9633789062|  20.75817981165418|
|2020|    4| 636375.3036499023|   700873.1859130859| -9.202503899354749|
|2020|    5| 691261.3830566406|   636375.3036499023