## Analyse Data in Kafka Topic Using KDA Flink

- Please check vpc endpoint for s3 and glue since MSK cluster inside VPC 
- Please check BOOTSTRAP SSL 
- Please check MSK security connection (allows unthorized for testing)

 
## Create a Table Connecting a Kafka Topic 


In [2]:
%flink.ssql(type=update)

DROP TABLE IF EXISTS stock_table;

CREATE TABLE stock_table (
    ticker STRING,
    price DOUBLE,
    event_time  TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
  )
WITH (
    'connector' = 'kafka',
    'topic' = 'stock-topic',
    'properties.bootstrap.servers' = 'b-2.democluster2.vidd98.c3.kafka.ap-southeast-1.amazonaws.com:9098,b-1.democluster2.vidd98.c3.kafka.ap-southeast-1.amazonaws.com:9098,b-3.democluster2.vidd98.c3.kafka.ap-southeast-1.amazonaws.com:9098',
    'properties.group.id' = 'KdaStudioGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'AWS_MSK_IAM',
    'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
    'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler'
);

 
## Simple Query

In [4]:
%flink.ssql(type=update)
SELECT * FROM stock_table

 
## Tumbling Window

In [6]:
%flink.ssql(type=update)
SELECT
        stock_table.ticker as ticker,
        AVG(stock_table.price) AS avg_price,
        TUMBLE_ROWTIME(stock_table.event_time, INTERVAL '10' second) as time_event
FROM stock_table
GROUP BY TUMBLE(stock_table.event_time, INTERVAL '10' second), stock_table.ticker;

 
## Slidding Window

In [8]:
%flink.ssql(type=update)
SELECT
    stock_table.ticker as ticker,
    AVG(stock_table.price) AS avg_price,
    HOP_ROWTIME(stock_table.event_time, INTERVAL '10' second, INTERVAL '1' minute) as hop_time
FROM stock_table
GROUP BY HOP(stock_table.event_time, INTERVAL '10' second, INTERVAL '1' minute), stock_table.ticker;

 
## Sink Table - Write to S3 


In [10]:
%flink.ssql(type=upate)
DROP TABLE IF EXISTS stock_output_table;
DROP TABLE IF EXISTS stock_output_table_json;

In [11]:
%flink.pyflink

st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)

st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)

In [12]:
%flink.ssql(type=update)
CREATE TABLE stock_output_table(
    ticker STRING,
    price DOUBLE,
    event_time TIMESTAMP(3))
    PARTITIONED BY (ticker)
WITH (
    'connector'='filesystem',
    'path'='s3a://data-lake-stream-20072023/kafka-data/',
    'format'='csv',
    'sink.partition-commit.policy.kind'='success-file',
    'sink.partition-commit.delay' = '1 min'
);

In [13]:
%flink.ssql(type=update)
INSERT INTO stock_output_table 
SELECT 
    ticker,
    price,
    event_time
FROM stock_table

 

## Sink Table - Write to S3 


In [15]:
%flink.ssql(type=update)
CREATE TABLE stock_output_table_json(
    ticker STRING,
    price DOUBLE,
    event_time TIMESTAMP(3))
    PARTITIONED BY (ticker)
WITH (
    'connector'='filesystem',
    'path'='s3a://data-lake-stream-20072023/kafka-data-json/',
    'format'='json',
    'sink.rolling-policy.rollover-interval' = '60s',
    'sink.rolling-policy.check-interval' = '30s'
);

In [16]:
%flink.ssql(type=update)
INSERT INTO stock_output_table_json 
SELECT 
    ticker,
    price,
    event_time
FROM stock_table

In [17]:
%flink.ssql


In [18]:
%flink.ssql


In [19]:
%flink.ssql


In [20]:
%flink.pyflink
