In [1]:
from datetime import datetime
import polars as pl
import numpy as np # strictly for random number generation

## Sample DataFrame Creation

We will first generate a data set with two timestamps, `start` and `end`, representing a time interval.

In [2]:
df = pl.DataFrame(
    {
        "start": pl.date_range(
            start = datetime(2021, 12, 16, 0, 0, 2),
            end = datetime(2021, 12, 16, 23),
            interval = "1s",
            eager = True
        )
    }
)

df = (
    df.lazy()
    .with_columns([
        pl.lit(np.random.rand(df.height) * 100).round().cast(pl.Int64).alias("delta_start_end"),
        pl.lit(np.random.rand(df.height) * 1e8).round().cast(pl.Int64).alias("FileSize_Bytes")
    ])
    .with_columns([
        (pl.col("start") + pl.duration(seconds="delta_start_end")).alias("end")
    ])
    .collect()
)

df.head()

start,delta_start_end,FileSize_Bytes,end
datetime[μs],i64,i64,datetime[μs]
2021-12-16 00:00:02,2,82316359,2021-12-16 00:00:04
2021-12-16 00:00:03,82,41543953,2021-12-16 00:01:25
2021-12-16 00:00:04,27,37736275,2021-12-16 00:00:31
2021-12-16 00:00:05,37,94180945,2021-12-16 00:00:42
2021-12-16 00:00:06,31,93165930,2021-12-16 00:00:37


## Calculate Duration and Average Mbps

Yes, I know we do not need to calculate duration because we generated random values for duration already, but this is for demonstrative purposes.

In [3]:
df = (
    df.lazy()
    .with_columns([
        (
            (pl.col("end") - pl.col("start")).dt.seconds()
        ).alias("duration_seconds")
    ])
    .with_columns([
        (
            pl.when(pl.col("duration_seconds") == pl.lit(0))
            .then(pl.lit(0))
            .otherwise(
                (
                    (
                        pl.col("FileSize_Bytes") * 8 # Convert to Bits
                    ) / 1e6 # Convert to Megabits (Mb)
                ) / pl.col("duration_seconds")
            )
        ).alias("Mbps_avg")
    ])
    .collect()
)

df.head()

start,delta_start_end,FileSize_Bytes,end,duration_seconds,Mbps_avg
datetime[μs],i64,i64,datetime[μs],i64,f64
2021-12-16 00:00:02,2,82316359,2021-12-16 00:00:04,2,329.265436
2021-12-16 00:00:03,82,41543953,2021-12-16 00:01:25,82,4.053069
2021-12-16 00:00:04,27,37736275,2021-12-16 00:00:31,27,11.181119
2021-12-16 00:00:05,37,94180945,2021-12-16 00:00:42,37,20.363448
2021-12-16 00:00:06,31,93165930,2021-12-16 00:00:37,31,24.042821


## Calculating Overlapping Time Intervals

### Generate values between each time interval

First we want to identify second "bins" each time interval belongs to.

In [4]:
(
    df.lazy()
    .with_row_count() # index rows to keep unique row integrity between explosion and grouping
    .with_columns([
        # Generate values in between [start, end] timestamps, by 1s
            pl.date_range(
                start=pl.col("start"),
                end=pl.col("end"),
                interval="1s",
                eager=False
            ).alias("duration_dt")
    ])
    .head()
    .collect()
)

