# FlinkSQL Development to aggregate coupon issue log from MSK

## 1. Source Table
Connect to Kafka. Fill out your MSK broker url, group id and kafka topic name.
* **properties.bootstrap.servers** : Comma separated list of Kafka brokers.
* **properties.group.id** : The id of the consumer group for Kafka source. 
* **topic** : Topic name to read data from when the table is used as source.
 
See also [Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/).

In [2]:
%flink.ssql

-- Source Table :: Coupon issue log

CREATE TABLE source_table_dev (
  msg_id STRING,
  msg_type STRING,
  device_id STRING,
  location_code STRING,
  coupon_code STRING,
  response STRING,
  create_time TIMESTAMP(3), 
  WATERMARK FOR create_time AS create_time - INTERVAL '5' SECOND 
) WITH (
  'connector' = 'kafka',
  'topic' = '<<YOUR-KAFKA-TOPIC-NAME>>',
  'properties.bootstrap.servers' = '<<YOUR-KAFKA-BROKER-ENDPOINTS>>',
  'properties.group.id' = '<<YOUR-CONSUMER-GROUP_ID>>',
  'properties.security.protocol' = 'SASL_SSL', 
  'properties.sasl.mechanism' = 'AWS_MSK_IAM', 
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 
  'format' = 'json', 
  'json.timestamp-format.standard' = 'ISO-8601', 
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true', 
  'scan.startup.mode' = 'earliest-offset'
)

## 2. Sink Table
Put aggreagted features into Kafka topic as a sink. Fill out your MSK broker url and kafka topic name.

In [4]:
%flink.ssql

-- (Query 1) Sink Table :: coupon-prefix-count

CREATE TABLE sink_table_coupon_prefix_count_dev (
  feature_group_name STRING,
  loc_coupon_prefix STRING,
  msg_count BIGINT,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = '<<YOUR-KAFKA-TOPIC-NAME>>',
  'properties.bootstrap.servers' = '<<YOUR-KAFKA-BROKER-ENDPOINTS>>',
  'properties.security.protocol' = 'SASL_SSL', 
  'properties.sasl.mechanism' = 'AWS_MSK_IAM', 
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 
  'format' = 'json'
)

In [5]:
%flink.ssql

-- (Query 2) Sink Table :: proto-coupon-location-invalid-count

CREATE TABLE sink_table_loc_invalid_cnt_dev (
  feature_group_name STRING,
  location_code STRING,
  msg_count BIGINT,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = '<<YOUR-KAFKA-TOPIC-NAME>>',
  'properties.bootstrap.servers' = '<<YOUR-KAFKA-BROKER-ENDPOINTS>>',
  'properties.security.protocol' = 'SASL_SSL', 
  'properties.sasl.mechanism' = 'AWS_MSK_IAM', 
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 
  'format' = 'json'
)

## 3. Feature aggregation query
Processing streamming data. 
* Modify feature_group_name to actual feature group name in Sageaker FeatureStore
* In you want to push aggregated feature values, unlock 'Insert' code.

In [7]:
%flink.ssql

-- (Query 1) coupon-prefix-count :: number of validation requests of last 10 minute, same coupon prefix from same location

-- INSERT INTO sink_table_coupon_prefix_count 
SELECT 
  'proto-coupon-prefix-count' AS feature_group_name, 
  CONCAT(location_code,'#', SUBSTRING(coupon_code, 1, CHAR_LENGTH(coupon_code) - 4)) AS loc_coupon_prefix, 
  COUNT(*) AS msg_count, 
  MAX(create_time) AS event_time 
FROM source_table_dev 
GROUP BY 
  CONCAT(location_code,'#', SUBSTRING(coupon_code, 1, CHAR_LENGTH(coupon_code) - 4)), 
  HOP(create_time, INTERVAL '5' MINUTE, INTERVAL '10' MINUTES)

In [8]:
%flink.ssql

-- (Query 2) proto-coupon-location-invalid-count :: invalid coupon reauest count in 5 minutes by location

-- INSERT INTO sink_table_loc_invalid_cnt_dev 
SELECT 
  'proto-coupon-location-invalid-count' AS feature_group_name, 
  location_code AS location_code, 
  COUNT(*) AS msg_count, 
  MAX(create_time) AS event_time 
FROM source_table_dev 
WHERE response = 'INVALID' 
GROUP BY 
  location_code, 
  TUMBLE(create_time, INTERVAL '5' MINUTE)