In [37]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, count, when, isnull, isnan, desc, to_date,current_date,mean ,lit,substring,ceil
# Création d'une session Spark
spark = SparkSession.builder\
         .appName("RatingEngine")\
         .config("spark.sql.shuffle.partitions", "50")\
         .config("spark.sql.autoBroadcastJoinThreshold", "10485760")\
         .config("spark.driver.memory", "3g")\
         .getOrCreate()


In [38]:
data_path = r".\cleand_df"  # Ajustez le chemin selon votre environnement
cleanedCdrsDf  = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(data_path)
cleanedCdrsDf.show()

+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+
|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec| product_code|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|
+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+
|3fb0ed98-9087-47c...|2025-06-10 12:59:...|   TETOUAN_01|        5G|     NULL|     NULL|        NULL|    DATA_PLUS|        ready|     NULL|       NULL|212675185622|                 499|        1159.5|       data|
|6206e624-ce2c-40a...|2025-06-10 12:59:...|CASABLANCA_02|        4G|     NULL|     NULL|        NULL|DATA_STANDARD|        ready|     NULL|       NU

In [39]:
custuomers_df = spark.read \
    .option("header", True) \
    .csv("customer_subscriptions.csv")

In [40]:
catalogDf = spark.read \
    .option("header", True) \
    .csv("plans/product_catalog.csv")


In [41]:
from pyspark.sql.functions import col

ratePlansDf = spark.read \
    .option("header", True) \
    .csv("plans/rate_plans.csv") \
    .withColumn("unit_price", col("unit_price").cast("double")) \
    .withColumn("free_units", col("free_units").cast("int")) \
    .withColumn("tier_threshold", col("tier_threshold").cast("int")) \
    .withColumn("tier_price", col("tier_price").cast("double"))


In [42]:
cdrs = (cleanedCdrsDf
        .withColumn(
            "customer_id",
            when(col("record_type") == "voice", col("caller_id"))
            .when(col("record_type") == "sms",  col("sender_id"))
            .otherwise(col("user_id"))
        ))

In [43]:
cdrs.show()

+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+
|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec| product_code|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type| customer_id|
+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+
|3fb0ed98-9087-47c...|2025-06-10 12:59:...|   TETOUAN_01|        5G|     NULL|     NULL|        NULL|    DATA_PLUS|        ready|     NULL|       NULL|212675185622|                 499|        1159.5|       data|212675185622|
|6206e624-ce2c-40a...|2025-06-10 12:59:...|CASABLANCA_02|        4G|     NULL|     NULL|        

In [44]:
# ─── jointure client → plan ─────────────────────────────────────────────
cdrs1 = cdrs.join(
    custuomers_df.filter((col("status") == "active") &
                       (col("subscription_type") == "postpaid"))
               .select("customer_id", "rate_plan_id"),
    on="customer_id",
    how="inner"
)
cdrs1.show()

+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+
| customer_id|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec| product_code|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|rate_plan_id|
+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+
|212675185622|3fb0ed98-9087-47c...|2025-06-10 12:59:...|   TETOUAN_01|        5G|     NULL|     NULL|        NULL|    DATA_PLUS|        ready|     NULL|       NULL|212675185622|                 499|        1159.5|       data|     Starter|
|212600679822|6206e624-ce2c-40a...|2025-06-1

In [45]:
# ─── jointure produit (rate_type, service_type) ────────────────────────
cdrs2 = cdrs1.join(catalogDf, on="product_code", how="left")
cdrs2.show()

+-------------+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+------------+----+---------+--------------------+
| product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|rate_plan_id|service_type|unit|rate_type|         description|
+-------------+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+------------+----+---------+--------------------+
|    DATA_PLUS|212675185622|3fb0ed98-9087-47c...|2025-06-10 12:59:...|   TETOUAN_01|        5G|     NULL|     NULL|        NULL|        

In [46]:
cdrs3 = cdrs2.join(ratePlansDf,
                   on=["rate_plan_id", "product_code"],
                   how="left")

In [47]:
cdrs3.show()

+------------+-------------+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+----+---------+--------------------+------------+----------+----------+--------------+----------+
|rate_plan_id| product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|service_type|unit|rate_type|         description|service_type|unit_price|free_units|tier_threshold|tier_price|
+------------+-------------+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------------+----+---------+--------------------+------------+--

In [48]:
cdrs3=cdrs3.drop("service_type")
cdrs3.show()

+------------+-------------+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+----+---------+--------------------+----------+----------+--------------+----------+
|rate_plan_id| product_code| customer_id|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|
+------------+-------------+------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+----+---------+--------------------+----------+----------+--------------+----------+
|     Starter|    DATA_PLUS|212

In [49]:
from pyspark.sql import Window
agg = (cdrs3.filter(col("record_type")=="data")
           .groupBy("customer_id")
           .agg(mean("data_volume_mb").alias("user_mean_mb")))

