AUTHENTICATION AND DATA READING

In [0]:
#dbutils.secrets.listScopes()
dbutils.secrets.list('databricks-keyvault-scope')
#secret1=dbutils.secrets.get('databricks-keyvault-scope','ServiceCredential')

[SecretMetadata(key='ApplicationId'),
 SecretMetadata(key='directoryid'),
 SecretMetadata(key='ServiceCredential')]

In [0]:
#Mounting Azure Data Lake Storage Gen2

application_id=dbutils.secrets.get('databricks-keyvault-scope','ApplicationId')
service_credential=dbutils.secrets.get('databricks-keyvault-scope','ServiceCredential')
directory_id=dbutils.secrets.get('databricks-keyvault-scope','directoryid')

configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": application_id,
          "fs.azure.account.oauth2.client.secret": service_credential,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
#dbutils.fs.mount(
#  source = "abfss://project040raw@project040datalake.dfs.core.windows.net/",
#  mount_point = "/mnt/project040datalake/project040raw",
#  extra_configs = configs)

In [0]:
mount_point = "/mnt/project040datalake/project040raw"

if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted existing mount at {mount_point}")
    
#  Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://project040raw@project040datalake.dfs.core.windows.net/",
  mount_point = mount_point,
  extra_configs = configs)
print(f"Mounted successfully at {mount_point}")

/mnt/project040datalake/project040raw has been unmounted.
Unmounted existing mount at /mnt/project040datalake/project040raw
Mounted successfully at /mnt/project040datalake/project040raw


In [0]:
mount_point = "/mnt/project040datalake/project040processed"

if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted existing mount at {mount_point}")
    
#  Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://project040processed@project040datalake.dfs.core.windows.net/",
  mount_point = mount_point,
  extra_configs = configs)
print(f"Mounted successfully at {mount_point}")

/mnt/project040datalake/project040processed has been unmounted.
Unmounted existing mount at /mnt/project040datalake/project040processed
Mounted successfully at /mnt/project040datalake/project040processed


In [0]:
dbutils.fs.mounts()

[MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/mnt/project040datalake/project040raw', source='abfss://project040raw@project040datalake.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/Volume', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/volumes', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/mnt/project040datalake/project040processed', source='abfss://project040processed@project040datalake.dfs.core.windows.net/', encryptionType=''),
 MountInfo(mountPoint='/', so

In [0]:
#read files in container
transactions_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/mnt/project040datalake/project040raw/transactions.csv")
transactions_df.show(5)


+--------------+----------+----------------+------------------+----------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|
+--------------+----------+----------------+------------------+----------------+
|             1|        45|      2024-01-01|             100.5|         Deposit|
|             2|        12|      2024-01-02|            200.75|      Withdrawal|
|             3|        78|      2024-01-03|             150.0|         Deposit|
|             4|        34|      2024-01-04|            300.25|      Withdrawal|
|             5|        56|      2024-01-05|             250.0|         Deposit|
+--------------+----------+----------------+------------------+----------------+
only showing top 5 rows



DATA CLEANING

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define schema for transactions.csv
transactions_df_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("account_id", IntegerType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("transaction_amount", FloatType(), True),
    StructField("transaction_type", StringType(), True)
])

# Read data with custom schema
transactions_df = spark.read.option("header", "true").schema(transactions_df_schema).csv("/mnt/project040datalake/project040raw/transactions.csv")
transactions_df.show(5)


+--------------+----------+----------------+------------------+----------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|
+--------------+----------+----------------+------------------+----------------+
|             1|        45|      2024-01-01|             100.5|         Deposit|
|             2|        12|      2024-01-02|            200.75|      Withdrawal|
|             3|        78|      2024-01-03|             150.0|         Deposit|
|             4|        34|      2024-01-04|            300.25|      Withdrawal|
|             5|        56|      2024-01-05|             250.0|         Deposit|
+--------------+----------+----------------+------------------+----------------+
only showing top 5 rows



In [0]:
#Checking the DataFrame Schema:
transactions_df.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- account_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_amount: float (nullable = true)
 |-- transaction_type: string (nullable = true)



In [0]:
#Checking the DataFrame Count:
print("Transactions count:", transactions_df.count())

Transactions count: 100


In [0]:
# Drop rows where any of the critical columns have missing values
transactions_df_cleaned = transactions_df.dropna(subset=["transaction_id", "transaction_amount"])

# Show the cleaned data
transactions_df_cleaned.show(5)


+--------------+----------+----------------+------------------+----------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|
+--------------+----------+----------------+------------------+----------------+
|             1|        45|      2024-01-01|             100.5|         Deposit|
|             2|        12|      2024-01-02|            200.75|      Withdrawal|
|             3|        78|      2024-01-03|             150.0|         Deposit|
|             4|        34|      2024-01-04|            300.25|      Withdrawal|
|             5|        56|      2024-01-05|             250.0|         Deposit|
+--------------+----------+----------------+------------------+----------------+
only showing top 5 rows



In [0]:
# Fill missing values in the 'description' column with a default string 'No description'
transactions_df_cleaned = transactions_df.fillna({"transaction_type": "No transaction type"})

# Show the cleaned data
transactions_df_cleaned.show(5)


+--------------+----------+----------------+------------------+----------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|
+--------------+----------+----------------+------------------+----------------+
|             1|        45|      2024-01-01|             100.5|         Deposit|
|             2|        12|      2024-01-02|            200.75|      Withdrawal|
|             3|        78|      2024-01-03|             150.0|         Deposit|
|             4|        34|      2024-01-04|            300.25|      Withdrawal|
|             5|        56|      2024-01-05|             250.0|         Deposit|
+--------------+----------+----------------+------------------+----------------+
only showing top 5 rows



In [0]:
# Drop duplicate rows based on 'transaction_id' (assuming this is the unique key)
transactions_df_cleaned = transactions_df_cleaned.dropDuplicates(["transaction_id"])

# Show the cleaned data
transactions_df_cleaned.show(5)


+--------------+----------+----------------+------------------+----------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|
+--------------+----------+----------------+------------------+----------------+
|            31|        71|      2024-01-31|             100.5|         Deposit|
|            85|        65|      2024-03-25|             250.0|         Deposit|
|            65|        69|      2024-03-05|             250.0|         Deposit|
|            53|        86|      2024-02-22|             150.0|         Deposit|
|            78|         4|      2024-03-18|            275.75|      Withdrawal|
+--------------+----------+----------------+------------------+----------------+
only showing top 5 rows



In [0]:
# Filter out transactions where the amount is negative or suspiciously high
transactions_df_cleaned1 = transactions_df_cleaned.filter((col("transaction_amount") <= 0) & (col("transaction_amount") >= 10000))

# Show the cleaned data
transactions_df_cleaned1.show(5)


+--------------+----------+----------------+------------------+----------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|
+--------------+----------+----------------+------------------+----------------+
+--------------+----------+----------------+------------------+----------------+



In [0]:
# Rename columns to standardize them
transactions_df_cleaned = transactions_df_cleaned.withColumnRenamed("transaction_id","TransactionID") \
                                                 .withColumnRenamed("account_id","AccountID",)\
                                                 .withColumnRenamed("transaction_amount","Amount",) \
                                                 .withColumnRenamed("transaction_date","TransactionDate") \
                                                 .withColumnRenamed("transaction_type","TransactionType")
                                                 

# Show the cleaned data
transactions_df_cleaned.show(5)


+-------------+---------+---------------+------+---------------+
|TransactionID|AccountID|TransactionDate|Amount|TransactionType|
+-------------+---------+---------------+------+---------------+
|           31|       71|     2024-01-31| 100.5|        Deposit|
|           85|       65|     2024-03-25| 250.0|        Deposit|
|           65|       69|     2024-03-05| 250.0|        Deposit|
|           53|       86|     2024-02-22| 150.0|        Deposit|
|           78|        4|     2024-03-18|275.75|     Withdrawal|
+-------------+---------+---------------+------+---------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import year, month, day

# Extract year and month from timestamp
transactions_df_cleaned = transactions_df_cleaned.withColumn("Year", year(col("TransactionDate"))) \
                                                 .withColumn("Month", month(col("TransactionDate")))\
                                                 .withColumn("Day", day(col("TransactionDate")))

# Show the cleaned data
transactions_df_cleaned.show(5)


+-------------+---------+---------------+------+---------------+----+-----+---+
|TransactionID|AccountID|TransactionDate|Amount|TransactionType|Year|Month|Day|
+-------------+---------+---------------+------+---------------+----+-----+---+
|           31|       71|     2024-01-31| 100.5|        Deposit|2024|    1| 31|
|           85|       65|     2024-03-25| 250.0|        Deposit|2024|    3| 25|
|           65|       69|     2024-03-05| 250.0|        Deposit|2024|    3|  5|
|           53|       86|     2024-02-22| 150.0|        Deposit|2024|    2| 22|
|           78|        4|     2024-03-18|275.75|     Withdrawal|2024|    3| 18|
+-------------+---------+---------------+------+---------------+----+-----+---+
only showing top 5 rows



In [0]:
# Check if there are any missing values in critical columns
transactions_df_cleaned.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in transactions_df_cleaned.columns]
).display()

TransactionID,AccountID,TransactionDate,Amount,TransactionType,Year,Month,Day
0,0,0,0,0,0,0,0


In [0]:
# Check for duplicates in transaction_id
duplicate_count = transactions_df_cleaned.groupBy("TransactionID").count().filter("count > 1").count()
print(f"Number of duplicate transactions: {duplicate_count}")


Number of duplicate transactions: 0


In [0]:
#Partition by Year,month,day and save in processed container in JSON format.
transactions_df_cleaned.write.mode("overwrite").partitionBy("year","month","day").format("parquet").save("/mnt/project040datalake/project040processed/transactions/accounts_json_out")

