## RAW DATA TABLE

In [None]:
create or replace TABLE STOCKS_DB.STOCKS_INFO.STOCKS_DATA (
	SYMBOL VARCHAR(16777216),
	TICKER_TIME TIMESTAMP_NTZ(9),
	CLOSE FLOAT,
	OPEN FLOAT,
	HIGH FLOAT,
	LOW FLOAT,
	VOLUME FLOAT,
	LAST_UPDATED_AT TIMESTAMP_NTZ(9)
);

## NORMALIZATION TO SNOWFLAKE SCHEMA

In [None]:
CREATE OR REPLACE TABLE STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK (
    STOCK_ID INT AUTOINCREMENT PRIMARY KEY,
    SYMBOL VARCHAR NOT NULL UNIQUE
);

CREATE OR REPLACE TABLE STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE (
    DATE_ID INT AUTOINCREMENT PRIMARY KEY,
    TICKER_TIME TIMESTAMP_NTZ(9) NOT NULL UNIQUE,
    TRADE_DATE DATE,
    TRADE_HOUR INT,
    TRADE_MINUTE INT,
    DAY_OF_WEEK STRING,
    MONTH INT,
    YEAR INT
);

CREATE OR REPLACE TABLE STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE (
    STOCK_ID INT,
    DATE_ID INT,
    TICKER_TIME TIMESTAMP_NTZ(9),
    OPEN FLOAT,
    HIGH FLOAT,
    LOW FLOAT,
    CLOSE FLOAT,
    VOLUME FLOAT,
    LAST_UPDATED_AT TIMESTAMP_NTZ(9),
    PRIMARY KEY (STOCK_ID, TICKER_TIME),
    FOREIGN KEY (STOCK_ID) REFERENCES STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK(STOCK_ID),
    FOREIGN KEY (DATE_ID) REFERENCES STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE(DATE_ID)
);


### ONE-TIME LOAD OF DIM AND FACT TABLES

In [None]:
INSERT INTO STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK (SYMBOL)
SELECT DISTINCT SYMBOL
FROM STOCKS_DB.STOCKS_INFO.STOCKS_DATA
WHERE SYMBOL NOT IN (
    SELECT SYMBOL FROM STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK
);


In [None]:
INSERT INTO STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE (
    TICKER_TIME, TRADE_DATE, TRADE_HOUR, TRADE_MINUTE, 
    DAY_OF_WEEK, MONTH, YEAR
)
SELECT DISTINCT 
    TICKER_TIME,
    CAST(TICKER_TIME AS DATE),
    EXTRACT(HOUR FROM TICKER_TIME),
    EXTRACT(MINUTE FROM TICKER_TIME),
    TO_CHAR(TICKER_TIME, 'DY'),
    EXTRACT(MONTH FROM TICKER_TIME),
    EXTRACT(YEAR FROM TICKER_TIME)
FROM STOCKS_DB.STOCKS_INFO.STOCKS_DATA
WHERE TICKER_TIME NOT IN (
    SELECT TICKER_TIME FROM STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE
);


In [None]:
INSERT INTO STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE (
    STOCK_ID, DATE_ID, TICKER_TIME,
    OPEN, HIGH, LOW, CLOSE, VOLUME, LAST_UPDATED_AT
)
SELECT 
    s.STOCK_ID,
    d.DATE_ID,
    r.TICKER_TIME,
    r.OPEN,
    r.HIGH,
    r.LOW,
    r.CLOSE,
    r.VOLUME,
    r.LAST_UPDATED_AT
FROM STOCKS_DB.STOCKS_INFO.STOCKS_DATA r
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK s ON r.SYMBOL = s.SYMBOL
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE d ON r.TICKER_TIME = d.TICKER_TIME
WHERE NOT EXISTS (
    SELECT 1 
    FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE f
    WHERE f.STOCK_ID = s.STOCK_ID AND f.TICKER_TIME = r.TICKER_TIME
);


