In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, split, when, to_date
from pyspark.sql.types import DoubleType, TimestampType, DateType, StructType, StructField, StringType
import requests
import time
import json

In [0]:
# %run "./bronze"

In [0]:
root_path = "abfss://etl@banksourcedata.dfs.core.windows.net"
bronze_root = f"{root_path}/bronze"
silver_root = f"{root_path}/silver"

kyc_path="abfss://raw@banksourcedata.dfs.core.windows.net/kyc"

In [0]:
bronze_atm = f"{bronze_root}/atm"
bronze_upi = f"{bronze_root}/upi"
bronze_profile = f"{bronze_root}/accountprofile"
silver_frad = f"{bronze_root}/fraudalerts"


df_atm = spark.read.json(bronze_atm)
df_upi = spark.read.json(bronze_upi)
df_acount_profile = spark.read.json(bronze_profile)
df_frad = spark.read.json(silver_frad)


In [0]:
%skip
account_schema = StructType([
    StructField("AccountNumber", StringType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("id", StringType(), True),
    StructField("ingest_source", StringType(), True),
    StructField("ingest_time", StringType(), True),
    StructField("payload", StructType([]), True),
    StructField("AccountHolderName", StringType(), True),
    StructField("AccountOpenDate", StringType(), True),
    StructField("AccountStatus", StringType(), True),
    StructField("AccountType", StringType(), True),
    StructField("Balance", DoubleType(), True),
    StructField("BankName", StringType(), True),
    StructField("BranchName", StringType(), True),
    StructField("Currency", StringType(), True),
    StructField("IFSC_Code", StringType(), True),
    StructField("KYC_DocID", StringType(), True),
    StructField("KYC_DocumentVerificationStatus", StringType(), True),
    StructField("KYC_Done", StringType(), True),
    StructField("reason", StringType(), True),
    StructField("type", StringType(), True)
])

df_frad = spark.read.schema(account_schema).json(silver_frad)

df_account_flat = (
    df_frad.select(
        col("id").cast("string"),
        col("AccountNumber").cast("string"),
        col("CustomerID").cast("string"),
        col("ingest_source").cast("string"),
        col("ingest_time").cast("string"),
        col("payload"),
        col("AccountHolderName").cast("string"),
        col("AccountOpenDate").cast("string"),
        col("AccountStatus").cast("string"),
        col("AccountType").cast("string"),
        col("Balance").cast("double"),
        col("BankName").cast("string"),
        col("BranchName").cast("string"),
        col("Currency").cast("string"),
        col("IFSC_Code").cast("string"),
        col("KYC_DocID").cast("string"),
        col("KYC_DocumentVerificationStatus").cast("string"),
        col("KYC_Done").cast("string"),
        col("reason").cast("string"),
        col("type").cast("string")
    )
)

df_account_flat.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/mnt/delta/account_flat")

In [0]:
# Records that have Account object
df_account = df_acount_profile.filter(col("Account").isNotNull())

# Records that have Customer object
df_customer = df_acount_profile.filter(col("Customer").isNotNull())

# print("Account docs:")
# display(df_account)

# print("Customer docs:")
# display(df_customer)


In [0]:
# Nominatim (OpenStreetMap) settings (if you plan to use API)
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
NOMINATIM_REVERSE_URL = "https://nominatim.openstreetmap.org/reverse"
USER_AGENT = "MazenetSilverETL/1.0" # Update with your contact


# Rate limit safety delay (seconds) between nominatim requests
NOMINATIM_DELAY = 1.0


# Small helper to sleep politely if calling Nominatim
def _nominatim_get(url, params):
    headers = {"User-Agent": USER_AGENT}
    resp = requests.get(url, params=params, headers=headers, timeout=30)
    if resp.status_code == 200:
        return resp.json()
    else:
    # return None on failure
        return None

In [0]:
df_account_flat = (
    df_account
    .select(
        "id",
        "AccountNumber",
        "CustomerID",
        col("Account.AccountHolderName").alias("AccountHolderName"),
        col("Account.BankName").alias("BankName"),
        col("Account.BranchName").alias("BranchName"),
        col("Account.IFSC_Code").alias("IFSC_Code"),
        col("Account.AccountType").alias("AccountType"),
        col("Account.AccountStatus").alias("AccountStatus"),
        col("Account.AccountOpenDate").alias("AccountOpenDate"),
        col("Account.Balance").alias("Balance"),
        col("Account.Currency").alias("Currency"),
        col("Account.KYC_Done").alias("KYC_Done"),
        col("Account.KYC_DocID").alias("KYC_DocID"),
        col("Account.KYC_DocumentVerificationStatus").alias("KYC_VerificationStatus")
    )
)

In [0]:
# COMMAND ----------
df_customer_flat = (
    df_customer
    .select(
        "CustomerID",
        col("Customer.FirstName").alias("FirstName"),
        col("Customer.LastName").alias("LastName"),
        col("Customer.DOB").alias("DOB"),
        col("Customer.Gender").alias("Gender"),
        col("Customer.Email").alias("Email"),
        col("Customer.Phone").alias("Phone"),
        col("Customer.Address").alias("Address"),
        col("Customer.City").alias("City"),
        col("Customer.State").alias("State"),
        col("Customer.ZipCode").alias("ZipCode"),
        col("Customer.KYC_Status").alias("KYC_Status"),
        col("Customer.KYC_Tier").alias("KYC_Tier"),
        col("Customer.Occupation").alias("Occupation"),
        col("Customer.AnnualIncome").alias("AnnualIncome")
    )
)

# print("Flattened Customer Data:")
# display(df_customer_flat)


In [0]:
# 6. ATM transactions: select & cast
atm_clean = df_atm.select(
col("TransactionID"),
F.to_timestamp(col("TransactionTime"), "yyyy-MM-dd HH:mm:ss").alias("transaction_time"),
col("TransactionType").alias("transaction_type"),
col("TransactionStatus").alias("transaction_status"),
col("Amount").cast(DoubleType()).alias("amount"),
col("AccountNumber"),
col("BankName").alias("bank_name"),
col("ATMID"),
col("ATM_Bank"),
col("Location"),
col("BalanceBefore").cast(DoubleType()).alias("balance_before"),
col("BalanceAfter").cast(DoubleType()).alias("balance_after")
)

In [0]:
# COMMAND ----------
# Silver Layer Join: Account + Customer on CustomerID
silver_df = (
    df_account_flat.alias("a")
    .join(
        df_customer_flat.alias("c"),
        on="CustomerID",
        how="left"
    )
)

# display(silver_df)


In [0]:
silver_df = (
    silver_df
        # Keys
        .withColumn("AccountNumber", col("AccountNumber").cast("string"))
        .withColumn("CustomerID", col("CustomerID").cast("string"))

        # Account fields
        .withColumn("AccountOpenDate", to_date(col("AccountOpenDate"), "yyyy-MM-dd"))
        .withColumn("Balance", col("Balance").cast("double"))
        .withColumn("KYC_Done", col("KYC_Done").cast("boolean"))

        # Customer fields
        .withColumn("DOB", to_date(col("DOB"), "yyyy-MM-dd"))
        .withColumn("AnnualIncome", col("AnnualIncome").cast("double"))
)

silver_df.printSchema()
# display(silver_df)


In [0]:


null_cleanup_cols = [
    "AccountHolderName", "BranchName", "Email", "Phone",
    "Address", "City", "State", "ZipCode"
]

for c in null_cleanup_cols:
    silver_df = silver_df.withColumn(
        c,
        when(
            (col(c).isNull()) | (col(c) == "") | (col(c).isin("N/A", "NA", "None")),
            None
        ).otherwise(col(c))
    )

# display(silver_df)


In [0]:
# COMMAND ----------
from pyspark.sql.functions import lit

silver_df = silver_df.withColumn(
    "is_valid_record",
    when(
        col("CustomerID").isNull() |
        col("AccountNumber").isNull() |
        col("AccountHolderName").isNull(),
        lit("INVALID")
    ).otherwise(lit("VALID"))
)

# display(silver_df.select("CustomerID", "AccountNumber", "AccountHolderName", "is_valid_record"))


In [0]:
# COMMAND ----------
from pyspark.sql.functions import col, lit, create_map, element_at, coalesce

# Conversion rates relative to INR
conversion_rates = {
    "INR": 1.0,
    "USD": 83.2,
    "AED": 22.7,
    "EUR": 90.1,
    "SGD": 62.0
}

# 1) Ensure Currency is one of the known ones; else default to INR
silver_df = silver_df.withColumn(
    "Currency",
    when(col("Currency").isin(list(conversion_rates.keys())), col("Currency"))
    .otherwise(lit("INR"))
)

# 2) Create a Spark map literal: {"INR":1.0, "USD":83.2, ...}
rate_map = create_map(
    *[lit(x) for kv in conversion_rates.items() for x in kv]
)

# 3) Convert Balance to INR (overwriting Balance)
silver_df = (
    silver_df
    .withColumn("Balance", col("Balance").cast("double"))
    .withColumn(
        "Balance",
        col("Balance") * coalesce(element_at(rate_map, col("Currency")), lit(1.0))
    )
    # After conversion, everything is INR
    .withColumn("Currency", lit("INR"))
)

# display(silver_df.select("CustomerID", "AccountNumber", "Balance", "Currency"))


In [0]:
silver_df.write.format("delta").mode("overwrite").partitionBy("City").save(f"{silver_root}/AccountProfile")

In [0]:
# 7. UPI transactions: select, split GeoLocation -> lat,long
upi_clean = df_upi.select(
col("TransactionID"),
F.to_timestamp(col("TransactionTime"), "yyyy-MM-dd HH:mm:ss").alias("transaction_time"),
col("TransactionType").alias("transaction_type"),
col("Status").alias("transaction_status"),
col("Amount").cast(DoubleType()).alias("amount"),
col("AccountNumber"),
col("BankName").alias("bank_name"),
col("Payer_UPI_ID"),
col("Payee_UPI_ID"),
col("DeviceID"),
col("AppUsed"),
col("GeoLocation"),
col("BalanceBefore").cast(DoubleType()).alias("balance_before"),
col("BalanceAfter").cast(DoubleType()).alias("balance_after")
)


# split GeoLocation if present
upi_clean = upi_clean.withColumn("lat_str", split(col("GeoLocation"), ",").getItem(0))
upi_clean = upi_clean.withColumn("lon_str", split(col("GeoLocation"), ",").getItem(1))
upi_clean = upi_clean.withColumn("latitude", col("lat_str").cast(DoubleType()))
upi_clean = upi_clean.withColumn("longitude", col("lon_str").cast(DoubleType()))
upi_clean = upi_clean.drop("lat_str", "lon_str", "GeoLocation")

In [0]:
# 8. Prepare manual mappings for missing geocodes or addresses
# You said you'll add some missing values manually. Add them here.
# Format: list of tuples
manual_atm_latlong = [
("Bandra West, Kolkata", 22.5470, 88.3516), # example; replace with accurate coords
]
manual_upi_address = [
("UPI000001", "Near Diamond Market, Anand, Gujarat"), # example
]


manual_atm_df = spark.createDataFrame(manual_atm_latlong, schema=["Location", "Latitude", "Longitude"])
manual_upi_df = spark.createDataFrame(manual_upi_address, schema=["TransactionID", "ResolvedAddress"])

In [0]:
# 9. ATM forward geocode strategy:
# - Collect distinct Location values from atm_clean where Location is not null
# - For locations missing in manual lookup, optionally call Nominatim on driver
# - Build a small mapping {Location -> (lat, lon)} and broadcast to Spark


distinct_locations = [r[0] for r in atm_clean.select("Location").distinct().na.drop().collect()]
# print("distinct ATM locations:", distinct_locations)


# Build initial mapping from manual table
manual_map = {row['Location']:(row['Latitude'], row['Longitude']) for row in manual_atm_df.collect()}


# Optionally, auto-geocode remaining locations (driver-side) - careful with rate limits
geocoded_map = {}
for loc in distinct_locations:
    if loc in manual_map:
        geocoded_map[loc] = manual_map[loc]
    else:
        # Call Nominatim (uncomment to enable) - commented out to avoid accidental HTTP calls
        params = {"q": loc, "format": "json", "limit": 1}
        resp = _nominatim_get(NOMINATIM_URL, params)
        if resp and len(resp) > 0:
            lat = float(resp[0]['lat'])
            lon = float(resp[0]['lon'])
            geocoded_map[loc] = (lat, lon)
            time.sleep(NOMINATIM_DELAY)
        else:
            geocoded_map[loc] = (None, None)
        # geocoded_map[loc] = (None, None) # fallback: keep None; user will add manual fixes


# Merge manual + geocoded maps
final_atm_map = {**geocoded_map, **manual_map}


# Create mapping dataframe to broadcast
mapping_rows = [(k, v[0], v[1]) for k, v in final_atm_map.items()]
if mapping_rows:
    atm_geo_map_df = spark.createDataFrame(mapping_rows, schema=["Location", "Latitude", "Longitude"] )
else:
    atm_geo_map_df = spark.createDataFrame([], schema="Location string, Latitude double, Longitude double")

In [0]:
# 10. Join ATM with mapping to add Latitude/Longitude
atm_with_geo = atm_clean.join(atm_geo_map_df, on='Location', how='left')

In [0]:
display(atm_with_geo.limit(10))

In [0]:
atm_with_geo.write.format("delta").mode("overwrite").partitionBy("transaction_type","transaction_status").save(f"{silver_root}/atm")

In [0]:
schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("Address", StringType(), True)
])

