In [0]:
dbutils.widgets.text("Catalog","enter a catalog")
dbutils.widgets.text("Schema","enter a schema")

In [0]:
catalog = dbutils.widgets.get("Catalog")
schema = dbutils.widgets.get("Schema")

In [0]:
from pyspark.sql.functions import col, current_timestamp, date_trunc, row_number, to_timestamp
from pyspark.sql.window import Window

In [0]:
volume_name = "stream"

transaction_output = f"/Volumes/{catalog}/{schema}/{volume_name}/stream_data/source_transactions"
dimension_output =  f"/Volumes/{catalog}/{schema}/{volume_name}/stream_data/source_dimensions"
checkpoint = f"/Volumes/{catalog}/{schema}/{volume_name}/stream_metadata/checkpoint"

In [0]:
bronze_table = f"{catalog}.{schema}.bronze_source_transactions"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", f"{checkpoint}/{bronze_table}")
    .load(transaction_output)
)

In [0]:
transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f"{checkpoint}/{bronze_table}")
    .toTable(f"{bronze_table}")
)

In [0]:
from pyspark.sql.functions import to_timestamp

df = spark.createDataFrame(
    [("2025-02-06 04:58:04.512093",)], 
    ["timestamp_str"]
)
df = df.withColumn(
    "timestamp", 
    to_timestamp("timestamp_str", "yyyy-MM-dd HH:mm:ss.SSSSSS")
)
display(df)

In [0]:
silver_table = f"{catalog}.{schema}.silver_source_transactions"

bronze_df = spark.readStream.table(bronze_table)

transformed_silver_df = (bronze_df.withColumn("trade_hour", date_trunc("hour", col("timestamp"))) 
                                 .withColumn("trade_day", date_trunc("day", col("timestamp")))
                                 .withColumn("timestamp_ts", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss.SSSSSS"))
)

version = 0 # use this to restart checkpoint

(transformed_silver_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f"{checkpoint}/{silver_table}/v{version}")
    .option("mergeSchema", "true")
    .option("startingOffsets", "earliest")
    .table(silver_table)
)

In [0]:
%sql
select * from ahtsa.buildastream.silver_source_transactions

In [0]:
%sql
select * from ahtsa.buildastream.gold_latest_trades