In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  when,mean ,lit,ceil
# Création d'une session Spark
spark = SparkSession.builder\
         .appName("RatingEngine")\
         .config("spark.jars", r".conf\postgresql-42.7.6.jar")\
         .config("spark.driver.extraClassPath", r".\conf\postgresql-42.7.6.jar")\
         .config("spark.executor.extraClassPath", r".\conf\\postgresql-42.7.6.jar")\
         .config("spark.sql.shuffle.partitions", "50")\
         .config("spark.sql.autoBroadcastJoinThreshold", "10485760")\
         .config("spark.driver.memory", "6g")\
         .getOrCreate()




In [2]:
data_path = r".\output_dir" 
cleanedCdrsDf  = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(data_path)

In [3]:
customersDf = spark.read \
    .format("jdbc")\
    .option("url",      "jdbc:postgresql://localhost:5432/projet_spark")\
    .option("dbtable",  "customers")\
    .option("user",     "postgres")\
    .option("password", "0000")\
    .load() 

customersDf.show(5)


+------------+----------------+-----------------+------------+---------------+------+--------------------+
| customer_id|   customer_name|subscription_type|rate_plan_id|activation_date|status|              region|
+------------+----------------+-----------------+------------+---------------+------+--------------------+
|212621008730|  Fouad El Ghazi|         postpaid|      PLAN_B|     2024-09-30|active|Béni Mellal-Khénifra|
|212680058037|Soufiane Belkadi|         postpaid|      PLAN_A|     2023-10-16|active|  Rabat-Salé-Kénitra|
|212705083484|    Rania Skalli|         postpaid|      PLAN_C|     2023-09-13|active|          L'Oriental|
|212742643119|    Hind Belkadi|         postpaid|      PLAN_B|     2024-10-09|active|   Guelmim-Oued Noun|
|212625950696|  Ehab El Amrani|         postpaid|      PLAN_C|     2024-12-16|active|      Marrakech-Safi|
+------------+----------------+-----------------+------------+---------------+------+--------------------+
only showing top 5 rows



In [4]:
catalogDf = spark.read \
    .option("header", True) \
    .csv("ressources/product_catalog.csv")


In [5]:
from pyspark.sql.functions import col

ratePlansDf = spark.read \
    .option("header", True) \
    .csv("ressources/rate_plans.csv") \
    .withColumn("unit_price", col("unit_price").cast("double")) \
    .withColumn("free_units", col("free_units").cast("int")) \
    .withColumn("tier_threshold", col("tier_threshold").cast("int")) \
    .withColumn("tier_price", col("tier_price").cast("double"))


In [6]:
cdrs = (cleanedCdrsDf
        .withColumn(
            "customer_id",
            when(col("record_type") == "voice", col("caller_id"))
            .when(col("record_type") == "sms",  col("sender_id"))
            .otherwise(col("user_id"))
        ))

In [7]:
# ─── jointure client → plan ─────────────────────────────────────────────
cdrs1 = cdrs.join(
    customersDf.filter((col("status") == "active") &
                       (col("subscription_type") == "postpaid"))
               .select("customer_id", "rate_plan_id"),
    on="customer_id",
    how="inner"
)
cdrs1.show()

+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+
| customer_id|           record_ID|          timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|product_code|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|rate_plan_id|
+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+
|212621008730|928baca9-3631-422...|2025-05-01 02:42:28|    MEKNES_01|        4G|        NULL|        NULL|        NULL|  DATA_BASIC|        ready|        NULL|        NULL|212621008730|                 140|        336.96|       2|  

In [8]:
# ─── jointure produit (rate_type, service_type) ────────────────────────
cdrs2 = cdrs1.join(catalogDf, on="product_code", how="left")
cdrs2.show()

+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+------------+------+---------+--------------------+
|product_code| customer_id|           record_ID|          timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|rate_plan_id|service_type|  unit|rate_type|         description|
+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+------------+------+---------+--------------------+
|  DATA_BASIC|212621008730|928baca9-3631-422...|2025-05-01 02:42:28|    MEKNES_

In [9]:
cdrs3 = cdrs2.join(ratePlansDf,
                   on=["rate_plan_id", "product_code"],
                   how="left")

In [10]:
cdrs3.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------------+------+---------+--------------------+------------+----------+----------+--------------+----------+
|rate_plan_id|product_code| customer_id|           record_ID|          timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|service_type|  unit|rate_type|         description|service_type|unit_price|free_units|tier_threshold|tier_price|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+---------

In [11]:
cdrs3=cdrs3.drop("service_type")
cdrs3.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+
|rate_plan_id|product_code| customer_id|           record_ID|          timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+-

In [12]:
from pyspark.sql.functions import col, min as spark_min

agg = (cdrs3
       .filter(col("record_type") == "data")
       .groupBy("customer_id")
       .agg(
           spark_min("data_volume_mb")
           .alias("user_min_mb")
       )
)

cdrs4 = cdrs3.join(agg, on="customer_id", how="left")

In [13]:
cdrs4.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+-----------+
| customer_id|rate_plan_id|product_code|           record_ID|          timestamp|      cell_id|technology|   caller_id|   callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_min_mb|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------

In [14]:
cdrs4 = cdrs4.withColumn(
    "data_volume_mb",
    when( (col("data_volume_mb")=="inconnu"), col("user_min_mb"))
    .otherwise(col("data_volume_mb"))
)