In [0]:
# 11. UPI reverse geocoding strategy
# - Collect distinct (latitude,longitude) pairs
# - Try to resolve to an address using Nominatim reverse geocoding on driver
# - Use manual mapping for transactions where reverse geocode fails


latlon_rows = upi_clean.select("latitude", "longitude").distinct().na.drop().collect()
latlon_pairs = [(r['latitude'], r['longitude']) for r in latlon_rows]
# print("distinct upi lat-lon pairs:", latlon_pairs)




# Build manual map for UPI IDs
# manual_upi_map = {row['TransactionID']: row['ResolvedAddress'] for row in manual_upi_df.collect()}


# Build reverse geocode mapping (driver-side). Commented out to prevent accidental HTTP calls.
reverse_geocode_map = {}
for lat, lon in latlon_pairs:
    # Uncomment the below block to query Nominatim (respect rate limits)
    params = {"lat": lat, "lon": lon, "format": "json"}
    resp = _nominatim_get(NOMINATIM_REVERSE_URL, params)
    if resp and 'display_name' in resp:
        reverse_geocode_map[(lat, lon)] = resp['display_name']
        time.sleep(NOMINATIM_DELAY)
    else:
        reverse_geocode_map[(lat, lon)] = None
    # reverse_geocode_map[(lat, lon)] = None


