### Configure Kafka source connector

In [None]:
# Kafka connector is not part of the binary distribution, so we need to download and link it for cluster execution explicitly
!wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

Example record from upstream Kafka source:
```json
{
    "createTime": "2023-09-20 22:19:02.224", 
    "orderId": 1695248388, 
    "payAmount": 88694.71922270155, 
    "payPlatform": 0, 
    "provinceId": 6,
}
```

In [None]:
import ibis
import ibis.expr.schema as sch
import ibis.expr.datatypes as dt
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.common import Configuration

source_schema = sch.Schema(
    {
        "createTime": dt.timestamp(scale=3),
        "orderId": dt.int64,
        "payAmount": dt.float64,
        "payPlatform": dt.int32,
        "provinceId": dt.int32,
    }
)

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_config = table_env.get_config()
config = Configuration()
config.set_string("parallelism.default", "1")
table_config.add_configuration(config)

In [None]:
connection = ibis.flink.connect(table_env)

# add the JAR downloaded above
connection._exec_sql("ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'")

source_configs = {
    "connector": "kafka",
    "topic": "payment_msg",
    "properties.bootstrap.servers": "localhost:9092",
    "properties.group.id": "test_3",
    "scan.startup.mode": "earliest-offset",
    "format": "json",
}

t = connection.create_table(
    "payment_msg",
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="createTime", allowed_delay=ibis.interval(seconds=15)
    ),
)

### Configure Kafka sink connector

In [None]:
sink_schema = sch.Schema(
    {
        "province": dt.string,
        "pay_amount": dt.float64,
    }
)

kafka_sink_configs = {
    "connector": "kafka",
    "topic": "sink",
    "properties.bootstrap.servers": "localhost:9092",
    "format": "json",
}

connection.create_table(
    "kafka_sink", schema=sink_schema, tbl_properties=kafka_sink_configs
)

### Define reference table

In [None]:
import pandas as pd

provinces = (
    "Beijing",
    "Shanghai",
    "Hangzhou",
    "Shenzhen",
    "Jiangxi",
    "Chongqing",
    "Xizang",
)
province_id_to_name_df = pd.DataFrame(
    enumerate(provinces), columns=["provinceId", "province"]
)
province_id_to_name_df

### Construct Ibis expression

In [None]:
agged = t[
    "provinceId",
    t.payAmount.sum()
    .over(range=(-ibis.interval(seconds=10), 0), order_by=t.createTime)
    .name("pay_amount"),
]
joined = agged.join(province_id_to_name_df, predicates="provinceId")[
    "province", "pay_amount"
]
joined

In [None]:
sql = connection.compile(joined)
print(sql)

### Stream results into Kafka sink topic

In [None]:
connection.insert("kafka_sink", joined)

In [None]:
!pip install kafka-python

In [None]:
from kafka import KafkaConsumer

consumer = KafkaConsumer("sink")
for _, msg in zip(range(10), consumer):
    print(msg)

### Now, do batch!

In [None]:
from kafka import KafkaConsumer

consumer = KafkaConsumer("payment_msg")
rows = []
for _, msg in zip(range(100), consumer):
    rows.append(msg)

In [None]:
import json

import pandas as pd

df = pd.DataFrame([json.loads(row.value) for row in rows])
df["createTime"] = pd.to_datetime(df["createTime"])
df

In [None]:
import ibis

con = ibis.pandas.connect()
t = con.create_table("payments", df)
t

In [None]:
agged = t[
    "provinceId",
    t.payAmount.sum()
    .over(range=(-ibis.interval(seconds=10), 0), order_by=t.createTime)
    .name("pay_amount"),
]
joined = agged.join(province_id_to_name_df, predicates="provinceId")[
    "province", "pay_amount"
]
joined

In [None]:
joined.to_pandas()