In [0]:
print("Loading enviroment setup function...")
# Import libraries
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.window import Window
import time

# Whole pipeline as CDF
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", True)

#Setting path objects
main_path = "dbfs:/mnt/borges_portifolio"
files_path = f"{main_path}/auto_loader_files/source_folder"
source = f"{main_path}/auto_loader_files/stream_files"

# Estabilishing the folder where Auto Loader will maintain identifying information of the stream
checkpoint_path = f'{main_path}/_checkpoints'

# Defining data type of each column for each JSON file or nested data
bronze_schema = "key BINARY, value BINARY, topic STRING, partition INT, offset INT, timestamp LONG"

heartrate_schema = "device_id INTEGER, time TIMESTAMP, heartrate DOUBLE"

workout_json_schema = "user_id INTEGER, workout_id INTEGER, timestamp DOUBLE, action STRING, session_id INTEGER"

def env_setup():
    # Creating Bronze schema
    dbutils.fs.rm("dbfs:/user/hive/warehouse/bronze.db", True) # Clearing previous data 
    spark.sql("DROP SCHEMA IF EXISTS bronze CASCADE;") # Dropping schema
    spark.sql("CREATE SCHEMA bronze;") # Creating schema
    spark.catalog.setCurrentDatabase('bronze') # Setting the bronze schema as default for this notebook

    # Creating Silver schema
    dbutils.fs.rm("dbfs:/user/hive/warehouse/silver.db", True)
    spark.sql("DROP SCHEMA IF EXISTS silver CASCADE;")
    spark.sql("CREATE SCHEMA silver;")

    # Creating Gold schema
    dbutils.fs.rm("dbfs:/user/hive/warehouse/gold.db", True)
    spark.sql("DROP SCHEMA IF EXISTS gold CASCADE;")
    spark.sql("CREATE SCHEMA gold;")

    # Cleaning any structured streaming or Auto Loader checkpoints
    dbutils.fs.rm("dbfs:/mnt/borges_portifolio/_checkpoints", True) # Clearing checkpoints

    dbutils.fs.mkdirs("dbfs:/mnt/borges_portifolio/auto_loader_files/source_folder")
    
    # Setting parallelism
    spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

    # Clone gym_logs table to Silver schema 
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.gym_logs
    SHALLOW CLONE delta.`{main_path}/gym_logs`
    LOCATION 'dbfs:/user/hive/warehouse/silver.db/gym_logs'
    """)

    # Clone user_lookup table to Silver schema 
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.user_lookup
    SHALLOW CLONE delta.`{main_path}/user-lookup`
    LOCATION 'dbfs:/user/hive/warehouse/silver.db/user_lookup'
    """)

    # Enable CDF on user_lookup
    spark.sql("ALTER TABLE silver.user_lookup SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")



time.sleep(0.7)

In [0]:
print("Loading Bronze related functions...")

# Creating the function that will ingest the files into the Bronze table
def process_bronze():
    
    # Time traveling to obtain the date_lookup in a DataFrame
    date_lookup_df = spark.read.option("versionAsOf", 0).load(f"{main_path}/date_lookup").select("date", "week_part")

    (spark.readStream
                  .format("cloudFiles")
                  .schema(bronze_schema)
                  .option("cloudFiles.format", "json")
                  .option("cloudFiles.schemaLocation", f"{checkpoint_path}/bronze_schema") # Location to store schema information
                  .load(files_path) # Source Folder
                  .join(f.broadcast(date_lookup_df), f.to_date((f.col("timestamp")/1000).cast("timestamp")) == f.col("date"), "left")
                  .writeStream
                        .option("checkpointLocation", f"{checkpoint_path}/bronze") # Location to store checkpoint information
                        .partitionBy("topic", "week_part") # Partitioning by topic and week_part to increase performance
                        .trigger(processingTime='5 seconds')
                        .queryName("bronze")
                        .table("bronze"))

# Creating a function for reprocessing all the files into the Bronze table
def reprocess_bronze():
    spark.sql("DROP TABLE IF EXISTS bronze.bronze")

    dbutils.fs.rm(f"{checkpoint_path}/bronze", True)
    
    process_bronze()

time.sleep(0.7)

In [0]:
print("Loading Heartrate related functions...")