# Create a mapping DF from latlon -> address
reverse_map_rows = [(lat, lon, reverse_geocode_map.get((lat,lon))) for (lat,lon) in latlon_pairs]
if reverse_map_rows:
    upi_reverse_map_df = spark.createDataFrame(reverse_map_rows, schema=schema)
else:
    upi_reverse_map_df = spark.createDataFrame([], schema=schema)

In [0]:
# 12. Join UPI with reverse map and manual transaction-level fixes
upi_with_address = upi_clean.join(upi_reverse_map_df, on=["latitude","longitude"], how='left')
# upi_with_address = upi_with_address.join(manual_upi_df, on='TransactionID', how='left')
# To avoid ambiguous column names after joins, let's pick proper names explicitly
# First, rename the reverse map column (if exists) to rev_address
for c in upi_with_address.columns:
    pass

In [0]:
# upi_with_address.display()

In [0]:
upi_with_address.write.format("delta").mode("overwrite").partitionBy("bank_name","transaction_type").save(f"{silver_root}/upi")

In [0]:
# 13. Standardize ATM and UPI into a single transactions schema
atm_std = atm_with_geo.select(
# col("CustomerID"),
col("TransactionID"),
col("transaction_time"),
col("transaction_type"),
col("transaction_status"),
col("amount"),
col("bank_name"),
col("AccountNumber").alias("account_number"),
col("Location").alias("location"),
col("Latitude").alias("latitude"),
col("Longitude").alias("longitude"),
col("balance_before"),
col("balance_after")
).withColumn("channel", lit("ATM"))


