## L2 - Silver Stage

Notebook ini bertanggung jawab untuk mengubah data dari layer Bronze ke Silver.
 
**Proses yang Dilakukan:**
1.  **Baca Data Bronze**: Mengambil data mentah yang sudah di-ingest dari L1.
2.  **Pembersihan & Aturan Bisnis**: Menerapkan validasi data spesifik per tabel (misalnya, menangani nilai negatif, standardisasi teks, dll).
3.  **Casting Tipe Data**: Memastikan setiap kolom memiliki tipe data yang benar dan sesuai untuk analisis (misalnya, string, integer, double, date).
4.  **Tambahkan Metadata**: Menambahkan kolom `update_date` dan `checksum`.
5.  **Merge ke Silver**: Melakukan operasi `MERGE` (upsert) ke tabel Silver. Update hanya terjadi jika `checksum` berubah, sehingga proses lebih efisien.

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Impor semua fungsi yang akan digunakan
from pyspark.sql.functions import (
    col, when, lit, lower, trim, initcap, sha2, 
    concat_ws, coalesce, current_timestamp, to_date
)

# Konfigurasi Path
bronze_base_path = "Tables/Bronze"
silver_base_path = "Tables/Silver"

# Daftar tabel yang akan diproses
TABLES = [
    "claim","claim_similarity","disease","disease_ontology","has_disease",
    "incharge","incharge_of_claim","insured_of_claim","patient","policyholder",
    "policyholder_connection","policyholder_of_claim","service",
]

# Definisikan Primary Keys untuk setiap tabel (digunakan untuk MERGE)
# Nama kolom sudah dalam format snake_case (sesuai output L1)
KEYS = {
    "claim": ["claim_id"],
    "claim_similarity": ["claim_id", "sim_claim_id"],
    "disease": ["disease_id"],
    "disease_ontology": ["parent_disease_id", "child_disease_id"],
    "has_disease": ["patient_id", "disease_id"],
    "incharge": ["incharge_id"],
    "incharge_of_claim": ["claim_id", "person_incharge_id"],
    "insured_of_claim": ["claim_id", "patient_id"],
    "patient": ["patient_id"],
    "policyholder": ["policyholder_id"],
    "policyholder_connection": ["policyholder_id", "policyholder_associate_id"],
    "policyholder_of_claim": ["claim_id", "policyholder_id"],
    "service": ["service_id"],
}

# Kolom metadata yang akan di-exclude dari checksum
META_COLS = {'load_date', 'update_date', 'checksum'}

StatementMeta(, e6caee0c-694c-4de6-a95a-e5ac5f0bd09d, 3, Finished, Available, Finished)

In [12]:
def trim_all_string_columns(df):
    """
    Melakukan trim pada semua kolom bertipe string di DataFrame.
    """
    string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
    for c in string_cols:
        df = df.withColumn(c, trim(col(c)))
    return df

def apply_preprocess(df, table_name):
    """
    Menerapkan aturan pembersihan data spesifik berdasarkan nama tabel.
    """
    print(f"[{table_name}] Applying preprocess...")

    if table_name == "claim":
        # Aturan untuk charge & duration tidak boleh negatif
        df = df.withColumn("charge", when(col("charge") < 0, 0.0).otherwise(col("charge")))
        df = df.withColumn("duration", when(col("duration") < 0, 1).otherwise(col("duration")))
        # Standardisasi kolom kategorikal
        df = df.withColumn("diagnosis", lower(col("diagnosis"))).na.fill("unknown", subset=["diagnosis"])
        df = df.withColumn("claim_type", lower(col("claim_type"))).na.fill("unknown", subset=["claim_type"])

    elif table_name == "claim_similarity":
        # Validasi score antara 0-100
        df = df.withColumn("similarity_score", 
            when((col("similarity_score") < 0) | (col("similarity_score") > 100), None)
            .otherwise(col("similarity_score"))
        )

    elif table_name == "incharge":
        # Standardisasi nama (Title Case) dan isi null
        name_cols = ["first_name", "last_name"]
        for c in name_cols:
            df = df.withColumn(c, initcap(col(c)))
        df = df.na.fill("Unknown", subset=name_cols)
        df = df.withColumn("risk_score",
            when((col("risk_score") < 0) | (col("risk_score") > 100), None)
            .otherwise(col("risk_score"))
        )

    elif table_name == "policyholder":
        # Standardisasi nama
        name_cols = ["first_name", "last_name"]
        for c in name_cols:
            df = df.withColumn(c, initcap(col(c)))
        df = df.na.fill("Unknown", subset=name_cols)
        # Validasi risk_score
        df = df.withColumn("risk_score",
            when((col("risk_score") < 0) | (col("risk_score") > 100), None)
            .otherwise(col("risk_score"))
        )
        # Pastikan high_risk adalah 0 atau 1
        df = df.withColumn("high_risk", when(col("high_risk") == '1', 1).otherwise(0))
        
    elif table_name == "service":
        # Standardisasi nama service
        df = df.withColumn("service_name", initcap(col("service_name"))).na.fill("Unknown", subset=["service_name"])
        # Validasi risk_score
        df = df.withColumn("risk_score",
            when((col("risk_score") < 0) | (col("risk_score") > 100), None)
            .otherwise(col("risk_score"))
        )
    
    return df

