 
## Generate Data Using Python SDK

In [1]:
%flink.ipyflink
import datetime
import json
import random
import boto3
import time 


STREAM_NAME = "stock-input-stream"
REGION = "ap-southeast-1"


def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(["BTC","ETH","BNB", "XRP", "DOGE"]),
        'price': round(random.random() * 100, 2)}


def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")
        # time.sleep(1)


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name=REGION))

## Create a In-Memory Table

In [3]:
%flink.pyflink
st_env.execute_sql("""DROP TABLE IF EXISTS stock_table""")
st_env.execute_sql("""DROP TABLE IF EXISTS output_table""")

In [4]:
%flink.pyflink
st_env.execute_sql("""
CREATE TABLE stock_table (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              )
              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = 'stock-input-stream',
                'aws.region' = 'ap-southeast-1',
                'scan.stream.initpos' = 'LATEST',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """)


 
## Select and Filter 


In [6]:
%flink.ssql(type=update)
SELECT * FROM stock_table 

## Tumbling Window
calculate average stock price group by ticker and window 


In [8]:
%flink.ssql(type=update)
SELECT 
        stock_table.ticker as ticker,
        AVG(stock_table.price) AS avg_price,
        TUMBLE_ROWTIME(stock_table.event_time, INTERVAL '10' second) as time_event
FROM stock_table
GROUP BY TUMBLE(stock_table.event_time, INTERVAL '10' second), stock_table.ticker;

 
## Tumbling Window
count ticker by ticker and window 


In [10]:
%flink.ssql(type=update)
SELECT 
        stock_table.ticker as ticker, 
        COUNT(stock_table.ticker) AS ticker_count,
        TUMBLE_ROWTIME(stock_table.event_time, INTERVAL '3' second) as time_event
FROM stock_table
GROUP BY TUMBLE(stock_table.event_time, INTERVAL '3' second), stock_table.ticker;

 

## Sliding Window 

- calculate average stock price group by ticker and window
- window width 1 minute, update each 10 seconds


In [12]:
%flink.ssql(type=update)
SELECT 
        stock_table.ticker as ticker,
        AVG(stock_table.price) AS avg_price,
        HOP_ROWTIME(stock_table.event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
FROM stock_table
GROUP BY HOP(stock_table.event_time, INTERVAL '10' second, INTERVAL '1' minute), stock_table.ticker;


 
## Enable Checkpointing

In [14]:
%flink.pyflink
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)


In [15]:
%flink.pyflink
st_env.execute_sql("""DROP TABLE IF EXISTS stock_output_table""")

 

## Writing Results to Amazon S3

In [17]:
%flink.pyflink

output_table_name = "stock_output_table"
bucket_name = "data-lake-demo-17072023"

st_env.execute_sql("""CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3)
              )
              PARTITIONED BY (ticker)
              WITH (
                  'connector'='filesystem',
                  'path'='s3a://{1}/hehe/',
                  'format'='csv',
                  'sink.partition-commit.policy.kind'='success-file',
                  'sink.partition-commit.delay' = '1 min'
              )""".format(table_name, bucket_name))

In [18]:
%flink.pyflink
table_result = st_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}".format(output_table_name, "stock_table"))

In [19]:
%flink.pyflink
print(table_result.get_job_client().cancel())

In [20]:
%flink.pyflink
