In [None]:
from confluent_kafka import Consumer

conf = {"bootstrap.servers": "localhost: 9092", "group.id": "mygroup", "auto.offset.reset": "earliest"}
consumer = Consumer(conf)
consumer.subscribe(["payment_msg"])
rows = []
for msg in consumer.consume(100):
    rows.append(msg.value().decode("utf-8"))

## Local execution on DuckDB

In [None]:
import json

import pandas as pd

df = pd.DataFrame([json.loads(row) for row in rows])
df["createTime"] = pd.to_datetime(df["createTime"])
df

In [None]:
import ibis

con = ibis.get_backend()
t = con.create_table("payments", df)
t

In [None]:
t.schema()

### Example transformations: filter, transform, aggregations

In [None]:
from ibis import _

t.filter(_.payAmount > 10000)

In [None]:
t.filter(_.payAmount > 10000).execute()

### Transformation chaining

In [None]:
provinces = (
    "Beijing",
    "Shanghai",
    "Hangzhou",
    "Shenzhen",
    "Jiangxi",
    "Chongqing",
    "Xizang",
)
province_id_to_name_df = pd.DataFrame(
    enumerate(provinces), columns=["provinceId", "province"]
)
province_id_to_name_df

In [None]:
join_filter = t.join(province_id_to_name_df, ["provinceId"]).filter(
    _.payAmount > 10000
)
join_filter.execute()

In [None]:
join_agg = t.join(province_id_to_name_df, ["provinceId"]).filter(
    _.payAmount > 10000
).group_by(["payPlatform", "provinceId"]).aggregate(
    totalPayAmount=_.payAmount.sum()
).order_by(
    _.totalPayAmount.desc()
)

In [None]:
join_agg.visualize()

In [None]:
print(join_agg.compile())

In [None]:
join_agg.execute()

### Intersperse SQL with Ibis methods

In [None]:
t.sql("SELECT * FROM payments WHERE payAmount > 10000").join(province_id_to_name_df, ["provinceId"]).execute()

### Window aggregation

In [None]:
window_agg = t[
    "provinceId",
    _.payAmount.sum()
    .over(range=(-ibis.interval(seconds=10), 0), order_by=_.createTime)
    .name("pay_amount"),
]
window_agg

In [None]:
window_agg.execute()

## Execute on Spark

In [None]:
from pyspark.sql import SparkSession

session = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
    .config("spark.sql.streaming.schemaInference", True)\
    .config("spark.ui.port","4050")\
    .getOrCreate()

In [None]:
schema = ibis.schema(
    {
        "createTime": "timestamp(3)",
        "orderId": "int64",
        "payAmount": "float64",
        "payPlatform": "int32",
        "provinceId": "int32",
    }
)

con = ibis.pyspark.connect(session, mode="streaming")

### Connect to a streaming source

In [None]:
con.read_kafka(
    "payments", 
    watermark=ibis.watermark(time_col="createTime", allowed_delay=ibis.interval(seconds=10)), 
    schema=schema,
    auto_parse=True,
    options={"kafka.bootstrap.servers": "localhost:9092", "subscribe": "payment_msg", "startingOffsets": "earliest"}
)

In [None]:
t = con.table("payments")
join_filter = t.join(province_id_to_name_df, ["provinceId"]).filter(
    _.payAmount > 10000
)

### Write results to sink

In [None]:
path = "output"
con.to_csv_dir(
    join_filter,
    path=path,
    options={"checkpointLocation": "checkpoint", "header": True},
)

In [None]:
from pathlib import Path

path = Path("output")
output = pd.concat([pd.read_csv(f) for f in path.glob("*.csv")])
output

In [None]:
con.to_kafka(
    join_filter, 
    auto_format=True, 
    options={"checkpointLocation": "checkpoint", "kafka.bootstrap.servers": "localhost:9092", "topic": "sink"},
)