In [1]:
import pandas as pd
from river import utils
import os


In [2]:
def save_model(model, path):
    utils.dump(model, path)
    print(f"Model saved to → {path}")
def load_model(path):
    if os.path.exists(path):
        model = utils.load(path)
        print(f"Model loaded from → {path}")
        return model
    return None

In [3]:
df = pd.read_csv( 'cluster-trace-gpu-v2023/csv/openb_pod_list_cpu100.csv')
df

Unnamed: 0,name,cpu_milli,memory_mib,num_gpu,gpu_milli,gpu_spec,qos,pod_phase,creation_time,deletion_time,scheduled_time
0,openb-pod-0000,12500,57344,0,0,,LS,Running,10644355,10644932,10644358.0
1,openb-pod-0001,12500,57344,0,0,,LS,Running,11556813,11557292,11556817.0
2,openb-pod-0002,32000,49152,0,0,,BE,Running,11412186,11412330,11412189.0
3,openb-pod-0003,16500,51200,0,0,,LS,Running,12524063,12524844,12524064.0
4,openb-pod-0004,8000,30517,0,0,,BE,Running,10741000,10744539,10741002.0
...,...,...,...,...,...,...,...,...,...,...,...
7848,openb-pod-7848,16000,32768,1,1000,,LS,Succeeded,12897160,12900786,12897161.0
7849,openb-pod-7849,11400,77824,1,1000,,LS,Running,12897659,12898203,12897692.0
7850,openb-pod-7850,6000,18432,1,460,,LS,Running,12898342,12900034,12898342.0
7851,openb-pod-7851,3152,5600,1,590,,BE,Failed,12900505,12900526,12900505.0


In [None]:
import os
import math
import numpy as np
import pandas as pd
from collections import deque

from river import (
    ensemble, tree, preprocessing as pp, stream, metrics,
    linear_model, neighbors, optim, utils
)


# Save / Load helpers
def save_model(model, path):
    utils.dump(model, path)
    print(f"Model saved to → {path}")

def load_model(path):
    if os.path.exists(path):
        model = utils.load(path)
        print(f"Model loaded from → {path}")
        return model
    return None


# Data
CSV_PATH = 'cluster-trace-gpu-v2023/csv/openb_pod_list_cpu100.csv'
df = pd.read_csv(CSV_PATH)

# Feature builder
def build_features(df: pd.DataFrame):
    df = df.copy()

    num_cols = ['cpu_milli','memory_mib','num_gpu','gpu_milli',
                'creation_time','deletion_time','scheduled_time']
    for c in num_cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')
        else:
            df[c] = 0.0

    y = df['cpu_milli'].fillna(0).astype(float)

    X = pd.DataFrame(index=df.index)
    X['mem_gib'] = (df['memory_mib'].fillna(0) / 1024.0).astype(float)
    X['num_gpu'] = df['num_gpu'].fillna(0).astype(float)
    X['gpu_milli'] = df['gpu_milli'].fillna(0).astype(float)

    ct = df['creation_time'].fillna(0).astype(float)
    dt = df['deletion_time'].fillna(0).astype(float)
    st = df['scheduled_time'].fillna(0).astype(float)
    X['lifetime']   = (dt - ct).clip(lower=0.0)
    X['sched_delay'] = (st - ct).clip(lower=0.0)

    X['has_gpu'] = (X['num_gpu'] > 0).astype(float)

    qos = df.get('qos', '').astype(str).str.upper()
    X['qos_LS'] = (qos == 'LS').astype(float)
    X['qos_BE'] = (qos == 'BE').astype(float)

    phase = df.get('pod_phase', '').astype(str).str.title()
    X['ph_Running']   = (phase == 'Running').astype(float)
    X['ph_Succeeded'] = (phase == 'Succeeded').astype(float)
    X['ph_Failed']    = (phase == 'Failed').astype(float)

    X = X.replace([np.inf, -np.inf], np.nan).fillna(0.0).astype(float)
    features = list(X.columns)
    return X, y, features

X, y, features = build_features(df)

# Models (experts)
def make_bag_ht():
    return ensemble.BaggingRegressor(
        model=tree.HoeffdingTreeRegressor(
            grace_period=100,
            max_depth=20,
            leaf_prediction='mean'
        ),
        n_models=20,
        seed=42
    )

def make_hat():
    return tree.HoeffdingAdaptiveTreeRegressor(
        grace_period=100,
        leaf_prediction='mean'
    )

def make_lin():
    return pp.StandardScaler() | linear_model.PARegressor()

def make_knn():
    engine = neighbors.LazySearch(
        window_size=3000, 
        min_distance_keep=0.0  
    )
    return pp.StandardScaler() | neighbors.KNNRegressor(
        n_neighbors=30,
        engine=engine,
        aggregation_method="mean"  
    )

experts = [make_bag_ht(), make_hat(), make_lin(), make_knn()]

# EWA over experts
ewa = ensemble.EWARegressor(models=experts, loss=optim.losses.Squared(), learning_rate=2e-3)

# Metrics
mae = metrics.MAE()

class OnlineMAPE:
    def __init__(self, eps=1e-8):
        self.eps = eps
        self.n = 0
        self.sum_pe = 0.0
    def update(self, y_true, y_pred):
        y_true = float(y_true); y_pred = float(y_pred)
        pe = abs(y_true - y_pred) / max(self.eps, abs(y_true))
        self.sum_pe += pe
        self.n += 1
    def get(self):
        return (self.sum_pe / max(1, self.n)) * 100.0

class OnlineWAPE:
    def __init__(self, eps=1e-8):
        self.eps = eps
        self.sum_abs_err = 0.0
        self.sum_abs_true = 0.0
    def update(self, y_true, y_pred):
        self.sum_abs_err += abs(float(y_true) - float(y_pred))
        self.sum_abs_true += abs(float(y_true))
    def get(self):
        return 100.0 * self.sum_abs_err / max(self.eps, self.sum_abs_true)

