In [None]:
import polars as pl
import datashader as ds
import datashader.transfer_functions as tf
from datetime import timedelta, datetime
import matplotlib.pyplot as plt
from threading import Lock
import numpy as np
import warnings

pl.Config.set_engine_affinity("streaming")
pl.Config.set_streaming_chunk_size(2000)

polars.config.Config

In [2]:
import os

os.environ["SHELL"] = "/app/bin/host-spawn"
os.environ["PATH"] = (
    "/var/home/caleb/Projects/stream-plotting/.venv/bin:/home/caleb/.local/share/zinit/plugins/starship---starship:/home/caleb/.local/share/zinit/polaris/bin:/home/linuxbrew/.linuxbrew/bin:/home/linuxbrew/.linuxbrew/sbin:/home/caleb/.cargo/bin:/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin"
)
os.environ["POLARS_VISUALIZE_PHYSICAL_PLAN"] = (
    "/home/caleb/Projects/stream-plotting/a.dot"
)

In [3]:
all_lf = []
for i in range(1, 1000):
    lf = pl.LazyFrame()
    lf = lf.with_columns(
        pl.datetime_range(
            datetime(
                2001, 1, (((i - 1) // 60) // 24) + 1, ((i - 1) // 60) % 24, (i - 1) % 60
            ),
            datetime(2001, 1, ((i // 60) // 24) + 1, (i // 60) % 24, i % 60),
            "123ns",
            eager=False,
        )
        .sample(100_000_000)
        .sort()
        .alias("time")
    )
    lf = lf.with_columns(
        (pl.int_range(100000).sample(100_000_000, with_replacement=True) / 10000).alias(
            "value"
        ),
        (pl.int_range(100000).sample(100_000_000, with_replacement=True) / 10000).alias(
            "colour"
        ),
    )
    # lf.sink_parquet(f"test/test{i}.parquet")

In [4]:
# pl.scan_parquet("test/").head().collect()

In [5]:
lf = pl.scan_parquet("test/*")

In [None]:
def plot_with_datashader(
    plots_dict: dict,
    df: pl.DataFrame,
    x: str,
    y: str,
    c: str,
    period_ns: int,
    plot_width=800,
    plot_height=600,
    out_dir=".",
    prefix="plot",
) -> pl.DataFrame:
    """
    Plots a Polars DataFrame using Datashader and returns the same DataFrame.
    Calculates plot index using time and metadata, and uses it in the output filename.

    Additional Parameters:
    - start_time (int): Epoch timestamp in nanoseconds of the full data start.
    - period_ns (int): Period in nanoseconds for splitting.
    - out_dir (str): Output directory for saving plots.
    - prefix (str): Prefix for filenames.
    """
    # Determine plot index from min x (timestamp)
    first_row = df[0]
    group_len = first_row["len"].item()
    plot_idx = first_row["group_idx"].item()
    x_min = first_row[f"{x}_min"].item()
    x_max = first_row[f"{x}_max"].item()
    y_min = first_row[f"{y}_min"].item()
    y_max = first_row[f"{y}_max"].item()
    pdf = df.to_pandas()
    # Create canvas and render
    cvs = ds.Canvas(
        plot_width=plot_width,
        plot_height=plot_height,
        x_range=(x_min, x_max),
        y_range=(y_min, y_max),
    )
    current_agg = cvs.points(pdf, x, y, ds.mean(c))
    lock_key = f"{plot_idx}_lock"
    count_key = f"{plot_idx}_count"

    # Initialize shared memory keys if not present
    with plots_dict.setdefault(f"{plot_idx}_lock", Lock()):
        existing = plots_dict.get(plot_idx)
        if existing is None:
            plots_dict[plot_idx] = current_agg.values.copy()
            plots_dict[count_key] = np.array(df.height)
        else:
            np.nanmean(
                np.dstack((existing, current_agg.values)),
                axis=2,
                out=plots_dict[plot_idx],
            )
            plots_dict[count_key] += np.array(df.height)

    if plots_dict[count_key] == np.array(group_len):
        total = np.nan_to_num(plots_dict[plot_idx], nan=0.0)
        current_agg.values = total
        # Save the image
        fig, ax = plt.subplots()
        fig.set_figheight(plot_height / 100)
        fig.set_figwidth(plot_width / 100)
        ax.imshow(tf.shade(current_agg).to_pil(), aspect="auto")
        ax.axis("off")
        filename = f"{out_dir}/{prefix}_{period_ns}_{plot_idx}.png"
        fig.savefig(filename, bbox_inches="tight", pad_inches=0)
        plt.close(fig)
        del plots_dict[count_key]
        del plots_dict[lock_key]
        del plots_dict[plot_idx]


def plot_lf(
    lf: pl.LazyFrame, period: timedelta, x: str, y: str, c: str, out_dir="plots"
):
    # Convert period to nanoseconds
    # period_ns = pl.duration_string_to_duration(period).cast(pl.Int64).max().item()

    # Add temporary epoch column for plotting
    epoch_col = f"{x}_epoch_tmp"
    lf = lf.set_sorted(x)
    lf = lf.with_columns(pl.col(x).dt.epoch("ns").alias(epoch_col)).with_columns(
        (
            (pl.col(epoch_col) - pl.col(epoch_col).min())
            // (period.total_seconds() * 1e9)
        )
        .alias("group_idx")
        .cast(pl.UInt32)
    )
    period_ns = int(period.total_seconds() * 1e9)
    plots = {}

    def custom_plot_fn(df: pl.DataFrame):
        for tmp_df in df.partition_by("group_idx"):
            plot_with_datashader(
                plots,
                tmp_df,
                x=epoch_col,
                y=y,
                c=c,
                period_ns=period_ns,
                out_dir=out_dir,
                prefix="plot",
            )
        return df

    # Run the dynamic grouping and plotting

    agg = (
        lf.select(epoch_col, "group_idx", y, c)
        .group_by("group_idx")
        .agg(
            pl.col(epoch_col).max().alias(f"{epoch_col}_max"),
            pl.col(epoch_col).min().alias(f"{epoch_col}_min"),
            pl.col(y).max().alias(f"{y}_max"),
            pl.col(y).min().alias(f"{y}_min"),
            pl.len().alias("len"),
        )
    )
    return (
        lf.select(epoch_col, "group_idx", y, c)
        .join(agg, on="group_idx")
        .map_batches(
            custom_plot_fn,
            predicate_pushdown=False,
            projection_pushdown=False,
            slice_pushdown=False,
            streamable=True,
        )
    )

In [8]:
with warnings.catch_warnings():
    warnings.simplefilter("ignore", category=RuntimeWarning)
    a = plot_lf(lf.head(100000000), timedelta(seconds=1), "time", "value", "colour")
    a.select(pl.col("value").first()).sink_parquet("out.parquet", engine="streaming")

In [12]:
d

node,start,end
str,u64,u64
"""optimization""",0,365204
"""with_column(time)""",365204,366597
"""with_column(time_epoch_tmp)""",366604,367791
"""with_column(group_idx)""",367800,1022416
"""simple-projection(time_epoch_t…",1022419,1022451
…,…,…
"""group_by_partitioned(group_idx…",1022487,1330473
"""join(group_idx)""",1330495,2201769
"""OPAQUE_PYTHON""",2202619,12182947
"""simple-projection(value)""",12182948,12182955


In [None]:
lf = lf.head(100000000)
a = (
    plot_lf(lf, timedelta(seconds=30), "time", "value", "colour")
    .select(pl.col("value").first())
    .sink_parquet("out.tmp", engine="streaming", lazy=True)
)
b = (
    plot_lf(lf, timedelta(seconds=10), "time", "value", "colour")
    .select(pl.col("value").first())
    .sink_parquet("out.tmp", engine="streaming", lazy=True)
)
c = (
    plot_lf(lf, timedelta(seconds=2), "time", "value", "colour")
    .select(pl.col("value").first())
    .sink_parquet("out.tmp", engine="streaming", lazy=True)
)
d = (
    plot_lf(lf, timedelta(milliseconds=1000), "time", "value", "colour")
    .select(pl.col("value").first())
    .sink_parquet("out.tmp", engine="streaming", lazy=True)
)
pl.collect_all([a, b, c, d], engine="streaming")

In [None]:
print(pl.explain_all([a, b, c, d]))