In [0]:
#Partition by Year,month,day and save in processed container in JSON format.
transactions_df_cleaned.write.mode("append").partitionBy("year","month","day").format("parquet").save("/mnt/project040datalake/project040processed/transactions/accounts_json_out")

In [0]:
#add columns for ingestion time
from pyspark.sql.functions import current_timestamp

df = transactions_df_cleaned.withColumn("ingestion_timestamp",current_timestamp())
df.show(5)

+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
|TransactionID|AccountID|TransactionDate|Amount|TransactionType|Year|Month|Day| ingestion_timestamp|
+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
|           31|       71|     2024-01-31| 100.5|        Deposit|2024|    1| 31|2024-11-14 22:12:...|
|           85|       65|     2024-03-25| 250.0|        Deposit|2024|    3| 25|2024-11-14 22:12:...|
|           65|       69|     2024-03-05| 250.0|        Deposit|2024|    3|  5|2024-11-14 22:12:...|
|           53|       86|     2024-02-22| 150.0|        Deposit|2024|    2| 22|2024-11-14 22:12:...|
|           78|        4|     2024-03-18|275.75|     Withdrawal|2024|    3| 18|2024-11-14 22:12:...|
+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
only showing top 5 rows



In [0]:
#DAY 01 FULL LOAD
df.write.format("delta").mode("overwrite").save("/mnt/project040datalake/project040processed/transactions/delta/sales")

In [0]:
spark.read.format("delta").load("/mnt/project040datalake/project040processed/transactions/delta/sales").show()


+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
|TransactionID|AccountID|TransactionDate|Amount|TransactionType|Year|Month|Day| ingestion_timestamp|
+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
|           31|       71|     2024-01-31| 100.5|        Deposit|2024|    1| 31|2024-11-14 22:28:...|
|           85|       65|     2024-03-25| 250.0|        Deposit|2024|    3| 25|2024-11-14 22:28:...|
|           65|       69|     2024-03-05| 250.0|        Deposit|2024|    3|  5|2024-11-14 22:28:...|
|           53|       86|     2024-02-22| 150.0|        Deposit|2024|    2| 22|2024-11-14 22:28:...|
|           78|        4|     2024-03-18|275.75|     Withdrawal|2024|    3| 18|2024-11-14 22:28:...|
|           34|       41|     2024-02-03|300.25|     Withdrawal|2024|    2|  3|2024-11-14 22:28:...|
|           81|       70|     2024-03-21| 100.5|        Deposit|2024|    3| 21|2024-11-14 2

In [0]:
#DAY 02 - Update existing and Insert New Data based on the TransactionID.
from delta.tables import DeltaTable

existing_data = DeltaTable.forPath(spark,"/mnt/project040datalake/project040processed/transactions/delta/sales")

existing_data.alias("existing").merge(df.alias("new"),"existing.TransactionID=new.TransactionID").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute

<bound method DeltaMergeBuilder.execute of <delta.tables.DeltaMergeBuilder object at 0x7fa994fa6510>>

In [0]:
spark.read.format("delta").load("/mnt/project040datalake/project040processed/transactions/delta/sales").show()

+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
|TransactionID|AccountID|TransactionDate|Amount|TransactionType|Year|Month|Day| ingestion_timestamp|
+-------------+---------+---------------+------+---------------+----+-----+---+--------------------+
|           31|       71|     2024-01-31| 100.5|        Deposit|2024|    1| 31|2024-11-14 22:28:...|
|           85|       65|     2024-03-25| 250.0|        Deposit|2024|    3| 25|2024-11-14 22:28:...|
|           65|       69|     2024-03-05| 250.0|        Deposit|2024|    3|  5|2024-11-14 22:28:...|
|           53|       86|     2024-02-22| 150.0|        Deposit|2024|    2| 22|2024-11-14 22:28:...|
|           78|        4|     2024-03-18|275.75|     Withdrawal|2024|    3| 18|2024-11-14 22:28:...|
|           34|       41|     2024-02-03|300.25|     Withdrawal|2024|    2|  3|2024-11-14 22:28:...|
|           81|       70|     2024-03-21| 100.5|        Deposit|2024|    3| 21|2024-11-14 2

In [0]:
#read files in container
accounts_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/mnt/project040datalake/project040raw/accounts.csv")
customers_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/mnt/project040datalake/project040raw/customers.csv")
loan_payments_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/mnt/project040datalake/project040raw/loan_payments.csv")
loans_df = spark.read.option("header", "true").option("inferSchema", "true").csv("/mnt/project040datalake/project040raw/loans.csv")

In [0]:
accounts_df.write.mode("overwrite").parquet("/mnt/project040datalake/project040processed/silver/accounts_parquet_out")
customers_df.write.mode("overwrite").parquet("/mnt/project040datalake/project040processed/silver/customers_parquet_out")
loan_payments_df.write.mode("overwrite").parquet("/mnt/project040datalake/project040processed/silver/loan_payments_parquet_out")
loans_df.write.mode("overwrite").parquet("/mnt/project040datalake/project040processed/silver/loans_parquet_out")