mape, wape = OnlineMAPE(), OnlineWAPE()

# Scaling and loop settings
TARGET_SCALE = float(np.percentile(np.abs(y.values), 75))
TARGET_SCALE = max(1.0, TARGET_SCALE)

last_preds = deque(maxlen=40)
WARM_UP = 200
SEEN = 0

NODE_CAPACITY = 8000  
sla_violations = 0
overprov = 0.0
scaling_actions = 0
current_nodes = 1

# Online loop
for x_row, y_true in stream.iter_pandas(X[features], y):
    SEEN += 1

    # Predict
    if SEEN <= WARM_UP:
        preds = [(m.predict_one(x_row) or 0.0) for m in experts]
        y_hat_scaled = float(np.mean(preds)) if preds else 0.0
    else:
        y_hat_scaled = ewa.predict_one(x_row) or 0.0

    # Rescale to original target and clip negatives
    y_hat = max(0.0, y_hat_scaled * TARGET_SCALE)

    # Metrics
    mae.update(y_true, y_hat)
    mape.update(y_true, y_hat)
    wape.update(y_true, y_hat)

    # Autoscaling policy
    needed_nodes = max(1, int(np.ceil(y_hat / NODE_CAPACITY)))
    if needed_nodes * NODE_CAPACITY < float(y_true):
        sla_violations += 1
    overprov += max(0.0, needed_nodes * NODE_CAPACITY - float(y_true))
    if needed_nodes != current_nodes:
        scaling_actions += 1
        current_nodes = needed_nodes

    # Train (scale target for stability)
    y_scaled = float(y_true) / TARGET_SCALE
    for m in experts:
        m.learn_one(x_row, y_scaled)
    if SEEN > WARM_UP:
        ewa.learn_one(x_row, y_scaled)

    last_preds.append((float(y_true), float(y_hat)))

# Report
print("Last 40 predictions (EWA over heterogeneous experts, scaled):")
for i, (yt, yh) in enumerate(last_preds, 1):
    print(f"{i:02d}  Actual={yt:8.2f}  Predicted={yh:8.2f}")

print(f"\nMAE={mae.get():.2f}")
print(f"MAPE={mape.get():.2f}%")
print(f"WAPE={wape.get():.2f}%")

if hasattr(ewa, "weights"):
    print("\nFinal EWA weights:")
    for name, w in zip(["bag_ht","hat","lin","knn"], ewa.weights):
        print(f"  {name:8s}: {w:.3f}")

total_steps = max(1, SEEN)
sla_violation_rate = sla_violations / total_steps
print(f"\nSLA violations: {sla_violations}  (rate = {sla_violation_rate:.3f})")
print(f"Over-provisioned capacity (mCPU): {overprov:.2f}")
print(f"Scaling actions taken: {scaling_actions}")


Last 40 predictions (EWA over heterogeneous experts, scaled):
01  Actual= 3152.00  Predicted= 3290.87
02  Actual=11400.00  Predicted=11613.40
03  Actual=11400.00  Predicted=11693.60
04  Actual= 8000.00  Predicted= 7929.64
05  Actual=11400.00  Predicted=11580.05
06  Actual= 3152.00  Predicted= 3120.45
07  Actual=11908.00  Predicted=11903.56
08  Actual=11908.00  Predicted=11911.12
09  Actual= 8000.00  Predicted= 6404.24
10  Actual=11908.00  Predicted=11909.46
11  Actual=11908.00  Predicted=11953.72
12  Actual=11908.00  Predicted=11911.88
13  Actual=11400.00  Predicted=11653.00
14  Actual= 8000.00  Predicted= 7954.48
15  Actual=11908.00  Predicted=11884.18
16  Actual=11908.00  Predicted=11904.79
17  Actual=11908.00  Predicted=11899.60
18  Actual=11908.00  Predicted=11915.43
19  Actual=11908.00  Predicted=11909.79
20  Actual= 3152.00  Predicted= 3105.31
21  Actual=11908.00  Predicted=11909.86
22  Actual=11908.00  Predicted=11909.92
23  Actual= 3152.00  Predicted= 3099.63
24  Actual= 3152.0

In [None]:
import os
import numpy as np
import pandas as pd
from collections import deque

from statsmodels.tsa.api import ExponentialSmoothing, SARIMAX


CSV_PATH = 'cluster-trace-gpu-v2023/csv/openb_pod_list_cpu100.csv'

NODE_CAPACITY = 8000  

INIT_WINDOW = 500     
REFIT_EVERY = 250     
CLIP_NEGATIVE = True    

class MAE:
    def __init__(self): self.err = 0.0; self.n = 0
    def update(self, y, yhat):
        self.err += abs(float(y) - float(yhat)); self.n += 1
    def get(self): return self.err / max(1, self.n)

class OnlineMAPE:
    def __init__(self, eps=1e-8): self.eps=eps; self.n=0; self.sum_pe=0.0
    def update(self, y, yhat):
        y = float(y); yhat = float(yhat)
        pe = abs(y - yhat) / max(self.eps, abs(y))
        self.sum_pe += pe; self.n += 1
    def get(self): return 100.0 * self.sum_pe / max(1, self.n)

class OnlineWAPE:
    def __init__(self, eps=1e-8): self.eps=eps; self.sae=0.0; self.sat=0.0
    def update(self, y, yhat):
        self.sae += abs(float(y)-float(yhat))
        self.sat += abs(float(y))
    def get(self): return 100.0 * self.sae / max(self.eps, self.sat)