## PROCEDURE TO INCREMENTALLY LOAD THE TABLES USING RAW TABLE

In [None]:
CREATE OR REPLACE PROCEDURE STOCKS_DB.STOCKS_ANALYTICS.SP_INCREMENTAL_LOAD()
RETURNS STRING
LANGUAGE SQL
AS
$$
-- Begin Transaction
BEGIN

-- Step 1: Load DIM_STOCK
INSERT INTO STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK (SYMBOL)
SELECT DISTINCT SYMBOL
FROM STOCKS_DB.STOCKS_INFO.STOCKS_DATA r
WHERE SYMBOL NOT IN (
    SELECT SYMBOL FROM STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK
);

-- Step 2: Load DIM_DATE
INSERT INTO STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE (
    TICKER_TIME, TRADE_DATE, TRADE_HOUR, TRADE_MINUTE, 
    DAY_OF_WEEK, MONTH, YEAR
)
SELECT DISTINCT 
    r.TICKER_TIME,
    CAST(r.TICKER_TIME AS DATE),
    EXTRACT(HOUR FROM r.TICKER_TIME),
    EXTRACT(MINUTE FROM r.TICKER_TIME),
    TO_CHAR(r.TICKER_TIME, 'DY'),
    EXTRACT(MONTH FROM r.TICKER_TIME),
    EXTRACT(YEAR FROM r.TICKER_TIME)
FROM STOCKS_DB.STOCKS_INFO.STOCKS_DATA r
WHERE r.TICKER_TIME NOT IN (
    SELECT TICKER_TIME FROM STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE
);

-- Step 3: Upsert FACT_STOCK_PRICE
MERGE INTO STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE AS tgt
USING (
    SELECT 
        r.SYMBOL,
        r.TICKER_TIME,
        r.OPEN, r.HIGH, r.LOW, r.CLOSE, r.VOLUME, r.LAST_UPDATED_AT,
        s.STOCK_ID,
        d.DATE_ID
    FROM STOCKS_DB.STOCKS_INFO.STOCKS_DATA r
    JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK s ON r.SYMBOL = s.SYMBOL
    JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE d ON r.TICKER_TIME = d.TICKER_TIME
) AS src
ON tgt.STOCK_ID = src.STOCK_ID AND tgt.TICKER_TIME = src.TICKER_TIME

WHEN MATCHED AND (
    tgt.OPEN != src.OPEN OR
    tgt.HIGH != src.HIGH OR
    tgt.LOW != src.LOW OR
    tgt.CLOSE != src.CLOSE OR
    tgt.VOLUME != src.VOLUME OR
    tgt.LAST_UPDATED_AT != src.LAST_UPDATED_AT
)
THEN UPDATE SET
    OPEN = src.OPEN,
    HIGH = src.HIGH,
    LOW = src.LOW,
    CLOSE = src.CLOSE,
    VOLUME = src.VOLUME,
    LAST_UPDATED_AT = src.LAST_UPDATED_AT

WHEN NOT MATCHED THEN
INSERT (
    STOCK_ID, DATE_ID, TICKER_TIME, 
    OPEN, HIGH, LOW, CLOSE, VOLUME, LAST_UPDATED_AT
)
VALUES (
    src.STOCK_ID, src.DATE_ID, src.TICKER_TIME,
    src.OPEN, src.HIGH, src.LOW, src.CLOSE, src.VOLUME, src.LAST_UPDATED_AT
);

-- End Transaction
RETURN 'Incremental load completed successfully.';

END;
$$;


In [None]:
CALL STOCKS_DB.STOCKS_ANALYTICS.SP_INCREMENTAL_LOAD();

## CREATION OF TASK TO RUN FOR EVERY 5 MIN

