# Prediction model

In [None]:
import pandas as pd
import numpy as np
import json
from collections import defaultdict
from sklearn.model_selection import LeaveOneOut
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.linear_model import RidgeCV
import lightgbm as lgb
import matplotlib.pyplot as plt
from tqdm import tqdm  
import joblib          

# provide file paths to logs and metrics files
log_paths = [
    'log_file_1',
    'log_file2',
    # … 
]
metric_paths = [
    'metrics_file_1',
    'metrics_file2',
    # … 
]

def process_single_run(log_path, metrics_path):
    makespan = {}
    task_counts = {}
    intervals = {}

    with open(log_path, 'r') as f:
        try:
            log_data = json.load(f)
        except json.JSONDecodeError as e:
            raise RuntimeError(f"Błąd przy parsowaniu pliku logów: {log_path}\n{e}")

    for src in log_data:
        wf   = src.get('log.attributes.workflowId')
        jb   = src.get('log.attributes.jobId')
        body = src.get('body')
        t    = pd.to_datetime(src.get('time'))
        if wf is None:
            continue

        # Makespan
        if body == 'Job started':
            st, fi = makespan.get(wf, [None, None])
            if st is None or t < st:
                st = t
            makespan[wf] = [st, fi]
        elif body == 'Job finished':
            st, fi = makespan.get(wf, [None, None])
            if fi is None or t > fi:
                fi = t
            makespan[wf] = [st, fi]

        # Task counts
        if body == 'Job started':
            task = src.get('log.attributes.name')
            if task:
                task_counts[(wf, task)] = task_counts.get((wf, task), 0) + 1

        # Concurrency
        if body in ('Job started', 'Job finished') and jb is not None:
            key = (wf, jb)
            st, en = intervals.get(key, [None, None])
            if body == 'Job started':
                st = t
            else:
                en = t
            intervals[key] = [st, en]

    # Makespan
    makespan_final = {}
    for wf, (st, fi) in makespan.items():
        if st is None or fi is None:
            continue
        makespan_final[wf] = (fi - st).total_seconds()

    # Task counts
    rows = []
    for (wf, task), cnt in task_counts.items():
        rows.append({'workflowId': wf, 'task_name': task, 'count': cnt})
    if rows:
        df_tasks = pd.DataFrame(rows)
        task_counts_ct = df_tasks.pivot_table(
            index='workflowId', columns='task_name', values='count', fill_value=0
        ).reset_index()
    else:
        task_counts_ct = pd.DataFrame({'workflowId': list(makespan_final.keys())})

    # Max concurrency per workflow
    def max_concurrency(interval_list):
        events = []
        for (st, en) in interval_list:
            events.append((st, 1))
            events.append((en, -1))
        events.sort()
        cur = mx = 0
        for _, delta in events:
            cur += delta
            mx = max(mx, cur)
        return mx

    concurrency_rows = []
    by_wf = defaultdict(list)
    for (wf, jb), (st, en) in intervals.items():
        if st is None or en is None:
            continue
        by_wf[wf].append((st, en))

    for wf, lst in by_wf.items():
        concurrency_rows.append({'workflowId': wf, 'max_concurrency': max_concurrency(lst)})
    if concurrency_rows:
        df_concurrency = pd.DataFrame(concurrency_rows)
    else:
        df_concurrency = pd.DataFrame({'workflowId': list(makespan_final.keys()), 'max_concurrency': 0})

    # CPU-info (na razie same zera, bo nie zbieramy)
    cpu_agg = pd.DataFrame({
        'workflowId': list(makespan_final.keys()),
        'cpu_speed': 0, 'cpu_cores': 0, 'cpu_pcores': 0
    })

    cpu_mem_stats = defaultdict(lambda: [0, 0.0, float('inf'), float('-inf'), 0.0])

    with open(metrics_path, 'r') as f:
        try:
            metrics_data = json.load(f)
        except json.JSONDecodeError as e:
            raise RuntimeError(f"Błąd przy parsowaniu pliku metryk: {metrics_path}\n{e}")

    for src in metrics_data:
        if isinstance(src, dict) and '_source' in src:
            src = src['_source']

        wf   = src.get('metric.attributes.workflowId')
        name = src.get('name')
        val  = src.get('value')
        if wf is None or val is None or name not in ('cpu-usage', 'memory-usage'):
            continue
        key = (wf, name)
        cnt, s, mn, mx, sq = cpu_mem_stats[key]
        cnt += 1
        s   += val
        mn   = min(mn, val)
        mx   = max(mx, val)
        sq  += val * val
        cpu_mem_stats[key] = [cnt, s, mn, mx, sq]

    rows = []
    for (wf, m_), (cnt, s, mn, mx, sq) in cpu_mem_stats.items():
        mean = s / cnt
        var  = (sq / cnt) - mean * mean
        std  = np.sqrt(var) if var > 0 else 0.0
        rows.append({
            'workflowId': wf,
            f'{m_}_mean': mean,
            f'{m_}_max':  mx,
            f'{m_}_min':  mn,
            f'{m_}_std':  std
        })
    if rows:
        df_cpu_mem = pd.DataFrame(rows)
        df_cpu_mem = df_cpu_mem.groupby('workflowId').first().reset_index()
    else:
        df_cpu_mem = pd.DataFrame({
            'workflowId': list(makespan_final.keys()),
            'cpu-usage_mean': 0, 'cpu-usage_max': 0, 'cpu-usage_min': 0, 'cpu-usage_std': 0,
            'memory-usage_mean': 0, 'memory-usage_max': 0, 'memory-usage_min': 0, 'memory-usage_std': 0
        })

    df_makespan = pd.DataFrame([{'workflowId': wf, 'makespan': ms} for wf, ms in makespan_final.items()])
    df_run = (
        df_makespan
        .merge(task_counts_ct, on='workflowId', how='left')
        .merge(df_concurrency, on='workflowId', how='left')
        .merge(cpu_agg, on='workflowId', how='left')
        .merge(df_cpu_mem, on='workflowId', how='left')
        .fillna(0)
    )
    return df_run