def autoscale_metrics(y_true_seq, y_hat_seq, node_capacity=NODE_CAPACITY):
    """
    Compute SLA violations, Over-Provision (raw and normalized), Scaling actions.
    """
    sla_violations = 0
    overprov_raw = 0.0
    scaling_actions = 0
    current_nodes = 1

    for y_true, y_hat in zip(y_true_seq, y_hat_seq):
        needed_nodes = max(1, int(np.ceil(float(y_hat) / node_capacity)))
        if needed_nodes * node_capacity < float(y_true):
            sla_violations += 1
        overprov_raw += max(0.0, needed_nodes * node_capacity - float(y_true))
        if needed_nodes != current_nodes:
            scaling_actions += 1
            current_nodes = needed_nodes

    total = len(y_true_seq)
    sla_rate = sla_violations / max(1, total)
    denom = sum(map(float, y_true_seq)) + 1e-8
    overprov_rate = overprov_raw / denom

    return dict(
        sla_violations=sla_violations,
        sla_rate=sla_rate,
        overprov_raw=overprov_raw,
        overprov_rate=overprov_rate,
        scaling_actions=scaling_actions
    )

def load_ordered_series(csv_path=CSV_PATH):
    df = pd.read_csv(csv_path)
    if 'creation_time' not in df.columns:
        raise ValueError("creation_time column is required for proper time ordering.")

    df['creation_time'] = pd.to_numeric(df['creation_time'], errors='coerce').fillna(0).astype(float)
    df['cpu_milli'] = pd.to_numeric(df['cpu_milli'], errors='coerce').fillna(0).astype(float)

    df = df.sort_values('creation_time').reset_index(drop=True)
    y = df['cpu_milli'].astype(float).values 
    return y, df
def baseline_naive_last(y):
    preds = []
    last = y[0]
    for t in range(len(y)):
        if t < 1:
            preds.append(y[0])  
        else:
            preds.append(last)
        last = y[t]
    return np.array(preds, dtype=float)

def baseline_moving_average(y, window=20):
    preds = []
    csum = 0.0
    buf = []
    for t in range(len(y)):
        if t < 1:
            preds.append(y[0])
        else:
            w = min(window, len(buf))
            mean = np.mean(buf[-w:]) if w > 0 else y[t-1]
            preds.append(mean)
        buf.append(y[t])
    return np.array(preds, dtype=float)

def baseline_holt_winters(y, trend='add', seasonal=None, seasonal_periods=None):

    n = len(y)
    preds = np.zeros(n, dtype=float)
    preds[:INIT_WINDOW] = baseline_naive_last(y)[:INIT_WINDOW]

    last_refit = INIT_WINDOW
    model = None
    fit = None

    for t in range(INIT_WINDOW, n):
        if (t - last_refit) >= REFIT_EVERY or fit is None:
            model = ExponentialSmoothing(
                y[:t],
                trend=trend,
                seasonal=seasonal,
                seasonal_periods=seasonal_periods,
                initialization_method="estimated"
            )
            fit = model.fit(optimized=True, use_brute=False)
            last_refit = t
        preds[t] = float(fit.forecast(1)[0])
    return preds

def baseline_sarima(y, order=(1,1,1), seasonal_order=(0,0,0,0)):
    n = len(y)
    preds = np.zeros(n, dtype=float)
    preds[:INIT_WINDOW] = baseline_naive_last(y)[:INIT_WINDOW]

    last_refit = INIT_WINDOW
    fit = None

    for t in range(INIT_WINDOW, n):
        if (t - last_refit) >= REFIT_EVERY or fit is None:
            try:
                model = SARIMAX(
                    y[:t],
                    order=order,
                    seasonal_order=seasonal_order,
                    enforce_stationarity=False,
                    enforce_invertibility=False
                )
                fit = model.fit(disp=False)
                last_refit = t
            except Exception as e:
                fit = None
        if fit is not None:
            try:
                preds[t] = float(fit.forecast(1)[0])
            except Exception:
                preds[t] = y[t-1]
        else:
            preds[t] = y[t-1]
    return preds

def evaluate_forecasts(y_true, y_hat, name="MODEL"):
    if CLIP_NEGATIVE:
        y_hat = np.maximum(0.0, y_hat)

    last_preds = deque(maxlen=40)
    mae = MAE(); mape = OnlineMAPE(); wape = OnlineWAPE()

    for yt, yh in zip(y_true, y_hat):
        mae.update(yt, yh); mape.update(yt, yh); wape.update(yt, yh)
        last_preds.append((float(yt), float(yh)))

    auto = autoscale_metrics(y_true, y_hat, node_capacity=NODE_CAPACITY)

    print(f"\n=== {name} ===")
    print("Last 40 predictions:")
    for i, (yt, yh) in enumerate(last_preds, 1):
        print(f"{i:02d}  Actual={yt:8.2f}  Predicted={yh:8.2f}")

    print(f"\nMAE={mae.get():.2f}")
    print(f"MAPE={mape.get():.2f}%")
    print(f"WAPE={wape.get():.2f}%")
    print(f"SLA violations: {auto['sla_violations']}  (rate={auto['sla_rate']:.3f})")
    print(f"Over-provision (raw mCPU): {auto['overprov_raw']:.2f}")
    print(f"Over-provision RATE: {100.0*auto['overprov_rate']:.2f}%")
    print(f"Scaling actions taken: {auto['scaling_actions']}")


if __name__ == "__main__":
    y, df = load_ordered_series(CSV_PATH)

    yhat_naive = baseline_naive_last(y)
    evaluate_forecasts(y, yhat_naive, name="Naive-LAST")

    yhat_ma = baseline_moving_average(y, window=30)
    evaluate_forecasts(y, yhat_ma, name="MovingAverage-30")

    yhat_hw = baseline_holt_winters(y, trend="add", seasonal=None, seasonal_periods=None)
    evaluate_forecasts(y, yhat_hw, name="HoltWinters(trend=add)")
    yhat_arima = baseline_sarima(y, order=(1,1,1), seasonal_order=(0,0,0,0))
    evaluate_forecasts(y, yhat_arima, name="SARIMA(1,1,1)")



