In [0]:
%flink.ssql(type=update)

DROP TABLE IF EXISTS stock_table;

-- create a table to glue data catalog table with columns for stock data, 
-- sets a watermark to trigger late arrival events, and configures it to read 
-- from a Kinesis stream, in JSON format with an ISO-8601 timestamp format.
CREATE TABLE stock_table(
    `date` STRING,
    ticker VARCHAR(6),
    open_price FLOAT,
    high FLOAT,
    low FLOAT,
    close_price FLOAT,
    adjclose FLOAT,
    volume BIGINT,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND
) WITH ( --connect to your kinesis data stream
    'connector' = 'kinesis', 
    'stream' = 'mp11v2_ds',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);

In [1]:
%flink.pyflink

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from datetime import datetime

# Unregister the existing function if it exists
st_env.execute_sql("DROP TEMPORARY FUNCTION IF EXISTS calculate_cmgr")
    
# Define the custom function
@udf(result_type=DataTypes.FLOAT(), input_types=[DataTypes.STRING(), DataTypes.FLOAT()])
def calculate_cmgr(date_string: str, close_price: float) -> float:
    start_date = datetime.strptime("01/04/2021", "%m/%d/%Y").date()
    current_date = datetime.strptime(date_string, '%m/%d/%Y').date()
    
    # Check if the day is within the first three days of the month and not January 2021
    if current_date.day <= 3 and not (current_date.month == 1 and current_date.year == 2021):
        # Compute the number of months between the start date and the current date
        number_of_months = (current_date.year - start_date.year) * 12 + (current_date.month - start_date.month)
        
        # Calculate the CMGR
        cmgr = ((close_price / 92.3) ** (1 / number_of_months) - 1) * 100
        return cmgr
    else:
        return 0.0

# Register the custom function with a name
st_env.create_temporary_function("calculate_cmgr", calculate_cmgr)


In [2]:
%flink.ssql(type=update)

SELECT event_time, calculate_cmgr(`date`, close_price) AS cmgr, `date`, close_price
FROM stock_table
WHERE calculate_cmgr(`date`, close_price) <> 0.0

In [3]:
%flink.pyflink

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

# Unregister the existing function if it exists
st_env.execute_sql("DROP TEMPORARY FUNCTION IF EXISTS calculate_ema")

# Define the EMA calculation function
@udf(result_type=DataTypes.FLOAT(), input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()])
def calculate_ema(prev_ema: float, current_price: float) -> float:
    return 

# Register the EMA calculation function
st_env.register_function("calculate_ema", calculate_ema)

In [4]:
%flink.ssql(type=update)
/* stock prices of when the current price is lower than the moving average which would be a good indicator for a bear market*/
-- SELECT 
--     event_time,
--     ticker,
--     close_price,
--     CASE
--         WHEN ROW_NUMBER() OVER (PARTITION BY ticker ORDER BY event_time) <= 10
--         THEN AVG(close_price) OVER (
--                 PARTITION BY ticker
--                 ORDER BY event_time
--                 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
--             )
--         ELSE ((close_price - emaprev) * (2.0/11.0)) + emaprev
--     END AS ema
-- FROM (
--     SELECT 
--         event_time,
--         ticker,
--         close_price,
--         emaprev
--     FROM (
--         SELECT 
--             event_time,
--             ticker,
--             close_price,
--             LAG(ema, 1, sma) OVER (
--                 PARTITION BY ticker
--                 ORDER BY event_time
--             ) AS emaprev
--         FROM (
--             SELECT
--                 event_time,
--                 ticker,
--                 close_price,
--                 AVG(close_price) OVER (
--                     PARTITION BY ticker
--                     ORDER BY event_time
--                     ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
--                 ) AS sma,
--                 AVG(close_price) OVER (
--                     PARTITION BY ticker
--                     ORDER BY event_time
--                     ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
--                 ) AS ema
--             FROM stock_table
--         ) t
--     ) t1
-- ) t2
SELECT *
FROM (
    SELECT 
    event_time,
    ticker,
    close_price,
    sma as avg_close_price,
    ((close_price - emaprev) * (2.0/11.0) + emaprev) AS ema_close_price,
    `date`
    FROM (
        SELECT 
            event_time,
            ticker,
            close_price,
            sma,
            (close_price - LAG(sma, 1) OVER (ORDER BY event_time)) * (2.0/11.0) + LAG(sma, 1) OVER (ORDER BY event_time) as emaprev,
            `date`
        FROM (
            SELECT
                event_time,
                ticker,
                close_price,
                AVG(close_price) OVER (
                  PARTITION BY ticker
                  ORDER BY event_time
                  ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
                ) AS sma,
                `date`
            FROM stock_table
        ) t
    ) t1
) t2
WHERE close_price < ema_close_price;

In [5]:
%flink.ssql(type=update)

SELECT *
FROM stock_table
    MATCH_RECOGNIZE(
        PARTITION BY ticker
        ORDER BY event_time
        MEASURES
            A.event_time as event_time,
            FIRST(A.`date`) AS initialPriceDate,
            LAST(B.`date`) AS dropDate,
            (-100*(C.close_price - A.close_price)/C.close_price) AS dropPercentage,
            FIRST(A.close_price) AS initialPrice,
            LAST(C.close_price) AS lastPrice
        ONE ROW PER MATCH
        AFTER MATCH SKIP PAST LAST ROW
        PATTERN (A B* C) WITHIN INTERVAL '1' MINUTE
        DEFINE
            B AS (B.close_price >= (A.close_price * 0.92)) AND (B.close_price <= (A.close_price * 1.08)),
            C AS (-100*(C.close_price - A.close_price)/C.close_price) >= 8.0
    )

In [6]:
%flink.ssql
