In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, avg, countDistinct, month, year

spark = (SparkSession.builder
         .appName("GreenSupply-BI")
         .master("local[*]")
         .getOrCreate())

base = "../data"  

customers = spark.read.option("header", True).option("inferSchema", True).csv(f"{base}/customers.csv")
products  = spark.read.option("header", True).option("inferSchema", True).csv(f"{base}/products.csv")
orders    = spark.read.option("header", True).option("inferSchema", True).csv(f"{base}/orders.csv")

df = (orders
      .join(products, "product_id", "left")
      .join(customers, "customer_id", "left"))


In [4]:
by_category = (df.groupBy("category")
                 .agg(_sum("net_sales").alias("total_revenue"),
                      _sum("contribution_margin").alias("total_margin"))
                 .orderBy(col("total_revenue").desc()))

by_category.show(truncate=False)

+------------+-----------------+------------------+
|category    |total_revenue    |total_margin      |
+------------+-----------------+------------------+
|Taschen     |77565.26999999999|30038.15          |
|Becher      |77493.00000000007|30168.85000000002 |
|Füllmaterial|62928.96         |24493.069999999985|
|Flaschen    |48152.55000000001|18747.050000000025|
|Boxen       |28699.59000000001|11107.359999999997|
+------------+-----------------+------------------+



In [5]:
customer_rev = (df.groupBy("customer_id","company_name","segment","region")
                  .agg(_sum("net_sales").alias("total_revenue"),
                       _sum("contribution_margin").alias("total_margin"))
                  .orderBy(col("total_revenue").desc()))

total_rev = customer_rev.agg(_sum("total_revenue").alias("t")).collect()[0]["t"]
top10_rev = customer_rev.limit(10).agg(_sum("total_revenue").alias("t")).collect()[0]["t"]

print(f"Top-10 Kunden Anteil am Umsatz: {top10_rev/total_rev:.1%}")
customer_rev.show(10, truncate=False)


Top-10 Kunden Anteil am Umsatz: 10.8%
+-----------+-------------------------+-------+------+------------------+------------------+
|customer_id|company_name             |segment|region|total_revenue     |total_margin      |
+-----------+-------------------------+-------+------+------------------+------------------+
|CUST1039   |Meier-Müller             |C      |SG    |3691.44           |1524.53           |
|CUST1118   |Bianchi and Sons         |B      |GE    |3396.59           |1348.93           |
|CUST1036   |Huber LLC                |B      |GE    |3262.29           |1266.4399999999998|
|CUST1144   |Bianchi-Schäfer          |A      |ZH    |3258.6899999999996|1271.68           |
|CUST1003   |Hürlimann-Meister        |B      |SG    |3166.22           |1214.13           |
|CUST1197   |Stalder, Fuchs and Kohler|A      |GE    |3117.46           |1226.75           |
|CUST1045   |Hug Group                |A      |BE    |3046.8199999999997|1194.1            |
|CUST1062   |Schaub-Ammann      

In [6]:
monthly = (df.withColumn("year", year("order_date"))
             .withColumn("month", month("order_date"))
             .groupBy("year","month")
             .agg(_sum("net_sales").alias("total_revenue"),
                  _sum("contribution_margin").alias("total_margin"))
             .orderBy("year","month"))

monthly.show(24, truncate=False)


+----+-----+------------------+------------------+
|year|month|total_revenue     |total_margin      |
+----+-----+------------------+------------------+
|2024|1    |48220.400000000016|18733.60000000001 |
|2024|2    |43603.200000000004|16854.440000000002|
|2024|3    |51752.63          |20199.82999999999 |
|2024|4    |47815.02999999999 |18587.709999999995|
|2024|5    |51005.54          |19681.160000000007|
|2024|6    |52442.56999999996 |20497.739999999976|
+----+-----+------------------+------------------+



In [7]:
from pyspark.sql.functions import avg

discount_effect = (df.groupBy("category")
                     .agg(avg("discount").alias("avg_discount"),
                          avg("contribution_margin").alias("avg_margin"))
                     .orderBy(col("avg_discount").desc()))

discount_effect.show(truncate=False)


+------------+-------------------+-----------------+
|category    |avg_discount       |avg_margin       |
+------------+-------------------+-----------------+
|Boxen       |0.07604938271604936|68.56395061728394|
|Becher      |0.07409374999999997|94.27765625000006|
|Flaschen    |0.0739860139860139 |65.54912587412596|
|Taschen     |0.07377833753148616|75.66284634760706|
|Füllmaterial|0.07146268656716409|73.11364179104473|
+------------+-------------------+-----------------+



In [8]:
orders_per_customer = df.groupBy("customer_id").agg(countDistinct("order_id").alias("n_orders"))
n_repeat = orders_per_customer.filter(col("n_orders") >= 2).count()
n_total  = orders_per_customer.count()
print(f"Wiederkaufsrate: {n_repeat/n_total:.1%}")


Wiederkaufsrate: 99.0%


In [9]:
dq = {
    "missing_customer_id": df.filter(col("customer_id").isNull()).count(),
    "missing_product_id":  df.filter(col("product_id").isNull()).count(),
    "non_positive_qty":    df.filter(col("quantity") <= 0).count(),
    "neg_sales":           df.filter(col("net_sales") < 0).count(),
}
print(dq)


{'missing_customer_id': 0, 'missing_product_id': 0, 'non_positive_qty': 0, 'neg_sales': 0}