def heartrate_merge_data(df, batch_id): 
    df.createOrReplaceTempView("stream_updates")    
    sql_query = """
    MERGE INTO silver.heart_rate a
    USING stream_updates b
    ON a.device_id=b.device_id AND a.time=b.time
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED THEN
    INSERT * 
    """    
    df.sparkSession.sql(sql_query)

def process_heart_rate():
    # Create heart_rate table
    spark.sql("CREATE TABLE IF NOT EXISTS silver.heart_rate (device_id INTEGER, time TIMESTAMP, heartrate DOUBLE) USING DELTA")

    # Define constraints based on previous analysis
    spark.sql("ALTER TABLE silver.heart_rate ADD CONSTRAINT validbpm CHECK (heartrate > 0)")
    spark.sql("ALTER TABLE silver.heart_rate ADD CONSTRAINT check_date CHECK (time > '2019-11-30')")

    # Structured Streaming query
    (spark.readStream
            .table("bronze")
            .filter("topic = 'bpm'")
            .select(f.from_json(f.col("value").cast("string"), heartrate_schema).alias("bpm"))
            .filter("bpm.heartrate > 0")
            .select("bpm.*")
            .withWatermark("time", "30 seconds")
            .dropDuplicates()
            .writeStream
                .foreachBatch(heartrate_merge_data) # Apply merge logic to each batch
                .option("checkpointLocation", f"{checkpoint_path}/heart_rate") # Set checkpoint location
                .outputMode("update") # Allow changing existing data in the table 
                .trigger(processingTime='5 seconds') # Process all available data, multiple batches if needed
                .queryName("silver_heart_rate") # Name the query for easy reference
                .start() # Start the query
                ) # Wait for query to finish

def reprocess_heart_rate():

    spark.sql("DROP TABLE IF EXISTS silver.heart_rate")

    dbutils.fs.rm(f"{checkpoint_path}/heart_rate", True)
    
    process_heart_rate()

time.sleep(0.7)

In [0]:
print("Loading Workout related functions...")

def workout_merge_data(df, batch_id):
    df.createOrReplaceTempView("stream_updates")

    sql_query = """
    MERGE INTO silver.workouts a
    USING stream_updates b
    ON a.user_id = b.user_id AND a.time = b.time
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED THEN
    INSERT *
    """

    df.sparkSession.sql(sql_query)

def process_workouts():
    # Create the workouts table if it doesn't exist
    spark.sql(f"""
                CREATE TABLE IF NOT EXISTS 
                    silver.workouts ( 
                        user_id     INTEGER,   
                        workout_id  INTEGER,   
                        time        TIMESTAMP, 
                        action      STRING,    
                        session_id  INTEGER    
                                    ) 
                USING DELTA
                """)

    # Add constraint to ensure timestamp is after a specific date
    spark.sql(f"ALTER TABLE silver.workouts ADD CONSTRAINT check_date CHECK (time > '2019-11-30')")

    # Read stream from the bronze table, apply transformations, and write to the workouts table
    (spark.readStream
            .table("bronze.bronze")
            .filter("topic = 'workout'")
            .select(f.from_json(f.col("value").cast("string"), workout_json_schema).alias("wkrt"))
            .select("wkrt.*")
            .select("user_id", "workout_id", f.col("timestamp").cast("timestamp").alias("time"), "action", "session_id")
            .withWatermark("time", "30 seconds")
            .dropDuplicates(["user_id", "time"])
                .writeStream
                .foreachBatch(workout_merge_data)
                .outputMode("update")
                .option("checkpointLocation", f"{checkpoint_path}/workouts")
                .queryName("silver_workouts")
                .trigger(processingTime='5 seconds')
                .start()
                )

def reprocess_workouts():
    # Drop the workouts table if it exists
    spark.sql(f"drop table if exists silver.workouts")

    # Remove checkpoint location for workouts
    dbutils.fs.rm(f"{checkpoint_path}/workouts", True)

    # Reprocess workouts
    process_workouts()

time.sleep(0.7)

In [0]:
print("Loading Completed Workouts related functions...")

