In [None]:
# === Block 1: RAM logger ===
# from google.colab import drive
# drive.mount('/content/drive')

%pip -q install rpy2
%load_ext rpy2.ipython

import os, signal, subprocess, time
from datetime import datetime

LOG_PATH = "/content/process_ram_usage.txt"
PID_PATH = "/content/ram_logger.pid"

# Kill any existing logger we started earlier
if os.path.exists(PID_PATH):
    try:
        with open(PID_PATH, "r") as f:
            old_pid = int(f.read().strip())
        os.kill(old_pid, signal.SIGTERM)
        print(f"Stopped previous RAM logger (PID {old_pid})")
    except Exception as e:
        print(f"Could not stop previous logger: {e}")
    finally:
        try: os.remove(PID_PATH)
        except: pass

# Fresh log file
if os.path.exists(LOG_PATH):
    os.remove(LOG_PATH)
    print("Cleared existing RAM log file")

# Start a simple logger:
bash_script = r'''
set -e
while true; do
  {
    echo "---------------"
    echo "TS $(date -u +%Y-%m-%dT%H:%M:%SZ)"
    echo "SYSTEM_TOTAL $(free -b | awk '/^Mem:/{print $2}')"
    echo "SYSTEM_USED  $(free -b | awk '/^Mem:/{print $3}')"
    echo "SYSTEM_AVAILABLE $(free -b | awk '/^Mem:/{print $7}')"
    ps -eo pid,user,comm,rss --sort=-rss | head -20 | awk '{print "PROC",$1,$2,$3,$4}'
  } >> /content/process_ram_usage.txt
  sleep 10
done
'''

