In [0]:
import dlt
from pyspark.sql.functions import col

# bronze

@dlt.table(
    name="dlta_bronze_daily",
    comment="Ingest daily parquet files for 2330 from ADLS Gen2 using Autoloader"
)
@dlt.expect_or_fail("non_null_fields", "date IS NOT NULL AND open IS NOT NULL AND high IS NOT NULL AND low IS NOT NULL AND close IS NOT NULL AND volume IS NOT NULL AND symbol IS NOT NULL")
@dlt.expect_or_fail("positive_prices_and_volume", "open > 0 AND high > 0 AND low > 0 AND close > 0 AND volume > 0")
def dlt_bronze_daily():
    df = (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "parquet")
            .load("abfss://twstocks@kenspractice.dfs.core.windows.net/daily/"))
    return df

@dlt.table(
    name="dlta_bronze_monthly",
    comment="Ingest monthly parquet files for 2330 from ADLS Gen2 using Autoloader"
)
@dlt.expect_or_fail("non_null_fields", "date IS NOT NULL AND open IS NOT NULL AND high IS NOT NULL AND low IS NOT NULL AND close IS NOT NULL AND volume IS NOT NULL AND symbol IS NOT NULL")
@dlt.expect_or_fail("positive_prices_and_volume", "open > 0 AND high > 0 AND low > 0 AND close > 0 AND volume > 0")
def dlt_bronze_monthly():
    df = (spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "parquet")
            .load("abfss://twstocks@kenspractice.dfs.core.windows.net/monthly/"))
    return df

In [0]:
import dlt
from pyspark.sql.functions import (
    col, when, round as spark_round, row_number,
    max as spark_max, min as spark_min, count
)
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

@dlt.table(
    name="dlta_silver_daily",
    comment="Accurate version: Calculate the lowest price before the highest price within the next 60 days; return null if less than 60 days in the future"
)
def dlt_silver_daily():
    """
    Implements fully accurate logic:
    - FHR: Maximum return within the next 60 days
    - FLR: Minimum return from today until the date of the maximum price
    - Records with less than 60 days in the future return null
    """
    
    df = spark.read.table("kenworkspace.tw_stocks_db.dlta_bronze_daily").orderBy("date")
    
    # Add row number
    df = df.withColumn("row_id", row_number().over(Window.orderBy("date")))
    
    # Create alias for future data
    future_df = df.select(
        col("row_id").alias("future_row_id"),
        col("date").alias("future_date"),
        col("high").alias("future_high"),
        col("low").alias("future_low")
    )
    
    # Join condition: join future data from 1 to 60 days ahead
    joined_df = df.alias("current").join(
        future_df.alias("future"),
        (col("future.future_row_id") > col("current.row_id")) &
        (col("future.future_row_id") <= col("current.row_id") + 60),
        "left"
    )
    
    # Count how many future days are available for each start date
    window_count = Window.partitionBy("current.row_id")
    joined_df = joined_df.withColumn(
        "future_days_count", count("future.future_row_id").over(window_count)
    )
    
    # Calculate the maximum high price for each start date (only process those with 60 days of data)
    window_max_high = Window.partitionBy("current.row_id")
    df_with_max = joined_df.withColumn(
        "period_max_high", spark_max("future.future_high").over(window_max_high)
    )
    
    # Find the earliest date that reaches the maximum high (only keep those with 60 days of data)
    df_max_date = df_with_max.filter(
        (col("future.future_high") == col("period_max_high")) &
        (col("future_days_count") >= 60)
    ).withColumn(
        "max_date_rank", 
        row_number().over(
            Window.partitionBy("current.row_id").orderBy("future.future_date")
        )
    ).filter(col("max_date_rank") == 1)
    
    # For each start date, calculate the minimum low up to the date of the maximum high
    min_period_df = df_max_date.alias("main").join(
        future_df.alias("min_data"),
        (col("min_data.future_row_id") > col("main.row_id")) &
        (col("min_data.future_row_id") <= col("main.future_row_id")),
        "left"
    )
    
    # Calculate the minimum low in this period
    window_min_low = Window.partitionBy("main.row_id")
    df_with_min = min_period_df.withColumn(
        "period_min_low", spark_min("min_data.future_low").over(window_min_low)
    )
    
    # Get the final result for those with 60 days of data
    processed_df = df_with_min.select(
        col("main.row_id"),
        col("main.date"),
        col("main.open"),
        col("main.high"), 
        col("main.low"),
        col("main.close"),
        col("main.volume"),
        col("main.period_max_high"),
        col("period_min_low"),
        col("main.future_days_count")
    ).distinct()
    
    # Join the processed result with the original data to ensure all dates are included
    all_dates_df = df.select("row_id", "date", "open", "high", "low", "close", "volume")
    
    final_df = all_dates_df.alias("all").join(
        processed_df.alias("processed"),
        col("all.row_id") == col("processed.row_id"),
        "left"
    )
    
    # Calculate percentage - only compute if there are 60 future days, otherwise return null
    result_df = final_df.withColumn(
        "60fhr_percent",
        when(
            (col("processed.future_days_count") >= 60) & col("processed.period_max_high").isNotNull(),
            spark_round(
                ((col("processed.period_max_high") - col("all.close")) / col("all.close")) * 100, 2
            )
        ).otherwise(None).cast(DoubleType())
    ).withColumn(
        "60flr_percent",
        when(
            (col("processed.future_days_count") >= 60) & col("processed.period_min_low").isNotNull(),
            spark_round(
                ((col("processed.period_min_low") - col("all.close")) / col("all.close")) * 100, 2
            )
        ).otherwise(None).cast(DoubleType())
    )
    
    return result_df.select(
        col("all.date"),
        col("all.open"),
        col("all.high"),
        col("all.low"),
        col("all.close"),
        col("all.volume"),
        "60fhr_percent",
        "60flr_percent"
    ).orderBy("date")