In [None]:
CREATE OR REPLACE TASK STOCKS_DB.STOCKS_ANALYTICS.TASK_INCREMENTAL_LOAD
WAREHOUSE = COMPUTE_WH  
SCHEDULE = '5 MINUTE'
AS
CALL STOCKS_DB.STOCKS_ANALYTICS.SP_INCREMENTAL_LOAD();

In [None]:
ALTER TASK STOCKS_DB.STOCKS_ANALYTICS.TASK_INCREMENTAL_LOAD RESUME;

In [None]:
SHOW TASKS IN SCHEMA STOCKS_DB.STOCKS_ANALYTICS;

SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    TASK_NAME => 'TASK_INCREMENTAL_LOAD',
    RESULT_LIMIT => 10
));


## Materialized Views (for Fast Reporting / Dashboards)

### Daily OHLC + Volume Summary by Stock

In [None]:
CREATE OR REPLACE VIEW STOCKS_DB.STOCKS_ANALYTICS.VW_DAILY_OHLC_SUMMARY
AS
SELECT
    ds.SYMBOL,
    dd.TRADE_DATE,
    MIN(fsp.OPEN) AS OPEN_PRICE,
    MAX(fsp.HIGH) AS HIGH_PRICE,
    MIN(fsp.LOW) AS LOW_PRICE,
    MAX(fsp.CLOSE) AS CLOSE_PRICE,
    SUM(fsp.VOLUME) AS TOTAL_VOLUME
FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON fsp.STOCK_ID = ds.STOCK_ID
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE dd ON fsp.DATE_ID = dd.DATE_ID
GROUP BY ds.SYMBOL, dd.TRADE_DATE;

### Minute-Level Price Snapshot (Last 1 Hour)

In [None]:
CREATE OR REPLACE VIEW STOCKS_DB.STOCKS_ANALYTICS.VW_LAST_1_HOUR_PRICES
AS
SELECT
    ds.SYMBOL,
    fsp.TICKER_TIME,
    fsp.OPEN,
    fsp.HIGH,
    fsp.LOW,
    fsp.CLOSE,
    fsp.VOLUME
FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON fsp.STOCK_ID = ds.STOCK_ID
WHERE fsp.TICKER_TIME >= DATEADD(HOUR, -1, CURRENT_TIMESTAMP());


### Top 10 Most Traded Stocks (Today)

In [None]:
CREATE OR REPLACE VIEW STOCKS_DB.STOCKS_ANALYTICS.VW_TODAY_TOP_TRADED
AS
SELECT
    ds.SYMBOL,
    SUM(fsp.VOLUME) AS TOTAL_VOLUME
FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON fsp.STOCK_ID = ds.STOCK_ID
JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_DATE dd ON fsp.DATE_ID = dd.DATE_ID
WHERE dd.TRADE_DATE = CURRENT_DATE()
GROUP BY ds.SYMBOL
ORDER BY TOTAL_VOLUME DESC
LIMIT 10;

### Top Gainers in Last 1 Hour

In [None]:
CREATE OR REPLACE VIEW STOCKS_DB.STOCKS_ANALYTICS.VW_TOP_GAINERS_LAST_1_HOUR AS
WITH prices AS (
    SELECT
        ds.SYMBOL,
        fsp.TICKER_TIME,
        fsp.CLOSE,
        ROW_NUMBER() OVER (PARTITION BY ds.SYMBOL ORDER BY fsp.TICKER_TIME ASC) AS rn_asc,
        ROW_NUMBER() OVER (PARTITION BY ds.SYMBOL ORDER BY fsp.TICKER_TIME DESC) AS rn_desc
    FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
    JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON ds.STOCK_ID = fsp.STOCK_ID
    WHERE fsp.TICKER_TIME >= DATEADD(HOUR, -1, CURRENT_TIMESTAMP())
),
open_close AS (
    SELECT
        SYMBOL,
        MAX(CASE WHEN rn_asc = 1 THEN CLOSE END) AS OPEN_PRICE,
        MAX(CASE WHEN rn_desc = 1 THEN CLOSE END) AS CLOSE_PRICE
    FROM prices
    GROUP BY SYMBOL
)
SELECT
    SYMBOL,
    OPEN_PRICE,
    CLOSE_PRICE,
    ROUND(((CLOSE_PRICE - OPEN_PRICE) / NULLIF(OPEN_PRICE, 0)) * 100, 2) AS PERCENT_CHANGE
