In [1]:
%pip install python-dotenv

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
from dotenv import load_dotenv

# تحميل المتغيرات من ملف .env
load_dotenv()


storage_account_key = os.getenv("AZURE_STORAGE_KEY")


if storage_account_key:
    spark.conf.set(
        "fs.azure.account.key.kirostoragep1.dfs.core.windows.net",
        storage_account_key
    )
    spark.conf.set(
        "fs.azure.account.key.kirostoragep1.blob.core.windows.net",
        storage_account_key
    )
    print("✅")
else:
    print("❌")

In [0]:

df_store = spark.read.parquet("abfss://filesp1@kirostoragep1.dfs.core.windows.net/dbo.stores.parquet")


df_store.show(5)

+--------+--------------------+------------+
|store_id|          store_name|    location|
+--------+--------------------+------------+
|       1|     City Mall Store|         UAE|
|       2|   High Street Store|Saudi Arabia|
|       3|   Tech World Outlet|       Qatar|
|       4|Cairo Festival Ci...|       Egypt|
|       5|          Mega Plaza|      Kuwait|
+--------+--------------------+------------+



In [0]:
df_transections=spark.read.parquet("abfss://filesp1@kirostoragep1.dfs.core.windows.net/dbo.transactions.parquet")
df_transections.show(5)
# https://kirostoragep1.blob.core.windows.net/filesp1/dbo.transactions.parquet

+--------------+-----------+----------+--------+--------+----------------+
|transaction_id|customer_id|product_id|store_id|quantity|transaction_date|
+--------------+-----------+----------+--------+--------+----------------+
|            31|        101|         1|       1|       3|      2025-04-01|
|            32|        102|         2|       2|       2|      2025-04-03|
|            33|        103|         3|       3|       1|      2025-04-05|
|            34|        104|         4|       4|       5|      2025-04-07|
|            35|        105|         5|       5|       2|      2025-04-09|
+--------------+-----------+----------+--------+--------+----------------+
only showing top 5 rows


In [0]:
df_products=spark.read.parquet("abfss://filesp1@kirostoragep1.dfs.core.windows.net/dbo.products.parquet")
df_products.show(5)

+----------+-----------------+-----------+-----+
|product_id|     product_name|   category|price|
+----------+-----------------+-----------+-----+
|         1|   Wireless Mouse|Electronics|  800|
|         2|Bluetooth Speaker|Electronics| 1200|
|         3|         Yoga Mat|    Fitness|  499|
|         4|     Laptop Stand|Accessories|  999|
|         5|     Notebook Set| Stationery|  149|
+----------+-----------------+-----------+-----+
only showing top 5 rows


In [0]:

df_customers=spark.read.parquet("abfss://filesp1@kirostoragep1.dfs.core.windows.net/customers (2).parquet")
df_customers.show(5)

+-----------+----------------+--------------------+------------+-----------------+
|customer_id|       full_name|               email|     country|registration_date|
+-----------+----------------+--------------------+------------+-----------------+
|       1001|    Ahmed Khaled|ahmed.khaled1@gma...|       Egypt|       2025-10-16|
|       1002| Sara Al Mansour|sara.almansour2@o...|Saudi Arabia|       2025-10-18|
|       1003|    Layla Kazemi|layla.kazemi3@yah...|         UAE|       2025-10-19|
|       1004|     Omar Farouk|omar.farouk4@gmai...|       Egypt|       2025-10-17|
|       1005|Fatima Al Rashid|fatima.alrashid5@...|Saudi Arabia|       2025-10-15|
+-----------+----------------+--------------------+------------+-----------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import col, sum, countDistinct, max, datediff, current_date, expr


df_sales = df_transections.join(df_products, "product_id") \
                          .withColumn("total_sale_amount", col("quantity") * col("price"))


df_sales_fixed = df_sales.withColumn("customer_id_fixed", col("customer_id") + 900)