=== Naive-LAST ===
Last 40 predictions:
01  Actual=11400.00  Predicted= 3152.00
02  Actual=11400.00  Predicted=11400.00
03  Actual=32000.00  Predicted=11400.00
04  Actual= 8000.00  Predicted=32000.00
05  Actual=11400.00  Predicted= 8000.00
06  Actual= 3152.00  Predicted=11400.00
07  Actual=11908.00  Predicted= 3152.00
08  Actual=11908.00  Predicted=11908.00
09  Actual= 8000.00  Predicted=11908.00
10  Actual=11908.00  Predicted= 8000.00
11  Actual=11908.00  Predicted=11908.00
12  Actual=11908.00  Predicted=11908.00
13  Actual=11400.00  Predicted=11908.00
14  Actual= 8000.00  Predicted=11400.00
15  Actual=11908.00  Predicted= 8000.00
16  Actual=11908.00  Predicted=11908.00
17  Actual=11908.00  Predicted=11908.00
18  Actual=11908.00  Predicted=11908.00
19  Actual=11908.00  Predicted=11908.00
20  Actual= 3152.00  Predicted=11908.00
21  Actual=11908.00  Predicted= 3152.00
22  Actual=11908.00  Predicted=11908.00
23  Actual= 3152.00  Predicted=11908.00
24  Actual= 3152.00  Predicted= 3152.00

In [None]:
import numpy as np
import pandas as pd
from collections import deque
from statsmodels.tsa.api import ExponentialSmoothing
CSV_PATH = 'cluster-trace-gpu-v2023/csv/openb_pod_list_cpu100.csv'

NODE_CAPACITY = 8000        
CLIP_NEGATIVE = True

PER_SYNC_PERIOD_MS = 10_000
BIN_AGG = 'max'              

LOOKAHEAD_MS = 10_000      
HISTORY_SIZE = 24          

HW_TREND_ONLY = True     
HW_SEASONAL = 'add'      
HW_SEASONAL_PERIODS = 144   
HW_STORED_SEASONS = 4    
HW_MIN_WINDOW = 240          

class MAE:
    def __init__(self): self.err = 0.0; self.n = 0
    def update(self, y, yhat):
        self.err += abs(float(y) - float(yhat)); self.n += 1
    def get(self): return self.err / max(1, self.n)

class OnlineMAPE:
    def __init__(self, eps=1e-8): self.eps=eps; self.n=0; self.sum_pe=0.0
    def update(self, y, yhat):
        y = float(y); yhat = float(yhat)
        self.sum_pe += abs(y - yhat) / max(self.eps, abs(y)); self.n += 1
    def get(self): return 100.0 * self.sum_pe / max(1, self.n)

class OnlineWAPE:
    def __init__(self, eps=1e-8): self.eps=eps; self.sae=0.0; self.sat=0.0
    def update(self, y, yhat):
        self.sae += abs(float(y)-float(yhat))
        self.sat += abs(float(y))
    def get(self): return 100.0 * self.sae / max(self.eps, self.sat)

def autoscale_metrics(y_true_seq, y_hat_seq, node_capacity=NODE_CAPACITY):
    sla_violations = 0
    overprov_raw = 0.0
    scaling_actions = 0
    current_nodes = 1
    for yt, yh in zip(y_true_seq, y_hat_seq):
        need = max(1, int(np.ceil(float(yh)/node_capacity)))
        if need*node_capacity < float(yt): sla_violations += 1
        overprov_raw += max(0.0, need*node_capacity - float(yt))
        if need != current_nodes:
            scaling_actions += 1
            current_nodes = need
    total = len(y_true_seq)
    sla_rate = sla_violations / max(1, total)
    denom = float(np.sum(y_true_seq)) + 1e-8
    overprov_rate = overprov_raw / denom
    return dict(
        sla_violations=sla_violations,
        sla_rate=sla_rate,
        overprov_raw=overprov_raw,
        overprov_rate=overprov_rate,
        scaling_actions=scaling_actions
    )


def _normalize_creation_time_to_ms(ct: np.ndarray) -> np.ndarray:
    """Heuristic: convert creation_time to milliseconds if needed."""
    ct = ct.astype(float)
    mx = float(np.nanmax(ct)) if len(ct) else 0.0
    if mx > 1e15:   ct = ct / 1e6  
    elif mx > 1e12: ct = ct / 1e3   
    elif mx < 1e9:  ct = ct * 1e3  
    return ct