# For UPI we pick final_address as location, and latitude/longitude as numeric values
upi_std = upi_with_address.select(
# col("CustomerID"),
col("TransactionID"),
col("transaction_time"),
col("transaction_type"),
col("transaction_status"),
col("amount"),
col("bank_name"),
col("AccountNumber").alias("account_number"),
col("address").alias("location"),
col("latitude"),
col("longitude"),
col("balance_before"),
col("balance_after")
).withColumn("channel", lit("UPI"))


transactions_union = atm_std.unionByName(upi_std)

In [0]:
display(transactions_union.limit(10))

In [0]:
transactions_union.write.format("delta").mode("overwrite").partitionBy("bank_name").save(f"{silver_root}/transactions")

In [0]:
storage_account = "bnksource"
storage_key = "evw39tmaRoERb9JwJ50uQ27ztEJndsoI95tMC5XTmd1cqsQM4RIZsbg/J7GUwxuNcGySfnu4GVHl+ASttanO2Q=="
adls_container = "raw"

In [0]:
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_key
)

In [0]:
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("abfss://raw@bnksource.dfs.core.windows.net/kyc/*.csv")

# display(df_csv)


In [0]:
df_kyc_raw = (spark.read.format("text")
              .load("abfss://raw@bnksource.dfs.core.windows.net/kyc/*.txt")
              .withColumn("FileName", input_file_name()))
