In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sha2, concat_ws, current_timestamp, lit, expr, to_date
from pyspark.sql.types import *
import psycopg2

In [26]:
# 1. Start Spark session
spark = SparkSession.builder \
    .appName("Gold Layer ETL with Auto-DDL") \
    .config("spark.driver.extraClassPath", r"C:\Users\Mind-Graph\Desktop\etlproject\jars\postgresql-42.2.18.jar") \
    .getOrCreate()

In [27]:
spark.conf.set("spark.sql.caseSensitive", "false")


In [28]:
# 2. JDBC configs
silver_url = "jdbc:postgresql://localhost:5432/silver_layer_db"
gold_url = "jdbc:postgresql://localhost:5432/gold_layer_db"
properties = {"user": "postgres", "password": "rathi", "driver": "org.postgresql.Driver"}


In [30]:

# 3. Read silver-layer tables
def read_and_normalize(table):
    df = spark.read.jdbc(silver_url, table, properties=properties)
    return df.select([col(c).alias(c.lower()) for c in df.columns])

silver_customer = read_and_normalize("silver_customer")
silver_identity = read_and_normalize("silver_customer_identity")
silver_relation = read_and_normalize("silver_customer_policy_relation")
silver_policies = read_and_normalize("silver_policies")
silver_claims = read_and_normalize("silver_claims")
silver_underwriting = read_and_normalize("silver_underwriting")
silver_insurance_ds = read_and_normalize("silver_insurance_dataset")


In [77]:
from pyspark.sql.functions import col, sha2, concat_ws, expr, current_timestamp

# Step 4: Join with aliasing
df = silver_customer.alias("c") \
    .join(silver_identity.alias("i"), on="customer_id", how="left") \
    .join(silver_relation.alias("r"), on="customer_id", how="left") \
    .join(silver_policies.alias("p"), on="policy_id", how="left") \
    .join(silver_claims.alias("cl"), on="policy_id", how="left") \
    .join(silver_underwriting.alias("u"), on=["policy_id", "customer_id"], how="left") \
    .join(silver_insurance_ds.alias("ds"), on="customer_id", how="left") \
    .select(
        col("c.customer_id").alias("customer_id"),
        col("p.policy_id").alias("policy_id"),
        col("cl.claim_id").alias("claim_id"),
        # Add other necessary columns with unique aliases
        col("c.name").alias("name"),
        col("c.dob").alias("dob"),
        col("i.Masked_DL_Number").alias("Masked_DL_Number"),
        col("i.Masked_Aadhaar_Number").alias("Masked_Aadhaar_Number"),
        col("p.Policy_Type").alias("Policy_Type"),
        col("p.Premium").alias("Premium"),
        col("p.Coverage_Amount").alias("Coverage_Amount"),
        col("p.Status").alias("Policy_Status"),
        col("c.age").alias("age"),
        col("c.door_no").alias("door_no"),
        col("c.street").alias("street"),
        col("c.city").alias("city"),
        col("c.state").alias("state"),
        col("c.country").alias("country"),
        col("ds.credit_score").alias("credit_score"),
        col("r.Start_Date").alias("Policy_Start_Date"),
        col("r.End_Date").alias("Policy_End_Date"),
        col("r.Status").alias("Relation_Status"),
        col("u.Risk_Score").alias("Underwriting_Risk_Score"),
        col("u.Approval_Status").alias("Approval_Status"),
        col("ds.risk_profile").alias("risk_profile"),
        col("ds.claims_history").alias("claims_history"),
        col("cl.Investigation_Required").alias("Investigation_Required"),
        col("cl.Claim_Amount").alias("Claim_Amount"),
        col("cl.Date").alias("Claim_Date"),
        col("cl.Status").alias("Claim_Status"),
        col("ds.src_transaction_date").alias("src_transaction_date"),
        # Add more columns if 
    )



