### Silver Layer Transformations – TelecomSparkTransformations

This notebook applies business transformations on Bronze tables
to create clean, enriched Silver Delta tables.

Principles:
- Apply business logic
- Perform joins and filters
- Create derived columns
- Prepare data for analytics


##### Read Bronze Tables

In [0]:
subscribers_df = spark.table("telecomsparktransformations_catalog.bronze.subscribers");

call_records_df = spark.table("telecomsparktransformations_catalog.bronze.call_records");

data_usage_df = spark.table("telecomsparktransformations_catalog.bronze.data_usage");

recharge_df = spark.table("telecomsparktransformations_catalog.bronze.recharge");


##### Basic Data Quality Filters

In [0]:
from pyspark.sql.functions import col

valid_calls_df = call_records_df.filter(col("call_duration_sec") > 0)
valid_data_usage_df = data_usage_df.filter(col("data_mb") > 0)
valid_recharge_df = recharge_df.filter(col("status") == "SUCCESS")


**Exam Rule**
Filtering happens in Silver, never in Bronze.

##### Aggregate Daily Call Usage

In [0]:
from pyspark.sql.functions import sum

daily_call_usage_df = (
    valid_calls_df
        .groupBy("subscriber_id", "call_date")
        .agg(sum("call_duration_sec").alias("total_call_duration_sec"))
)


**Notes :**
Reduces data volume before joins (performance best practice).

##### Join Data Usage + Call Usage

In [0]:
usage_enriched_df = (
    valid_data_usage_df
        .join(
            daily_call_usage_df,
            (valid_data_usage_df.subscriber_id == daily_call_usage_df.subscriber_id) &
            (valid_data_usage_df.usage_date == daily_call_usage_df.call_date),
            "left"
        )
        .drop(daily_call_usage_df.subscriber_id)
        .drop("call_date")
)


##### Add Subscriber Attributes

In [0]:
usage_enriched_df = (
    usage_enriched_df
        .join(subscribers_df, "subscriber_id", "inner")
)

##### Create Derived Business Columns

In [0]:
from pyspark.sql.functions import coalesce, lit

usage_enriched_df = (
    usage_enriched_df
        .withColumn(
            "total_usage_mb",
            col("data_mb") + coalesce(col("total_call_duration_sec"), lit(0)) * 0.5
        )
)




**Simple business logic:**
- sec call ≈ 0.5 MB equivalent usage


##### Write Silver Table


In [0]:
usage_enriched_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("telecomsparktransformations_catalog.silver.usage_enriched")

display(spark.table("telecomsparktransformations_catalog.silver.usage_enriched"))

**Why overwrite?**
- Silver tables are derived
- Can be rebuilt safely


#### Validate Silver Output

In [0]:
spark.table("telecomsparktransformations_catalog.silver.usage_enriched").show()

spark.sql("""
SELECT subscriber_id,
       COUNT(*) AS records
FROM telecomsparktransformations_catalog.silver.usage_enriched
GROUP BY subscriber_id
""").show()

#### Silver Layer Rules (VERY IMPORTANT – Exam)
    -  Business logic allowed  
    -  Joins happen here  
    -  Filters and aggregations allowed  
    -  Silver tables are clean and reusable  

> Common Exam Traps (Read Carefully)
--------------------------------------- ------------------
| Question.                              | Correct Answer| 
|--------------------------------------- |---------------|
| Where to remove invalid records?       | Silver        | 
| Where to aggregate raw events?         | Silver.       |
| Where to calculate derived columns?    | Silver.       |
| Where to keep raw data unchanged?      | Bronze.       |
----------------------------------------------------------


