In [1]:
import findspark
findspark.init("C:/spark/spark-3.4.1-bin-hadoop3")

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
        .appName("CosmosETL")
        
        # Cosmos DB Spark Connector
        .config(
            "spark.jars.packages",
            "com.azure.cosmos.spark:azure-cosmos-spark_3-4_2-12:4.41.0"
        )
        
        # ADLS Gen2 (ABFS) authentication
        .config("fs.azure.account.key.charithastorage123.dfs.core.windows.net",
                "k3f/N47wk7KRksHyAa6fcFAAJVPLFLNBRejSUYM+85v71Oz7uWx5fm6s9dEVfGmWIMhKyzLoSIPX+ASt0lHp2Q==")
        
        .config("fs.azure.account.auth.type.charithastorage123.dfs.core.windows.net", "SharedKey")
        .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        .config("fs.azure.skip.metrics", "true")

        # Performance tuning
        .config("spark.sql.shuffle.partitions", "4")
        
        .getOrCreate()
)

spark


In [2]:
cosmosConfig = {
  "spark.cosmos.accountEndpoint": "https://charithacosmos.documents.azure.com:443/",
  "spark.cosmos.accountKey": "VoP3rX3G2qkeiRYr7n0oRcATCwRRs9KYSaakIBgeCkaTjfgEhAHhBMOHNYZX48P0CgDAi0ffAS6CACDbLNczfA==",
  "spark.cosmos.database": "OperationalDB"
}

In [3]:
atm_raw = (
    spark.read.format("cosmos.oltp")
    .options(**cosmosConfig, **{"spark.cosmos.container": "ATMTransactions"})
    .load()
)

upi_raw = (
    spark.read.format("cosmos.oltp")
    .options(**cosmosConfig, **{"spark.cosmos.container": "UPIEvents"})
    .load()
)

cust_raw = (
    spark.read.format("cosmos.oltp")
    .options(**cosmosConfig, **{"spark.cosmos.container": "AccountProfile"})
    .load()
)

print("ATM:", atm_raw.count())
print("UPI:", upi_raw.count())
print("Customers:", cust_raw.count())


ATM: 5000
UPI: 500
Customers: 100


In [4]:
atm_raw.printSchema()
atm_raw.columns

root
 |-- Location: string (nullable = true)
 |-- ATMID: string (nullable = true)
 |-- AccountNumber: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- TransactionType: string (nullable = true)
 |-- TransactionAmount: integer (nullable = true)
 |-- id: string (nullable = false)
 |-- transaction_type: string (nullable = true)
 |-- TransactionID: string (nullable = true)
 |-- Status: string (nullable = true)



['Location',
 'ATMID',
 'AccountNumber',
 'CustomerID',
 'partitionKey',
 'TransactionTime',
 'TransactionType',
 'TransactionAmount',
 'id',
 'transaction_type',
 'TransactionID',
 'Status']

In [5]:
upi_raw.printSchema()
upi_raw.columns

root
 |-- TxnID: string (nullable = true)
 |-- AccountNumber: integer (nullable = true)
 |-- TxnTimestamp: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- TxnType: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- PayeeUPI: string (nullable = true)
 |-- Amount: integer (nullable = true)
 |-- EventID: string (nullable = true)
 |-- id: string (nullable = false)
 |-- GeoLocation: string (nullable = true)
 |-- PayerUPI: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- DeviceID: string (nullable = true)
 |-- Status: string (nullable = true)



['TxnID',
 'AccountNumber',
 'TxnTimestamp',
 'CustomerID',
 'TxnType',
 'partitionKey',
 'PayeeUPI',
 'Amount',
 'EventID',
 'id',
 'GeoLocation',
 'PayerUPI',
 'transaction_type',
 'DeviceID',
 'Status']

In [6]:
cust_raw.printSchema()
cust_raw.columns

