In [0]:
%flink

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import java.util.Properties

// replace with your brokers, etc...
val bootstrapServers : String = "boot-jh3g3srn.c3.kafka-serverless.us-east-2.amazonaws.com:9098"
var config = new Properties()
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put("security.protocol", "SASL_SSL")
config.put("sasl.mechanism", "AWS_MSK_IAM")
config.put("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
config.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
var admin = AdminClient.create(config)


In [1]:
%flink

// list topics
var topicListing = admin.listTopics().listings().get()

In [2]:
%flink

import org.apache.kafka.clients.admin.NewTopic

// 3 partitions and replication factor of 1
var newTopic = new NewTopic("MyOrdersTopic", 3, 1.toShort);
admin.createTopics(Collections.singleton(newTopic));

In [3]:
%flink

admin.deleteTopics(Collections.singleton("DatagenJsonTopic2"))

In [4]:
%flink.ssql

DROP TABLE IF EXISTS orders_datagen_source;

CREATE TABLE orders_datagen_source (
    product_id   BIGINT,
    order_number BIGINT,
    quantity     INT,
    price_int    INT,
    price        AS CAST(price_int/100.0 AS DECIMAL(32, 2)),
    buyer        STRING,
    order_time   TIMESTAMP(3)
)
WITH (
    'connector'= 'datagen',
    'fields.product_id.min' = '1',
    'fields.product_id.max' = '99999',
    'fields.quantity.min' = '1',
    'fields.quantity.max' = '25',
    'fields.price_int.min' = '29',
    'fields.price_int.max' = '99999999',
    'fields.order_number.min' = '1',
    'fields.order_number.max' = '9999999999',
    'fields.buyer.length' = '15'
);


In [5]:
%flink.ssql

DROP TABLE IF EXISTS orders_msk;

CREATE TABLE orders_msk (
    product_id   BIGINT,
    order_number BIGINT,
    quantity     INT,
    price        DECIMAL(32, 2),
    buyer        STRING,
    order_time   TIMESTAMP(3)
)
WITH (
    'connector'= 'kafka',
    'topic' = 'MyOrdersTopic',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'boot-jh3g3srn.c3.kafka-serverless.us-east-2.amazonaws.com:9098',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'AWS_MSK_IAM',
    'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
    'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler'
);


In [6]:
%flink.pyflink

s_env.disable_operator_chaining()


In [7]:
%flink.ssql(parallelism=2)

INSERT INTO orders_msk
SELECT 
    product_id,
    order_number,
    quantity,
    price,
    buyer,
    order_time
FROM orders_datagen_source;

In [8]:
%flink.ssql(type=update, parallelism=2)

select * from orders_msk;