In [15]:
cdrs4 = cdrs4.withColumn(
    "data_volume_mb",
    when( ((col("user_min_mb")=="inconnu") &(col("data_volume_mb")=="inconnu")), lit(0)).otherwise(col("data_volume_mb")))

In [16]:
cdrs4 = cdrs4.withColumn(
    "rating_status",
    when((col("rating_status")=="needs_review")&(col("data_volume_mb")=="inconnu"), "unmatched").otherwise(col("rating_status"))
)

In [17]:
cdrs4 = cdrs4.withColumn(
    "rating_status",
    when((col("rating_status")=="needs_review"), "ready").otherwise(col("rating_status"))
)

In [18]:
cdrsss=cdrs4.filter(cdrs4.rating_status == "ready")
cdrsss.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+-----------+
| customer_id|rate_plan_id|product_code|           record_ID|          timestamp|      cell_id|technology|   caller_id|    callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_min_mb|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+-----------------

In [19]:
cdrs4.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+-----------+
| customer_id|rate_plan_id|product_code|           record_ID|          timestamp|      cell_id|technology|   caller_id|    callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_min_mb|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+-----------------

In [20]:
cdrs4 = cdrs4.withColumn(
    "session_duration_sec",
    when((col("session_duration_sec").isNull() | (col("session_duration_sec") < 0))&(col("record_type")=="data"), lit(0))
    .otherwise(col("session_duration_sec"))
)

In [21]:
cdrs4 = cdrs4.withColumn(
    "billable_units",
    when(col("record_type") == "voice",
         ceil(col("duration_sec") / 60))       
    .when(col("record_type") == "sms",
         lit(1))
    .otherwise(                               # data
         col("data_volume_mb"))
)


In [22]:
cdrs4.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+-----------+--------------+
| customer_id|rate_plan_id|product_code|           record_ID|          timestamp|      cell_id|technology|   caller_id|    callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|user_min_mb|billable_units|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+----

In [23]:
from pyspark.sql.functions import coalesce

cdrs4 = (cdrs4
    .withColumn("free_units",    coalesce(col("free_units"),    lit(0)))
    .withColumn("tier_threshold",coalesce(col("tier_threshold"),lit(0)))
    .withColumn("unit_price",    coalesce(col("unit_price"),    lit(0.0)))
    .withColumn("tier_price",    coalesce(col("tier_price"),    lit(0.0)))
)


In [24]:
cdrsRated = cdrs4.withColumn(
    "cost",
    when(col("rate_type") == "flat",
         when(col("billable_units") <= col("free_units"), lit(0))
         .otherwise((col("billable_units") - col("free_units")) * col("unit_price")))
    .when(col("rate_type") == "tiered",
         when(col("billable_units") <= col("free_units"), lit(0))
         .when(col("billable_units") <= col("tier_threshold"),
               (col("billable_units") - col("free_units")) * col("unit_price"))
         .otherwise(
               (col("tier_threshold") - col("free_units")) * col("unit_price") +
               (col("billable_units") - col("tier_threshold")) * col("tier_price")))
)

cdrsRated = cdrsRated.withColumn(
    "rating_status",
    when(col("cost").isNull(), "unmatched")
    .otherwise("rated")
)


In [25]:
cdrsRated.select("record_ID","customer_id","product_code",
                 "billable_units","unit_price","cost","rating_status") \
         .show(truncate=False)


+------------------------------------+------------+------------+--------------+----------+--------------------+-------------+
|record_ID                           |customer_id |product_code|billable_units|unit_price|cost                |rating_status|
+------------------------------------+------------+------------+--------------+----------+--------------------+-------------+
|49d6cc2b-30a3-4592-aeb8-7af7622d2603|212629151506|VOICE_NAT   |8             |0.01      |0.03                |rated        |
|691b3d0c-56e7-4859-843d-c6dc702b80ee|212706798407|VOICE_NAT   |2             |0.01      |0.0                 |rated        |
|c270b926-5d1d-4633-a71d-18fe76eb25a6|212688851505|DATA_BASIC  |1977.95       |0.02      |39.159              |rated        |
|9c108463-2a61-40b8-a122-a87a3ca940a3|212714097600|VOICE_NAT   |1             |0.01      |0.0                 |rated        |
|b39ab5e2-c9d4-4157-92bc-fa3170d90e1c|212658902556|SMS_NAT     |1             |0.035     |0.0                 |rated  

In [26]:
from pyspark.sql.functions import date_format

cdrsRated = cdrsRated.withColumn(
    "billing_period",
    date_format(col("timestamp"), "yyyy-MM")
)
cdrsRated=cdrsRated.drop("user_min_mb")

In [27]:
(
 cdrsRated
 .write
 .mode("overwrite")
 .partitionBy("billing_period")
 .parquet("rated_cdrs/")
)

print("✓ Rating terminé : Parquet écrit dans rated_cdrs/")


✓ Rating terminé : Parquet écrit dans rated_cdrs/


In [28]:
cdrsRated.show()

+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+--------------------+--------------+--------+-----------+------+---------+--------------------+----------+----------+--------------+----------+--------------+--------------------+--------------+
| customer_id|rate_plan_id|product_code|           record_ID|          timestamp|      cell_id|technology|   caller_id|    callee_id|duration_sec|rating_status|   sender_id| receiver_id|     user_id|session_duration_sec|data_volume_mb|batch_id|record_type|  unit|rate_type|         description|unit_price|free_units|tier_threshold|tier_price|billable_units|                cost|billing_period|
+------------+------------+------------+--------------------+-------------------+-------------+----------+------------+-------------+------------+-------------+------------+------------+------------+-------------