In [1]:
from dotenv import load_dotenv
from pyflink.table.confluent import ConfluentSettings, ConfluentTools, ConfluentTableDescriptor
from pyflink.table import TableEnvironment, DataTypes, Schema
from pyflink.table.expressions import *
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Tumble, Slide, Session
from pyflink.table.udf import udf

In [2]:
from dotenv import load_dotenv
load_dotenv("./.env")

True

In [3]:
settings = ConfluentSettings.from_global_variables()
settings = settings.in_streaming_mode()
env = TableEnvironment.create(settings)

In [5]:
env.list_catalogs() # list Kafka environments

['default', 'examples', 'msds682']

In [8]:
env.use_catalog("msds682")

In [9]:
env.list_databases() # list Kafka clusters

['INFORMATION_SCHEMA', 'msds682']

In [10]:
env.use_database("msds682")

In [11]:
env.list_tables() # list Kafka topics

['system.usage', 'system.usage$errors']

In [12]:
table = env.from_path("`system.usage`")

In [13]:
ConfluentTools.print_changelog_limit(table, 10)

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| op |                            key |                             id |                      cpu_usage |                      cpu_stats |                   memory_usage |                      timestamp |                $rowtime |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-------------------------+
| +I |                          x'31' |                              1 |                           26.8 | [19126.0, 220961.0, 1.76305... |                           73.3 |                   1.76361741E9 | 2025-11-19 21:43:28.973 |
| +I |                          x'31' |                              1 |    

In [14]:
result = table.where(col("id")=='2').select(col("id"),col("memory_usage"), col("$rowtime").alias("rowtime"))
ConfluentTools.print_materialized_limit(result, 10)

+----+--------------+-------------------------+
| id | memory_usage |                 rowtime |
+----+--------------+-------------------------+
|  2 |         75.4 | 2025-11-19 21:48:33.730 |
|  2 |         74.5 | 2025-11-19 21:48:43.252 |
|  2 |         74.5 | 2025-11-19 21:48:43.886 |
|  2 |         72.7 | 2025-11-19 21:53:36.720 |
|  2 |         72.7 | 2025-11-19 21:53:36.887 |
|  2 |         72.7 | 2025-11-19 21:53:36.929 |
|  2 |         74.2 | 2025-11-19 21:59:54.190 |
|  2 |         74.1 | 2025-11-19 21:59:54.587 |
|  2 |         74.9 | 2025-11-19 22:00:00.817 |
|  2 |         74.9 | 2025-11-19 22:00:01.396 |
+----+--------------+-------------------------+
10 rows in set


In [15]:
ConfluentTools.print_changelog_limit(result, 10)

+----+--------------------------------+--------------------------------+-------------------------+
| op |                             id |                   memory_usage |                 rowtime |
+----+--------------------------------+--------------------------------+-------------------------+
| +I |                              2 |                           75.4 | 2025-11-19 21:48:33.730 |
| +I |                              2 |                           74.5 | 2025-11-19 21:48:43.252 |
| +I |                              2 |                           74.5 | 2025-11-19 21:48:43.886 |
| +I |                              2 |                           72.7 | 2025-11-19 21:53:36.720 |
| +I |                              2 |                           72.7 | 2025-11-19 21:53:36.887 |
| +I |                              2 |                           72.7 | 2025-11-19 21:53:36.929 |
| +I |                              2 |                           74.2 | 2025-11-19 21:59:54.190 |
| +I |    

In [None]:
aggregated = (
    table
    .window(Tumble.over(lit(1).minutes).on(col("rowtime")).alias("w"))
    .group_by(col("id"), col("w"))
    .select(
        col("id"),
        col("w").start.alias("window_start"),
        col("w").end.alias("window_end"),
        min(col("memory_usage")).alias("min_memory_usage")
    )
)

In [None]:
result = aggregated.execute()
result.print()