### CESNET-TimeSeries24 → minimal throughput datasets
Creates per-entity CSVs with only:
- time
- throughput_bytes (equal to n_bytes in the interval)
# 
Optional: bytes_per_second
#
Works for:
- institutions/
- institution_subnets/
- ip_addresses_sample/
- ip_addresses_full/ (nested folder structure handled)
#
Output mirrors the input structure by default (can be changed).

In [27]:
# %% Imports & Config
import os
import sys
import glob
from pathlib import Path
from typing import Dict, Iterable, Tuple

import pandas as pd

DATA_ROOT = Path("../CESNET-TimeSeries24") 
OUTPUT_ROOT = Path("cesnet-institutions-throughput") 

SCOPES = [
    "institutions",
    # "institution_subnets",
    # "ip_addresses_sample",
    # "ip_addresses_full", 
]

AGG_LEVELS = {
    "agg_10_minutes": 600,    # seconds in window
    "agg_1_hour": 3600,
    "agg_1_day": 86400,
}

CSV_COMPRESSION = "infer"       # e.g. "gzip" to compress (adds .gz)

pd.options.mode.copy_on_write = True


In [None]:
# %% Helpers to load the official time maps once per aggregation
def load_time_map(data_root: Path) -> Dict[str, pd.DataFrame]:
    """
    Load times_* CSVs into {agg_level: DataFrame[id_time, time]} dict.
    Ensures 'time' is pandas datetime.
    """
    times_dir = data_root / "times"
    time_map = {}

    fname_by_agg = {
        "agg_10_minutes": "times_10_minutes.csv",
        "agg_1_hour": "times_1_hour.csv",
        "agg_1_day": "times_1_day.csv",
    }

    for agg, fname in fname_by_agg.items():
        fp = times_dir / fname
        if not fp.exists():
            raise FileNotFoundError(f"Missing time file: {fp}")
        df = pd.read_csv(fp)
        # Expect columns: id_time, time (string)
        if "id_time" not in df or "time" not in df:
            raise ValueError(f"{fp} must contain 'id_time' and 'time' columns.")
        df = df[["id_time", "time"]].copy()
        df["time"] = pd.to_datetime(df["time"], utc=False)  # keep tz-aware; drop .tz_convert if needed
        # df["time"] = df["time"].dt.tz_localize(None)
        time_map[agg] = df
    return time_map

time_map = load_time_map(DATA_ROOT)
time_map.keys()


  df["time"] = pd.to_datetime(df["time"], utc=False)  # keep tz-aware; drop .tz_convert if needed


dict_keys(['agg_10_minutes', 'agg_1_hour', 'agg_1_day'])

In [29]:
# %% Utility: discover all per-entity CSVs for a given scope+agg level
def iter_entity_files(scope: str, agg_level: str) -> Iterable[Tuple[Path, Path]]:
    """
    Yield (input_csv_path, relative_output_path) for the given scope and agg_level.

    For 'ip_addresses_full', input files live in nested folders:
        ip_addresses_full/agg_10_minutes/<folder>/<id_ip>.csv
    For other scopes:
        <scope>/<agg_level>/<id>.csv
    """
    scope_dir = DATA_ROOT / scope / agg_level
    if not scope_dir.exists():
        return  # empty iterator

    if scope == "ip_addresses_full":
        # nested structure two levels deep: */*.csv
        pattern = str(scope_dir / "*" / "*.csv")
        for inp in glob.glob(pattern):
            inp = Path(inp)
            # preserve nested structure under output
            rel = inp.relative_to(DATA_ROOT)
            yield inp, rel
    else:
        # flat csvs directly in agg folder
        pattern = str(scope_dir / "*.csv")
        for inp in glob.glob(pattern):
            inp = Path(inp)
            rel = inp.relative_to(DATA_ROOT)
            yield inp, rel


In [30]:
# %% Core processor
def process_one_file(inp_csv: Path, rel_out_path: Path, agg_level: str, out_root: Path):
    """
    Read one CESNET CSV, join time, keep only [time, throughput_bytes, (bytes_per_second?)].
    Write to OUTPUT_ROOT mirrored path.
    """
    # Read minimal columns
    # Schemas differ slightly between native (10m) and re-aggregated (1h/1d),
    # but they always have: id_time, n_bytes
    usecols = ["id_time", "n_bytes"]
    df = pd.read_csv(inp_csv, usecols=usecols)

    # Join with time map
    tm = time_map[agg_level]
    out = df.merge(tm, on="id_time", how="left", validate="many_to_one")

    # Sanity checks
    if out["time"].isna().any():
        missing = out["time"].isna().sum()
        raise ValueError(f"{inp_csv}: {missing} rows had id_time not found in {agg_level} time map.")

    seconds = AGG_LEVELS[agg_level]
    
    # Build throughput (bps) dataset
    out_df = pd.DataFrame({
        "time": pd.to_datetime(out["time"], utc=False),
        "throughput_bps": out["n_bytes"].astype("int64") / float(seconds)
    })        

    # Prepare output path (mirror tree, but under OUTPUT_ROOT and rename file)
    # Change file name to emphasize this is a minimal throughput dataset
    # e.g., replace "x.csv" with "x.throughput.csv"
    os.makedirs(out_root, exist_ok=True)
    out_path = out_root / rel_out_path
    out_path = out_path.with_name(out_path.stem + ".throughput.csv")

    out_path.parent.mkdir(parents=True, exist_ok=True)
    out_df.to_csv(out_path, index=False, compression=CSV_COMPRESSION)

    return out_df.shape[0], out_path