def completed_wrkt_merge_data(df, batch_id):
    df.createOrReplaceTempView("stream_updates")

    sql_query = """
    MERGE INTO silver.completed_workouts a
    USING stream_updates b
    ON a.user_id=b.user_id AND a.session_id=b.session_id
    WHEN MATCHED THEN
    UPDATE SET *
    WHEN NOT MATCHED THEN
    INSERT * 
    """    
    df.sparkSession.sql(sql_query)


def process_completed_workouts():

    spark.sql("""
      CREATE TABLE IF NOT EXISTS  
      silver.completed_workouts   
        (user_id      INTEGER,    
        session_id   INTEGER,    
        start_time   TIMESTAMP,  
        end_time     TIMESTAMP)  
      USING DELTA \
    """)

    (spark.readStream
          .table("silver.workouts")
          .groupBy("user_id", "session_id")
          .agg(
              f.max(f.when(f.col("action") == "start", f.col("time"))).alias("start_time"),
              f.max(f.when(f.col("action") == "stop", f.col("time"))).alias("end_time")
              )
          .filter(f.col("start_time").isNotNull() & f.col("end_time").isNotNull())
          .select("user_id", "session_id", "start_time", "end_time")
          .withWatermark("start_time", "30 seconds")
          .writeStream
              .foreachBatch(completed_wrkt_merge_data)
              .option("checkpointLocation", f"{checkpoint_path}/completed_workouts")
              .outputMode("update")
              .trigger(processingTime='5 seconds')
              .queryName("silver_completed_workouts")
              .start()
              
    )

def reprocess_completed_workouts():
    dbutils.fs.rm(f"{checkpoint_path}/completed_workouts", True)

    spark.sql("drop table if exists silver.completed_workouts")

    process_completed_workouts()    

time.sleep(0.7)


In [0]:
print("Loading Gym Report related functions...")

def process_gym_reports():
  spark.sql("""
            DROP VIEW if EXISTS gold.gym_report;
            """)

  spark.sql("""
                CREATE OR REPLACE VIEW gold.gym_report AS (
                  SELECT gym, mac_address, date, workouts, (last_timestamp - first_timestamp)/60 minutes_in_gym, (to_unix_timestamp(end_workout) - to_unix_timestamp(start_workout))/60 minutes_exercising
                  FROM silver.gym_logs c
                  INNER JOIN (
                    SELECT b.mac_address, to_date(start_time) date, collect_set(session_id) workouts, min(start_time) start_workout, max(end_time) end_workout
                    FROM silver.completed_workouts a
                    INNER JOIN silver.user_lookup b
                    ON a.user_id = b.user_id
                    GROUP BY mac_address, to_date(start_time)
                    ) d
                    ON c.mac = d.mac_address AND to_date(CAST(c.first_timestamp AS timestamp)) = d.date
                );
            """)

time.sleep(0.7)

In [0]:
print("Loading Users related functions...")