df_customer_summary = df_sales_fixed.groupBy("customer_id_fixed") \
                              .agg(
                                  sum("total_sale_amount").alias("total_monetary_value"),
                                  countDistinct("transaction_id").alias("frequency"),
                                  max("transaction_date").alias("last_purchase_date")
                              )


df_customer_summary = df_customer_summary.withColumn("recency_days", datediff(current_date(), col("last_purchase_date")))


df_final_customers = df_customer_summary.join(
    df_customers,
    df_customer_summary["customer_id_fixed"] == df_customers["customer_id"]
)


print("--- الداتا النهائية للعملاء (بعد إصلاح الـ ID) ---")
df_final_customers.show(10)

--- الداتا النهائية للعملاء (بعد إصلاح الـ ID) ---
+-----------------+--------------------+---------+------------------+------------+-----------+-----------------+--------------------+------------+-----------------+
|customer_id_fixed|total_monetary_value|frequency|last_purchase_date|recency_days|customer_id|        full_name|               email|     country|registration_date|
+-----------------+--------------------+---------+------------------+------------+-----------+-----------------+--------------------+------------+-----------------+
|             1025|                 149|        1|        2025-05-19|         164|       1025|      Huda Nasser|huda.nasser25@hot...|       Egypt|       2025-10-17|
|             1005|                3898|        2|        2025-06-03|         149|       1005| Fatima Al Rashid|fatima.alrashid5@...|Saudi Arabia|       2025-10-15|
|             1016|                1097|        2|        2025-06-25|         127|       1016|       Adel Nabil|adel.nabil16

In [0]:
from pyspark.sql.functions import percent_rank, when
from pyspark.sql.window import Window


windowSpec = Window.orderBy(col("total_monetary_value").desc())
df_ranked = df_final_customers.withColumn("rank", percent_rank().over(windowSpec))


df_segmented = df_ranked.withColumn("Customer_Tier",
    when(col("rank") <= 0.20, "Gold")  # Top 20%
    .when((col("rank") > 0.20) & (col("rank") <= 0.50), "Silver") # Next 30%
    .otherwise("Bronze") # Bottom 50%
)


print("العملاء بعد التقسيم لطبقات:")
df_segmented.show()

العملاء بعد التقسيم لطبقات:




+-----------------+--------------------+---------+------------------+------------+-----------+----------------+--------------------+------------+-----------------+--------------------+-------------+
|customer_id_fixed|total_monetary_value|frequency|last_purchase_date|recency_days|customer_id|       full_name|               email|     country|registration_date|                rank|Customer_Tier|
+-----------------+--------------------+---------+------------------+------------+-----------+----------------+--------------------+------------+-----------------+--------------------+-------------+
|             1017|               28991|        2|        2025-06-27|         125|       1017| Mariam Al Farsi|mariam.alfarsi17@...|Saudi Arabia|       2025-10-18|                 0.0|         Gold|
|             1010|               17992|        2|        2025-06-13|         139|       1010|      Tamer Adel|tamer.adel10@hotm...|       Egypt|       2025-10-17|0.038461538461538464|         Gold|
|    

In [0]:
from pyspark.sql.functions import avg
df_segmented.groupBy("Customer_Tier") \
            .agg(
                sum("total_monetary_value").alias("Total_Sales"),
                avg("total_monetary_value").alias("Avg_Sales_Per_Customer"),
                avg("frequency").alias("Avg_Frequency"),
                avg("recency_days").alias("Avg_Recency_Days")
            ).orderBy(col("Total_Sales").desc()).show()



+-------------+-----------+----------------------+------------------+------------------+
|Customer_Tier|Total_Sales|Avg_Sales_Per_Customer|     Avg_Frequency|  Avg_Recency_Days|
+-------------+-----------+----------------------+------------------+------------------+
|         Gold|     100366|    16727.666666666668|1.8333333333333333|             139.0|
|       Silver|      50966|               6370.75|             1.875|           135.875|
|       Bronze|      26253|    2019.4615384615386|1.8461538461538463|141.46153846153845|
+-------------+-----------+----------------------+------------------+------------------+