row_nr,start,delta_start_end,FileSize_Bytes,end,duration_seconds,Mbps_avg,duration_dt
u32,datetime[μs],i64,i64,datetime[μs],i64,f64,list[datetime[μs]]
0,2021-12-16 00:00:02,2,82316359,2021-12-16 00:00:04,2,329.265436,"[2021-12-16 00:00:02, 2021-12-16 00:00:03, 2021-12-16 00:00:04]"
1,2021-12-16 00:00:03,82,41543953,2021-12-16 00:01:25,82,4.053069,"[2021-12-16 00:00:03, 2021-12-16 00:00:04, … 2021-12-16 00:01:25]"
2,2021-12-16 00:00:04,27,37736275,2021-12-16 00:00:31,27,11.181119,"[2021-12-16 00:00:04, 2021-12-16 00:00:05, … 2021-12-16 00:00:31]"
3,2021-12-16 00:00:05,37,94180945,2021-12-16 00:00:42,37,20.363448,"[2021-12-16 00:00:05, 2021-12-16 00:00:06, … 2021-12-16 00:00:42]"
4,2021-12-16 00:00:06,31,93165930,2021-12-16 00:00:37,31,24.042821,"[2021-12-16 00:00:06, 2021-12-16 00:00:07, … 2021-12-16 00:00:37]"


### Explode Time Bins for Metrics Generation

You are now seeing *1 to many* records of each `row_nr`, per the number of overlaps with the time bin, `duration_dt`

In [5]:
(
    df.lazy()
    .with_row_count() # index rows to keep unique row integrity between explosion and grouping
    .with_columns([
        # Generate values in between [start, end] timestamps, by 1s
            pl.date_range(
                start=pl.col("start"),
                end=pl.col("end"),
                interval="1s",
                eager=False
            ).alias("duration_dt")
    ])
    .explode(["duration_dt"]) # Explode the list of date (time) range values into individual records
    .sort("duration_dt")
    .head()
    .collect()
)

row_nr,start,delta_start_end,FileSize_Bytes,end,duration_seconds,Mbps_avg,duration_dt
u32,datetime[μs],i64,i64,datetime[μs],i64,f64,datetime[μs]
0,2021-12-16 00:00:02,2,82316359,2021-12-16 00:00:04,2,329.265436,2021-12-16 00:00:02
0,2021-12-16 00:00:02,2,82316359,2021-12-16 00:00:04,2,329.265436,2021-12-16 00:00:03
1,2021-12-16 00:00:03,82,41543953,2021-12-16 00:01:25,82,4.053069,2021-12-16 00:00:03
2,2021-12-16 00:00:04,27,37736275,2021-12-16 00:00:31,27,11.181119,2021-12-16 00:00:04
0,2021-12-16 00:00:02,2,82316359,2021-12-16 00:00:04,2,329.265436,2021-12-16 00:00:04


## Metrics Generation

Now we can either:
 1. Calculate the total # of **concurrent transactions** and ~ cumulative (average) **bandwidth**
 1. Calculate the # of ***other*** transactions occuring during a transaction interval

 We will start with the first.

 ### *Total* Concurrent Transactions & Cumulative Bandwidth

In [6]:
def calc_total_concurrent(
        df: pl.DataFrame, start_timestamp_col, end_timestamp_col, 
        avg_Mbps_col, delta_duration_col, bin_size = "1s", lazy_return = False
):
    df_lazy_temp = (
        df.lazy()
        .with_row_count() # index rows to keep unique row integrity between explosion and grouping
        .with_columns([
            # Generate values in between [start, end] timestamps, by 1s
                pl.date_range(
                    start=pl.col(start_timestamp_col),
                    end=pl.col(end_timestamp_col),
                    interval=bin_size,
                    eager=False
                ).alias("duration_dt")
        ])
        .explode(["duration_dt"]) # Explode the list of date (time) range values into individual records
        .sort("duration_dt")
        .groupby_dynamic(
            # For every second, starting from the first value of duration_dt, group by 1 sec bins (duration)
            "duration_dt", every = "1s", period = "1s",
            closed = "left", truncate = True, start_by = "datapoint"
        )
        .agg([
            pl.n_unique("row_nr").alias("num_concurrent_transaction"),
            pl.count(), pl.col(avg_Mbps_col).sum().alias("cum_Mbps"),
            pl.col(delta_duration_col).min().cast(pl.Float64).suffix("_min"),
            pl.col(delta_duration_col).mean().cast(pl.Float64).suffix("_mean"),
            pl.col(delta_duration_col).quantile(.5).cast(pl.Float64).suffix("_p50"),
            pl.col(delta_duration_col).quantile(.75).cast(pl.Float64).suffix("_p75"),
            pl.col(delta_duration_col).quantile(.95).cast(pl.Float64).suffix("_p95"),
            pl.col(delta_duration_col).max().cast(pl.Float64).suffix("_max")
        ])
    )
    if lazy_return == True:
        return(df_lazy_temp)
    else:
        return(df_lazy_temp.collect())

