In [1]:
from pathlib import Path
from glob import glob

import polars as pl
from tqdm.auto import tqdm

# ---- keep these consistent with 00/01 ----
RAW_DATA_DIR = Path("data_raw")  # TODO: same as in 00/01
INTERMEDIATE_DIR = Path("data_intermediate")
INTERMEDIATE_DIR.mkdir(parents=True, exist_ok=True)

SECONDS_PER_HOUR = 60 * 60
SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR

print("RAW_DATA_DIR     =", RAW_DATA_DIR.resolve())
print("INTERMEDIATE_DIR =", INTERMEDIATE_DIR.resolve())


RAW_DATA_DIR     = C:\Users\Layton\Desktop\cis-520\preprocess\data_raw
INTERMEDIATE_DIR = C:\Users\Layton\Desktop\cis-520\preprocess\data_intermediate


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def sec_to_hour_expr(col: str | pl.Expr) -> pl.Expr:
    col_expr = pl.col(col) if isinstance(col, str) else col
    return ((col_expr % SECONDS_PER_DAY) // SECONDS_PER_HOUR).cast(pl.Int32)


cpu_schema = {
    "timestamp": pl.Int64,
    "vm_id": pl.Utf8,
    "min_cpu": pl.Float64,
    "max_cpu": pl.Float64,
    "avg_cpu": pl.Float64,
}


In [3]:
CPU_DIR = RAW_DATA_DIR / "vm_cpu"

shard_paths = sorted(glob(str(CPU_DIR / "*.csv.gz")))
print(f"Found {len(shard_paths)} cpu shards under {CPU_DIR}")

cpu_part_dir = INTERMEDIATE_DIR / "cpu_parts"
cpu_part_dir.mkdir(parents=True, exist_ok=True)
print("Partial aggregates will go to:", cpu_part_dir.resolve())


Found 125 cpu shards under data_raw\vm_cpu
Partial aggregates will go to: C:\Users\Layton\Desktop\cis-520\preprocess\data_intermediate\cpu_parts


In [8]:
def aggregate_cpu_shard(csv_path: str | Path, out_parquet: str | Path) -> None:
    """
    Aggregate one vm_cpu_readings shard into per-VM partial stats and save as Parquet.
    """

    csv_path = str(csv_path)
    out_parquet = str(out_parquet)

    # Lazy scan for streaming-friendly groupby
    lf = pl.scan_csv(
        csv_path,
        has_header=True,
        schema=cpu_schema,
    )

    lf = lf.with_columns(
        sec_to_hour_expr("timestamp").alias("hour_of_day")
    )

    day_hours = list(range(8, 20))  # 08:00 - 19:59
    night_hours = list(range(0, 8)) + list(range(20, 24))

    agg_exprs = []

    # basic counts/sums
    agg_exprs.extend(
        [
            pl.count().alias("n_readings"),
            pl.col("avg_cpu").sum().alias("sum_avg"),
            (pl.col("avg_cpu") ** 2).sum().alias("sum_avg_sq"),
            pl.col("max_cpu").max().alias("max_cpu"),
            (pl.col("avg_cpu") > 60.0).sum().alias("cnt_gt_60"),
            (pl.col("avg_cpu") > 80.0).sum().alias("cnt_gt_80"),
        ]
    )

    # day/night aggregates
    agg_exprs.extend(
        [
            pl.col("avg_cpu")
            .filter(pl.col("hour_of_day").is_in(day_hours))
            .sum()
            .alias("sum_day"),
            pl.when(pl.col("hour_of_day").is_in(day_hours))
            .then(1)
            .otherwise(0)
            .sum()
            .alias("cnt_day"),
            pl.col("avg_cpu")
            .filter(pl.col("hour_of_day").is_in(night_hours))
            .sum()
            .alias("sum_night"),
            pl.when(pl.col("hour_of_day").is_in(night_hours))
            .then(1)
            .otherwise(0)
            .sum()
            .alias("cnt_night"),
        ]
    )

    # per-hour aggregates (0..23)
    for h in range(24):
        agg_exprs.append(
            pl.col("avg_cpu")
            .filter(pl.col("hour_of_day") == h)
            .sum()
            .alias(f"sum_hour_{h}")
        )
    for h in range(24):
        agg_exprs.append(
            pl.when(pl.col("hour_of_day") == h)
            .then(1)
            .otherwise(0)
            .sum()
            .alias(f"cnt_hour_{h}")
        )

    grouped = lf.group_by("vm_id").agg(agg_exprs)

    # streaming=True is nice if your Polars version supports it
    df = grouped.collect(streaming=True)

    df.write_parquet(out_parquet)

In [9]:
pbar = tqdm(shard_paths, desc="CPU shards → partial per-VM stats")

for i, path in enumerate(pbar):
    out_path = cpu_part_dir / f"cpu_part_{i:03d}.parquet"

    # Allow resuming: skip if already done
    if out_path.exists():
        pbar.set_postfix_str("skip (exists)")
        continue

    pbar.set_postfix_str(out_path.name)
    aggregate_cpu_shard(path, out_path)

print("Pass 1 complete: partial aggregates written to", cpu_part_dir)


(Deprecated in version 0.20.5)
  pl.count().alias("n_readings"),
  df = grouped.collect(streaming=True)
CPU shards → partial per-VM stats: 100%|██████████| 125/125 [15:14<00:00,  7.32s/it, cpu_part_124.parquet]

Pass 1 complete: partial aggregates written to data_intermediate\cpu_parts





In [10]:
part_paths = sorted(glob(str(cpu_part_dir / "cpu_part_*.parquet")))
print(f"Combining {len(part_paths)} partial files...")

# We'll use lazy scan over all partials
lf_parts = pl.scan_parquet(str(cpu_part_dir / "cpu_part_*.parquet"))

# Build aggregation expressions programmatically
sum_cols = [
    "n_readings",
    "sum_avg",
    "sum_avg_sq",
    "cnt_gt_60",
    "cnt_gt_80",
    "sum_day",
    "cnt_day",
    "sum_night",
    "cnt_night",
] + [f"sum_hour_{h}" for h in range(24)] + [f"cnt_hour_{h}" for h in range(24)]

agg_exprs_final = []

for c in sum_cols:
    agg_exprs_final.append(pl.col(c).sum().alias(c))

# max_cpu uses max, not sum
agg_exprs_final.append(pl.col("max_cpu").max().alias("max_cpu"))

lf_agg = lf_parts.group_by("vm_id").agg(agg_exprs_final)

# Materialize
vm_usage_raw = lf_agg.collect()
print("Raw combined usage rows:", vm_usage_raw.height)
print("Columns:", vm_usage_raw.columns[:10], "...")


Combining 125 partial files...
Raw combined usage rows: 2013767
Columns: ['vm_id', 'n_readings', 'sum_avg', 'sum_avg_sq', 'cnt_gt_60', 'cnt_gt_80', 'sum_day', 'cnt_day', 'sum_night', 'cnt_night'] ...


In [11]:
# Avoid division by zero by using when/otherwise
df = vm_usage_raw

df = df.with_columns(
    [
        (pl.col("sum_avg") / pl.col("n_readings")).alias("cpu_mean"),
        (
            (
                pl.col("sum_avg_sq") / pl.col("n_readings")
                - (pl.col("sum_avg") / pl.col("n_readings")) ** 2
            )
            .clip(lower_bound=0.0)
            .sqrt()
        ).alias("cpu_std"),
        (pl.col("cnt_gt_60") / pl.col("n_readings")).alias("cpu_frac_gt_60"),
        (pl.col("cnt_gt_80") / pl.col("n_readings")).alias("cpu_frac_gt_80"),
        pl.when(pl.col("cnt_day") > 0)
        .then(pl.col("sum_day") / pl.col("cnt_day"))
        .otherwise(0.0)
        .alias("day_cpu_mean"),
        pl.when(pl.col("cnt_night") > 0)
        .then(pl.col("sum_night") / pl.col("cnt_night"))
        .otherwise(0.0)
        .alias("night_cpu_mean"),
    ]
)

df = df.with_columns(
    (pl.col("day_cpu_mean") / (pl.col("night_cpu_mean") + 1e-3)).alias(
        "day_night_ratio"
    )
)

# Per-hour mean features
hour_mean_cols = []
for h in range(24):
    sum_col = f"sum_hour_{h}"
    cnt_col = f"cnt_hour_{h}"
    mean_col = f"cpu_hour_{h}_mean"
    df = df.with_columns(
        pl.when(pl.col(cnt_col) > 0)
        .then(pl.col(sum_col) / pl.col(cnt_col))
        .otherwise(0.0)
        .alias(mean_col)
    )
    hour_mean_cols.append(mean_col)

print("Example rows with new features:")
print(df.select(
    ["vm_id", "n_readings", "cpu_mean", "cpu_std", "cpu_frac_gt_60",
     "day_cpu_mean", "night_cpu_mean", "day_night_ratio"] + hour_mean_cols[:4]
).head())


Example rows with new features:
shape: (5, 12)
┌───────────┬───────────┬───────────┬──────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ vm_id     ┆ n_reading ┆ cpu_mean  ┆ cpu_std  ┆ … ┆ cpu_hour_ ┆ cpu_hour_ ┆ cpu_hour_ ┆ cpu_hour_ │
│ ---       ┆ s         ┆ ---       ┆ ---      ┆   ┆ 0_mean    ┆ 1_mean    ┆ 2_mean    ┆ 3_mean    │
│ str       ┆ ---       ┆ f64       ┆ f64      ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│           ┆ u32       ┆           ┆          ┆   ┆ f64       ┆ f64       ┆ f64       ┆ f64       │
╞═══════════╪═══════════╪═══════════╪══════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ iDRcL2N6S ┆ 1         ┆ 0.154114  ┆ 0.0      ┆ … ┆ 0.0       ┆ 0.0       ┆ 0.0       ┆ 0.0       │
│ V3Hte50J0 ┆           ┆           ┆          ┆   ┆           ┆           ┆           ┆           │
│ wA9PsBXw2 ┆           ┆           ┆          ┆   ┆           ┆           ┆           ┆           │
│ 3+4…      ┆           ┆           ┆       

In [12]:
usage_out_path = INTERMEDIATE_DIR / "vm_usage_agg.parquet"
df.write_parquet(usage_out_path)
print("Saved final per-VM usage table to:", usage_out_path)


Saved final per-VM usage table to: data_intermediate\vm_usage_agg.parquet
