In [None]:
# paragraph 1
%flink.ssql(type=update)

-- This table connects the stock data stream from Kinesis 
-- and defines a watermark for event-time processing.
DROP TABLE IF EXISTS stock_table;
CREATE TABLE stock_table(
    `date` STRING,
    ticker VARCHAR(6),
    open_price FLOAT,
    high FLOAT,
    low FLOAT,
    close_price FLOAT,
    adjclose FLOAT,
    volume BIGINT,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time as event_time
) WITH ( --connect to your kinesis data stream
    'connector' = 'kinesis', 
    'stream' = 'new-data-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

DROP TABLE IF EXISTS part_a;
DROP TABLE IF EXISTS part_b;
DROP TABLE IF EXISTS part_c;
CREATE TABLE part_a(event_time TIMESTAMP(3), cmgr FLOAT, `date` STRING, close_price FLOAT) 
WITH ('connector' = 'filesystem','path' = 's3://d-bucket/part_a','format' = 'json','json.timestamp-format.standard' = 'ISO-8601');
CREATE TABLE part_b(ticker VARCHAR(6), `date` STRING, close_price FLOAT, avg_close_price FLOAT, ema_close_price DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://d-bucket/part_b','format' = 'json','json.timestamp-format.standard' = 'ISO-8601');
CREATE TABLE part_c(ticker VARCHAR(6), event_time TIMESTAMP(3), initialPriceDate STRING, dropDate STRING, dropPercentage DOUBLE, initialPrice FLOAT, lastPrice FLOAT)
WITH ('connector' = 'filesystem','path' = 's3://d-bucket/part_c','format' = 'json','json.timestamp-format.standard' = 'ISO-8601');

# paragraph 2
%flink.ssql(type=update)
DROP TEMPORARY FUNCTION IF EXISTS cmgr;

# paragraph 3
%flink.pyflink

from pyflink.table import DataTypes
from pyflink.table.udf import udf
from datetime import datetime

START_PRICE = 92.30
START_YEAR = 2021
START_MONTH = 1


@udf(result_type=DataTypes.FLOAT(), input_types=[DataTypes.STRING(), DataTypes.FLOAT()])
def cmgr(date_str: str, price: float) -> float:
    try:
        # Extract year, month, day
        date_obj = datetime.strptime(date_str, "%m/%d/%Y")
        year, month, day = date_obj.year, date_obj.month, date_obj.day

        # Filter: not Jan 2021 and within first 3 days
        if (year == START_YEAR and month == START_MONTH) or day > 3:
            return 0.0

        # Months since Jan 2021
        num_months = (year - START_YEAR) * 12 + (month - START_MONTH)
        if num_months <= 0 or START_PRICE == 0:
            return 0.0

        growth_rate = ((price / START_PRICE) ** (1 / num_months) - 1) * 100
        return round(growth_rate, 6)
    except Exception:
        return 0.0

st_env.create_temporary_system_function("cmgr", cmgr)

# paragraph 4
%flink.ssql(runAsOne=True)
INSERT INTO part_a 
    SELECT event_time, cmgr(`date`, close_price) AS cmgr, `date`, close_price
	FROM stock_table
	WHERE cmgr(`date`, close_price) <> 0.0;
INSERT INTO part_b
	SELECT ticker, `date`, close_price, t.avg_close_price, t.ema_close_price
	FROM (
		SELECT ticker, `date`, close_price, 
		AVG(close_price) OVER (
			PARTITION BY ticker
			ORDER BY event_time
			ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
		) AS avg_close_price, 
		EXP((SUM(LN(close_price)) OVER (
			PARTITION BY ticker
			ORDER BY event_time
			ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
		) / 10 )) AS ema_close_price
		FROM stock_table
	) t
	WHERE close_price < t.ema_close_price;
INSERT INTO part_c
	SELECT *
	FROM stock_table
	MATCH_RECOGNIZE(
		PARTITION BY ticker
		ORDER BY event_time
		MEASURES
	    	A.event_time AS event_time, 
			A.`date` AS initialPriceDate, 
			C. `date` AS dropDate, 
			(100 * (A.close_price - C.close_price) / A.close_price) AS dropPercentage, 
			A.close_price AS initialPrice, 
			C.close_price AS lastPrice
		ONE ROW PER MATCH
		AFTER MATCH SKIP PAST LAST ROW
		PATTERN(A B{0, 2} C)
		DEFINE
			B AS -100 * (B.close_price - A.close_price) / A.close_price < 8.0,
	    	C AS 100 * (A.close_price - C.close_price) / A.close_price >= 8.0
	);