# df_kyc_raw.display()



In [0]:
from pyspark.sql.functions import collect_list, concat_ws

df_grouped = (df_kyc_raw
              .groupBy("FileName")
              .agg(concat_ws("\n", collect_list("value")).alias("FileText")))
# display(df_grouped.limit(5))


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf

def parse_kyc(text):
    result = {"DocumentType": None, "CustomerID": None, "AccountNumber": None,
              "DocumentID": None, "Status": None}

    if not text:
        return result

    for line in text.split("\n"):
        line = line.strip()
        if line.startswith("Type:"):
            result["DocumentType"] = line.split(":",1)[1].strip()
        elif line.startswith("CustomerID:"):
            result["CustomerID"] = line.split(":",1)[1].strip()
        elif line.startswith("AccountNumber:"):
            result["AccountNumber"] = line.split(":",1)[1].strip()
        elif line.startswith("ID:"):
            result["DocumentID"] = line.split(":",1)[1].strip()
        elif line.startswith("Status:"):
            result["Status"] = line.split(":",1)[1].strip()

    return result

schema = StructType([
    StructField("DocumentType", StringType()),
    StructField("CustomerID", StringType()),
    StructField("AccountNumber", StringType()),
    StructField("DocumentID", StringType()),
    StructField("Status", StringType()),
])

parse_udf = udf(parse_kyc, schema)

df_parsed = df_grouped.withColumn("parsed", parse_udf("FileText"))


In [0]:
# df_parsed.display(5)

In [0]:
df_kyc = (
    df_parsed
    .select(
        "FileName",
        "FileText",
        col("parsed.DocumentType").alias("DocumentType"),
        col("parsed.CustomerID").alias("CustomerID"),
        col("parsed.AccountNumber").alias("AccountNumber"),
        col("parsed.DocumentID").alias("DocumentID"),
        col("parsed.Status").alias("KYCStatus")
    )
    .drop("FileText")
)


In [0]:
# display(df_kyc.limit(10))

In [0]:
df_kyc_clean = (
    df_kyc
    .withColumn("DocumentType", upper(col("DocumentType")))
    .withColumn("KYCStatus", upper(col("KYCStatus")))
    .withColumn("IngestTime", current_timestamp())
)


In [0]:
df_kyc_clean.write.format("delta").mode("overwrite").partitionBy("KYCStatus", "DocumentType").save(f"{silver_root}/kyc")

