In [0]:
dbutils.widgets.text("Environment", "dev", "Set the current environment/catalog name")
dbutils.widgets.text("RunType", "once", "Set once to run as a batch")
dbutils.widgets.text("ProcessingTime", "5 seconds", "Set the microbatch interval")

In [0]:
env = dbutils.widgets.get("Environment")
once = True if dbutils.widgets.get("RunType") == "once" else False
processing_time = dbutils.widgets.get("ProcessingTime")
if once:
    print("Starting sbit in batch mode.")
else:
    print(f"Starting sbit in stream mode with {processing_time} microbatch.")

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", True)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", True)
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

In [0]:
%run ./02-setup

In [0]:
%run ./03-history-loader

In [0]:
SH = SetupHelper(env)
HL = HistoryLoader(env)

In [0]:
setup_required = spark.sql(f"SHOW DATABASES IN {SH.catalog}").filter(f"databaseName == '{SH.db_name}'").count() != 1
if setup_required:
    SH.setup()
    SH.validate()
    HL.load_history()
    HL.validate()
else:
    spark.sql(f"USE {SH.catalog}.{SH.db_name}")

In [0]:
%run ./04-bronze

In [0]:
%run ./05-silver

In [0]:
%run ./06-gold

In [0]:
BZ = Bronze(env)
SL = Silver(env)
GL = Gold(env)

In [0]:
BZ.consume(once, processing_time)

In [0]:
SL.upsert(once, processing_time)

In [0]:
GL.upsert(once, processing_time)