In [78]:
# Step 5: Data cleansing and governance
df = df.dropDuplicates(["customer_id", "policy_id", "claim_id"]) \
       .withColumn("record_uuid", expr("uuid()")) \
       .withColumn("record_hash_id", sha2(concat_ws("||", col("customer_id"), col("policy_id"), col("claim_id")), 256)) \
       .withColumn("ingested_at", current_timestamp())


In [79]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- policy_id: string (nullable = true)
 |-- claim_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- Masked_DL_Number: string (nullable = true)
 |-- Masked_Aadhaar_Number: string (nullable = true)
 |-- Policy_Type: string (nullable = true)
 |-- Premium: double (nullable = true)
 |-- Coverage_Amount: double (nullable = true)
 |-- Policy_Status: string (nullable = true)
 |-- age: string (nullable = true)
 |-- door_no: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- Policy_Start_Date: date (nullable = true)
 |-- Policy_End_Date: date (nullable = true)
 |-- Relation_Status: string (nullable = true)
 |-- Underwriting_Risk_Score: double (nullable = true)
 |-- Approval_Status: string (nullable = true)
 |-- risk_profi

In [80]:
df.show(5)

+-----------+---------+--------+----------------+----------+--------------------+---------------------+-----------+-------+---------------+-------------+---+-------+--------------------+------------+--------+-------+------------+-----------------+---------------+---------------+-----------------------+---------------+------------+--------------+----------------------+------------+----------+------------+--------------------+--------------------+--------------------+--------------------+
|customer_id|policy_id|claim_id|            name|       dob|    Masked_DL_Number|Masked_Aadhaar_Number|Policy_Type|Premium|Coverage_Amount|Policy_Status|age|door_no|              street|        city|   state|country|credit_score|Policy_Start_Date|Policy_End_Date|Relation_Status|Underwriting_Risk_Score|Approval_Status|risk_profile|claims_history|Investigation_Required|Claim_Amount|Claim_Date|Claim_Status|src_transaction_date|         record_uuid|      record_hash_id|         ingested_at|
+-----------+---

In [81]:

df = df.withColumn("ingested_at", current_timestamp())

In [82]:
#Auto-DDL: PostgreSQL table creation
def create_gold_table_ddl(df, table_name, jdbc_url, props):
    schema_ddl = []
    for field in df.schema.fields:
        name = field.name
        dtype = field.dataType
        if isinstance(dtype, StringType):
            dtype_str = "TEXT"
        elif isinstance(dtype, IntegerType):
            dtype_str = "INTEGER"
        elif isinstance(dtype, DoubleType):
            dtype_str = "DOUBLE PRECISION"
        elif isinstance(dtype, TimestampType):
            dtype_str = "TIMESTAMP"
        elif isinstance(dtype, DateType):
            dtype_str = "DATE"
        else:
            dtype_str = "TEXT"
        schema_ddl.append(f'"{name}" {dtype_str}')

    ddl = f'CREATE TABLE IF NOT EXISTS public.{table_name} (\n  ' + ",\n  ".join(schema_ddl) + "\n);"

    conn = psycopg2.connect(
        database="gold_layer_db",
        user=props["user"],
        password=props["password"],
        host="localhost",
        port="5432"
    )
    cur = conn.cursor()
    cur.execute(ddl)
    conn.commit()
    cur.close()
    conn.close()


In [83]:
create_gold_table_ddl(
    df=df,                                  # Your final DataFrame
    table_name="gold_customer_policy_summary",  # Desired PostgreSQL table name
    jdbc_url=gold_url,                      # Your JDBC connection URL, e.g., "jdbc:postgresql://localhost:5432/gold_layer_db"
    props=properties                        #Your connection properties, e.g., {"user": "postgres", "password": "rathi"}
)


In [85]:
df.write.jdbc(
    url=gold_url,
    table="gold_customer_policy",
    mode="overwrite",
    properties=properties
)