In [0]:
df_sales.join(df_segmented.select("customer_id", "Customer_Tier"), "customer_id") \
        .filter(col("Customer_Tier") == "Gold") \
        .groupBy("product_name", "category") \
        .agg(sum("total_sale_amount").alias("Total_Sales")) \
        .orderBy(col("Total_Sales").desc()) \
        .show(10)



+------------+--------+-----------+
|product_name|category|Total_Sales|
+------------+--------+-----------+
+------------+--------+-----------+



In [0]:
from pyspark.sql.functions import count

df_segmented_grouped = (
    df_segmented
    .groupBy("country", "Customer_Tier")
    .agg(count("customer_id").alias("Customer_Count"))
    .orderBy("country", "Customer_Tier")
)

display(df_segmented_grouped)



country,Customer_Tier,Customer_Count
Egypt,Bronze,4
Egypt,Gold,1
Egypt,Silver,4
Saudi Arabia,Bronze,5
Saudi Arabia,Gold,3
Saudi Arabia,Silver,1
UAE,Bronze,4
UAE,Gold,2
UAE,Silver,3


In [0]:
print("--- 4. تحليل المتاجر والطبقات ---")
df_sales.join(df_store, "store_id") \
        .join(df_segmented.select("customer_id", "Customer_Tier"),
              df_sales["customer_id"] + 900 == df_segmented["customer_id"]) \
        .groupBy("store_name", "location", "Customer_Tier") \
        .agg(sum("total_sale_amount").alias("Total_Sales")) \
        .orderBy(col("Total_Sales").desc()) \
        .show()

--- 4. تحليل المتاجر والطبقات ---




+--------------------+------------+-------------+-----------+
|          store_name|    location|Customer_Tier|Total_Sales|
+--------------------+------------+-------------+-----------+
|   High Street Store|Saudi Arabia|         Gold|      72187|
|Cairo Festival Ci...|       Egypt|       Silver|      31975|
|Cairo Festival Ci...|       Egypt|         Gold|      23986|
|   Tech World Outlet|       Qatar|       Bronze|      10177|
|   High Street Store|Saudi Arabia|       Silver|       9799|
|     City Mall Store|         UAE|       Silver|       7596|
|     City Mall Store|         UAE|       Bronze|       6190|
|   High Street Store|Saudi Arabia|       Bronze|       6000|
|          Mega Plaza|      Kuwait|         Gold|       4193|
|          Mega Plaza|      Kuwait|       Bronze|       3886|
|   Tech World Outlet|       Qatar|       Silver|       1596|
+--------------------+------------+-------------+-----------+



In [0]:

df_final_report = df_segmented.select(
    "customer_id", "full_name", "country", "registration_date",
    "total_monetary_value", "frequency", "recency_days", "Customer_Tier"
)

df_final_report.write.mode("overwrite").parquet("wasbs://filesp1@kirostoragep1.blob.core.windows.net/analysis_results/customer_segmented_report")



df_sales_joined = df_sales.join(df_customers, df_sales["customer_id"] + 900 == df_customers["customer_id"]) \
                            .join(df_store, "store_id")

df_sales_detailed = df_sales_joined.select(
    "transaction_id",
    df_customers["customer_id"], 
    "product_id",
    "store_id",
    "quantity",
    "transaction_date",
    "price",
    "total_sale_amount",
    "product_name",
    "category",
    "country",
    "store_name",
    "location"
)

df_sales_detailed.write.mode("overwrite").parquet("wasbs://filesp1@kirostoragep1.blob.core.windows.net/analysis_results/fact_sales_detailed")

print("--- 🚀 تم حفظ الملفات بنجاح وجاهزة لـ Power BI ---")



--- 🚀 تم حفظ الملفات بنجاح وجاهزة لـ Power BI ---