FROM open_close
ORDER BY PERCENT_CHANGE DESC
LIMIT 10;


### Volume Spikes by Symbol (Last 30 Minutes vs 24hr Avg)

In [None]:
CREATE OR REPLACE VIEW STOCKS_DB.STOCKS_ANALYTICS.VW_VOLUME_SPIKE_ALERTS AS
WITH last_30 AS (
    SELECT
        ds.SYMBOL,
        SUM(fsp.VOLUME) AS volume_last_30
    FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
    JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON ds.STOCK_ID = fsp.STOCK_ID
    WHERE fsp.TICKER_TIME >= DATEADD(MINUTE, -30, CURRENT_TIMESTAMP())
    GROUP BY ds.SYMBOL
),
last_24hr AS (
    SELECT
        ds.SYMBOL,
        AVG(fsp.VOLUME) AS avg_volume_24hr
    FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
    JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON ds.STOCK_ID = fsp.STOCK_ID
    WHERE fsp.TICKER_TIME >= DATEADD(HOUR, -24, CURRENT_TIMESTAMP())
    GROUP BY ds.SYMBOL
)
SELECT
    l30.SYMBOL,
    l30.volume_last_30,
    l24.avg_volume_24hr,
    ROUND(l30.volume_last_30 / NULLIF(l24.avg_volume_24hr, 0), 2) AS spike_ratio
FROM last_30 l30
JOIN last_24hr l24 ON l30.SYMBOL = l24.SYMBOL
WHERE l30.volume_last_30 / NULLIF(l24.avg_volume_24hr, 0) > 2  -- Only show spikes > 2x
ORDER BY spike_ratio DESC;


### Hourly OHLC Summary (Candlestick)

In [None]:
CREATE OR REPLACE VIEW STOCKS_DB.STOCKS_ANALYTICS.VW_HOURLY_CANDLESTICK AS
WITH hourly_data AS (
    SELECT
        ds.SYMBOL,
        DATE_TRUNC('HOUR', fsp.TICKER_TIME) AS HOUR_BUCKET,
        fsp.TICKER_TIME,
        fsp.OPEN, fsp.HIGH, fsp.LOW, fsp.CLOSE
    FROM STOCKS_DB.STOCKS_ANALYTICS.FACT_STOCK_PRICE fsp
    JOIN STOCKS_DB.STOCKS_ANALYTICS.DIM_STOCK ds ON ds.STOCK_ID = fsp.STOCK_ID
    WHERE fsp.TICKER_TIME >= DATEADD(DAY, -1, CURRENT_TIMESTAMP())
),
ranked AS (
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY SYMBOL, HOUR_BUCKET ORDER BY TICKER_TIME ASC) AS rn_open,
        ROW_NUMBER() OVER (PARTITION BY SYMBOL, HOUR_BUCKET ORDER BY TICKER_TIME DESC) AS rn_close
    FROM hourly_data
)
SELECT
    SYMBOL,
    HOUR_BUCKET,
    MAX(CASE WHEN rn_open = 1 THEN OPEN END) AS OPEN_PRICE,
    MAX(HIGH) AS HIGH_PRICE,
    MIN(LOW) AS LOW_PRICE,
    MAX(CASE WHEN rn_close = 1 THEN CLOSE END) AS CLOSE_PRICE
FROM ranked
GROUP BY SYMBOL, HOUR_BUCKET
ORDER BY SYMBOL, HOUR_BUCKET;