root
 |-- Name: void (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: long (nullable = true)
 |-- id: string (nullable = false)
 |-- CreatedAt: string (nullable = true)



['Name', 'CustomerID', 'Email', 'Phone', 'id', 'CreatedAt']

In [7]:
upi_raw.show(5, truncate=False)

+---------+-------------+--------------------+----------+-------+------------+------------+------+---------+-------+---------------+-----------+----------------+--------+-------+
|TxnID    |AccountNumber|TxnTimestamp        |CustomerID|TxnType|partitionKey|PayeeUPI    |Amount|EventID  |id     |GeoLocation    |PayerUPI   |transaction_type|DeviceID|Status |
+---------+-------------+--------------------+----------+-------+------------+------------+------+---------+-------+---------------+-----------+----------------+--------+-------+
|TXN000001|1002003160   |2025-01-01T00:00:00Z|CUST148   |DEBIT  |CUST148     |store374@upi|2500  |UPI000001|CUST148|20.2018,87.4257|user189@upi|UPI_OTHER       |DEV129  |SUCCESS|
|TXN000002|1002003127   |2025-01-01T00:00:05Z|CUST041   |DEBIT  |CUST041     |store400@upi|95000 |UPI000002|CUST041|25.6749,75.5603|user389@upi|UPI_OTHER       |DEV095  |SUCCESS|
|TXN000003|1002003405   |2025-01-01T00:00:10Z|CUST146   |CREDIT |CUST146     |store172@upi|75000 |UPI0000

In [8]:
atm_raw.show(5, truncate=False)

+---------+------+-------------+----------+------------+--------------------+---------------+-----------------+---------+----------------+-------------+-------+
|Location |ATMID |AccountNumber|CustomerID|partitionKey|TransactionTime     |TransactionType|TransactionAmount|id       |transaction_type|TransactionID|Status |
+---------+------+-------------+----------+------------+--------------------+---------------+-----------------+---------+----------------+-------------+-------+
|Chennai  |ATM013|1002003352   |CUST372   |CUST372     |2025-01-01T00:00:00Z|WITHDRAWAL     |5000             |ATM000001|ATM_OTHER       |ATM000001    |SUCCESS|
|Mumbai   |ATM008|1002003348   |CUST032   |CUST032     |2025-01-01T00:01:00Z|WITHDRAWAL     |8000             |ATM000002|ATM_OTHER       |ATM000002    |SUCCESS|
|Bangalore|ATM047|1002003268   |CUST488   |CUST488     |2025-01-01T00:02:00Z|WITHDRAWAL     |8000             |ATM000003|ATM_OTHER       |ATM000003    |SUCCESS|
|Kolkata  |ATM015|1002003256   |CU

In [9]:
cust_raw.show(5, truncate=False)

+----+----------+--------------+----------+-------+--------------------------+
|Name|CustomerID|Email         |Phone     |id     |CreatedAt                 |
+----+----------+--------------+----------+-------+--------------------------+
|null|CUST001   |user1@mail.com|9000000001|CUST001|2025-12-08T15:57:32.054215|
|null|CUST002   |user2@mail.com|9000000002|CUST002|2025-12-08T15:57:32.081457|
|null|CUST003   |user3@mail.com|9000000003|CUST003|2025-12-08T15:57:32.108141|
|null|CUST004   |user4@mail.com|9000000004|CUST004|2025-12-08T15:57:32.134021|
|null|CUST005   |user5@mail.com|9000000005|CUST005|2025-12-08T15:57:32.159845|
+----+----------+--------------+----------+-------+--------------------------+
only showing top 5 rows



### ATM Silver

#### Clean UPI Transactions

In [10]:
from pyspark.sql.functions import col, to_timestamp, lit

upi_silver = (
    upi_raw
    .withColumn("TxnTimestamp", to_timestamp("TxnTimestamp"))
    .withColumn("TransactionAmount", col("Amount").cast("double"))
    .withColumn("Channel", lit("UPI"))
    .select(
        col("TxnID").alias("TransactionID"),
        "AccountNumber",
        "CustomerID",
        "TxnTimestamp",
        "TransactionAmount",
        "transaction_type",
        "DeviceID",
        "Status",
        "PayeeUPI",
        "PayerUPI",
        "GeoLocation",
        "EventID",
    )
)

upi_silver.show(5, truncate=False)
upi_silver.write.mode("overwrite").parquet(
    "abfs://silver@charithastorage123.dfs.core.windows.net/upi_silver/"
)


+-------------+-------------+----------+-------------------+-----------------+----------------+--------+-------+------------+-----------+---------------+---------+
|TransactionID|AccountNumber|CustomerID|TxnTimestamp       |TransactionAmount|transaction_type|DeviceID|Status |PayeeUPI    |PayerUPI   |GeoLocation    |EventID  |
+-------------+-------------+----------+-------------------+-----------------+----------------+--------+-------+------------+-----------+---------------+---------+
|TXN000001    |1002003160   |CUST148   |2025-01-01 05:30:00|2500.0           |UPI_OTHER       |DEV129  |SUCCESS|store374@upi|user189@upi|20.2018,87.4257|UPI000001|
|TXN000002    |1002003127   |CUST041   |2025-01-01 05:30:05|95000.0          |UPI_OTHER       |DEV095  |SUCCESS|store400@upi|user389@upi|25.6749,75.5603|UPI000002|
|TXN000003    |1002003405   |CUST146   |2025-01-01 05:30:10|75000.0          |UPI_OTHER       |DEV159  |SUCCESS|store172@upi|user252@upi|15.6949,80.2814|UPI000003|
|TXN000004    |1

### ATM Silver

#### Clean ATM Transactions

In [11]:
atm_silver = (
    atm_raw
    .withColumn("TransactionTime", to_timestamp("TransactionTime"))
    .withColumn("TransactionAmount", col("TransactionAmount").cast("double"))
    .withColumn("Channel", lit("ATM"))
    .select(
        col("TransactionID"),
        col("AccountNumber"),
        col("CustomerID"),
        col("TransactionTime").alias("TxnTimestamp"),
        "TransactionAmount",
        "TransactionType",
        "transaction_type",
        "Location",
        "ATMID",
        "Status"
    )
)

atm_silver.show(5, truncate=False)
atm_silver.write.mode("overwrite").parquet(
    "abfs://silver@charithastorage123.dfs.core.windows.net/atm_silver/"
)


+-------------+-------------+----------+-------------------+-----------------+---------------+----------------+---------+------+-------+
|TransactionID|AccountNumber|CustomerID|TxnTimestamp       |TransactionAmount|TransactionType|transaction_type|Location |ATMID |Status |
+-------------+-------------+----------+-------------------+-----------------+---------------+----------------+---------+------+-------+
|ATM000001    |1002003352   |CUST372   |2025-01-01 05:30:00|5000.0           |WITHDRAWAL     |ATM_OTHER       |Chennai  |ATM013|SUCCESS|
|ATM000002    |1002003348   |CUST032   |2025-01-01 05:31:00|8000.0           |WITHDRAWAL     |ATM_OTHER       |Mumbai   |ATM008|SUCCESS|
|ATM000003    |1002003268   |CUST488   |2025-01-01 05:32:00|8000.0           |WITHDRAWAL     |ATM_OTHER       |Bangalore|ATM047|SUCCESS|
|ATM000004    |1002003256   |CUST459   |2025-01-01 05:33:00|5000.0           |WITHDRAWAL     |ATM_OTHER       |Kolkata  |ATM015|SUCCESS|
|ATM000005    |1002003494   |CUST378   |2

### Silver Customer

#### Clean Customer Data

In [12]:
from pyspark.sql.functions import (
    col, when, lit, to_timestamp, date_format,
    year, month, dayofmonth, quarter, monotonically_increasing_id
)
cust_silver = (
    cust_raw
    .withColumn("Name", when(col("Name").isNull(), col("CustomerID")).otherwise(col("Name")))
    .select(
        col("CustomerID"),
        col("Name"),
        col("Email"),
        col("Phone"),
        to_timestamp("CreatedAt").alias("CreatedAt")
    )
)

cust_silver.show(5, truncate=False)
cust_silver.write.mode("overwrite").parquet(
    "abfs://silver@charithastorage123.dfs.core.windows.net/cust_silver/"
)

+----------+-------+--------------+----------+--------------------------+
|CustomerID|Name   |Email         |Phone     |CreatedAt                 |
+----------+-------+--------------+----------+--------------------------+
|CUST001   |CUST001|user1@mail.com|9000000001|2025-12-08 15:57:32.054215|
|CUST002   |CUST002|user2@mail.com|9000000002|2025-12-08 15:57:32.081457|
|CUST003   |CUST003|user3@mail.com|9000000003|2025-12-08 15:57:32.108141|
|CUST004   |CUST004|user4@mail.com|9000000004|2025-12-08 15:57:32.134021|
|CUST005   |CUST005|user5@mail.com|9000000005|2025-12-08 15:57:32.159845|
+----------+-------+--------------+----------+--------------------------+
only showing top 5 rows



### DIMENSION TABLES (GOLD)

#### DimCustomer

In [13]:
from pyspark.sql.functions import monotonically_increasing_id

dim_customer = (
    cust_silver
    .withColumn("CustomerSK", monotonically_increasing_id())
)

dim_customer.show(5, truncate=False)
dim_customer.write.mode("overwrite").parquet(
    "abfs://gold@charithastorage123.dfs.core.windows.net/DimCustomer/"
)



+----------+-------+--------------+----------+--------------------------+----------+
|CustomerID|Name   |Email         |Phone     |CreatedAt                 |CustomerSK|
+----------+-------+--------------+----------+--------------------------+----------+
|CUST001   |CUST001|user1@mail.com|9000000001|2025-12-08 15:57:32.054215|0         |
|CUST002   |CUST002|user2@mail.com|9000000002|2025-12-08 15:57:32.081457|1         |
|CUST003   |CUST003|user3@mail.com|9000000003|2025-12-08 15:57:32.108141|2         |
|CUST004   |CUST004|user4@mail.com|9000000004|2025-12-08 15:57:32.134021|3         |
|CUST005   |CUST005|user5@mail.com|9000000005|2025-12-08 15:57:32.159845|4         |
+----------+-------+--------------+----------+--------------------------+----------+
only showing top 5 rows



#### DimAccount

In [14]:
dim_account = (
    upi_silver.select("AccountNumber").union(atm_silver.select("AccountNumber"))
    .distinct()
    .withColumn("AccountSK", monotonically_increasing_id())
)

dim_account.show(5, truncate=False)
dim_account.write.mode("overwrite").parquet(
    "abfs://gold@charithastorage123.dfs.core.windows.net/DimAccount/"
)


+-------------+---------+
|AccountNumber|AccountSK|
+-------------+---------+
|1002003405   |0        |
|1002003007   |1        |
|1002003187   |2        |
|1002003139   |3        |
|1002003230   |4        |
+-------------+---------+
only showing top 5 rows



#### DimDate

In [15]:
from pyspark.sql.functions import year, month, dayofmonth, quarter, date_format

dates = (
    upi_silver.select(col("TxnTimestamp"))
    .union(atm_silver.select(col("TxnTimestamp")))
    .distinct()
    .withColumn("DateSK", date_format("TxnTimestamp", "yyyyMMdd"))
    .withColumn("Year", year("TxnTimestamp"))
    .withColumn("Month", month("TxnTimestamp"))
    .withColumn("Day", dayofmonth("TxnTimestamp"))
    .withColumn("Quarter", quarter("TxnTimestamp"))
)

dim_date = dates.select("DateSK", "Year", "Month", "Day", "Quarter")

dim_date.show(10, truncate=False)
dim_date.write.mode("overwrite").parquet(
    "abfs://gold@charithastorage123.dfs.core.windows.net/DimDate/"
)


+--------+----+-----+---+-------+
|DateSK  |Year|Month|Day|Quarter|
+--------+----+-----+---+-------+
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
|20250101|2025|1    |1  |1      |
+--------+----+-----+---+-------+
only showing top 10 rows



### FACT TABLES (GOLD)

#### UPI Transactions GOLD

In [16]:
from pyspark.sql.functions import col, date_format, lit

upi_gold = (
    upi_silver
    .withColumn("DateSK", date_format(col("TxnTimestamp"), "yyyyMMdd"))
    .withColumn("Channel", lit("UPI"))   # add Channel safely here
    .select(
        col("TransactionID"),
        col("CustomerID"),
        col("AccountNumber"),
        col("TxnTimestamp"),
        col("TransactionAmount"),
        col("transaction_type").alias("TransactionType"),
        col("Channel"),
        col("EventID"),
        col("Status"),
        col("DeviceID"),
        col("PayeeUPI"),
        col("PayerUPI"),
        col("GeoLocation"),
        col("DateSK")
    )
)


#### ATM Transactions GOLD

In [17]:
atm_gold = (
    atm_silver
    .withColumn("DateSK", date_format(col("TxnTimestamp"), "yyyyMMdd"))
    .withColumn("Channel", lit("ATM"))
    .select(
        col("TransactionID"),
        col("CustomerID"),
        col("AccountNumber"),
        col("TxnTimestamp"),
        col("TransactionAmount"),
        col("TransactionType"),
        col("Channel"),
        col("ATMID"),
        col("Location"),
        col("Status"),
        col("DateSK")
    )
)


#### UNION both to create FactTransactions (GOLD Layer)

In [18]:
from pyspark.sql.functions import lit

atm_common = atm_gold.select(
    "TransactionID","CustomerID","AccountNumber","TxnTimestamp",
    "TransactionAmount","TransactionType","Channel",
    lit(None).alias("EventID"),
    lit(None).alias("DeviceID"),
    lit(None).alias("PayeeUPI"),
    lit(None).alias("PayerUPI"),
    lit(None).alias("GeoLocation"),
    "Status","DateSK"
)

upi_common = upi_gold.select(
    "TransactionID","CustomerID","AccountNumber","TxnTimestamp",
    "TransactionAmount","TransactionType","Channel",
    "EventID",
    "DeviceID",
    "PayeeUPI",
    "PayerUPI",
    "GeoLocation",
    "Status","DateSK"
)


In [19]:
fact_transactions = atm_common.unionByName(upi_common)
fact_transactions.show()
fact_transactions.write.mode("overwrite").parquet(
    "abfs://gold@charithastorage123.dfs.core.windows.net/FactTransactions/"
)


+-------------+----------+-------------+-------------------+-----------------+---------------+-------+-------+--------+--------+--------+-----------+-------+--------+
|TransactionID|CustomerID|AccountNumber|       TxnTimestamp|TransactionAmount|TransactionType|Channel|EventID|DeviceID|PayeeUPI|PayerUPI|GeoLocation| Status|  DateSK|
+-------------+----------+-------------+-------------------+-----------------+---------------+-------+-------+--------+--------+--------+-----------+-------+--------+
|    ATM000001|   CUST372|   1002003352|2025-01-01 05:30:00|           5000.0|     WITHDRAWAL|    ATM|   null|    null|    null|    null|       null|SUCCESS|20250101|
|    ATM000002|   CUST032|   1002003348|2025-01-01 05:31:00|           8000.0|     WITHDRAWAL|    ATM|   null|    null|    null|    null|       null|SUCCESS|20250101|
|    ATM000003|   CUST488|   1002003268|2025-01-01 05:32:00|           8000.0|     WITHDRAWAL|    ATM|   null|    null|    null|    null|       null|SUCCESS|20250101

### Gold-Level Dimension Tables (DimCustomer, DimAccount, DimDate)

#### DimCustomer (SCD Type-2)

In [None]:
from pyspark.sql.functions import lit, to_timestamp, monotonically_increasing_id

dim_customer = (
    cust_silver
        .select(
            "CustomerID",
            "Name",
            "Email",
            "Phone",
            to_timestamp("CreatedAt").alias("StartDate")
        )
        .withColumn("EndDate", lit(None).cast("timestamp"))
        .withColumn("IsCurrent", lit(True))
        .withColumn("CustomerSK", monotonically_increasing_id())
)

dim_customer.show(5, truncate=False)

# Write to ADLS as DimCustomer (Gold)



+----------+-------+--------------+----------+--------------------------+-------+---------+----------+
|CustomerID|Name   |Email         |Phone     |StartDate                 |EndDate|IsCurrent|CustomerSK|
+----------+-------+--------------+----------+--------------------------+-------+---------+----------+
|CUST001   |CUST001|user1@mail.com|9000000001|2025-12-08 15:57:32.054215|null   |true     |0         |
|CUST002   |CUST002|user2@mail.com|9000000002|2025-12-08 15:57:32.081457|null   |true     |1         |
|CUST003   |CUST003|user3@mail.com|9000000003|2025-12-08 15:57:32.108141|null   |true     |2         |
|CUST004   |CUST004|user4@mail.com|9000000004|2025-12-08 15:57:32.134021|null   |true     |3         |
|CUST005   |CUST005|user5@mail.com|9000000005|2025-12-08 15:57:32.159845|null   |true     |4         |
+----------+-------+--------------+----------+--------------------------+-------+---------+----------+
only showing top 5 rows