def users_merge_crt_dlt(microBatchDF, batchId):

    window = Window.partitionBy("alt_id").orderBy(f.col("updated").desc())

    (microBatchDF
        .filter(f.col("update_type").isin(["new", "update"]))
        .withColumn("rank", f.rank().over(window))
        .filter("rank == 1")
        .drop("rank")
        .createOrReplaceTempView("ranked_updates"))

    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.users u
        USING ranked_updates r
        ON u.alt_id=r.alt_id
        WHEN MATCHED AND u.updated < r.updated
          THEN UPDATE SET *
        WHEN NOT MATCHED
          THEN INSERT *
    """)

    (microBatchDF
        .filter("update_type = 'delete'")
        .select(f.col("alt_id"), 
                f.col("updated").alias("requested"), 
                f.date_add("updated", 30).alias("deadline"), 
                f.lit("requested").alias("status"))
        .write
        .format("delta")
        .mode("append")
        .option("txnVersion", batchId)
        .option("txnAppId", "batch_rank_upsert")
        .saveAsTable("silver.delete_requests"))
    
def process_users():

    salt = "BEANS"

    spark.sql(f"CREATE TABLE IF NOT EXISTS silver.users (alt_id STRING, user_id INTEGER, dob DATE, sex STRING, gender STRING, first_name STRING, last_name STRING, street_address STRING, city STRING, state STRING, zip INT, updated TIMESTAMP) USING DELTA")

    users_schema = """
        user_id INTEGER, 
        update_type STRING, 
        timestamp FLOAT, 
        dob STRING, 
        sex STRING, 
        gender STRING, 
        first_name STRING, 
        last_name STRING, 
        address STRUCT<
            street_address: STRING, 
            city: STRING, 
            state: STRING, 
            zip: INT
    >"""
    
    (spark.readStream
        .table("bronze.bronze")
        .filter("topic = 'user_info'")
        .dropDuplicates()
        .select(f.from_json(f.col("value").cast("string"), users_schema).alias("users"))
        .select("users.*")
        .select(f.sha2(f.concat(f.col("user_id"), f.lit(f"{salt}")), 256).alias("alt_id"),
                f.col("user_id"),
                f.col('timestamp').cast("timestamp").alias("updated"),
                f.to_date('dob','MM/dd/yyyy').alias('dob'),
                'sex', 
                'gender',
                'first_name',
                'last_name',
                'address.*', 
                'update_type')
        .writeStream
        .foreachBatch(users_merge_crt_dlt)
        .outputMode("update")
        .option("checkpointLocation", f"{checkpoint_path}/users")
        .queryName("silver_users") # Name the query for easy reference
        .trigger(processingTime='5 seconds')
        .start()
        )
        
def reprocess_users():
    dbutils.fs.rm(f"{checkpoint_path}/users", True)
    dbutils.fs.rm(f"{main_path}/delete_requests", True)

    spark.sql("drop table if exists silver.users")
    spark.sql("drop table if exists silver.delete_requests")

    process_users()

time.sleep(0.7)

In [0]:
print("Loading User Bins related functions...")

def age_bins(dob_col):
    age_col = f.floor(f.months_between(f.current_date(), dob_col)/12).alias("age")
    
    return (f.when((age_col < 18), "under 18")
             .when((age_col >= 18) & (age_col < 25), "18-25")
             .when((age_col >= 25) & (age_col < 35), "25-35")
             .when((age_col >= 35) & (age_col < 45), "35-45")
             .when((age_col >= 45) & (age_col < 55), "45-55")
             .when((age_col >= 55) & (age_col < 65), "55-65")
             .when((age_col >= 65) & (age_col < 75), "65-75")
             .when((age_col >= 75) & (age_col < 85), "75-85")
             .when((age_col >= 85) & (age_col < 95), "85-95")
             .when((age_col >= 95), "95+")
             .otherwise("invalid age").alias("age"))

# Simple Merge function
def user_bins_merge_data(df, batch_id):
    (df.drop("updated")
     .createOrReplaceTempView("stream_updates"))

    df.sparkSession.sql("""
          MERGE INTO silver.user_bins a
          USING stream_updates b
          ON a.alt_id=b.alt_id
          WHEN MATCHED THEN
            UPDATE SET *
          WHEN NOT MATCHED THEN
            INSERT * 
          """)


def process_user_bins():

    (spark.sql("""
      CREATE TABLE IF NOT EXISTS
      silver.user_bins
       (alt_id   STRING,
        user_id  INTEGER,
        age      STRING,
        sex      STRING,
        gender   STRING,
        city     STRING,
        state    STRING,
        zip      INTEGER) 
      USING DELTA
    """))

    (spark.readStream
          .option("skipChangeCommits", "true")
          .table("silver.users")
          .select("alt_id",
                  "user_id", 
                  age_bins(f.col("dob")),
                  "sex",
                  "gender", 
                  "city", 
                  "state",
                  "zip",
                  "updated")
          .withWatermark("updated", "30 seconds")
          .writeStream
              .foreachBatch(user_bins_merge_data)
              .option("checkpointLocation", f"{checkpoint_path}/user_bins")
              .outputMode("update")
              .trigger(processingTime='5 seconds')
              .queryName("silver_user_bins")
              .start()
              )

def reprocess_user_bins():
    dbutils.fs.rm(f"{checkpoint_path}/user_bins", True)

    spark.sql("drop table if exists silver.user_bins")

    process_user_bins()

time.sleep(0.7)

In [0]:
print("Loading Workout User Summary related functions...")

def process_workout_user_summary():

  spark.sql("""
            DROP VIEW IF EXISTS gold.workout_user_summary;
            """)

  spark.sql("""
                CREATE VIEW gold.workout_user_summary AS (
                SELECT
                    wh.user_id,
                    wh.min_heartrate,
                    wh.avg_heartrate,
                    wh.max_heartrate,
                    wh.start_time,
                    wh.end_time,
                    ub.age,
                    ub.sex,
                    ub.gender,
                    ub.city,
                    ub.state,
                    ub.zip
                FROM (SELECT
                          cw.user_id,
                          ul.device_id,
                          cw.session_id,
                          MIN(hr.heartrate) AS min_heartrate,
                          AVG(hr.heartrate) AS avg_heartrate,
                          MAX(hr.heartrate) AS max_heartrate,
                          MIN(cw.start_time) AS start_time,
                          MAX(cw.end_time) AS end_time
                      FROM silver.completed_workouts cw
                      JOIN silver.user_lookup ul ON cw.user_id = ul.user_id
                      JOIN silver.heart_rate hr ON ul.device_id = hr.device_id
                      GROUP BY cw.user_id, ul.device_id, cw.session_id
                ) wh
                JOIN silver.user_bins ub ON wh.user_id = ub.user_id
                );
            """)

time.sleep(0.7)

In [0]:
print("Loading Deletes related functions...")

def broadcast_deletes(microBatchDF, batchId):
    (microBatchDF
        .filter("_change_type = 'delete'")
        .createOrReplaceTempView("deletes"))
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.workouts w
        USING deletes d
        ON w.user_id = d.user_id
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.completed_workouts cw
        USING deletes d
        ON cw.user_id = d.user_id
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.heart_rate hr
        USING deletes d
        ON hr.device_id = d.device_id
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.gym_logs gl
        USING deletes d
        ON gl.mac = d.mac_address
        WHEN MATCHED
            THEN DELETE
    """)
    
    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.users u
        USING deletes d
        ON u.alt_id = d.alt_id
        WHEN MATCHED
            THEN DELETE
    """)

    microBatchDF.sparkSession.sql("""
        DELETE FROM silver.user_bins
        WHERE user_id IN (SELECT user_id FROM deletes)
    """)

    microBatchDF.sparkSession.sql("""
        MERGE INTO silver.delete_requests dr
        USING deletes d
        ON d.alt_id = dr.alt_id
        WHEN MATCHED
          THEN UPDATE SET status = "deleted"
    """)


def process_deletes():
    #counting before purge
    before_count_workouts = spark.table("silver.workouts").count()
    before_count_completed_workouts = spark.table("silver.completed_workouts").count()
    before_count_gym_logs = spark.table("silver.gym_logs").count()
    before_count_heart_rate = spark.table("silver.heart_rate").count()
    before_count_users = spark.table("silver.users").count()
    before_count_user_bins = spark.table("silver.user_bins").count()

    spark.sql("DELETE FROM silver.user_lookup WHERE alt_id IN (SELECT alt_id FROM silver.delete_requests WHERE status = 'requested')")

    (spark.readStream
          .format("delta")
          .option("readChangeFeed", "true")
          .option("startingVersion", start_version)
          .table("silver.user_lookup")
          .writeStream
          .foreachBatch(broadcast_deletes)
          .outputMode("update")
          .option("checkpointLocation", f"{checkpoint_path}/deletes")
          .trigger(processingTime='5 seconds')
          .start()
          
    )

    #counting after purge
    uptodate_count_workouts = spark.table("silver.workouts").count()
    uptodate_count_completed_workouts = spark.table("silver.completed_workouts").count()
    uptodate_count_gym_logs = spark.table("silver.gym_logs").count()
    uptodate_count_heart_rate = spark.table("silver.heart_rate").count()
    uptodate_count_users = spark.table("silver.users").count()
    uptodate_count_user_bins = spark.table("silver.user_bins").count()

    #printing values obtained before and how many were purged in each table
    print(f"Before broadcasting the deletes, the table Workouts had {before_count_workouts} rows. {before_count_workouts - uptodate_count_workouts} rows were deleted, remaining now {uptodate_count_workouts}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Completed_workouts had {before_count_completed_workouts} rows. {before_count_completed_workouts - before_count_completed_workouts} rows were deleted, remaining now {uptodate_count_completed_workouts}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Gym_logs had {before_count_gym_logs} rows. {before_count_gym_logs - uptodate_count_gym_logs} rows were deleted, remaining now {uptodate_count_gym_logs}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Heart_rate had {before_count_heart_rate} rows. {before_count_heart_rate - uptodate_count_heart_rate} rows were deleted, remaining now {uptodate_count_heart_rate}")
    print(" ")
    print(f"Before broadcasting the deletes, the table Users had {before_count_users} rows. {before_count_users - uptodate_count_users} rows were deleted, remaining now {uptodate_count_users}")
    print(" ")
    print(f"Before broadcasting the deletes, the table User_bins had {before_count_user_bins} rows. {before_count_user_bins - uptodate_count_user_bins} rows were deleted, remaining now {uptodate_count_user_bins}")
    print(" ")


time.sleep(0.7)

In [0]:
def initialize_pipeline():
  print("Initializing pipeline...")
  print("\n")
  process_bronze()
  print("Broze layer stream started")
  print("\n")
  process_heart_rate()
  process_workouts()
  process_users()
  process_completed_workouts()
  process_user_bins()
  print("Silver layer streams started")
  print("\n")
  process_gym_reports()
  process_workout_user_summary()
  print("Gold layer Views created")
  print("\n")
  print("You can start the files ingestion!")

In [0]:
# List of all the files that will simulate a streaming pipeline
stream_files = {
1:[
"part-00000-tid-532491258944704650-a78b6d69-f3e1-42eb-aaae-c3f48e9d75c2-1663-1-c000.json",
"part-00003-tid-532491258944704650-a78b6d69-f3e1-42eb-aaae-c3f48e9d75c2-1666-1-c000.json",
"part-00004-tid-532491258944704650-a78b6d69-f3e1-42eb-aaae-c3f48e9d75c2-1667-1-c000.json",
"part-00005-tid-532491258944704650-a78b6d69-f3e1-42eb-aaae-c3f48e9d75c2-1668-1-c000.json"
],2:[
"part-00004-tid-4809231750595173478-9733187f-843a-46c7-9f52-eab674ba7338-1697-1-c000.json",
"part-00003-tid-4809231750595173478-9733187f-843a-46c7-9f52-eab674ba7338-1696-1-c000.json",
"part-00000-tid-4809231750595173478-9733187f-843a-46c7-9f52-eab674ba7338-1693-1-c000.json"
],3:[
"part-00000-tid-111368985018146951-3a640041-7f7f-44fd-9286-cb2f49cefac7-1714-1-c000.json",
"part-00003-tid-111368985018146951-3a640041-7f7f-44fd-9286-cb2f49cefac7-1717-1-c000.json",
"part-00004-tid-111368985018146951-3a640041-7f7f-44fd-9286-cb2f49cefac7-1718-1-c000.json"
],4:[
"part-00004-tid-8664805335701107839-047009b2-38c4-4637-b0da-a59ea7ab4725-1739-1-c000.json",
"part-00003-tid-8664805335701107839-047009b2-38c4-4637-b0da-a59ea7ab4725-1738-1-c000.json",
"part-00000-tid-8664805335701107839-047009b2-38c4-4637-b0da-a59ea7ab4725-1735-1-c000.json"
],5:[
"part-00004-tid-3723893643168635313-3ff2fc91-919e-4b54-959d-2fd836e454c9-1760-1-c000.json",
"part-00003-tid-3723893643168635313-3ff2fc91-919e-4b54-959d-2fd836e454c9-1759-1-c000.json",
"part-00000-tid-3723893643168635313-3ff2fc91-919e-4b54-959d-2fd836e454c9-1756-1-c000.json"
],6:[
"part-00004-tid-7816484799722101425-436eec6b-6daf-4672-9848-df532124b3b0-1781-1-c000.json",
"part-00003-tid-7816484799722101425-436eec6b-6daf-4672-9848-df532124b3b0-1780-1-c000.json",
"part-00000-tid-7816484799722101425-436eec6b-6daf-4672-9848-df532124b3b0-1777-1-c000.json"
],7:[
"part-00004-tid-3715053498828480727-07fd4187-5f08-408f-a02d-8664bb11ef18-1802-1-c000.json",
"part-00003-tid-3715053498828480727-07fd4187-5f08-408f-a02d-8664bb11ef18-1801-1-c000.json",
"part-00000-tid-3715053498828480727-07fd4187-5f08-408f-a02d-8664bb11ef18-1798-1-c000.json"
],8:[
"part-00004-tid-4510170731639983217-d3c96e48-d62c-409d-8b67-05e3743d8036-1823-1-c000.json",
"part-00003-tid-4510170731639983217-d3c96e48-d62c-409d-8b67-05e3743d8036-1822-1-c000.json",
"part-00000-tid-4510170731639983217-d3c96e48-d62c-409d-8b67-05e3743d8036-1819-1-c000.json"
],9:[
"part-00004-tid-3313816597353595179-9e5499ff-e6a8-477c-9662-8cab09d9db89-1844-1-c000.json",
"part-00003-tid-3313816597353595179-9e5499ff-e6a8-477c-9662-8cab09d9db89-1843-1-c000.json",
"part-00002-tid-3313816597353595179-9e5499ff-e6a8-477c-9662-8cab09d9db89-1842-1-c000.json",
"part-00001-tid-3313816597353595179-9e5499ff-e6a8-477c-9662-8cab09d9db89-1841-1-c000.json",
"part-00000-tid-3313816597353595179-9e5499ff-e6a8-477c-9662-8cab09d9db89-1840-1-c000.json"
],10:[
"part-00004-tid-4501507592585151880-6844bec2-7404-4906-b353-a3d81d2e649e-1865-1-c000.json",
"part-00003-tid-4501507592585151880-6844bec2-7404-4906-b353-a3d81d2e649e-1864-1-c000.json",
"part-00002-tid-4501507592585151880-6844bec2-7404-4906-b353-a3d81d2e649e-1863-1-c000.json",
"part-00001-tid-4501507592585151880-6844bec2-7404-4906-b353-a3d81d2e649e-1862-1-c000.json",
"part-00000-tid-4501507592585151880-6844bec2-7404-4906-b353-a3d81d2e649e-1861-1-c000.json"
],11:[
"part-00004-tid-8408848988009657416-45d2e867-810d-4f61-8f47-2884a06d772b-1886-1-c000.json",
"part-00003-tid-8408848988009657416-45d2e867-810d-4f61-8f47-2884a06d772b-1885-1-c000.json",
"part-00002-tid-8408848988009657416-45d2e867-810d-4f61-8f47-2884a06d772b-1884-1-c000.json",
"part-00001-tid-8408848988009657416-45d2e867-810d-4f61-8f47-2884a06d772b-1883-1-c000.json",
"part-00000-tid-8408848988009657416-45d2e867-810d-4f61-8f47-2884a06d772b-1882-1-c000.json"
],12:[
"part-00004-tid-8034711223202012424-14bb1983-06be-4427-bc93-8df65840763b-1907-1-c000.json",
"part-00003-tid-8034711223202012424-14bb1983-06be-4427-bc93-8df65840763b-1906-1-c000.json",
"part-00002-tid-8034711223202012424-14bb1983-06be-4427-bc93-8df65840763b-1905-1-c000.json",
"part-00001-tid-8034711223202012424-14bb1983-06be-4427-bc93-8df65840763b-1904-1-c000.json",
"part-00000-tid-8034711223202012424-14bb1983-06be-4427-bc93-8df65840763b-1903-1-c000.json"
],13:[
"part-00004-tid-5354968751320088002-9ee8ebbf-1623-44cb-aa20-e4ffd32909a4-1928-1-c000.json",
"part-00003-tid-5354968751320088002-9ee8ebbf-1623-44cb-aa20-e4ffd32909a4-1927-1-c000.json",
"part-00002-tid-5354968751320088002-9ee8ebbf-1623-44cb-aa20-e4ffd32909a4-1926-1-c000.json",
"part-00001-tid-5354968751320088002-9ee8ebbf-1623-44cb-aa20-e4ffd32909a4-1925-1-c000.json",
"part-00000-tid-5354968751320088002-9ee8ebbf-1623-44cb-aa20-e4ffd32909a4-1924-1-c000.json"
],14:[
"part-00004-tid-3713058930943657627-6ffcec81-9f8e-42b9-bfc8-e73734efd13b-1949-1-c000.json",
"part-00003-tid-3713058930943657627-6ffcec81-9f8e-42b9-bfc8-e73734efd13b-1948-1-c000.json",
"part-00002-tid-3713058930943657627-6ffcec81-9f8e-42b9-bfc8-e73734efd13b-1947-1-c000.json",
"part-00001-tid-3713058930943657627-6ffcec81-9f8e-42b9-bfc8-e73734efd13b-1946-1-c000.json",
"part-00000-tid-3713058930943657627-6ffcec81-9f8e-42b9-bfc8-e73734efd13b-1945-1-c000.json"
],15:[
"part-00004-tid-7685515290006980378-aedd825a-885d-4d2e-9a77-4adc6fa048e1-1970-1-c000.json",
"part-00003-tid-7685515290006980378-aedd825a-885d-4d2e-9a77-4adc6fa048e1-1969-1-c000.json",
"part-00002-tid-7685515290006980378-aedd825a-885d-4d2e-9a77-4adc6fa048e1-1968-1-c000.json",
"part-00001-tid-7685515290006980378-aedd825a-885d-4d2e-9a77-4adc6fa048e1-1967-1-c000.json",
"part-00000-tid-7685515290006980378-aedd825a-885d-4d2e-9a77-4adc6fa048e1-1966-1-c000.json"
],16:[
"part-00000-tid-1258914159397947207-0c400917-b30c-4bcb-825f-bef3aa9893be-1987-1-c000.json",
"part-00003-tid-1258914159397947207-0c400917-b30c-4bcb-825f-bef3aa9893be-1990-1-c000.json",
"part-00004-tid-1258914159397947207-0c400917-b30c-4bcb-825f-bef3aa9893be-1991-1-c000.json",
"part-00005-tid-1258914159397947207-0c400917-b30c-4bcb-825f-bef3aa9893be-1992-1-c000.json"
]
}

In [0]:
class streaming_factory:
    def __init__(self, starting_batch=1, max_batch=16):
        self.max_batch = max_batch
        self.batch = starting_batch
    
    def start(self):
        print("Starting the stream")
        print("\n")

        time.sleep(0.7)

        print("Stream is ready!")
        print("\n")

        print(f"#{self.batch} batch of files arriving")
        print("\n")

        for i in stream_files[self.batch]:
          dbutils.fs.cp(f"{source}/{i}", f"{files_path}")
        self.batch += 1
        
    
    def load(self, continuous=False):
                
        if self.batch > self.max_batch:
            print("Data source exhausted", end="...")
            return
        
        if continuous:
            print("Loading all files", end="...")
            
            for key in stream_files:
                for i in stream_files[key]:
                    dbutils.fs.cp(f"{source}/{i}", f"{files_path}")
            
            self.batch = self.max_batch+1
            
        else:
            print(f"Loading batch #{self.batch} to the daily stream", end="...")
            key = self.batch

            for i in stream_files[key]:
              dbutils.fs.cp(f"{source}/{i}", f"{files_path}")
          
            time.sleep(90)
            self.batch += 1

None # Suppressing Output

In [0]:
stream=streaming_factory()

In [0]:
class StreamManager:
    def __init__(self, files_path):
        self.files_path = files_path

    def stop_stream(self, stream_name):
        """Stop a single stream by name."""
        stopped = False
        for stream in spark.streams.active:
            if stream.name == stream_name:
                stream.stop()
                stream.awaitTermination()
                print(f"Stream {stream_name} stopped successfully.")
                stopped = True
                break
        if not stopped:
            print(f"No active stream found with the name: {stream_name}")

    def stop_all_streams(self):
        """Stop all active streams."""
        for stream in spark.streams.active:
            stream.stop()
            stream.awaitTermination()
        print("All active streams stopped.")

    def remove_folder(self):
        dbutils.fs.rm(self.files_path, True)
        print(f"Folder {self.files_path} removed.")

    def restart_environment(self):
        """Stop all active streams and remove the source folder."""
        self.stop_all_streams()
        self.remove_folder()
        print("Environment restarted successfully.")

In [0]:
maintenance = StreamManager(files_path)

In [0]:
print("All functions are loaded!")
time.sleep(0.3)
print("\n")
print("Ready to start the pipeline")
