In [11]:
from pathlib import Path
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, lower, trim, regexp_replace, when, length, lit, concat,
    to_timestamp, current_timestamp, regexp_extract, row_number,
    countDistinct, count
)

BASE_IN = "./"      # Change to your path if needed
BASE_OUT = "./medallion_profiles"
BRONZE = f"{BASE_OUT}/bronze"
SILVER = f"{BASE_OUT}/silver"
GOLD = f"{BASE_OUT}/gold"

for p in [BASE_OUT, BRONZE, SILVER, GOLD]:
    Path(p).mkdir(parents=True, exist_ok=True)

spark = (
    SparkSession.builder
    .appName("ProfilesMedallionETL")
    .master("local[*]")
    .config("spark.sql.session.timeZone", "Asia/Kolkata")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

FILES = {
    "profiles": f"{BASE_IN}/profiles.csv",
    "phones": f"{BASE_IN}/profile_phones.csv",
    "addresses": f"{BASE_IN}/profile_addresses.csv",
    "history": f"{BASE_IN}/profile_history.csv",
}


In [12]:

# Helpers
def read_csv(path):
    return (
        spark.read
        .option("header", True)
        .option("inferSchema", True)
        .option("mode", "PERMISSIVE")
        .csv(path)
        .withColumn("_ingest_ts", current_timestamp())
    )

def choose_col(df, candidates):
    cols_lower = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols_lower:
            return cols_lower[cand.lower()]
    return None

def ensure_profile_id(df):
    pid = choose_col(df, ["profile_id", "profileId", "user_id", "userid", "id"])
    if pid and pid != "profile_id":
        df = df.withColumnRenamed(pid, "profile_id")
    return df


In [13]:

# ---------------- Bronze ----------------
df_profiles_bz = read_csv(FILES["profiles"])
df_phones_bz = read_csv(FILES["phones"])
df_addresses_bz = read_csv(FILES["addresses"])
df_history_bz = read_csv(FILES["history"])

print("=== Bronze Schemas ===")
df_profiles_bz.printSchema()
df_phones_bz.printSchema()
df_addresses_bz.printSchema()
df_history_bz.printSchema()

print("=== Bronze Samples ===")
df_profiles_bz.show(10, truncate=False)
df_phones_bz.show(10, truncate=False)
df_addresses_bz.show(10, truncate=False)
df_history_bz.show(10, truncate=False)

# Persist Bronze
df_profiles_bz.write.mode("overwrite").parquet(f"{BRONZE}/profiles")
df_phones_bz.write.mode("overwrite").parquet(f"{BRONZE}/phones")
df_addresses_bz.write.mode("overwrite").parquet(f"{BRONZE}/addresses")
df_history_bz.write.mode("overwrite").parquet(f"{BRONZE}/history")


=== Bronze Schemas ===
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- password: string (nullable = true)
 |-- update_count: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- _ingest_ts: timestamp (nullable = false)

root
 |-- id: integer (nullable = true)
 |-- phone: string (nullable = true)
 |-- _ingest_ts: timestamp (nullable = false)

root
 |-- profile_id: integer (nullable = true)
 |-- address: string (nullable = true)
 |-- _ingest_ts: timestamp (nullable = false)

root
 |-- id: integer (nullable = true)
 |-- profile_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- password_hash: string (nullable = true)
 |-- previous_values: string (nullable = tru

In [14]:

# ---------------- Silver: profiles ----------------
df_profiles = ensure_profile_id(df_profiles_bz)

gender_col = choose_col(df_profiles, ["gender", "sex"])
email_col = choose_col(df_profiles, ["email", "email_id", "mail"])
first_name_col = choose_col(df_profiles, ["first_name", "firstname", "given_name"])
last_name_col = choose_col(df_profiles, ["last_name", "lastname", "surname", "family_name"])
updated_at_col = choose_col(df_profiles, ["updated_at", "updatedAt", "modified_at", "modifiedAt", "last_updated"])
created_at_col = choose_col(df_profiles, ["created_at", "createdAt", "inserted_at", "insertedAt", "createdon"])

if email_col:
    df_profiles = df_profiles.withColumn("email_std", lower(trim(col(email_col))))
else:
    df_profiles = df_profiles.withColumn("email_std", lit(None).cast("string"))

if first_name_col:
    df_profiles = df_profiles.withColumn("first_name_std", trim(col(first_name_col)))
else:
    df_profiles = df_profiles.withColumn("first_name_std", lit(None).cast("string"))

if last_name_col:
    df_profiles = df_profiles.withColumn("last_name_std", trim(col(last_name_col)))
else:
    df_profiles = df_profiles.withColumn("last_name_std", lit(None).cast("string"))

if gender_col:
    g = lower(trim(col(gender_col)))
    df_profiles = df_profiles.withColumn(
        "gender_std",
        when((g.isNull()) | (g == ""), lit("unknown"))
        .when(g.startswith("m"), lit("male"))
        .when(g.startswith("f"), lit("female"))
        .otherwise(lit("other"))
    )
else:
    df_profiles = df_profiles.withColumn("gender_std", lit("unknown"))

if created_at_col:
    df_profiles = df_profiles.withColumn("created_at_ts", to_timestamp(col(created_at_col)))
else:
    df_profiles = df_profiles.withColumn("created_at_ts", lit(None).cast("timestamp"))
if updated_at_col:
    df_profiles = df_profiles.withColumn("updated_at_ts", to_timestamp(col(updated_at_col)))
else:
    df_profiles = df_profiles.withColumn("updated_at_ts", lit(None).cast("timestamp"))

if "profile_id" in df_profiles.columns:
    w = Window.partitionBy("profile_id").orderBy(col("updated_at_ts").desc_nulls_last(), col("_ingest_ts").desc())
    df_profiles = df_profiles.withColumn("_rn", row_number().over(w)).filter(col("_rn") == 1).drop("_rn")

df_profiles_silver = df_profiles
print("=== Silver: profiles ===")
df_profiles_silver.show(10, truncate=False)

df_profiles_silver.write.mode("overwrite").parquet(f"{SILVER}/profiles")


=== Silver: profiles ===
+----------+----------------+------------------------+----------+----+------+----------------------------------------------------------------+------------+--------------------------+--------------------------+--------------------------+------------------------+--------------+-------------+----------+--------------------------+--------------------------+
|profile_id|name            |email                   |dob       |age |sex   |password                                                        |update_count|created_at                |updated_at                |_ingest_ts                |email_std               |first_name_std|last_name_std|gender_std|created_at_ts             |updated_at_ts             |
+----------+----------------+------------------------+----------+----+------+----------------------------------------------------------------+------------+--------------------------+--------------------------+--------------------------+------------------------+--

In [15]:

# ---------------- Silver: phones (NO phone number transformations) ----------------
df_phones = ensure_profile_id(df_phones_bz)

# Detect useful columns WITHOUT touching the phone value
phone_col = choose_col(df_phones, ["phone", "phone_number", "mobile", "msisdn"])  # we won't transform this
is_primary_col = choose_col(df_phones, ["is_primary", "primary", "isPrimary"])
phone_updated_col = choose_col(df_phones, ["updated_at", "updatedAt", "modified_at", "modifiedAt", "last_updated"])
phone_type_col = choose_col(df_phones, ["type", "phone_type"])

# Parse timestamp only (not phone value)
if phone_updated_col:
    df_phones = df_phones.withColumn("phone_updated_ts", to_timestamp(col(phone_updated_col)))
else:
    df_phones = df_phones.withColumn("phone_updated_ts", lit(None).cast("timestamp"))

# Normalize primary flag text to boolean, but do not touch phone
if is_primary_col:
    df_phones = df_phones.withColumn("is_primary_flag", when(lower(col(is_primary_col)).isin("true","t","1","yes","y"), lit(True)).otherwise(lit(False)))
else:
    df_phones = df_phones.withColumn("is_primary_flag", lit(False))

# Select a single phone row per profile (prefer primary, then latest by timestamp, then ingest time)
if "profile_id" in df_phones.columns:
    wph = Window.partitionBy("profile_id").orderBy(col("is_primary_flag").desc(), col("phone_updated_ts").desc_nulls_last(), col("_ingest_ts").desc())
    df_phones = df_phones.withColumn("_rn", row_number().over(wph)).filter(col("_rn")==1).drop("_rn")

# Keep original phone column name as-is if available
keep_cols = ["profile_id", "is_primary_flag", "phone_updated_ts"]
if phone_type_col and (phone_type_col in df_phones.columns):
    keep_cols.append(phone_type_col)
if phone_col and (phone_col in df_phones.columns):
    keep_cols.append(phone_col)

df_phones_silver = df_phones.select(*[c for c in keep_cols if c in df_phones.columns])

print("=== Silver: phones (no phone number transformations) ===")
df_phones_silver.show(20, truncate=False)

df_phones_silver.write.mode("overwrite").parquet(f"{SILVER}/phones")


=== Silver: phones (no phone number transformations) ===
+----------+---------------+----------------+------------------------------------------------------+
|profile_id|is_primary_flag|phone_updated_ts|phone                                                 |
+----------+---------------+----------------+------------------------------------------------------+
|1         |false          |NULL            |+444316890158;+33441999413                            |
|2         |false          |NULL            |+61950319558;+61133794074                             |
|3         |false          |NULL            |+812831985603;+33347183297;+446854674104              |
|4         |false          |NULL            |+61650135286;+912207151773                            |
|5         |false          |NULL            |+33010545792;+915639983771                            |
|6         |false          |NULL            |+33440430255;+18457210525                             |
|7         |false          |NULL  

In [16]:

# ---------------- Silver: addresses (UPDATED PINCODE LOGIC) ----------------
df_addr = ensure_profile_id(df_addresses_bz)

# Try to locate explicit columns if present
pincode_col = choose_col(df_addr, ["pincode", "pin_code", "postal_code", "zip", "zipcode"])
addr_text_col = choose_col(df_addr, [
    "address", "address_line", "address_line1", "address1", "addr", "line1",
    "full_address", "street", "address_text", "addressline1"
])
addr_text_col2 = choose_col(df_addr, ["address2", "address_line2", "line2", "addressline2"])

addr_updated_col = choose_col(df_addr, ["updated_at", "updatedAt", "modified_at", "modifiedAt", "last_updated"])
is_primary_addr_col = choose_col(df_addr, ["is_primary", "primary", "isPrimary"])
city_col = choose_col(df_addr, ["city", "district", "town"])
state_col = choose_col(df_addr, ["state", "state_name"])
country_col = choose_col(df_addr, ["country"])

# 1) Extract 6-digit from explicit pincode field if available
if pincode_col:
    pin_field = regexp_extract(col(pincode_col).cast("string"), r"(\d{5})", 1)
else:
    pin_field = lit(None).cast("string")

# 2) Otherwise extract LAST 6 digits at the end of the address string(s)
if addr_text_col:
    pin_from_addr1 = regexp_extract(col(addr_text_col).cast("string"), r"(\d{5})(?!\d)[^\d]*$", 1)
else:
    pin_from_addr1 = lit(None).cast("string")
if addr_text_col2:
    pin_from_addr2 = regexp_extract(col(addr_text_col2).cast("string"), r"(\d{5})(?!\d)[^\d]*$", 1)
else:
    pin_from_addr2 = lit(None).cast("string")

df_addr = df_addr.withColumn("pincode_from_field", when(length(pin_field)==5, pin_field))
df_addr = df_addr.withColumn("pincode_from_address_end", when(length(pin_from_addr1)==5, pin_from_addr1).otherwise(when(length(pin_from_addr2)==5, pin_from_addr2)))
df_addr = df_addr.withColumn(
    "pincode6",
    when(length(col("pincode_from_field"))==5, col("pincode_from_field"))
    .otherwise(col("pincode_from_address_end"))
)
df_addr = df_addr.withColumn(
    "pincode_source",
    when(length(col("pincode_from_field"))==5, lit("field"))
    .when(length(col("pincode_from_address_end"))==5, lit("address_end"))
    .otherwise(lit("none"))
)

# Timestamps & primary flag
if addr_updated_col:
    df_addr = df_addr.withColumn("addr_updated_ts", to_timestamp(col(addr_updated_col)))
else:
    df_addr = df_addr.withColumn("addr_updated_ts", lit(None).cast("timestamp"))
if is_primary_addr_col:
    df_addr = df_addr.withColumn("is_primary_addr", when(lower(col(is_primary_addr_col)).isin("true","t","1","yes","y"), lit(True)).otherwise(lit(False)))
else:
    df_addr = df_addr.withColumn("is_primary_addr", lit(False))

# Select one address per profile (primary, then latest by timestamp, then ingest)
if "profile_id" in df_addr.columns:
    waddr = Window.partitionBy("profile_id").orderBy(col("is_primary_addr").desc(), col("addr_updated_ts").desc_nulls_last(), col("_ingest_ts").desc())
    df_addr = df_addr.withColumn("_rn", row_number().over(waddr)).filter(col("_rn")==1).drop("_rn")

# Columns to keep
keep_addr_cols = ["profile_id", "pincode6", "pincode_source", "addr_updated_ts", "is_primary_addr"]
if city_col: keep_addr_cols.append(city_col)
if state_col: keep_addr_cols.append(state_col)
if country_col: keep_addr_cols.append(country_col)

df_addr_silver = df_addr.select(*[c for c in keep_addr_cols if c in df_addr.columns])

print("=== Silver: addresses (pincode from field or address-end) ===")
df_addr_silver.show(20, truncate=False)

df_addr_silver.write.mode("overwrite").parquet(f"{SILVER}/addresses")


=== Silver: addresses (pincode from field or address-end) ===
+----------+--------+--------------+---------------+---------------+
|profile_id|pincode6|pincode_source|addr_updated_ts|is_primary_addr|
+----------+--------+--------------+---------------+---------------+
|1         |81505   |address_end   |NULL           |false          |
|2         |64640   |address_end   |NULL           |false          |
|3         |47682   |address_end   |NULL           |false          |
|4         |11126   |address_end   |NULL           |false          |
|5         |19360   |address_end   |NULL           |false          |
|6         |38227   |address_end   |NULL           |false          |
|7         |10383   |address_end   |NULL           |false          |
|8         |54092   |address_end   |NULL           |false          |
|9         |NULL    |none          |NULL           |false          |
|10        |19820   |address_end   |NULL           |false          |
|11        |02333   |address_end   |NULL 

In [17]:

# ---------------- Silver: history ----------------
df_hist = ensure_profile_id(df_history_bz)

event_ts_col = choose_col(df_hist, ["event_ts", "event_time", "timestamp", "time", "ts", "updated_at", "updatedAt"])
event_type_col = choose_col(df_hist, ["event_type", "event", "action", "status"])

if event_ts_col:
    df_hist = df_hist.withColumn("event_ts_std", to_timestamp(col(event_ts_col)))
else:
    df_hist = df_hist.withColumn("event_ts_std", lit(None).cast("timestamp"))
if event_type_col:
    df_hist = df_hist.withColumn("event_type_std", lower(trim(col(event_type_col))))
else:
    df_hist = df_hist.withColumn("event_type_std", lit(None).cast("string"))

if "profile_id" in df_hist.columns:
    wh = Window.partitionBy("profile_id").orderBy(col("event_ts_std").desc_nulls_last(), col("_ingest_ts").desc())
    df_hist = df_hist.withColumn("_rn", row_number().over(wh)).filter(col("_rn")==1).drop("_rn")

df_hist_silver = df_hist.select([c for c in df_hist.columns if c not in ["_ingest_ts"]])
print("=== Silver: history ===")
df_hist_silver.show(10, truncate=False)

df_hist_silver.write.mode("overwrite").parquet(f"{SILVER}/history")


=== Silver: history ===
+---+----------+----------------+------------------------+------+----------------------------------------------------------------+------------------------------------+-----------------------------------------+--------------------+------------+--------------+
|id |profile_id|name            |email                   |sex   |password_hash                                                   |previous_values                     |updated_at                               |update_count        |event_ts_std|event_type_std|
+---+----------+----------------+------------------------+------+----------------------------------------------------------------+------------------------------------+-----------------------------------------+--------------------+------------+--------------+
|1  |1         |NULL            |julia61@example.net     |NULL  |6b774cc0935c64071765ad3a933c9af81084882f7b6de23c32b186e8cc13768b|"{""name"": ""Benjamin Leonard""    | ""email"": ""richardtran@exampl

In [18]:

# ---------------- Silver: profile_master ----------------
master = df_profiles_silver

# Join a single selected phone row per profile (with original phone column name intact)
if "profile_id" in master.columns and "profile_id" in df_phones_silver.columns:
    # Build the selection list dynamically so the original phone column name is preserved
    phone_col = None
    for c in df_phones_silver.columns:
        if c.lower() in ["phone", "phone_number", "mobile", "msisdn"]:
            phone_col = c
            break
    sel_cols = ["profile_id", "is_primary_flag", "phone_updated_ts"]
    if "phone_type" in df_phones_silver.columns: sel_cols.append("phone_type")
    if "type" in df_phones_silver.columns and "phone_type" not in df_phones_silver.columns: sel_cols.append("type")
    if phone_col: sel_cols.append(phone_col)
    master = master.join(df_phones_silver.select(*[c for c in sel_cols if c in df_phones_silver.columns]), on="profile_id", how="left")

# Join primary/latest address
if "profile_id" in master.columns and "profile_id" in df_addr_silver.columns:
    join_cols = ["profile_id","pincode6","pincode_source","addr_updated_ts","is_primary_addr"]
    if "city" in df_addr_silver.columns: join_cols.append("city")
    if "state" in df_addr_silver.columns: join_cols.append("state")
    if "country" in df_addr_silver.columns: join_cols.append("country")
    master = master.join(df_addr_silver.select(*[c for c in join_cols if c in df_addr_silver.columns]), on="profile_id", how="left")

# Join latest history
if "profile_id" in master.columns and "profile_id" in df_hist_silver.columns:
    master = master.join(df_hist_silver.select("profile_id","event_ts_std","event_type_std"), on="profile_id", how="left")

print("=== Silver: profile_master ===")
master.show(20, truncate=False)

master.write.mode("overwrite").parquet(f"{SILVER}/profile_master")


=== Silver: profile_master ===
+----------+----------------+--------------------------+----------+----+------+----------------------------------------------------------------+------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------+-------------+----------+--------------------------+--------------------------+---------------+----------------+------------------------------------------------------+--------+--------------+---------------+---------------+------------+--------------+
|profile_id|name            |email                     |dob       |age |sex   |password                                                        |update_count|created_at                |updated_at                |_ingest_ts                |email_std                 |first_name_std|last_name_std|gender_std|created_at_ts             |updated_at_ts             |is_primary_flag|phone_updated_ts|phone                                        

In [19]:

# ---------------- Gold: analytics ----------------
# 1) gender counts
gender_counts = master.groupBy("gender_std").agg(
    countDistinct("profile_id").alias("profile_count")
).orderBy(col("profile_count").desc())

# 2) pincode counts
pincode_counts = master.groupBy("pincode6").agg(
    countDistinct("profile_id").alias("profile_count")
).orderBy(col("profile_count").desc(), col("pincode6").asc())

print("=== Gold: gender_counts ===")
gender_counts.show(50, truncate=False)

print("=== Gold: pincode_counts (top 50) ===")
pincode_counts.show(50, truncate=False)

gender_counts.write.mode("overwrite").parquet(f"{GOLD}/gender_counts")
pincode_counts.write.mode("overwrite").parquet(f"{GOLD}/pincode_counts")
(gender_counts.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{GOLD}/gender_counts_csv"))
(pincode_counts.coalesce(1).write.mode("overwrite").option("header", True).csv(f"{GOLD}/pincode_counts_csv"))


=== Gold: gender_counts ===
+----------+-------------+
|gender_std|profile_count|
+----------+-------------+
|other     |296          |
|female    |292          |
|male      |277          |
|unknown   |135          |
+----------+-------------+

=== Gold: pincode_counts (top 50) ===
+--------+-------------+
|pincode6|profile_count|
+--------+-------------+
|NULL    |140          |
|02232   |2            |
|18488   |2            |
|00651   |1            |
|01109   |1            |
|01128   |1            |
|01205   |1            |
|01472   |1            |
|01603   |1            |
|01642   |1            |
|01718   |1            |
|01725   |1            |
|01920   |1            |
|02040   |1            |
|02092   |1            |
|02185   |1            |
|02333   |1            |
|02495   |1            |
|02530   |1            |
|02623   |1            |
|03011   |1            |
|03034   |1            |
|03066   |1            |
|03466   |1            |
|03489   |1            |
|03638   |1      