log_proc = subprocess.Popen(["bash", "-c", bash_script], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(PID_PATH, "w") as f:
    f.write(str(log_proc.pid))

print("Background RAM logging started")
print(f"Log file: {LOG_PATH}")
print(f"Logger PID: {log_proc.pid}")
print("Stop later with: !kill -9 $(cat /content/ram_logger.pid)")
time.sleep(2)


In [None]:
%%R
# === Block 2: R training/eval with timing ===

suppressPackageStartupMessages({
  library(data.table)
  library(rpart)
})

# Force UTC for all timestamps we capture
Sys.setenv(TZ = "UTC")

data <- list()  # list(process, start, end)

log_start <- function(name) {
  idx <- length(data) + 1L
  data[[idx]] <<- list(process = name, start = as.POSIXct(Sys.time(), tz = "UTC"), end = as.POSIXct(NA))
  invisible(idx)
}
log_end <- function(idx) {
  data[[idx]]$end <<- as.POSIXct(Sys.time(), tz = "UTC")
  invisible(NULL)
}

as_timing_df <- function(x = data) {
  if (!length(x)) return(data.frame(process=character(), start=as.POSIXct(character()), end=as.POSIXct(character()), duration_secs=numeric()))
  df <- data.table::rbindlist(lapply(x, as.data.table), fill = TRUE)
  df[, duration_secs := as.numeric(difftime(end, start, units = "secs"))]
  as.data.frame(df)
}

# --- Paths & schema (aligned with your latest setup) ---
base_all   <- "/content/drive/MyDrive/data"
base_final <- "/content/drive/MyDrive/data"

train_raw_path <- file.path(base_all,   "train.csv")
stores_path    <- file.path(base_all,   "stores.csv")
items_path     <- file.path(base_all,   "items.csv")
hol_path       <- file.path(base_all,   "holidays_events.csv")

# Load test data
test_path      <- file.path(base_final, "df_test.csv")

features <- c(
  "perishable","onpromotion","on_hol","month","is_weekend","day",
  "store_nbr","item_nbr","city","state","type","cluster","family","class"
)
target <- "unit_sales"

cat("=== Workflow start (UTC) ===\n")

# Load training + meta (raw CSVs)
i_load_tr <- log_start("load_train+meta")
stopifnot(file.exists(train_raw_path), file.exists(stores_path), file.exists(items_path), file.exists(hol_path))

cat("Loading raw train/stores/items/holidays...\n")
train  <- fread(train_raw_path,
                select = c("id","date","store_nbr","item_nbr","unit_sales","onpromotion"),
                showProgress = TRUE)
stores <- fread(stores_path, select = c("store_nbr","city","state","type","cluster"))
items  <- fread(items_path,  select = c("item_nbr","family","class","perishable"))
hol    <- fread(hol_path,    select = c("date","locale","locale_name","type","transferred"))
cat(sprintf("Train rows: %d | Stores: %d | Items: %d | Hol rows: %d\n",
            nrow(train), nrow(stores), nrow(items), nrow(hol)))
log_end(i_load_tr)

# Load the previous test file but DO NOT use it below
i_load_te <- log_start("load_test_keep_only")
if (file.exists(test_path)) {
  cat("Loading test data\n")
  DT_test_unused <- data.table::fread(test_path, showProgress = TRUE)
  cat(sprintf("Kept test data: %d rows, %d columns\n", nrow(DT_test_unused), ncol(DT_test_unused)))
} else {
  cat("Test file not found; continuing without it.\n")
}
log_end(i_load_te)

# Prep holidays (mirror Python logic)
i_prep_hol <- log_start("prep_holidays")
cat("Preparing holiday/event flags...\n")
hol[, transferred := tolower(as.character(transferred))]
hol <- hol[is.na(transferred) | transferred != "true"]      # drop transferred
hol <- hol[type != "Work Day"]                              # drop compensatory work days
hol[, on_hol := fifelse(type %in% c("Holiday","Bridge","Additional"), "Holiday", NA_character_)]
hol[, on_evt := fifelse(type %in% c("Event"), "Event", NA_character_)]

locL <- hol[locale == "Local",    .(date, city  = locale_name, on_hol_L = on_hol, on_evt_L = on_evt)]
locR <- hol[locale == "Regional", .(date, state = locale_name, on_hol_R = on_hol, on_evt_R = on_evt)]
locN <- hol[locale == "National", .(date, on_hol_N = on_hol, on_evt_N = on_evt)]
log_end(i_prep_hol)

# Preprocess & merge like Python (allow cartesian joins; no duplicate checks)
i_merge <- log_start("merge_join")
cat("Merging train + stores + items + (Local/Regional/National) holidays...\n")

# ensure dates comparable
train[, date := as.IDate(date)]
locL[,  date := as.IDate(date)]
locR[,  date := as.IDate(date)]
locN[,  date := as.IDate(date)]

DT <- merge(train,  stores, by = "store_nbr", all.x = TRUE, allow.cartesian = TRUE)
DT <- merge(DT,     items,  by = "item_nbr",  all.x = TRUE, allow.cartesian = TRUE)
DT <- merge(DT,     locL,   by = c("date","city"),  all.x = TRUE, allow.cartesian = TRUE)
DT <- merge(DT,     locR,   by = c("date","state"), all.x = TRUE, allow.cartesian = TRUE)
DT <- merge(DT,     locN,   by = "date",            all.x = TRUE, allow.cartesian = TRUE)

# coalesce holiday/event flags (prefer any that are present)
DT[, on_hol := fcoalesce(on_hol_L, on_hol_R, on_hol_N)]
DT[, on_evt := fcoalesce(on_evt_L, on_evt_R, on_evt_N)]
DT[, c("on_hol_L","on_hol_R","on_hol_N","on_evt_L","on_evt_R","on_evt_N") := NULL]

cat(sprintf("Post-merge shape: %d rows, %d cols\n", nrow(DT), ncol(DT)))
log_end(i_merge)

# Feature engineering on the WHOLE merged table
i_feat <- log_start("feature_engineering")
cat("Engineering features...\n")

# target: clip returns -> 0
DT[, unit_sales := pmax(0, as.numeric(unit_sales))]

# promotions & perishability mappings
DT[, onpromotion := as.character(onpromotion)]
DT[, onpromotion := fifelse(onpromotion %in% c("True","TRUE","true"), 1L,
                      fifelse(onpromotion %in% c("False","FALSE","false"), 0L, 2L))]
DT[, perishable := fifelse(perishable == 0, 1.0,
                      fifelse(perishable == 1, 1.25, 2.0))]

# holiday flag: 'Holiday' -> 1 else -1
DT[, on_hol := fifelse(on_hol == "Holiday", 1L, -1L)]

# calendar features
DT[, date := as.POSIXct(date, tz = "UTC")]
DT[, month      := as.integer(format(date, "%m"))]
DT[, is_weekend := as.integer(weekdays(date) %in% c("Saturday","Sunday"))]
DT[, day        := weekdays(date)]

# ensure categorical columns are factors
cat_cols <- c("day","city","state","type","cluster","family","class")
for (col in cat_cols) {
  if (!is.factor(DT[[col]])) DT[[col]] <- as.factor(DT[[col]])
}

# filter columns
needed_cols <- unique(c(features, target))
DT <- DT[, ..needed_cols]

# quick sanity: all features exist
miss_tr <- setdiff(c(features, target), names(DT))
if (length(miss_tr)) stop("Merged table missing columns: ", paste(miss_tr, collapse = ", "))

log_end(i_feat)


# Split 80/20 (training/test) from the engineered DT
i_split <- log_start("split_train_test")
cat("Splitting 80/20 train/test...\n")
set.seed(42)
N <- nrow(DT)
n_train <- floor(0.8 * N)
idx <- sample.int(N, size = n_train)
in_train <- logical(N); in_train[idx] <- TRUE

TR <- DT[in_train]
TE <- DT[!in_train]

# ensure factor levels carried over (subsetting keeps levels; re-assert to be safe)
for (col in cat_cols) {
  if (is.factor(TR[[col]]) && is.factor(TE[[col]])) {
    levels(TE[[col]]) <- levels(TR[[col]])
  }
}
cat(sprintf("Train rows: %d | Test rows: %d\n", nrow(TR), nrow(TE)))
log_end(i_split)

# Train
i_train <- log_start("train_model")
cat("Training model...\n")
ctrl <- rpart.control(
  minsplit = 1000, minbucket = 25, cp = 0.001,
  maxdepth = 7, xval = 0, maxcompete = 0, maxsurrogate = 0, usesurrogate = 0
)
form <- as.formula(paste(target, "~", paste(features, collapse = " + ")))
set.seed(42)
t0 <- Sys.time()
model <- rpart(formula = form, data = TR, method = "anova", control = ctrl, model = FALSE, x = FALSE, y = FALSE)
t1 <- Sys.time()
cat(sprintf("Model trained in %.1f sec\n", as.numeric(difftime(t1, t0, units = "secs"))))
log_end(i_train)

# Predict on our 20% holdout
i_pred <- log_start("predict")
cat("Predicting on 20%% holdout...\n")
pred <- predict(model, newdata = TE[, features, with = FALSE])
log_end(i_pred)

# Evaluate
i_eval <- log_start("evaluate")
cat("Evaluating...\n")
y_true  <- as.numeric(TE[[target]])
valid   <- !is.na(y_true) & !is.na(pred)
y_true  <- y_true[valid]; y_pred <- pred[valid]
mse  <- mean((y_true - y_pred)^2)
rmse <- sqrt(mse)
mae  <- mean(abs(y_true - y_pred))
sse  <- sum((y_true - y_pred)^2)
sst  <- sum((y_true - mean(y_true))^2)
r2   <- 1 - (sse / sst)
log_end(i_eval)

# Results
cat("\nEVALUATION RESULTS\n")
cat(sprintf("MSE : %.2f\nRMSE: %.2f\nMAE : %.2f", mse, rmse, mae))

# cat("\n=== STEP TIMINGS (raw list) ===\n")
# print(data)
# cat("\n--- Pretty view ---\n")
# print(as_timing_df(data))
# cat("\n=== Workflow end ===\n")


In [None]:
%%R
# === Block 3: Export timing to CSV ===
# Produces one row per (process, phase), where phase in {"start","end"}.
# Columns: process, phase, time

stopifnot(exists("data"))

to_rows <- function(step) {
  data.frame(
    process = c(step$process, step$process),
    phase   = c("start", "end"),
    time    = format(step$start, "%Y-%m-%dT%H:%M:%SZ", tz = "UTC")
                |> c(format(step$end,   "%Y-%m-%dT%H:%M:%SZ", tz = "UTC")),
    stringsAsFactors = FALSE
  )
}

save_df <- do.call(rbind, lapply(data, to_rows))

write.csv(save_df, file = "/content/data.csv", row.names = FALSE)
cat("Saved timing data to /content/data.csv\n")


In [None]:
# === Block 4: RAM plots - actual and normalized time ===
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

# ---- Inputs ----
TIMING_CSV = "/content/data.csv"                 # written by Block 3
LOG_PATH   = "/content/process_ram_usage.txt"    # written by Block 1

# ---- Output base (per-run folder) ----
RUNS_BASE = "/content/drive/MyDrive/runs"
os.makedirs(RUNS_BASE, exist_ok=True)

# ---- Fixed segment colors (blue = loading, green = join, yellow = training, red = eval) ----
COLOR_LOADING  = "#1f77b4"  # blue
COLOR_JOIN     = "#2ca02c"  # green
COLOR_TRAINING = "#ffd700"  # yellow (gold)
COLOR_EVAL     = "#d62728"  # red

# ---- Segment layout for normalized plot ----
SEG_WIDTHS = {"loading": 0.20, "join": 0.20, "training": 0.40, "evaluate": 0.20}
SEG_ORDER  = ["loading", "join", "training", "evaluate"]
SEG_COLORS = {
    "loading":  COLOR_LOADING,
    "join":     COLOR_JOIN,
    "training": COLOR_TRAINING,
    "evaluate": COLOR_EVAL,
}

# 1) Load workflow events (expects: process, phase, time)
events = pd.read_csv(TIMING_CSV)
events["time"] = pd.to_datetime(events["time"], utc=True)

first_start = events.loc[events["phase"] == "start", "time"].min()
if pd.isna(first_start):
    raise RuntimeError("No start event found in timing CSV.")

# Stable run_id per run
run_id = "run_" + first_start.strftime("%Y%m%dT%H%M%SZ")
run_dir = os.path.join(RUNS_BASE, run_id)
os.makedirs(run_dir, exist_ok=True)



def _first_end(process_name):
    r = events[(events["process"] == process_name) & (events["phase"] == "end")]
    return pd.to_datetime(r.iloc[0]["time"], utc=True) if not r.empty else pd.NaT

def _max_end(mask):
    r = events[mask & (events["phase"] == "end")]
    return pd.to_datetime(r["time"], utc=True).max() if not r.empty else pd.NaT

# Flexible loaders (load_train, load_train+meta, load_test_keep_only, load_test, etc.)
after_load = _max_end(events["process"].str.startswith("load", na=False))

# Flexible join/merge
join_mask = events["process"].str.contains("merge|join", case=False, regex=True, na=False)
after_join = _max_end(join_mask)

# Train & eval
after_train = _first_end("train_model")
after_eval  = _first_end("evaluate")

# Build markers
markers = [{"label": "before_load", "time": first_start}]
if not pd.isna(after_load):
    markers.append({"label": "after_load", "time": after_load})
if not pd.isna(after_join):
    markers.append({"label": "after_join", "time": after_join})
if not pd.isna(after_train):
    markers.append({"label": "after_train", "time": after_train})
if not pd.isna(after_eval):
    markers.append({"label": "after_eval", "time": after_eval})

markers_df = pd.DataFrame(markers).sort_values("time").reset_index(drop=True)

# Quick diagnostics so you can see what was detected
print("== Detected milestones ==")
for m in markers:
    print(f"{m['label']:>12}: {m['time']}")

# 2) Parse RAM log (SYSTEM_USED bytes → GB)
def parse_ram_log(path=LOG_PATH):
    if not os.path.exists(path):
        raise FileNotFoundError(f"RAM log not found: {path}")
    ts = None
    rows = []
    with open(path, "r") as f:
        for line in f:
            s = line.strip()
            if s.startswith("TS "):
                ts = pd.to_datetime(s.split()[1], utc=True)
            elif s.startswith("SYSTEM_USED"):
                try:
                    used_bytes = int(s.split()[1])
                    if ts is not None:
                        rows.append({"time": ts, "ram_gb": used_bytes / (1024**3)})
                except Exception:
                    pass
    df = pd.DataFrame(rows).sort_values("time").reset_index(drop=True)
    if df.empty:
        raise RuntimeError("Parsed RAM log is empty. Has the logger run long enough?")
    return df

ram = parse_ram_log()
ram["time"] = pd.to_datetime(ram["time"], utc=True)

# 3) Align milestones to nearest sample at-or-before
markers_for_merge = markers_df.rename(columns={"time": "event_time"}).sort_values("event_time")
aligned_df = pd.merge_asof(
    markers_for_merge, ram.sort_values("time"),
    left_on="event_time", right_on="time", direction="backward"
).rename(columns={"time":"plot_time"})[["label","event_time","plot_time","ram_gb"]]
aligned_df = aligned_df.dropna(subset=["plot_time"]).reset_index(drop=True)

m_time = {r.label: r.plot_time for _, r in aligned_df.iterrows()}
m_y    = {r.label: r.ram_gb    for _, r in aligned_df.iterrows()}

# 4) Consolidated segments
SEGMENTS = [
    ("loading",  "before_load", "after_load"),
    ("join",     "after_load",  "after_join"),
    ("training", "after_join",  "after_train"),
    ("evaluate", "after_train", "after_eval"),
]

# Helpers
def interp_ram_at(t, times, values):
    if t <= times.iloc[0]:  return float(values.iloc[0])
    if t >= times.iloc[-1]: return float(values.iloc[-1])
    idx = times.searchsorted(t, side="left")
    t0, t1 = times.iloc[idx-1], times.iloc[idx]
    y0, y1 = values.iloc[idx-1], values.iloc[idx]
    frac = (t - t0) / (t1 - t0)
    return float(y0 + frac * (y1 - y0))

def build_segment_series(t_start, t_end, ram_df):
    ram_df = ram_df.sort_values("time").reset_index(drop=True)
    if (pd.isna(t_start)) or (pd.isna(t_end)) or (t_end <= t_start):
        return pd.DataFrame(columns=["time","ram_gb"])
    inside = ram_df[(ram_df["time"] > t_start) & (ram_df["time"] < t_end)].copy()
    y_start = interp_ram_at(t_start, ram_df["time"], ram_df["ram_gb"])
    y_end   = interp_ram_at(t_end,   ram_df["time"], ram_df["ram_gb"])
    out = pd.concat([
        pd.DataFrame([{"time": t_start, "ram_gb": y_start}]),
        inside[["time","ram_gb"]],
        pd.DataFrame([{"time": t_end,   "ram_gb": y_end}]),
    ], ignore_index=True).sort_values("time").reset_index(drop=True)
    return out

def segment_stats(df):
    if len(df) < 2:
        return dict(mean=np.nan, peak=df["ram_gb"].max() if not df.empty else np.nan,
                    duration_secs=0.0, samples=len(df))
    t = df["time"].astype("int64").to_numpy() / 1e9  # seconds
    y = df["ram_gb"].to_numpy()
    duration = t[-1] - t[0]
    area = np.trapezoid(y, t)  # np.trapz is deprecated
    mean = area / duration if duration > 0 else np.nan
    peak = float(np.max(y))
    return dict(mean=mean, peak=peak, duration_secs=duration, samples=len(df))

# 5) Build series & stats
seg_series, seg_stats = {}, {}
for seg_name, start_lab, end_lab in SEGMENTS:
    if (start_lab in m_time) and (end_lab in m_time):
        t0, t1 = m_time[start_lab], m_time[end_lab]
        s = build_segment_series(t0, t1, ram)
        if not s.empty:
            seg_series[seg_name] = s
            seg_stats[seg_name] = segment_stats(s)

avail_labels = [k for k in ["before_load","after_load","after_join","after_train","after_eval"] if k in m_time]
avail_times = [m_time[k] for k in avail_labels]
if len(avail_times) >= 2:
    run_start, run_end = min(avail_times), max(avail_times)
else:
    run_start = first_start
    run_end   = ram["time"].iloc[-1]
run_series = build_segment_series(run_start, run_end, ram)
overall_stats = segment_stats(run_series)

# 6) Plot 1 : Actual time (UTC)
fig1, ax1 = plt.subplots(figsize=(14, 6))
ax1.plot(ram["time"], ram["ram_gb"], linewidth=2, alpha=0.95, zorder=3)
y_min, y_max = ram["ram_gb"].min(), ram["ram_gb"].max()
y_text = y_min + 0.05 * (y_max - y_min)

for seg_name, start_lab, end_lab in SEGMENTS:
    if seg_name in seg_series:
        df = seg_series[seg_name]
        ax1.fill_between(df["time"], 0, df["ram_gb"], color=SEG_COLORS[seg_name], alpha=0.28, zorder=1)
        x0, x1 = df["time"].iloc[0], df["time"].iloc[-1]
        y0, y1 = df["ram_gb"].iloc[0], df["ram_gb"].iloc[-1]
        ax1.scatter([x0, x1], [y0, y1], s=80, zorder=4, color=SEG_COLORS[seg_name], edgecolors="black")
        label = {"loading":"Loading","join":"Join","training":"Training","evaluate":"Evaluate"}[seg_name]
        x_mid = x0 + (x1 - x0) / 2
        ax1.text(x_mid, y_text, label, ha="center", va="center",
                 fontsize=10, fontweight="bold",
                 bbox=dict(boxstyle="round,pad=0.25", facecolor="white", alpha=0.6),
                 zorder=5)
ax1.set_title("Memory Profile: RAPRT (DecisionTreeRegressor) — Join of Tables in Memory (Full Dataset)", fontweight="bold")
ax1.set_xlabel("Time (UTC)")
ax1.set_ylabel("Memory Usage (GB)")
ax1.grid(True, alpha=0.3, linestyle="--")
ax1.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M:%S", tz=mdates.UTC))
fig1.autofmt_xdate()
plt.tight_layout()
plt.show()

# 7) Normalized data + Plot 2 : Normalized time
seg_spans, cursor = {}, 0.0
for name in SEG_ORDER:
    w = SEG_WIDTHS[name]
    seg_spans[name] = (cursor, cursor + w)
    cursor += w

norm_rows = []
for seg_name in SEG_ORDER:
    if seg_name not in seg_series:
        continue
    df = seg_series[seg_name]
    t0, t1 = df["time"].iloc[0], df["time"].iloc[-1]
    denom_ns = (t1.value - t0.value)
    if denom_ns <= 0:
        continue
    times_ns = df["time"].astype("int64").to_numpy()
    x_seg = (times_ns - t0.value) / float(denom_ns)  # [0,1] within segment
    span0, span1 = seg_spans[seg_name]
    x_global = span0 + x_seg * (span1 - span0)
    for xs, xg, y, tt in zip(x_seg, x_global, df["ram_gb"].to_numpy(), df["time"]):
        norm_rows.append({
            "run_id": run_id, "segment": seg_name,
            "x_segment_norm": float(xs), "x_global_norm": float(xg),
            "ram_gb": float(y), "time_utc": tt.isoformat()
        })
norm_df = pd.DataFrame(norm_rows).sort_values(["x_global_norm"]).reset_index(drop=True)

# Save minimal per-run data for super plot
norm_csv_path = os.path.join(run_dir, "ram_profile_normalized.csv")
norm_df.to_csv(norm_csv_path, index=False)

# Plot 2 : Normalized 
fig2, ax2 = plt.subplots(figsize=(14, 6))
if not norm_df.empty:
    ax2.plot(norm_df["x_global_norm"], norm_df["ram_gb"], linewidth=2, alpha=0.95, zorder=3)
    for seg_name in SEG_ORDER:
        span = seg_spans[seg_name]
        df_seg = norm_df[norm_df["segment"] == seg_name]
        if df_seg.empty:
            continue
        ax2.fill_between(df_seg["x_global_norm"], 0, df_seg["ram_gb"],
                         color=SEG_COLORS[seg_name], alpha=0.28, zorder=1)
        x0, x1 = span
        # pick nearest points to the boundaries for markers
        y0 = df_seg.iloc[(df_seg["x_global_norm"]-x0).abs().argmin()]["ram_gb"]
        y1 = df_seg.iloc[(df_seg["x_global_norm"]-x1).abs().argmin()]["ram_gb"]
        ax2.scatter([x0, x1], [y0, y1], s=80, zorder=4, color=SEG_COLORS[seg_name], edgecolors="black")
        label = {"loading":"Loading","join":"Join","training":"Training","evaluate":"Evaluate"}[seg_name]
        x_mid = x0 + (x1 - x0) / 2
        y_min2, y_max2 = norm_df["ram_gb"].min(), norm_df["ram_gb"].max()
        y_text2 = y_min2 + 0.05 * (y_max2 - y_min2)
        ax2.text(x_mid, y_text2, label, ha="center", va="center",
                 fontsize=10, fontweight="bold",
                 bbox=dict(boxstyle="round,pad=0.25", facecolor="white", alpha=0.6),
                 zorder=5)
ax2.set_xlim(0.0, 1.0)
ax2.set_title("Memory Profile: RAPRT (DecisionTreeRegressor) — Join of Tables in Memory (Full Dataset)", fontweight="bold")
ax2.set_xlabel("Normalized Time")
ax2.set_ylabel("Memory Usage (GB)")
ax2.grid(True, alpha=0.3, linestyle="--")
plt.tight_layout()
plt.show()

# 8) Print and save summary stats (overall + per segment)
def pretty_stats(name, st):
    print(f"{name:>10}: mean={st['mean']:.3f} GB | peak={st['peak']:.3f} GB | duration={st['duration_secs']:.1f}s | samples={st['samples']}")
print("\n=== RAM SUMMARY (time-weighted) ===")
pretty_stats("OVERALL", overall_stats)
for seg_name in SEG_ORDER:
    if seg_name in seg_stats:
        title = {"loading":"LOADING","join":"JOIN","training":"TRAINING","evaluate":"EVALUATE"}[seg_name]
        pretty_stats(title, seg_stats[seg_name])

summary_rows = [{
    "run_id": run_id, "scope": "overall",
    "mean_ram_gb": overall_stats["mean"], "peak_ram_gb": overall_stats["peak"],
    "duration_secs": overall_stats["duration_secs"], "samples": overall_stats["samples"]
}]
for seg_name in SEG_ORDER:
    if seg_name in seg_stats:
        summary_rows.append({
            "run_id": run_id, "scope": seg_name,
            "mean_ram_gb": seg_stats[seg_name]["mean"], "peak_ram_gb": seg_stats[seg_name]["peak"],
            "duration_secs": seg_stats[seg_name]["duration_secs"], "samples": seg_stats[seg_name]["samples"]
        })
pd.DataFrame(summary_rows).to_csv(os.path.join(run_dir, "run_summary.csv"), index=False)
print(f"\n Saved normalized plot data: {norm_csv_path}")
print(f"Saved summary stats:       {os.path.join(run_dir, 'run_summary.csv')}")
