In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

schema = StructType([
    StructField("transactionId", StringType()),
    StructField("cardNumber", StringType()),
    StructField("amount", DoubleType()),
    StructField("location", StringType()),
    StructField("timestamp", StringType()),
    StructField("userId", StringType())
])

event_hub_config = {
    "kafka.bootstrap.servers": "balusudha-kafka-namespace.servicebus.windows.net:9093",
    "subscribe": "transactions",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": """
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="$ConnectionString"
        password="Endpoint=sb://balusudha-kafka-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ESenVx8WkrhMzJeYVzPYwSUFyj0XAPV+L+AEhKow1vk=";
    """
}

df_kafka = spark.readStream.format("kafka").options(**event_hub_config).load()

df_json = df_kafka.selectExpr("CAST(value AS STRING)") \
    .withColumn("data", from_json(col("value"), schema)) \
    .select("data.*")

sas_token_write = "sp=racwdl&st=2025-08-01T10:01:33Z&se=2025-08-01T18:16:33Z&spr=https&sv=2024-11-04&sr=c&sig=UnDZ1dX99v0qobX4Amu6CdT7e2i9KPN3cE%2F8XFt45mQ%3D"

spark.conf.set(f"fs.azure.sas.deltatables.balusudhastorageaccount.blob.core.windows.net",sas_token_write)

output_path=f"wasbs://deltatables@balusudhastorageaccount.blob.core.windows.net/output_files/"

df_json.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"wasbs://deltatables@balusudhastorageaccount.blob.core.windows.net/checkpoints") \
    .start(output_path)

print("data written to delta")

data written to delta


In [0]:
sas_token = "sp=racwdl&st=2025-08-01T10:01:33Z&se=2025-08-01T18:16:33Z&spr=https&sv=2024-11-04&sr=c&sig=UnDZ1dX99v0qobX4Amu6CdT7e2i9KPN3cE%2F8XFt45mQ%3D"
storage_account = "balusudhastorageaccount"
container = "deltatables"
delta_path = "output_files"

spark.conf.set(
    f"fs.azure.sas.{container}.{storage_account}.blob.core.windows.net",
    sas_token
)

df = spark.read.format("delta").load(f"wasbs://{container}@{storage_account}.blob.core.windows.net/{delta_path}")

print(df.count())


148


In [0]:
# %pip install great_expectations==0.15.39
%restart_python

In [0]:
from great_expectations.dataset import SparkDFDataset


storage_account = "balusudhastorageaccount"
container = "deltatables"
delta_path = "output_files"
df = spark.read.format("delta").load(f"wasbs://{container}@{storage_account}.blob.core.windows.net/{delta_path}")
ge_df = SparkDFDataset(df)

ge_df.expect_column_values_to_be_between("amount", 1, 500000)
ge_df.expect_column_values_to_not_be_null("timestamp")
ge_df.expect_column_value_lengths_to_be_between("cardNumber", 13, 19)

validation_result = ge_df.validate()
print(validation_result)

ge_df.save_expectation_suite("fraud_detection_expectations")

{
  "success": true,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "expectation_type": "expect_column_values_to_be_between",
        "kwargs": {
          "column": "amount",
          "min_value": 1,
          "max_value": 500000,
          "result_format": "BASIC"
        },
        "meta": {}
      },
      "result": {
        "element_count": 148,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_list": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "column": "timestamp",
      

In [0]:
from pyspark.sql.functions import length

silver_df = df.filter(
    (df.amount >= 1) &
    (df.amount <= 500000) &
    (df.timestamp.isNotNull()) &
    (length(df.cardNumber) >= 13) &
    (length(df.cardNumber) <= 19)
)


silver_path = "silver_output_files"

silver_df.write.format("delta").mode("overwrite").save(
    f"wasbs://{container}@{storage_account}.blob.core.windows.net/{silver_path}"
)


silver_df.write.format("delta").mode("overwrite").saveAsTable("silver_sales_data")

spark.sql("SELECT * FROM silver_sales_data").show()


In [0]:
from pyspark.sql.functions import *

gold_df = (
    silver_df
    .groupBy("location")
    .agg(
        sum("amount").alias("total_sales"),
        countDistinct("cardNumber").alias("unique_customers")
    )
)

gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_sales_summary")

spark.sql("select * from gold_sales_summary").show()


+---------+------------------+----------------+
| location|       total_sales|unique_customers|
+---------+------------------+----------------+
|Bangalore|        2369930.89|              43|
|   London|        1053685.52|              26|
|   Mumbai|        1309563.25|              25|
|    Delhi|1417256.7900000003|              30|
|      NYC|        1072501.31|              23|
+---------+------------------+----------------+

