In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [0]:
# Configs for mounting
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "ab75fa94-e653-43f4-8e3b-59524ed11b4d",
  "fs.azure.account.oauth2.client.secret": "Wtf8Q~ZlI4LmbCXthSunl72i4LZM8tuX.PFU1bSQ",
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/1aff2d1a-b404-4b47-a8f3-eca6b373b83f/oauth2/token"
}

# Mount command
dbutils.fs.mount(
  source = "abfss://transformed-data@fdadls.dfs.core.windows.net",  # <container>@<storageaccount>
  mount_point = "/mnt/transformed-data",
  extra_configs = configs
)




Out[41]: True

In [0]:
spark

In [0]:
display(dbutils.fs.ls("/mnt/transformed-data"))

path,name,size,modificationTime
dbfs:/mnt/transformed-data/Customer_Profile/,Customer_Profile/,0,1747907848000
dbfs:/mnt/transformed-data/Fraudulent Patterns/,Fraudulent Patterns/,0,1747907870000
dbfs:/mnt/transformed-data/Merchant Information/,Merchant Information/,0,1747907876000
dbfs:/mnt/transformed-data/Transaction Amounts/,Transaction Amounts/,0,1747907883000
dbfs:/mnt/transformed-data/Transaction Data/,Transaction Data/,0,1747907889000


### Load CSV Files

In [0]:
# Step 1: Load Transformed Data

base_path = "/mnt/transformed-data"

customer_df = spark.read.format('delta').load(f"{base_path}/Customer_Profile/customer_data")
account_df = spark.read.format('delta').load(f"{base_path}/Customer_Profile/account_activity")

fraud_indicators_df = spark.read.format('delta').load(f"{base_path}/Fraudulent Patterns/fraud_indicators")
suspicious_activity_df = spark.read.format('delta').load(f"{base_path}/Fraudulent Patterns/suspicious_activity")

merchant_df = spark.read.format('delta').load(f"{base_path}/Merchant Information/merchant_data")
category_labels_df = spark.read.format('delta').load(f"{base_path}/Merchant Information/transaction_category_labels")

amount_df = spark.read.format('delta').load(f"{base_path}/Transaction Amounts/amount_data")
anomaly_scores_df = spark.read.format('delta').load(f"{base_path}/Transaction Amounts/anomaly_scores")

transaction_metadata_df = spark.read.format('delta').load(f"{base_path}/Transaction Data/transaction_metadata")
transaction_records_df = spark.read.format('delta').load(f"{base_path}/Transaction Data/transaction_records")

In [0]:
# Join transaction tables
transactions_full = transaction_records_df \
    .join(transaction_metadata_df, "TransactionID", "left") \
    .join(amount_df, "TransactionID", "left") \
    .join(anomaly_scores_df, "TransactionID", "left") \
    .join(category_labels_df, "TransactionID", "left") \
    .join(fraud_indicators_df, "TransactionID", "left")

In [0]:
transactions_full.show(5)

+-------------+------------------+----------+-------------------+----------+------------------+--------------------+--------+--------------+
|TransactionID|            Amount|CustomerID|          Timestamp|MerchantID| TransactionAmount|        AnomalyScore|Category|FraudIndicator|
+-------------+------------------+----------+-------------------+----------+------------------+--------------------+--------+--------------+
|            1|55.530334429869185|      1952|2022-01-01 00:00:00|      2701| 79.41360746377397|  0.6866994638180963|   Other|             0|
|            2|12.881180192784143|      1027|2022-01-01 01:00:00|      2070|12.053087153568082| 0.08174887080114657|  Online|             0|
|            3|50.176321517065674|      1955|2022-01-01 02:00:00|      2238| 33.31035719105734|0.023856830105308702|  Travel|             0|
|            4| 41.63400105303006|      1796|2022-01-01 03:00:00|      2879| 46.12111728963105|  0.8769943477359176|  Travel|             0|
|            

In [0]:
transactions_full.printSchema()

root
 |-- TransactionID: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- MerchantID: integer (nullable = true)
 |-- TransactionAmount: double (nullable = true)
 |-- AnomalyScore: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- FraudIndicator: integer (nullable = true)



In [0]:
# Join customer-related tables
tx_customers_full = transactions_full \
    .join(customer_df, "CustomerID", "left") \
    .join(account_df, "CustomerID", "left") \
    .join(suspicious_activity_df, "CustomerID", "left")

In [0]:
tx_customers_full.show(5)

+----------+-------------+------------------+-------------------+----------+------------------+--------------------+--------+--------------+-------------+---+------------+-----------------+----------+--------------+
|CustomerID|TransactionID|            Amount|          Timestamp|MerchantID| TransactionAmount|        AnomalyScore|Category|FraudIndicator|         Name|Age|     Address|   AccountBalance| LastLogin|SuspiciousFlag|
+----------+-------------+------------------+-------------------+----------+------------------+--------------------+--------+--------------+-------------+---+------------+-----------------+----------+--------------+
|      1952|            1|55.530334429869185|2022-01-01 00:00:00|      2701| 79.41360746377397|  0.6866994638180963|   Other|             0|Customer 1952| 50|Address 1952|2869.689912187824|2024-08-09|             0|
|      1027|            2|12.881180192784143|2022-01-01 01:00:00|      2070|12.053087153568082| 0.08174887080114657|  Online|           

