### Bronze to Silver (Transform Bronze Data)

In [1]:
from pyspark.sql.functions import *

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("BronzeToSilver")
    .master("local[*]")
    .getOrCreate()
)

spark

In [33]:
raw_data_lake_schema = '''
  _row_id BIGINT,
  txn_date STRING,
  filler STRING,
  txn_time STRING,
  store_id STRING,
  terminal_id STRING,
  txn_id STRING,
  cust_id STRING,
  payment_mode STRING,
  partner_bank STRING,
  amount_paid DECIMAL(9,2),
  bank_payable DECIMAL(9,2),
  customer_payable DECIMAL(9,2),
  currency_code STRING,
  txn_status STRING
'''


In [34]:
df = spark.read.format("csv").schema(raw_data_lake_schema).option('header', True).load(r'D:\mainframe_to_analytics_dev\databricks\catalog\bronze\BRONZE20260104.CSV')

In [35]:
df.show(10)

+-------+----------+------+--------+--------+-----------+------------+----------+------------+---------------+-----------+------------+----------------+-------------+----------+
|_row_id|  txn_date|filler|txn_time|store_id|terminal_id|      txn_id|   cust_id|payment_mode|   partner_bank|amount_paid|bank_payable|customer_payable|currency_code|txn_status|
+-------+----------+------+--------+--------+-----------+------------+----------+------------+---------------+-----------+------------+----------------+-------------+----------+
|      0|2026-01-04|      |12:47:16|  STR001|       T01 |TXN000001   |CUST89581 |  CASH      |NA             |    1988.85|     2038.57|         2018.68|          ESS|          |
|      1|2026-01-04|      |12:47:16|  STR002|       T01 |TXN000002   |CUST62506 |  UPI       |HDFC           |     716.64|      734.56|          727.39|          ED |          |
|      2|2026-01-04|      |12:47:16|  STR002|       T01 |TXN000003   |CUST55832 |  CARD      |HDFC           |

In [36]:
df.printSchema()

root
 |-- _row_id: long (nullable = true)
 |-- txn_date: string (nullable = true)
 |-- filler: string (nullable = true)
 |-- txn_time: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- terminal_id: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- cust_id: string (nullable = true)
 |-- payment_mode: string (nullable = true)
 |-- partner_bank: string (nullable = true)
 |-- amount_paid: decimal(9,2) (nullable = true)
 |-- bank_payable: decimal(9,2) (nullable = true)
 |-- customer_payable: decimal(9,2) (nullable = true)
 |-- currency_code: string (nullable = true)
 |-- txn_status: string (nullable = true)



In [37]:
## Type Casting

# txn_date, txn_time -> datetime, timestamp


In [38]:
from pyspark.sql.functions import to_timestamp, concat_ws, col

df = df.withColumn(
    "txn_timestamp",
    to_timestamp(
        concat_ws(" ", col("txn_date"), col("txn_time")),
        "yyyy-MM-dd HH:mm:ss"
    )
)

In [39]:
df.show(10)

+-------+----------+------+--------+--------+-----------+------------+----------+------------+---------------+-----------+------------+----------------+-------------+----------+-------------------+
|_row_id|  txn_date|filler|txn_time|store_id|terminal_id|      txn_id|   cust_id|payment_mode|   partner_bank|amount_paid|bank_payable|customer_payable|currency_code|txn_status|      txn_timestamp|
+-------+----------+------+--------+--------+-----------+------------+----------+------------+---------------+-----------+------------+----------------+-------------+----------+-------------------+
|      0|2026-01-04|      |12:47:16|  STR001|       T01 |TXN000001   |CUST89581 |  CASH      |NA             |    1988.85|     2038.57|         2018.68|          ESS|          |2026-01-04 12:47:16|
|      1|2026-01-04|      |12:47:16|  STR002|       T01 |TXN000002   |CUST62506 |  UPI       |HDFC           |     716.64|      734.56|          727.39|          ED |          |2026-01-04 12:47:16|
|      2|2

In [42]:
### Normalize values

## eg 'UPI   ' to 'UPI' 



In [52]:
from pyspark.sql.functions import trim, col

