In [27]:
import polars as pl
import re
import os

In [13]:
# df = pl.read_parquet("../logs/quick_test/kafka/1p2c1t64_0b-30s_events.parquet")
df = pl.read_parquet("../logs/quick_test/zeromq_p2p/1p2c1t64_0b-30s_events.parquet")
print(df.schema)
print(df.head())

Schema([('container_name', String), ('timestamp', Datetime(time_unit='us', time_zone=None)), ('event_type', String), ('message_id', String), ('logical_size', Int64), ('topic', String), ('serialized_size', Int64)])
shape: (5, 7)
┌────────────────┬─────────────────────────┬──────────────┬────────────┬──────────────┬───────┬─────────────────┐
│ container_name ┆ timestamp               ┆ event_type   ┆ message_id ┆ logical_size ┆ topic ┆ serialized_size │
│ ---            ┆ ---                     ┆ ---          ┆ ---        ┆ ---          ┆ ---   ┆ ---             │
│ str            ┆ datetime[μs]            ┆ str          ┆ str        ┆ i64          ┆ str   ┆ i64             │
╞════════════════╪═════════════════════════╪══════════════╪════════════╪══════════════╪═══════╪═════════════════╡
│ zeromq_p2p-C1  ┆ 2025-04-24 13:55:42.772 ┆ Initializing ┆ null       ┆ null         ┆ null  ┆ null            │
│ zeromq_p2p-C1  ┆ 2025-04-24 13:55:42.872 ┆ Initialized  ┆ null       ┆ null         ┆ 

In [15]:
df = pl.read_parquet("../logs/quick_test/kafka/1p2c1t64_0b-30s_events.parquet")
df = pl.concat([df, pl.read_parquet("../logs/quick_test/zeromq_p2p/1p2c1t64_0b-30s_events.parquet")], how="vertical")
print(df.schema)
print(df.head())

Schema([('container_name', String), ('timestamp', Datetime(time_unit='us', time_zone=None)), ('event_type', String), ('message_id', String), ('logical_size', Int64), ('topic', String), ('serialized_size', Int64)])
shape: (5, 7)
┌────────────────┬─────────────────────────┬──────────────┬────────────┬──────────────┬───────┬─────────────────┐
│ container_name ┆ timestamp               ┆ event_type   ┆ message_id ┆ logical_size ┆ topic ┆ serialized_size │
│ ---            ┆ ---                     ┆ ---          ┆ ---        ┆ ---          ┆ ---   ┆ ---             │
│ str            ┆ datetime[μs]            ┆ str          ┆ str        ┆ i64          ┆ str   ┆ i64             │
╞════════════════╪═════════════════════════╪══════════════╪════════════╪══════════════╪═══════╪═════════════════╡
│ kafka-C1       ┆ 2025-04-24 13:54:10.053 ┆ Initializing ┆ null       ┆ null         ┆ null  ┆ null            │
│ kafka-C1       ┆ 2025-04-24 13:54:10.054 ┆ Initialized  ┆ null       ┆ null         ┆ 

In [18]:
df.filter(
    pl.col("event_type") == "Publication"
).group_by(
    pl.col("container_name")
).agg(
    pl.len().alias("num_logs"),
    pl.col("logical_size").sum(),
    pl.col("serialized_size").sum()
)

container_name,num_logs,logical_size,serialized_size
str,u32,i64,i64
"""kafka-P0""",884431,74181032,74181032
"""zeromq_p2p-P0""",1544205,98829057,0


In [64]:
def load_and_annotate_events(path: str):
    dfs = []
    pattern = re.compile(r'(?P<p>\d+)p(?P<c>\d+)c(?P<t>\d+)t(?P<ms>[\d_]+)b\-(?P<s>\d+)s_events\.parquet')

    for fname in os.listdir(path):
        match = pattern.match(fname)
        if match:
            file_path = os.path.join(path, fname)
            df = pl.read_parquet(file_path)

            # Extract metadata from filename
            metadata = match.groupdict()
            df = df.with_columns([
                pl.lit(int(metadata['p'])).alias("num_producers"),
                pl.lit(int(metadata['c'])).alias("num_consumers"),
                pl.lit(int(metadata['t'])).alias("num_topics"),
                pl.lit(int(ms.split("_")[0])).alias("message_size"),
                pl.lit(int(metadata['s'])).alias("duration_seconds")
            ])

            dfs.append(df)

    if not dfs:
        raise FileNotFoundError("No matching *_events.parquet files found in the specified directory.")

    return pl.concat(dfs, how="vertical")

In [115]:
dfs_zmq = load_and_annotate_events("../logs/quick_test/zeromq_p2p/")
dfs_kafka = load_and_annotate_events("../logs/quick_test/kafka/")
dfs_total = pl.concat([dfs_zmq, dfs_kafka], how="vertical")
dfs_total = dfs_total.filter(
    pl.col("event_type").is_in(["Intention", "Publication"])
).select(
    ["container_name","message_id", "event_type", "timestamp"]
).pivot(
    values="timestamp",
    index=["container_name","message_id"],
    on="event_type",
    aggregate_function="first"
).with_columns(
    Delay=(pl.col("Publication")-pl.col("Intention")).dt.total_milliseconds()
)

In [116]:
dfs_total.filter(
    pl.col("container_name") == "kafka-P0"
).select(
    pl.col("Delay").alias("delay_kafka")
).describe()

statistic,delay_kafka
str,f64
"""count""",906077.0
"""null_count""",0.0
"""mean""",4.58258
"""std""",26.014411
"""min""",0.0
"""25%""",2.0
"""50%""",3.0
"""75%""",4.0
"""max""",1026.0


In [117]:
pl.concat([
    dfs_total.filter(
        pl.col("container_name") == "kafka-P0"
    ).select(
        pl.col("Delay").alias("delay_kafka")
    ),
    dfs_total.filter(
        pl.col("container_name") == "zeromq_p2p-P0"
    ).select(
        pl.col("Delay").alias("delay_zeromq_p2p")
    )]
, how="horizontal").describe()

statistic,delay_kafka,delay_zeromq_p2p
str,f64,f64
"""count""",906077.0,1560228.0
"""null_count""",654151.0,0.0
"""mean""",4.58258,0.019194
"""std""",26.014411,0.137206
"""min""",0.0,0.0
"""25%""",2.0,0.0
"""50%""",3.0,0.0
"""75%""",4.0,0.0
"""max""",1026.0,1.0


In [95]:
dfs_total.filter(
    pl.col("container_name") == "zeromq_p2p-P0"
).select("Delay").describe()

statistic,Delay
str,str
"""count""","""1560228"""
"""null_count""","""0"""
"""mean""","""0:00:00.000019"""
"""std""",
"""min""","""0:00:00"""
"""25%""","""0:00:00"""
"""50%""","""0:00:00"""
"""75%""","""0:00:00"""
"""max""","""0:00:00.001000"""


In [70]:
dfs_total.filter(
    pl.col("event_type") == "Publication"
).group_by(
    pl.col("container_name")
).agg(
    pl.len().alias("num_logs"),
    pl.col("logical_size").sum(),
    pl.col("serialized_size").sum()
)

container_name,num_logs,logical_size,serialized_size
str,u32,i64,i64
"""zeromq_p2p-P0""",4491289,1567016518,0
"""kafka-P0""",2984703,1370699220,1370699220