n_runs = len(log_paths)
all_runs = []

for log_path, metrics_path in tqdm(zip(log_paths, metric_paths), total=n_runs, desc="Przetwarzam pliki"):
    df_run = process_single_run(log_path, metrics_path)
    all_runs.append(df_run)

df_all = pd.concat(all_runs, ignore_index=True)

y = df_all['makespan']
X = df_all.drop(columns=['workflowId', 'makespan'])

scaler = StandardScaler()
num_cols = X.select_dtypes(include=[np.number]).columns
X[num_cols] = scaler.fit_transform(X[num_cols])

y_true = []
y_pred = []
loo = LeaveOneOut()

for train_idx, test_idx in loo.split(X):
    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

    alphas = np.logspace(-3, 3, 50)
    model_cv = RidgeCV(alphas=alphas, cv=5, scoring='neg_mean_squared_error')
    model_cv.fit(X_train, y_train)

    pred = model_cv.predict(X_test)
    y_true.append(y_test.values[0])
    y_pred.append(pred[0])

mae  = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
r2   = r2_score(y_true, y_pred)
print(f"--- Ocena LOO-CV (RidgeCV) ---")
print(f"LOO  MAE: {mae:.2f}")
print(f"LOO RMSE: {rmse:.2f}")
print(f"LOO   R²: {r2:.2f}")


# Model Ridg
alphas_final = np.logspace(-3, 3, 50)
ridge_final = RidgeCV(alphas=alphas_final, cv=5, scoring='neg_mean_squared_error')
ridge_final.fit(X, y)

# Model LightGBM 
train_data = lgb.Dataset(X, label=y)
params = {
    'objective': 'regression',
    'metric': ['mae', 'rmse'],
    'num_leaves': 4,
    'min_data_in_leaf': 3,
    'lambda_l2': 1.0,
    'verbosity': -1
}
lgbm_final = lgb.train(params, train_data)
lgb.plot_importance(lgbm_final, max_num_features=10)
plt.tight_layout()
plt.show()


# Save models
joblib.dump(scaler, 'scaler.pkl')
joblib.dump(ridge_final, 'ridge_model.pkl')
lgbm_final.save_model('lgbm_model.txt')


def predict_makespan_from_paths(new_log_path: str, new_metrics_path: str, 
                                model_type: str = 'lgbm') -> float:
    """
    Wczytuje nowy plik logów + metryk, buduje cechy w taki sam sposób jak przy treningu,
    skaluje je przy użyciu zapisanego scaler.pkl i zwraca predykowany makespan.
    
    Args:
        new_log_path (str): ścieżka do pliku JSON z logami pojedynczego uruchomienia.
        new_metrics_path (str): ścieżka do pliku JSON z metrykami dla tego uruchomienia.
        model_type (str): 'ridge' lub 'lgbm' (domyślnie 'lgbm').
    
    Returns:
        float: predykcja makespanu (w sekundach).
    """
    df_run = process_single_run(new_log_path, new_metrics_path)
    if df_run.shape[0] != 1:
        raise ValueError("Funkcja process_single_run zwróciła != 1 wiersza. Sprawdź pliki wejściowe.")
    
    X_new = df_run.drop(columns=['workflowId', 'makespan'], errors='ignore')
    original_cols = list(X.columns)
    for col in original_cols:
        if col not in X_new.columns:
            X_new[col] = 0
    X_new = X_new[original_cols]
    
    scaler_loaded = joblib.load('scaler.pkl')
    X_new[num_cols] = scaler_loaded.transform(X_new[num_cols])
    
    if model_type == 'ridge':
        model_loaded = joblib.load('ridge_model.pkl')
        pred = model_loaded.predict(X_new)[0]
    elif model_type == 'lgbm':
        model_loaded = lgb.Booster(model_file='lgbm_model.txt')
        pred = model_loaded.predict(X_new)[0]
    else:
        raise ValueError("model_type musi być 'ridge' lub 'lgbm'")
    
    return float(pred)


# Test model on real data

In [None]:
log_paths = [
    'log_path_1'
]

metrics_paths = [
    'log_path_2'
]


def get_actual_makespan_from_log(log_path):
    import json
    import pandas as pd

    with open(log_path) as f:
        logs = json.load(f)
    logs_df = pd.DataFrame(logs)

    workflow_end_log = logs_df[logs_df['body'].str.contains('Workflow finished', na=False)]

    if workflow_end_log.empty:
        return None

    try:
        duration_seconds = float(workflow_end_log['log.attributes.timeInSeconds'].values[0])
        return duration_seconds
    except (KeyError, ValueError, IndexError):
        return None


results = []

for log_path, metrics_path in zip(log_paths, metrics_paths):
    try:
        pred = predict_makespan_from_paths(log_path, metrics_path, model_type='lgbm')
        actual = get_actual_makespan_from_log(log_path)
        results.append({
            # 'Log file': log_path.split('/')[-1],
            'Predicted makespan (s)': round(pred, 2),
            'Actual makespan (s)': round(actual, 2) if actual is not None else 'N/A',
            'Error (s)': round(pred - actual, 2) if actual is not None else 'N/A'
        })
    except Exception as e:
        results.append({
            'Log file': log_path.split('/')[-1],
            'Predicted makespan (s)': 'ERROR',
            'Actual makespan (s)': 'ERROR',
            'Error (s)': str(e)
        })

results_df = pd.DataFrame(results)
display(results_df)