cdrs4 = cdrs3.join(agg, "customer_id", "left")

In [50]:
cdrs4.show()

+------------+------------+-------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+----+---------+--------------------+----------+----------+--------------+----------+------------+
| customer_id|rate_plan_id| product_code|           record_ID|           timestamp|      cell_id|technology|caller_id|callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_mean_mb|
+------------+------------+-------------+--------------------+--------------------+-------------+----------+---------+---------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+----+---------+--------------------+----------+----------+--------------+----------+------

In [51]:
cdrs4 = cdrs4.withColumn(
    "imputed_mb",
    when((col("data_volume_mb").isNull()) | (col("data_volume_mb")=="inconnu"), col("user_mean_mb"))
    .otherwise(col("data_volume_mb"))
)

In [52]:
cdrs4 = cdrs4.withColumn(
    "imputed_mb",
    when((col("user_mean_mb").isNull()) & (col("imputed_mb").isNull()), lit(0)).otherwise(col("imputed_mb")))

In [53]:
cdrs4 = cdrs4.withColumn(
    "rating_status",
    when(col("rating_status")=="needs_review", "imputed").otherwise(col("rating_status"))
)

In [54]:
cdrs4.show()

+------------+------------+-------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------+---------+--------------------+----------+----------+--------------+----------+------------+----------+
| customer_id|rate_plan_id| product_code|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_mean_mb|imputed_mb|
+------------+------------+-------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------+---------+--------------------+--------

In [55]:
cdrs4 = cdrs4.withColumn(
    "dur_sec_imp",
    when(col("duration_sec").isNull() | (col("duration_sec") < 0), lit(0))
    .otherwise(col("duration_sec"))
)

In [56]:
cdrs4 = cdrs4.withColumn(
    "billable_units",
    when(col("record_type") == "voice",
         ceil(col("dur_sec_imp")))       # arrondi à la minute
    .when(col("record_type") == "sms",
         lit(1))
    .otherwise(                               # data
         col("imputed_mb"))
)


In [57]:
cdrs4.show()

+------------+------------+-------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+---------+-----------+------------+--------------------+--------------+-----------+------+---------+--------------------+----------+----------+--------------+----------+------------+----------+-----------+--------------+
| customer_id|rate_plan_id| product_code|           record_ID|           timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|sender_id|receiver_id|     user_id|session_duration_sec|data_volume_mb|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_mean_mb|imputed_mb|dur_sec_imp|billable_units|
+------------+------------+-------------+--------------------+--------------------+-------------+----------+------------+------------+------------+-------------+---------+-----------+------------+--------------------+--------------+----

In [58]:
cdrsRated = cdrs4.withColumn(
    "cost",
    when(col("rate_type") == "flat",
         when(col("billable_units") <= col("free_units"), lit(0))
         .otherwise((col("billable_units") - col("free_units")) * col("unit_price")))
    .when(col("rate_type") == "tiered",
         when(col("billable_units") <= col("free_units"), lit(0))
         .when(col("billable_units") <= col("tier_threshold"),
               (col("billable_units") - col("free_units")) * col("unit_price"))
         .otherwise(
               (col("tier_threshold") - col("free_units")) * col("unit_price") +
               (col("billable_units") - col("tier_threshold")) * col("tier_price")))
)

cdrsRated = cdrsRated.withColumn(
    "rating_status",
    when(col("data_volume_mb").isNull(), "imputed")      # data imputée
    .when(col("duration_sec").isNull(),  "imputed")      # voice imputée
    .when(col("cost").isNull(),         "unmatched")
    .otherwise("rated")
)


In [59]:
cdrsRated.select("record_ID","customer_id","product_code",
                 "billable_units","unit_price","cost","rating_status") \
         .show(truncate=False)


+------------------------------------+------------+-------------+--------------+----------+------------------+-------------+
|record_ID                           |customer_id |product_code |billable_units|unit_price|cost              |rating_status|
+------------------------------------+------------+-------------+--------------+----------+------------------+-------------+
|3b05fe59-8ac1-4bd6-80e2-804949a8d849|212656101919|DATA_PLUS    |1073.35       |0.015     |0.0               |imputed      |
|f7a77471-0221-4213-aceb-a7ffdb2dd6cb|212715284078|VOICE_NAT    |239           |0.008     |0.712             |imputed      |
|1f87f2c1-e038-405c-b127-3035b0ba0d67|212617942523|VOICE_NAT    |399           |0.006     |0.594             |imputed      |
|ff79da47-77d4-4aaa-b4a9-a0cfc5b81b76|212793499686|VOICE_NAT    |527           |0.006     |1.362             |imputed      |
|a5115bcb-29ae-4a49-8209-a859fa82529b|212647686237|DATA_PLUS    |1846.16       |0.012     |0.0               |imputed      |
