In [0]:
dbutils.fs.unmount("/mnt/dataset")

/mnt/dataset has been unmounted.


True

In [0]:
dbutils.fs.unmount("/mnt/streamdata")

In [0]:
container_name = "dataset"
storage_account_name = "mystorage142"
sas_token = "sp=racwdle&st=2024-12-14T10:48:42Z&se=2024-12-24T18:48:42Z&spr=https&sv=2022-11-02&sr=c&sig=JBS%2Fp%2BentxPBzO%2BsU4TfR05SxnrfYEPSBsoCmG2B7Qg%3D"

dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point="/mnt/dataset",
    extra_configs={
        f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net": sas_token
    }
)

True

In [0]:
container_name = "streamdata"
storage_account_name = "mystorage142"
sas_token = "sp=racwdlme&st=2024-12-14T13:35:47Z&se=2024-12-24T21:35:47Z&spr=https&sv=2022-11-02&sr=c&sig=BUp6kptDmKlJal9XHcKzTftN%2BBNM%2F0ffShNdawEX%2F5A%3D"

dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point="/mnt/streamdata",
    extra_configs={
        f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net": sas_token
    }
)

True

In [0]:
display(spark.read.csv("/mnt/dataset", header='true').show())

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|        1|  15634602| Hargrave|        619|   France|Female| 42|     2|        0|            1|             1|      101348.88|     1|
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|             1|      112542.58|     0|
|        3|  15619304|     Onio|        502|   France|Female| 42|     8| 159660.8|            3|             0|      113931.57|     1|
|        4|  15701354|     Boni|        699|   France|Female| 39|     1|        0|            2|             0|       93826.63|     0|
|        5|  15737888| Mitchell|        850|    Spain|F

In [0]:
spark.read.csv("/mnt/dataset", header='true').dtypes

[('RowNumber', 'string'),
 ('CustomerId', 'string'),
 ('Surname', 'string'),
 ('CreditScore', 'string'),
 ('Geography', 'string'),
 ('Gender', 'string'),
 ('Age', 'string'),
 ('Tenure', 'string'),
 ('Balance', 'string'),
 ('NumOfProducts', 'string'),
 ('IsActiveMember', 'string'),
 ('EstimatedSalary', 'string'),
 ('Exited', 'string')]

In [0]:
schema="""RowNumber string,
CustomerId string,
Surname string,
CreditScore string,
Geography string,
Gender string,
Age string,
Tenure string,
Balance string,
NumOfProducts string,
IsActiveMember string,
EstimatedSalary string,
Exited string
"""

In [0]:
raw_df = spark.readStream \
        .format("csv") \
        .schema(schema)\
        .option("path", "/mnt/dataset") \
        .load()

In [0]:
transformed_df = raw_df.selectExpr(
    "RowNumber",
    "CustomerId",
    "Surname",
    "CreditScore",
    "Geography",
    "Gender",
    "Age",
    "Tenure",
    "Balance",
    "NumOfProducts",
    "IsActiveMember",
    "EstimatedSalary",
    "Exited"
)


In [0]:
from pyspark.sql.functions import expr

In [0]:
from pyspark.sql.functions import col

flattened_df = transformed_df \
    .withColumnRenamed("CreditScore", "Credit_Score") \
    .withColumnRenamed("IsActiveMember", "Active_Member") \
    .withColumn("Age_Group", expr("CASE WHEN Age >= 18 AND Age < 30 THEN '18-29' " +
                                  "WHEN Age >= 30 AND Age < 50 THEN '30-49' " +
                                  "ELSE '50+' END"))


In [0]:
import threading

# Start the streaming query
invoiceWriterQuery = flattened_df.writeStream \
    .format("json") \
    .queryName("Flattened Customer Writer") \
    .outputMode("append") \
    .option("path", "/mnt/streamdata/output") \
    .option("checkpointLocation", "/mnt/streamdata/chk-point-dir") \
    .trigger(processingTime="1 minute") \
    .start()

def stop_query():
    invoiceWriterQuery.stop()
    print("Streaming query stopped after 5 minutes.")

timer = threading.Timer(300, stop_query)
timer.start()