def drop_full_duplicates(df, table_name):
    """
    Menghapus baris yang merupakan duplikat penuh dan mencatat jumlahnya.
    """
    print(f"[{table_name}] Dropping full duplicates...")
    before_count = df.count()
    df = df.dropDuplicates()
    after_count = df.count()
    
    if after_count < before_count:
        print(f"[{table_name}] Dropped {before_count - after_count} full duplicate rows. ({before_count} -> {after_count})")
        
    return df

def apply_type_casting(df, table_name):
    """
    Menerapkan casting tipe data yang benar untuk setiap kolom.
    """
    print(f"[{table_name}] Applying type casting...")

    # Konfigurasi casting per tabel
    CAST_CONFIGS = {
        "claim": {
            "charge": "double", "claim_date": "date", "duration": "int"
        },
        "claim_similarity": {"similarity_score": "int"},
        "incharge": {"risk_score": "int"},
        "policyholder": {"risk_score": "int", "high_risk": "int"},
        "policyholder_connection": {"level": "int"},
        "service": {"risk_score": "int"}
    }
    
    # Semua kolom ID dipastikan string
    for c in df.columns:
        if c.endswith("_id"):
            df = df.withColumn(c, col(c).cast(StringType()))
            
    # Terapkan casting spesifik dari config
    table_casts = CAST_CONFIGS.get(table_name, {})
    for col_name, data_type in table_casts.items():
        if col_name in df.columns:
            if data_type == "date":
                # Penanganan khusus untuk tanggal dengan format berbeda
                df = df.withColumn(col_name, to_date(col(col_name)))
            else:
                df = df.withColumn(col_name, col(col_name).cast(data_type))
                
    return df

def add_metadata_columns(df, table_name):
    """
    Menambahkan kolom update_date dan checksum.
    """
    print(f"[{table_name}] Adding metadata columns (update_date, checksum)...")
    
    # 1. Tambah/Update kolom update_date
    df = df.withColumn("update_date", current_timestamp())
    
    # 2. Buat checksum
    key_cols = KEYS.get(table_name, [])
    cols_for_checksum = [c for c in df.columns if c not in set(key_cols) | META_COLS]
    
    df = df.withColumn(
        "checksum",
        sha2(
            concat_ws(
                "||", 
                *[coalesce(col(c).cast("string"), lit("")) for c in sorted(cols_for_checksum)]
            ), 256
        )
    )
    return df


StatementMeta(, e6caee0c-694c-4de6-a95a-e5ac5f0bd09d, 14, Finished, Available, Finished)

In [13]:
for table in TABLES:
    print(f"\n{'='*20} Processing Table: {table.upper()} {'='*20}")
    
    # Cek apakah key untuk merge sudah didefinisikan
    if table not in KEYS:
        print(f"⚠️  Skipping '{table}': No primary key defined in KEYS config.")
        continue
        
    # 1. Baca data dari Bronze
    bronze_path = f"{bronze_base_path}/{table}"
    print(f"Reading from Bronze path: {bronze_path}")
    try:
        df_bronze = spark.read.format("delta").load(bronze_path)
    except Exception as e:
        print(f"❌ ERROR: Could not read Bronze table {table}. Skipping. Details: {e}")
        continue

    # 2. Lakukan transformasi: Pembersihan -> Casting -> Metadata
    df_trimmed = trim_all_string_columns(df_bronze)
    df_preprocessed = apply_preprocess(df_trimmed, table)
    df_deduplicated = drop_full_duplicates(df_preprocessed, table)
    df_casted = apply_type_casting(df_deduplicated, table)
    df_final = add_metadata_columns(df_casted, table)
    
    # 3. Persiapan untuk MERGE
    silver_path = f"{silver_base_path}/{table}"
    key_cols = KEYS[table]
    on_expression = " AND ".join([f"target.{k} = source.{k}" for k in key_cols])
    
    print(f"Preparing to merge into Silver path: {silver_path}")
    print(f"Merge condition (ON): {on_expression}")

    # 4. Eksekusi MERGE
    if not DeltaTable.isDeltaTable(spark, silver_path):
        # Jika tabel Silver belum ada, buat baru
        print(f"Silver table not found. Creating new table at {silver_path}.")
        df_final.write.format("delta").mode("overwrite").save(silver_path)
        print(f"✅ Successfully created and populated Silver table: {table}")
    else:
        # Jika tabel sudah ada, lakukan MERGE
        delta_table = DeltaTable.forPath(spark, silver_path)
        
        update_set = {c: f"source.{c}" for c in df_final.columns if c not in key_cols}
        insert_values = {c: f"source.{c}" for c in df_final.columns}
        
        delta_table.alias("target") \
            .merge(
                df_final.alias("source"),
                on_expression
            ) \
            .whenMatchedUpdate(
                condition="target.checksum <> source.checksum",
                set=update_set
            ) \
            .whenNotMatchedInsert(
                values=insert_values
            ) \
            .execute()
        print(f"✅ Successfully merged data into Silver table: {table}")


