# DLT pipeline

This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/realtime_stock_pipeline.yml.

In [None]:

CREATE OR REFRESH STREAMING TABLE raws_quotes
COMMENT "This table represent raws data from adls storage"
AS 
SELECT * FROM cloud_files("/mnt/container_name/storage_account_name/directory_name/**/**/**/**/**/**/*.avro", 'avro')

# Bronze Table

In [None]:

CREATE OR REFRESH STREAMING TABLE bronze_quotes
COMMENT "This table represent bronze layer for quotes"
AS 
SELECT 
    body.ticker,
    body.c as current_price,
    body.d as change,
    body.dp as percent_change,
    body.h as high_price_of_the_day,
    body.l as low_price_of_the_day,
    body.o as open_price,
    body.pc as previous_close_price,
    body.t as timestamp 
FROM (
    SELECT 
    from_json(CAST(body AS STRING), 'ticker STRING, c FLOAT, d FLOAT, dp FLOAT, h FLOAT, l FLOAT, o FLOAT, pc FLOAT, t TIMESTAMP') as body
    FROM STREAM(LIVE.raws_quotes)
);


# Silver Table

In [None]:

CREATE OR REFRESH STREAMING TABLE silver_quote(
    CONSTRAINT valid_ticker EXPECT (ticker IS NOT NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_current_price EXPECT (price IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "Aggregated data with constraints partitoned by date and average price by hours"
AS 
SELECT 
    ticker,
    AVG(current_price) AS price,
    MIN(low_price_of_the_day) as low_price_of_the_hours,
    MAX(high_price_of_the_day) as high_price_of_the_hours,
    any_value(open_price) as open_price,
    any_value(previous_close_price) as previous_close_price,
    HOUR(CAST(timestamp as TIMESTAMP)) as hour,
    DATE(timestamp) as date
FROM STREAM(LIVE.bronze_quotes)
GROUP BY ticker,DATE(timestamp), HOUR(CAST(timestamp AS TIMESTAMP));