df_total_conc = calc_total_concurrent(
    df = df, start_timestamp_col = "start", end_timestamp_col = "end", 
    avg_Mbps_col = "Mbps_avg", bin_size = "1s", delta_duration_col = "duration_seconds"
)

df_total_conc.head()

duration_dt,num_concurrent_transaction,count,cum_Mbps,duration_seconds_min,duration_seconds_mean,duration_seconds_p50,duration_seconds_p75,duration_seconds_p95,duration_seconds_max
datetime[μs],u32,u32,f64,f64,f64,f64,f64,f64,f64
2021-12-16 00:00:02,1,1,329.265436,2.0,2.0,2.0,2.0,2.0,2.0
2021-12-16 00:00:03,2,2,333.318505,2.0,42.0,82.0,82.0,82.0,82.0
2021-12-16 00:00:04,3,3,344.499623,2.0,37.0,27.0,82.0,82.0,82.0
2021-12-16 00:00:05,3,3,35.597635,27.0,48.666667,37.0,82.0,82.0,82.0
2021-12-16 00:00:06,4,4,59.640455,27.0,44.25,37.0,82.0,82.0,82.0


### *Other* Concurrent Transactions

In [7]:
def calc_other_concurrent(
        df: pl.DataFrame, start_timestamp_col, end_timestamp_col, 
        avg_Mbps_col, delta_duration_col, bin_size = "1s"
):
    return(
        df.lazy()
        .with_row_count()
        .with_columns([
            # Generate values in between [start, end] timestamps, by 1s
                pl.date_range(
                    start=pl.col(start_timestamp_col),
                    end=pl.col(end_timestamp_col),
                    interval=bin_size,
                    eager=False
                ).alias("duration_dt")
        ])
        .explode(["duration_dt"]) # Explode the list of date (time) range values into individual records
        .sort("duration_dt")
        .join(
            calc_total_concurrent(
                df = df, start_timestamp_col = start_timestamp_col, end_timestamp_col = end_timestamp_col, 
                delta_duration_col = delta_duration_col,
                avg_Mbps_col = avg_Mbps_col, bin_size = bin_size, lazy_return = True
            ), on = "duration_dt", how = "left"
        )
        .groupby(["row_nr"])
        .agg([
            pl.col(delta_duration_col).first(), pl.col(avg_Mbps_col).first(),
            (pl.col("num_concurrent_transaction").max() - 1).alias("num_other_transactions"),
            pl.col(start_timestamp_col).min(), pl.col(end_timestamp_col).max()
        ])
        .sort(["row_nr"])
        .head()
        .collect()
    )

calc_other_concurrent(
    df = df, start_timestamp_col = "start", end_timestamp_col = "end", 
    delta_duration_col = "duration_seconds",
    avg_Mbps_col = "Mbps_avg", bin_size = "1s"
).head()

row_nr,duration_seconds,Mbps_avg,num_other_transactions,start,end
u32,i64,f64,u32,datetime[μs],datetime[μs]
0,2,329.265436,2,2021-12-16 00:00:02,2021-12-16 00:00:04
1,82,4.053069,51,2021-12-16 00:00:03,2021-12-16 00:01:25
2,27,11.181119,27,2021-12-16 00:00:04,2021-12-16 00:00:31
3,37,20.363448,32,2021-12-16 00:00:05,2021-12-16 00:00:42
4,31,24.042821,28,2021-12-16 00:00:06,2021-12-16 00:00:37
