## Configure Kafka source connector

In [None]:
# Kafka connector is not part of the binary distribution, so we need to download and link it for cluster execution explicitly
!wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

Example record from upstream Kafka source:
```json
{
    "createTime": "2023-09-20 22:19:02", 
    "orderId": 1695248388, 
    "payAmount": 88694.71922270155, 
    "payPlatform": 0, 
    "provinceId": 6,
}
```

In [None]:
import ibis
import ibis.expr.schema as sch
import ibis.expr.datatypes as dt
from pyflink.table import EnvironmentSettings, TableEnvironment

source_schema = sch.Schema(
    {
        "createTime": dt.timestamp(scale=3),
        "orderId": dt.int64,
        "payAmount": dt.float64,
        "payPlatform": dt.int32,
        "provinceId": dt.int32
    }
)

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
connection = ibis.flink.connect(table_env)

# add the JAR downloaded above
connection._exec_sql("ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'")

source_configs = {
    "connector": "kafka",
    "topic": "payment_msg",
    "properties.bootstrap.servers": "localhost:9092",
    "properties.group.id": "test_3",
    "scan.startup.mode": "earliest-offset",
    "format": "json",
}

connection.create_table(
    "payment_msg",
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
            time_col="createTime", allowed_delay=ibis.interval(seconds=15)
        ),
)

## Configure sink connectors

In [None]:
sink_schema = sch.Schema(
    {
        "createTime": dt.timestamp(scale=3),
        "orderId": dt.int64,
        "payAmount": dt.float64,
        "payPlatform": dt.int32,
        "provinceId": dt.int32
    }
)

print_sink_configs = {
    "connector": "print"
}

connection.create_table(
    "print_sink",
    schema=sink_schema,
    tbl_properties=print_sink_configs
)

In [None]:
kafka_sink_configs = {
    "connector": "kafka",
    "topic": "sink",
    "properties.bootstrap.servers": "localhost:9092",
    "format": "json",
}

connection.create_table(
    "kafka_sink",
    schema=sink_schema,
    tbl_properties=kafka_sink_configs
)

In [None]:
connection._exec_sql(
    "INSERT INTO kafka_sink SELECT * FROM payment_msg"
)

In [None]:
connection._exec_sql(
    "INSERT INTO print_sink SELECT * FROM payment_msg LIMIT 10"
).wait()