In [0]:
%python
# Replace these with your own Storage Account details
storage_account_name = "ENTER YOURE STORAGE ACCOUNT NAME"    # e.g., financestreamdatastorage
storage_account_key = "ENTER YOUR AZURE STORAGE ACCOUNT KEY"   # Copy from Azure portal
# Set the source URL (wasbs or abfss)
source_url = f"wasbs://raw-transactions@{storage_account_name}.blob.core.windows.net"

# Unmount if already mounted
if any(mount.mountPoint == "/mnt/raw-transactions" for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount("/mnt/raw-transactions")

# Mount raw-transactions container
dbutils.fs.mount(
  source = source_url,
  mount_point = "/mnt/raw-transactions",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

# Similarly, mount processed-transactions container
source_url2 = f"wasbs://processed-transactions@{storage_account_name}.blob.core.windows.net"

# Unmount if already mounted
if any(mount.mountPoint == "/mnt/processed-transactions" for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount("/mnt/processed-transactions")

dbutils.fs.mount(
  source = source_url2,
  mount_point = "/mnt/processed-transactions",
  extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key}
)

/mnt/raw-transactions has been unmounted.
/mnt/processed-transactions has been unmounted.
Out[18]: True

In [0]:
# ----------------------------------------
# 1. Import Libraries
# ----------------------------------------
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
import pyspark.sql.functions as F

# ----------------------------------------
# 2. Define Schema for Transactions
# ----------------------------------------
txnSchema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("account_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("amount", DoubleType(), True),
    StructField("location", StringType(), True),
    StructField("channel", StringType(), True)
])




In [0]:
# ----------------------------------------
# 3. Read Streaming Files (Auto Loader)
# ----------------------------------------
stream_df = (spark.readStream
             .format("cloudFiles")
             .option("cloudFiles.format", "csv")
             .option("header", "true")
             .schema(txnSchema)
             .load("/mnt/raw-transactions/"))




In [0]:
# ----------------------------------------
# 4. Add Fraud Detection Logic
# ----------------------------------------
processed_df = (stream_df
    .withColumn("is_suspicious", F.col("amount") > 10000)
    .withColumn("ingest_timestamp", F.current_timestamp())
)




In [0]:
# ----------------------------------------
# 5. Write Stream to Delta Lake
# ----------------------------------------
(processed_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/processed-transactions/checkpoints/")
    .start("/mnt/processed-transactions/transactions_delta/"))




In [0]:
# ----------------------------------------
# 6. (Optional) Aggregated Metrics for Monitoring
# ----------------------------------------
metrics_df = (processed_df
    .groupBy(F.window("timestamp", "1 minute"))
    .agg(
        F.count("*").alias("txn_count"),
        F.sum("amount").alias("total_amount"),
        F.sum(F.when(F.col("is_suspicious"), 1).otherwise(0)).alias("suspicious_count")
    ))

(metrics_df.writeStream
    .format("memory")
    .queryName("TxnMetrics")
    .outputMode("complete")
    .start())



In [0]:
%sql
SELECT * FROM TxnMetrics
ORDER BY window

window,txn_count,suspicious_count,total_amount
"List(2025-04-26T17:00:00.000+0000, 2025-04-26T17:01:00.000+0000)",1,0,5000.0
"List(2025-04-26T17:01:00.000+0000, 2025-04-26T17:02:00.000+0000)",1,1,12000.0
"List(2025-04-26T17:02:00.000+0000, 2025-04-26T17:03:00.000+0000)",1,0,750.0
"List(2025-04-26T17:03:00.000+0000, 2025-04-26T17:04:00.000+0000)",1,1,15000.0
"List(2025-04-26T17:04:00.000+0000, 2025-04-26T17:05:00.000+0000)",1,0,300.0