StatementMeta(, e6caee0c-694c-4de6-a95a-e5ac5f0bd09d, 15, Finished, Available, Finished)


Reading from Bronze path: Tables/Bronze/claim
[claim] Applying preprocess...
[claim] Dropping full duplicates...
[claim] Applying type casting...
[claim] Adding metadata columns (update_date, checksum)...
Preparing to merge into Silver path: Tables/Silver/claim
Merge condition (ON): target.claim_id = source.claim_id
✅ Successfully merged data into Silver table: claim

Reading from Bronze path: Tables/Bronze/claim_similarity
[claim_similarity] Applying preprocess...
[claim_similarity] Dropping full duplicates...
[claim_similarity] Applying type casting...
[claim_similarity] Adding metadata columns (update_date, checksum)...
Preparing to merge into Silver path: Tables/Silver/claim_similarity
Merge condition (ON): target.claim_id = source.claim_id AND target.sim_claim_id = source.sim_claim_id
✅ Successfully merged data into Silver table: claim_similarity

Reading from Bronze path: Tables/Bronze/disease
[disease] Applying preprocess...
[disease] Dropping full duplicates...
[disease] Apply

In [14]:
print(f"\n{'='*20} VERIFICATION {'='*20}")
for table in TABLES:
    silver_path = f"{silver_base_path}/{table}"
    if DeltaTable.isDeltaTable(spark, silver_path):
        print(f"\n--- Verifying: {table} ---")
        df_silver = spark.read.format("delta").load(silver_path)
        print(f"Row count: {df_silver.count()}")
        df_silver.printSchema()
        # df_silver.show(5, truncate=False)
        display(df_silver.limit(5))
    else:
        print(f"\n--- Table {table} not found in Silver layer. ---")


StatementMeta(, e6caee0c-694c-4de6-a95a-e5ac5f0bd09d, 16, Finished, Available, Finished)



--- Verifying: claim ---
Row count: 100001
root
 |-- claim_id: string (nullable = true)
 |-- charge: double (nullable = true)
 |-- claim_date: date (nullable = true)
 |-- duration: integer (nullable = true)
 |-- insured_id: string (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- person_incharge_id: string (nullable = true)
 |-- claim_type: string (nullable = true)
 |-- policyholder_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, b0f55520-57d4-4f95-9d90-93634691109d)


--- Verifying: claim_similarity ---
Row count: 15279
root
 |-- claim_id: string (nullable = true)
 |-- sim_claim_id: string (nullable = true)
 |-- similarity_score: integer (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 2711cd86-af3a-4584-855e-9d435612b66b)


--- Verifying: disease ---
Row count: 397
root
 |-- disease_id: string (nullable = true)
 |-- concept_name: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 6bff864e-0935-4d5b-bd45-106675c89e65)


--- Verifying: disease_ontology ---
Row count: 448
root
 |-- parent_disease_id: string (nullable = true)
 |-- child_disease_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, e9431e5c-2f33-4415-842e-738e52c368e6)


--- Verifying: has_disease ---
Row count: 446
root
 |-- patient_id: string (nullable = true)
 |-- disease_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, e0a713c3-2d4e-4b72-ae0a-7ffecb9902ff)


--- Verifying: incharge ---
Row count: 10001
root
 |-- incharge_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- risk_score: integer (nullable = true)
 |-- service_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, df637f1b-33de-44b0-a945-733d79e9f604)


--- Verifying: incharge_of_claim ---
Row count: 100001
root
 |-- claim_id: string (nullable = true)
 |-- person_incharge_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 0cf07fad-15b2-436c-bfc7-ad96740084a0)


--- Verifying: insured_of_claim ---
Row count: 12
root
 |-- claim_id: string (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, b73a432c-9afb-4942-87dd-aaf503e9b721)


--- Verifying: patient ---
Row count: 166
root
 |-- patient_id: string (nullable = true)
 |-- subscription_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 3f87ac9c-3b84-4216-bf18-d669404d6684)


--- Verifying: policyholder ---
Row count: 10006
root
 |-- policyholder_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- risk_score: integer (nullable = true)
 |-- high_risk: integer (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, d5c0443f-4d59-440a-b658-34870c014371)


--- Verifying: policyholder_connection ---
Row count: 347
root
 |-- policyholder_id: string (nullable = true)
 |-- policyholder_associate_id: string (nullable = true)
 |-- level: integer (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 2aa8aaf0-ae9d-436d-aa27-387bd6cbbfed)


--- Verifying: policyholder_of_claim ---
Row count: 99983
root
 |-- claim_id: string (nullable = true)
 |-- policyholder_id: string (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 1ee50c19-faf1-449e-ad63-617fa46fe118)


--- Verifying: service ---
Row count: 8673
root
 |-- service_id: string (nullable = true)
 |-- service_name: string (nullable = true)
 |-- risk_score: integer (nullable = true)
 |-- load_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)
 |-- checksum: string (nullable = true)



SynapseWidget(Synapse.DataFrame, 40f23e9a-4ec2-4453-b547-322dfcba2ea8)