df = df.withColumn('payment_mode', trim(col('payment_mode')))
df = df.withColumn('partner_bank', trim(col('partner_bank')))
df = df.withColumn('currency_code', trim(col('currency_code')))
df = df.withColumn('terminal_id', trim(col('terminal_id')))
df.show(10)

+-------+----------+------+--------+--------+-----------+------------+----------+------------+------------+-----------+------------+----------------+-------------+----------+-------------------+-------+-------------+
|_row_id|  txn_date|filler|txn_time|store_id|terminal_id|      txn_id|   cust_id|payment_mode|partner_bank|amount_paid|bank_payable|customer_payable|currency_code|txn_status|      txn_timestamp|IS_CASH|is_self_kiosk|
+-------+----------+------+--------+--------+-----------+------------+----------+------------+------------+-----------+------------+----------------+-------------+----------+-------------------+-------+-------------+
|      0|2026-01-04|      |12:47:16|  STR001|        T01|TXN000001   |CUST89581 |        CASH|          NA|    1988.85|     2038.57|         2018.68|          ESS|          |2026-01-04 12:47:16|      Y|            Y|
|      1|2026-01-04|      |12:47:16|  STR002|        T01|TXN000002   |CUST62506 |         UPI|        HDFC|     716.64|      734.56|

In [53]:
df = df.withColumn('IS_CASH', when(((col('payment_mode') == 'UPI') | (col('payment_mode') == 'CARD')),'N').otherwise('Y'))

In [54]:
df.show(10)

+-------+----------+------+--------+--------+-----------+------------+----------+------------+------------+-----------+------------+----------------+-------------+----------+-------------------+-------+-------------+
|_row_id|  txn_date|filler|txn_time|store_id|terminal_id|      txn_id|   cust_id|payment_mode|partner_bank|amount_paid|bank_payable|customer_payable|currency_code|txn_status|      txn_timestamp|IS_CASH|is_self_kiosk|
+-------+----------+------+--------+--------+-----------+------------+----------+------------+------------+-----------+------------+----------------+-------------+----------+-------------------+-------+-------------+
|      0|2026-01-04|      |12:47:16|  STR001|        T01|TXN000001   |CUST89581 |        CASH|          NA|    1988.85|     2038.57|         2018.68|          ESS|          |2026-01-04 12:47:16|      Y|            Y|
|      1|2026-01-04|      |12:47:16|  STR002|        T01|TXN000002   |CUST62506 |         UPI|        HDFC|     716.64|      734.56|

In [55]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "is_self_kiosk",
    when(col("terminal_id").isin("T01", "T02"), "N")
    .otherwise("Y")
)

In [56]:
df.show(10)

+-------+----------+------+--------+--------+-----------+------------+----------+------------+------------+-----------+------------+----------------+-------------+----------+-------------------+-------+-------------+
|_row_id|  txn_date|filler|txn_time|store_id|terminal_id|      txn_id|   cust_id|payment_mode|partner_bank|amount_paid|bank_payable|customer_payable|currency_code|txn_status|      txn_timestamp|IS_CASH|is_self_kiosk|
+-------+----------+------+--------+--------+-----------+------------+----------+------------+------------+-----------+------------+----------------+-------------+----------+-------------------+-------+-------------+
|      0|2026-01-04|      |12:47:16|  STR001|        T01|TXN000001   |CUST89581 |        CASH|          NA|    1988.85|     2038.57|         2018.68|          ESS|          |2026-01-04 12:47:16|      Y|            N|
|      1|2026-01-04|      |12:47:16|  STR002|        T01|TXN000002   |CUST62506 |         UPI|        HDFC|     716.64|      734.56|

In [60]:
silver_df = df

In [None]:
## STEP 3 : BRONZE CSV FILE -> CATALOG

## WILL WORK ON DATABRICKS
import os
from datetime import datetime
SILVER_DIR = r"D:\mainframe_to_analytics_dev\databricks\catalog\silver"
run_date = datetime.now().strftime("%Y%m%d")
BRONZE_FILE = os.path.join(
            SILVER_DIR, f"BRONZE{run_date}.CSV"
        )

os.makedirs(SILVER_DIR, exist_ok=True)

silver_df.to_csv(BRONZE_FILE)

Py4JJavaError: An error occurred while calling o331.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