In [0]:
tx_customers_full.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- TransactionID: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- MerchantID: integer (nullable = true)
 |-- TransactionAmount: double (nullable = true)
 |-- AnomalyScore: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- FraudIndicator: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- AccountBalance: double (nullable = true)
 |-- LastLogin: date (nullable = true)
 |-- SuspiciousFlag: integer (nullable = true)



In [0]:
# Join merchant info
full_df = tx_customers_full.join(merchant_df, "MerchantID", "left")

In [0]:
full_df.show(5)

+----------+----------+-------------+------------------+-------------------+------------------+--------------------+--------+--------------+-------------+---+------------+-----------------+----------+--------------+-------------+-------------+
|MerchantID|CustomerID|TransactionID|            Amount|          Timestamp| TransactionAmount|        AnomalyScore|Category|FraudIndicator|         Name|Age|     Address|   AccountBalance| LastLogin|SuspiciousFlag| MerchantName|     Location|
+----------+----------+-------------+------------------+-------------------+------------------+--------------------+--------+--------------+-------------+---+------------+-----------------+----------+--------------+-------------+-------------+
|      2701|      1952|            1|55.530334429869185|2022-01-01 00:00:00| 79.41360746377397|  0.6866994638180963|   Other|             0|Customer 1952| 50|Address 1952|2869.689912187824|2024-08-09|             0|Merchant 2701|Location 2701|
|      2070|      1027| 

In [0]:
full_df.printSchema()

root
 |-- MerchantID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- TransactionID: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- TransactionAmount: double (nullable = true)
 |-- AnomalyScore: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- FraudIndicator: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- AccountBalance: double (nullable = true)
 |-- LastLogin: date (nullable = true)
 |-- SuspiciousFlag: integer (nullable = true)
 |-- MerchantName: string (nullable = true)
 |-- Location: string (nullable = true)



In [0]:
# Feature Engineering
full_df = full_df.withColumn("HourOfTransaction", hour("Timestamp")) \
    .withColumn("DayOfWeek", dayofweek("Timestamp")) \
    .withColumn("IsHighAmount", when(col("TransactionAmount") > 10000, 1).otherwise(0)) \
    .withColumn("RecentLoginGapDays", datediff(current_date(), col("LastLogin")))

In [0]:
full_df.show(5)

+----------+----------+-------------+------------------+-------------------+------------------+--------------------+--------+--------------+-------------+---+------------+-----------------+----------+--------------+-------------+-------------+-----------------+---------+------------+------------------+
|MerchantID|CustomerID|TransactionID|            Amount|          Timestamp| TransactionAmount|        AnomalyScore|Category|FraudIndicator|         Name|Age|     Address|   AccountBalance| LastLogin|SuspiciousFlag| MerchantName|     Location|HourOfTransaction|DayOfWeek|IsHighAmount|RecentLoginGapDays|
+----------+----------+-------------+------------------+-------------------+------------------+--------------------+--------+--------------+-------------+---+------------+-----------------+----------+--------------+-------------+-------------+-----------------+---------+------------+------------------+
|      2701|      1952|            1|55.530334429869185|2022-01-01 00:00:00| 79.41360746

In [0]:
full_df.printSchema()

root
 |-- MerchantID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- TransactionID: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- TransactionAmount: double (nullable = true)
 |-- AnomalyScore: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- FraudIndicator: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- AccountBalance: double (nullable = true)
 |-- LastLogin: date (nullable = true)
 |-- SuspiciousFlag: integer (nullable = true)
 |-- MerchantName: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- HourOfTransaction: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- IsHighAmount: integer (nullable = false)
 |-- RecentLoginGapDays: integer (nullable = true)



In [0]:
# Save as feature dataset
full_df.write.mode("overwrite").parquet("/mnt/final-dataset")


In [0]:
# Dimension: Customer
dim_customer = full_df.select("CustomerID", "Name", "Age", "Address", "AccountBalance", "LastLogin", "RecentLoginGapDays").dropDuplicates()


In [0]:
# Dimension: Merchant
dim_merchant = full_df.select("MerchantID", "MerchantName", "Location").dropDuplicates()

In [0]:
# Dimension: Time (derived from Timestamp)
dim_time = full_df.select("Timestamp").dropna().withColumn("TransactionDate", to_date("Timestamp")).withColumn("Hour", hour("Timestamp")).dropDuplicates()

In [0]:
# Fact: Transactions
fact_transactions = full_df.select("TransactionID", "CustomerID", "MerchantID","TransactionAmount", "HourOfTransaction", "DayOfWeek",
    "IsHighAmount", "AnomalyScore", "FraudIndicator", "Timestamp")

In [0]:
# Save each table to ADLS in Parquet format
dim_customer.write.mode("overwrite").parquet("/mnt/star-schema/dim_customer")

dim_merchant.write.mode("overwrite").parquet("/mnt/star-schema/dim_merchant")

dim_time.write.mode("overwrite").parquet("/mnt/star-schema/dim_time")

fact_transactions.write.mode("overwrite").parquet("/mnt/star-schema/fact_transactions")