In [31]:
# %% Driver: run for all SCOPES and AGG_LEVELS
from time import perf_counter

def run_all(
    scopes=SCOPES,
    agg_levels=AGG_LEVELS.keys(),
    out_root: Path = OUTPUT_ROOT,
):
    totals = []
    t0 = perf_counter()
    for scope in scopes:
        for agg in agg_levels:
            scope_dir = DATA_ROOT / scope / agg
            if not scope_dir.exists():
                print(f"[skip] {scope}/{agg}: not found.")
                continue

            print(f"[start] {scope}/{agg}")
            n_files = 0
            n_rows = 0
            for inp_csv, rel in iter_entity_files(scope, agg):
                try:
                    rows, outp = process_one_file(
                        inp_csv=inp_csv,
                        rel_out_path=rel,
                        agg_level=agg,
                        out_root=out_root,
                    )
                    n_files += 1
                    n_rows += rows
                except Exception as e:
                    # Avoid crashing on one bad file; report and continue
                    print(f"  [error] {inp_csv}: {e}", file=sys.stderr)
            print(f"[done ] {scope}/{agg} → files: {n_files:,} rows: {n_rows:,}")
            totals.append((scope, agg, n_files, n_rows))

    t1 = perf_counter()
    print(f"\nAll done in {t1 - t0:.1f}s")
    return pd.DataFrame(totals, columns=["scope", "agg_level", "n_files", "n_rows"])

summary = run_all()
summary


  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\0.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\1.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\10.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880


[start] institutions/agg_10_minutes


  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\100.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\101.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\102.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\103.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\104.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] ..\CESNET-TimeSeries24\institutions\agg_10_minutes\105.csv: Tz-aware datetime.datetime cannot be converted to datetime64 unless utc=True, at position 2880
  [error] 

[done ] institutions/agg_10_minutes → files: 5 rows: 0
[start] institutions/agg_1_hour
[done ] institutions/agg_1_hour → files: 283 rows: 1,880,182
[start] institutions/agg_1_day
[done ] institutions/agg_1_day → files: 283 rows: 78,667

All done in 25.2s


Unnamed: 0,scope,agg_level,n_files,n_rows
0,institutions,agg_10_minutes,5,0
1,institutions,agg_1_hour,283,1880182
2,institutions,agg_1_day,283,78667


### What you get
- Output folder structure mirrors input, e.g.:
#
    OUTPUT_ROOT/
      institutions/
          agg_10_minutes/1234.throughput.csv
          agg_1_hour/1234.throughput.csv
          agg_1_day/1234.throughput.csv
      institution_subnets/...
      ip_addresses_sample/...
      ip_addresses_full/<folder>/<id_ip>.throughput.csv
#
Each file has:
  time, throughput_bps
#
- throughput_bps == n_bytes per second for the given aggregation window (10m, 1h, 1d), "bytes per time" as a *rate*
- Timestamps are UTC; drop .tz_convert if you want naive timestamps.


In [32]:
# %% Quick verification (adjust paths to a recent output you have)
examples = list(OUTPUT_ROOT.rglob("5.throughput.csv"))[:3]
for p in examples:
    print(p)
    display(pd.read_csv(p, nrows=5))


cesnet-institutions-throughput\institutions\agg_1_day\5.throughput.csv


Unnamed: 0,time,throughput_bps
0,2023-10-09 00:00:00+00:00,203520.392338
1,2023-10-10 00:00:00+00:00,212837.188287
2,2023-10-11 00:00:00+00:00,186500.295255
3,2023-10-12 00:00:00+00:00,236941.003715
4,2023-10-13 00:00:00+00:00,191074.822303


cesnet-institutions-throughput\institutions\agg_1_hour\5.throughput.csv


Unnamed: 0,time,throughput_bps
0,2023-10-09 00:00:00+00:00,3560.270833
1,2023-10-09 01:00:00+00:00,4951.698056
2,2023-10-09 02:00:00+00:00,37455.998333
3,2023-10-09 03:00:00+00:00,422775.234444
4,2023-10-09 04:00:00+00:00,963996.973611