def load_binned_series(csv_path=CSV_PATH, per_sync_ms=PER_SYNC_PERIOD_MS, how=BIN_AGG,
                       filter_col=None, filter_val=None):
    """
    Build a fixed-interval time series by binning creation_time into per_sync_ms buckets.
    If filter_col & filter_val provided and column exists → filter rows first.
    """
    df = pd.read_csv(csv_path)
    if 'creation_time' not in df.columns:
        raise ValueError("creation_time column is required for binning.")

    df['cpu_milli'] = pd.to_numeric(df.get('cpu_milli', 0), errors='coerce')
    df['creation_time'] = pd.to_numeric(df['creation_time'], errors='coerce')
    df = df.dropna(subset=['creation_time', 'cpu_milli']).copy()

    if filter_col and filter_col in df.columns:
        df = df[df[filter_col] == filter_val].copy()

    df['creation_time'] = _normalize_creation_time_to_ms(df['creation_time'].values)
    df['creation_time'] = df['creation_time'].astype(np.int64)
    df['bin'] = (df['creation_time'] // int(per_sync_ms)).astype(np.int64)

    if how == 'max':
        agg = df.groupby('bin', as_index=True)['cpu_milli'].max()
    else:
        agg = df.groupby('bin', as_index=True)['cpu_milli'].mean()

    agg = agg.sort_index()
    y = agg.values.astype(float)
    return y, agg.index.values 


class PHPA_Linear:
    def __init__(self, lookAhead_ms=LOOKAHEAD_MS, historySize=HISTORY_SIZE, perSync_ms=PER_SYNC_PERIOD_MS):
        self.lookAhead_ms = int(lookAhead_ms)
        self.historySize = int(historySize)
        self.perSync_ms = int(perSync_ms)
        self.stepsAhead = max(1, int(np.ceil(self.lookAhead_ms / self.perSync_ms)))

    def predict_series(self, y: np.ndarray) -> np.ndarray:
        n = len(y)
        yhat = np.zeros(n, dtype=float)
        if n == 0:
            return yhat
        yhat[0] = y[0]
        for t in range(1, n):
            start = max(0, t - self.historySize)
            hist = y[start:t]
            if len(hist) < 2:
                yhat[t] = y[t-1]
                continue
            xs = np.arange(len(hist), dtype=float)
            A = np.vstack([xs, np.ones_like(xs)]).T
            try:
                slope, intercept = np.linalg.lstsq(A, hist, rcond=None)[0]
                future_x = len(hist) - 1 + self.stepsAhead
                pred = slope * future_x + intercept
            except Exception:
                pred = y[t-1]
            yhat[t] = pred
        return yhat


class PHPA_HoltWinters:
 
    def __init__(self, trend_only=HW_TREND_ONLY, seasonal=HW_SEASONAL,
                 seasonal_periods=HW_SEASONAL_PERIODS, stored_seasons=HW_STORED_SEASONS,
                 min_window=HW_MIN_WINDOW):
        self.trend_only = bool(trend_only)
        self.seasonal = None if trend_only else str(seasonal)
        self.seasonal_periods = int(seasonal_periods)
        self.stored_seasons = int(stored_seasons)
        self.min_window = int(min_window)

    def predict_series(self, y: np.ndarray) -> np.ndarray:
        n = len(y)
        yhat = np.zeros(n, dtype=float)
        if n == 0:
            return yhat
        yhat[0] = y[0]

        if self.seasonal is None:
            window = max(self.min_window, 30)
        else:
            window = max(self.min_window, self.seasonal_periods * max(1, self.stored_seasons))

        for t in range(1, n):
            start = max(0, t - window)
            series = y[start:t]
            if len(series) < max(10, (self.seasonal_periods + 2 if self.seasonal else 10)):
                yhat[t] = y[t-1]
                continue
            try:
                model = ExponentialSmoothing(
                    series,
                    trend='add',                      
                    seasonal=self.seasonal,           
                    seasonal_periods=(self.seasonal_periods if self.seasonal else None),
                    initialization_method="estimated"
                )
                fit = model.fit(optimized=True)      
                yhat[t] = float(fit.forecast(1)[0])
            except Exception:
                yhat[t] = y[t-1]
        return yhat

def evaluate(y_true, y_hat, name="MODEL"):
    y_hat = np.asarray(y_hat, dtype=float)
    if CLIP_NEGATIVE:
        y_hat = np.maximum(0.0, y_hat)

    last_preds = deque(maxlen=40)
    mae = MAE(); mape = OnlineMAPE(); wape = OnlineWAPE()
    for yt, yh in zip(y_true, y_hat):
        mae.update(yt, yh); mape.update(yt, yh); wape.update(yt, yh)
        last_preds.append((float(yt), float(yh)))

    auto = autoscale_metrics(y_true, y_hat, node_capacity=NODE_CAPACITY)

    print(f"\n=== {name} (PHPA-like) ===")
    print("Last 40 predictions:")
    for i, (yt, yh) in enumerate(last_preds, 1):
        print(f"{i:02d}  Actual={yt:8.2f}  Predicted={yh:8.2f}")

    print(f"\nMAE={mae.get():.2f}")
    print(f"MAPE={mape.get():.2f}%")
    print(f"WAPE={wape.get():.2f}%")
    print(f"SLA violations: {auto['sla_violations']}  (rate={auto['sla_rate']:.3f})")
    print(f"Over-provision (raw mCPU): {auto['overprov_raw']:.2f}")
    print(f"Over-provision RATE: {100.0*auto['overprov_rate']:.2f}%")
    print(f"Scaling actions taken: {auto['scaling_actions']}")


if __name__ == "__main__":
    y, bins = load_binned_series(
        CSV_PATH,
        per_sync_ms=PER_SYNC_PERIOD_MS,
        how=BIN_AGG,
    )

    print(f"Binned series length = {len(y)} (per_sync_ms={PER_SYNC_PERIOD_MS}, agg={BIN_AGG})")

    lin = PHPA_Linear(lookAhead_ms=LOOKAHEAD_MS, historySize=HISTORY_SIZE, perSync_ms=PER_SYNC_PERIOD_MS)
    yhat_lin = lin.predict_series(y)
    evaluate(y, yhat_lin, name=f"PHPA Linear (lookAhead={LOOKAHEAD_MS}ms, hist={HISTORY_SIZE})")

    hw = PHPA_HoltWinters(
        trend_only=HW_TREND_ONLY,
        seasonal=HW_SEASONAL,
        seasonal_periods=HW_SEASONAL_PERIODS,
        stored_seasons=HW_STORED_SEASONS,
        min_window=HW_MIN_WINDOW
    )
    yhat_hw = hw.predict_series(y)
    evaluate(y, yhat_hw, name=f"Holt-Winters ({'trend-only' if HW_TREND_ONLY else f'seasonal={HW_SEASONAL}, P={HW_SEASONAL_PERIODS}'})")


Binned series length = 7225 (per_sync_ms=10000, agg=max)

=== PHPA Linear (lookAhead=10000ms, hist=24) (PHPA-like) ===
Last 40 predictions:
01  Actual= 4000.00  Predicted= 8539.62
02  Actual=32000.00  Predicted= 7631.10
03  Actual=11400.00  Predicted=11309.36
04  Actual=11400.00  Predicted=11240.72
05  Actual= 3152.00  Predicted=11162.32
06  Actual=11400.00  Predicted=10796.93
07  Actual=11400.00  Predicted=11527.04
08  Actual=32000.00  Predicted=11894.54
09  Actual= 8000.00  Predicted=15201.71
10  Actual=11400.00  Predicted=14195.32
11  Actual= 3152.00  Predicted=13702.90
12  Actual=11908.00  Predicted=12571.00
13  Actual= 8000.00  Predicted=12993.61
14  Actual=11908.00  Predicted=12764.88
15  Actual=11400.00  Predicted=12405.13
16  Actual= 8000.00  Predicted=12690.38
17  Actual=11908.00  Predicted=12414.48
18  Actual=11908.00  Predicted=12007.55
19  Actual=11908.00  Predicted=12753.45
20  Actual=11908.00  Predicted=12300.17
21  Actual=11908.00  Predicted=11751.72
22  Actual= 3152.00 

In [None]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from collections import deque
from datetime import datetime, timezone

try:
    from prophet import Prophet
except Exception:
    from fbprophet import Prophet 
from tensorflow import keras
from tensorflow.keras import layers
CSV_PATH = 'cluster-trace-gpu-v2023/csv/openb_pod_list_cpu100.csv'
NODE_CAPACITY = 8000
CLIP_NEGATIVE = True
PER_SYNC_PERIOD_MS = 10_000   
BIN_AGG = 'max'              
PROPHET_DAILY = True
PROPHET_WEEKLY = True
PROPHET_YEARLY = False
PROPHET_SEASONALITY_MODE = 'additive' 

INIT_WINDOW = 500        
REFIT_EVERY = 250       
LOOKAHEAD_STEPS = 1     

LSTM_LOOKBACK = 48     
LSTM_EPOCHS = 8            
LSTM_BATCH = 32
LSTM_VERBOSE = 0

class MAE:
    def __init__(self): self.err=0.0; self.n=0
    def update(self, y, yhat): self.err += abs(float(y)-float(yhat)); self.n+=1
    def get(self): return self.err / max(1, self.n)

class OnlineMAPE:
    def __init__(self, eps=1e-8): self.eps=eps; self.n=0; self.sum_pe=0.0
    def update(self, y, yhat):
        y=float(y); yhat=float(yhat)
        self.sum_pe += abs(y-yhat)/max(self.eps, abs(y)); self.n += 1
    def get(self): return 100.0*self.sum_pe/max(1,self.n)

class OnlineWAPE:
    def __init__(self, eps=1e-8): self.eps=eps; self.sae=0.0; self.sat=0.0
    def update(self, y, yhat):
        self.sae += abs(float(y)-float(yhat))
        self.sat += abs(float(y))
    def get(self): return 100.0*self.sae/max(self.eps,self.sat)

def autoscale_metrics(y_true_seq, y_hat_seq, node_capacity=NODE_CAPACITY):
    sla_violations=0; overprov_raw=0.0; scaling_actions=0; current_nodes=1
    for yt, yh in zip(y_true_seq, y_hat_seq):
        need = max(1, int(np.ceil(float(yh)/node_capacity)))
        if need*node_capacity < float(yt): sla_violations += 1
        overprov_raw += max(0.0, need*node_capacity - float(yt))
        if need != current_nodes:
            scaling_actions += 1; current_nodes = need
    total = len(y_true_seq)
    sla_rate = sla_violations / max(1,total)
    denom = float(np.sum(y_true_seq)) + 1e-8
    overprov_rate = overprov_raw / denom
    return dict(sla_violations=sla_violations, sla_rate=sla_rate,
                overprov_raw=overprov_raw, overprov_rate=overprov_rate,
                scaling_actions=scaling_actions)

def _normalize_creation_time_to_ms(ct: np.ndarray) -> np.ndarray:
    ct = ct.astype(float); mx = float(np.nanmax(ct)) if len(ct) else 0.0
    if mx > 1e15:   ct = ct / 1e6  
    elif mx > 1e12: ct = ct / 1e3 
    elif mx < 1e9:  ct = ct * 1e3  
    return ct

def load_binned_series(csv_path=CSV_PATH, per_sync_ms=PER_SYNC_PERIOD_MS, how=BIN_AGG,
                       filter_col=None, filter_val=None):
    df = pd.read_csv(csv_path)
    if 'creation_time' not in df.columns:
        raise ValueError("creation_time required.")
    df['cpu_milli'] = pd.to_numeric(df.get('cpu_milli', 0), errors='coerce')
    df['creation_time'] = pd.to_numeric(df['creation_time'], errors='coerce')
    df = df.dropna(subset=['creation_time', 'cpu_milli']).copy()

    if filter_col and filter_col in df.columns:
        df = df[df[filter_col] == filter_val].copy()

    df['creation_time'] = _normalize_creation_time_to_ms(df['creation_time'].values).astype(np.int64)
    df['bin'] = (df['creation_time'] // int(per_sync_ms)).astype(np.int64)

    if how == 'max':
        agg = df.groupby('bin', as_index=True)['cpu_milli'].max().sort_index()
    else:
        agg = df.groupby('bin', as_index=True)['cpu_milli'].mean().sort_index()

    y = agg.values.astype(float)
    bins = agg.index.values
    ds = pd.to_datetime(bins * int(per_sync_ms), unit='ms', utc=True).tz_convert(None)
    return y, ds


def prophet_walk_forward(y, ds, daily=True, weekly=True, yearly=False,
                         seasonality_mode='additive',
                         init_window=INIT_WINDOW, refit_every=REFIT_EVERY,
                         steps_ahead=LOOKAHEAD_STEPS):
    n = len(y)
    yhat = np.zeros(n, dtype=float)
    yhat[:init_window] = y[:init_window]  

    model = None; last_refit = -10**9
    history_df = pd.DataFrame({'ds': ds, 'y': y})

    for t in range(init_window, n):
        if (t - last_refit) >= refit_every or model is None:
            m = Prophet(daily_seasonality=daily, weekly_seasonality=weekly, yearly_seasonality=yearly)
            m.seasonality_mode = seasonality_mode
            m.fit(history_df.iloc[:t])  
            model = m; last_refit = t

        future_times = [ds[t-1] + pd.to_timedelta(PER_SYNC_PERIOD_MS*steps_ahead, unit='ms')]
        future_df = pd.DataFrame({'ds': future_times})
        fc = model.predict(future_df)
        yhat[t] = float(fc['yhat'].iloc[0])

    return yhat
def _make_lstm_model(input_len: int):
    model = keras.Sequential([
        layers.Input(shape=(input_len, 1)),
        layers.LSTM(50, return_sequences=True),
        layers.LSTM(50),
        layers.Dense(1)
    ])
    model.compile(optimizer='adam', loss='mae')
    return model

def _build_supervised(seq, lookback):
    X, Y = [], []
    for i in range(len(seq) - lookback):
        X.append(seq[i:i+lookback])
        Y.append(seq[i+lookback])
    X = np.array(X, dtype=np.float32).reshape(-1, lookback, 1)
    Y = np.array(Y, dtype=np.float32).reshape(-1, 1)
    return X, Y

def prophet_lstm_residual_walk_forward(y, ds,
                                       init_window=INIT_WINDOW,
                                       refit_every=REFIT_EVERY,
                                       steps_ahead=LOOKAHEAD_STEPS,
                                       lookback=LSTM_LOOKBACK,
                                       epochs=LSTM_EPOCHS,
                                       batch=LSTM_BATCH,
                                       verbose=LSTM_VERBOSE):
    n = len(y)
    yhat = np.zeros(n, dtype=float)
    yhat[:init_window] = y[:init_window]

    last_refit = -10**9
    p_model = None
    lstm = None
    history_df = pd.DataFrame({'ds': ds, 'y': y})
    residual_hist = None

    for t in range(init_window, n):
        if (t - last_refit) >= refit_every or p_model is None or lstm is None:
            p = Prophet(daily_seasonality=PROPHET_DAILY, weekly_seasonality=PROPHET_WEEKLY, yearly_seasonality=PROPHET_YEARLY)
            p.seasonality_mode = PROPHET_SEASONALITY_MODE
            p.fit(history_df.iloc[:t])
            hist_fc = p.predict(history_df.iloc[:t][['ds']])
            resid = history_df.iloc[:t]['y'].values - hist_fc['yhat'].values
            residual_hist = resid.astype(float)
            if len(residual_hist) > (lookback + 5):
                X, Y = _build_supervised(residual_hist, lookback)
                lstm = _make_lstm_model(lookback)
                lstm.fit(X, Y, epochs=epochs, batch_size=batch, verbose=verbose)
            else:
                lstm = None

            p_model = p
            last_refit = t

        future_times = [ds[t-1] + pd.to_timedelta(PER_SYNC_PERIOD_MS*steps_ahead, unit='ms')]
        future_df = pd.DataFrame({'ds': future_times})
        p_fc = p_model.predict(future_df)
        base = float(p_fc['yhat'].iloc[0])

        if lstm is not None:
            past_resid = residual_hist[-lookback:]
            if len(past_resid) < lookback:
                past = np.pad(past_resid, (lookback - len(past_resid), 0))
            else:
                past = past_resid
            x = past.reshape(1, lookback, 1).astype(np.float32)
            res_next = float(lstm.predict(x, verbose=0)[0,0])
        else:
            res_next = 0.0

        yhat[t] = base + res_next

    return yhat

def evaluate(y_true, y_hat, name="MODEL"):
    y_hat = np.array(y_hat, dtype=float)
    if CLIP_NEGATIVE:
        y_hat = np.maximum(0.0, y_hat)

    last_preds = deque(maxlen=40)
    mae = MAE(); mape = OnlineMAPE(); wape = OnlineWAPE()
    for yt, yh in zip(y_true, y_hat):
        mae.update(yt, yh); mape.update(yt, yh); wape.update(yt, yh)
        last_preds.append((float(yt), float(yh)))
    auto = autoscale_metrics(y_true, y_hat, node_capacity=NODE_CAPACITY)

    print(f"\n=== {name} ===")
    print("Last 40 predictions:")
    for i, (yt, yh) in enumerate(last_preds, 1):
        print(f"{i:02d}  Actual={yt:8.2f}  Predicted={yh:8.2f}")
    print(f"\nMAE={mae.get():.2f}")
    print(f"MAPE={mape.get():.2f}%")
    print(f"WAPE={wape.get():.2f}%")
    print(f"SLA violations: {auto['sla_violations']}  (rate={auto['sla_rate']:.3f})")
    print(f"Over-provision (raw mCPU): {auto['overprov_raw']:.2f}")
    print(f"Over-provision RATE: {100.0*auto['overprov_rate']:.2f}%")
    print(f"Scaling actions taken: {auto['scaling_actions']}")

if __name__ == "__main__":
    y, ds = load_binned_series(CSV_PATH, per_sync_ms=PER_SYNC_PERIOD_MS, how=BIN_AGG)
    print(f"Binned series length = {len(y)}; bin={PER_SYNC_PERIOD_MS} ms; agg={BIN_AGG}")
    yhat_p = prophet_walk_forward(
        y, ds,
        daily=PROPHET_DAILY, weekly=PROPHET_WEEKLY, yearly=PROPHET_YEARLY,
        seasonality_mode=PROPHET_SEASONALITY_MODE,
        init_window=INIT_WINDOW, refit_every=REFIT_EVERY,
        steps_ahead=LOOKAHEAD_STEPS
    )
    evaluate(y, yhat_p, name="Prophet (paper baseline)")
    yhat_pl = prophet_lstm_residual_walk_forward(
        y, ds,
        init_window=INIT_WINDOW,
        refit_every=REFIT_EVERY,
        steps_ahead=LOOKAHEAD_STEPS,
        lookback=LSTM_LOOKBACK,
        epochs=LSTM_EPOCHS,
        batch=LSTM_BATCH,
        verbose=LSTM_VERBOSE
    )
    evaluate(y, yhat_pl, name="Hybrid Prophet+LSTM (paper-style)")


Importing plotly failed. Interactive plots will not work.
18:05:25 - cmdstanpy - INFO - Chain [1] start processing
18:05:25 - cmdstanpy - INFO - Chain [1] done processing


Binned series length = 7225; bin=10000 ms; agg=max


18:05:29 - cmdstanpy - INFO - Chain [1] start processing
18:05:29 - cmdstanpy - INFO - Chain [1] done processing
18:05:32 - cmdstanpy - INFO - Chain [1] start processing
18:05:32 - cmdstanpy - INFO - Chain [1] done processing
18:05:36 - cmdstanpy - INFO - Chain [1] start processing
18:05:36 - cmdstanpy - INFO - Chain [1] done processing
18:05:39 - cmdstanpy - INFO - Chain [1] start processing
18:05:40 - cmdstanpy - INFO - Chain [1] done processing
18:05:43 - cmdstanpy - INFO - Chain [1] start processing
18:05:43 - cmdstanpy - INFO - Chain [1] done processing
18:05:47 - cmdstanpy - INFO - Chain [1] start processing
18:05:47 - cmdstanpy - INFO - Chain [1] done processing
18:05:51 - cmdstanpy - INFO - Chain [1] start processing
18:05:51 - cmdstanpy - INFO - Chain [1] done processing
18:05:54 - cmdstanpy - INFO - Chain [1] start processing
18:05:54 - cmdstanpy - INFO - Chain [1] done processing
18:05:58 - cmdstanpy - INFO - Chain [1] start processing
18:05:58 - cmdstanpy - INFO - Chain [1]


=== Prophet (paper baseline) ===
Last 40 predictions:
01  Actual= 4000.00  Predicted=11089.19
02  Actual=32000.00  Predicted=11089.42
03  Actual=11400.00  Predicted=11098.17
04  Actual=11400.00  Predicted=11104.32
05  Actual= 3152.00  Predicted=11105.58
06  Actual=11400.00  Predicted=11105.60
07  Actual=11400.00  Predicted=11105.65
08  Actual=32000.00  Predicted=11105.29
09  Actual= 8000.00  Predicted=11093.80
10  Actual=11400.00  Predicted=11076.78
11  Actual= 3152.00  Predicted=11063.23
12  Actual=11908.00  Predicted=11055.77
13  Actual= 8000.00  Predicted=11064.81
14  Actual=11908.00  Predicted=11066.96
15  Actual=11400.00  Predicted=11067.81
16  Actual= 8000.00  Predicted=11090.56
17  Actual=11908.00  Predicted=11138.32
18  Actual=11908.00  Predicted=11172.63
19  Actual=11908.00  Predicted=11180.26
20  Actual=11908.00  Predicted=11189.76
21  Actual=11908.00  Predicted=11193.00
22  Actual= 3152.00  Predicted=11196.29
23  Actual=11908.00  Predicted=11201.30
24  Actual= 3152.00  Pred

18:07:26 - cmdstanpy - INFO - Chain [1] start processing
18:07:26 - cmdstanpy - INFO - Chain [1] done processing
18:07:46 - cmdstanpy - INFO - Chain [1] start processing
18:07:46 - cmdstanpy - INFO - Chain [1] done processing
18:08:07 - cmdstanpy - INFO - Chain [1] start processing
18:08:07 - cmdstanpy - INFO - Chain [1] done processing
18:08:29 - cmdstanpy - INFO - Chain [1] start processing
18:08:29 - cmdstanpy - INFO - Chain [1] done processing
18:08:52 - cmdstanpy - INFO - Chain [1] start processing
18:08:52 - cmdstanpy - INFO - Chain [1] done processing
18:09:15 - cmdstanpy - INFO - Chain [1] start processing
18:09:15 - cmdstanpy - INFO - Chain [1] done processing
18:09:39 - cmdstanpy - INFO - Chain [1] start processing
18:09:39 - cmdstanpy - INFO - Chain [1] done processing
18:10:05 - cmdstanpy - INFO - Chain [1] start processing
18:10:05 - cmdstanpy - INFO - Chain [1] done processing
18:10:35 - cmdstanpy - INFO - Chain [1] start processing
18:10:35 - cmdstanpy - INFO - Chain [1]


=== Hybrid Prophet+LSTM (paper-style) ===
Last 40 predictions:
01  Actual= 4000.00  Predicted=11161.33
02  Actual=32000.00  Predicted=11161.56
03  Actual=11400.00  Predicted=11170.32
04  Actual=11400.00  Predicted=11176.46
05  Actual= 3152.00  Predicted=11177.73
06  Actual=11400.00  Predicted=11177.74
07  Actual=11400.00  Predicted=11177.80
08  Actual=32000.00  Predicted=11177.43
09  Actual= 8000.00  Predicted=11165.95
10  Actual=11400.00  Predicted=11148.92
11  Actual= 3152.00  Predicted=11135.38
12  Actual=11908.00  Predicted=11127.91
13  Actual= 8000.00  Predicted=11136.95
14  Actual=11908.00  Predicted=11139.11
15  Actual=11400.00  Predicted=11139.95
16  Actual= 8000.00  Predicted=11162.70
17  Actual=11908.00  Predicted=11210.46
18  Actual=11908.00  Predicted=11244.77
19  Actual=11908.00  Predicted=11252.40
20  Actual=11908.00  Predicted=11261.90
21  Actual=11908.00  Predicted=11265.15
22  Actual= 3152.00  Predicted=11268.44
23  Actual=11908.00  Predicted=11273.44
24  Actual= 3152