In [None]:
import pandas as pd
import os
import numpy as np
import glob
from huggingface_hub import snapshot_download
from sklearn.model_selection import train_test_split
import optuna
import lightgbm as lgb
import xgboost as xgb
from sklearn.model_selection import GroupKFold, cross_val_score
from sklearn.metrics import root_mean_squared_error, mean_absolute_percentage_error, mean_absolute_error, r2_score
import seaborn as sns
import matplotlib.pyplot as plt

#### Wczytanie danych

In [None]:
# 1. Download dataset repo
local_dir = snapshot_download(
    repo_id="ejhusom/llm-inference-energy-consumption",
    repo_type="dataset",
)

# 2. Point to folder with CSVs
data_dir = os.path.join(local_dir, "data")

# 3. Read and label each CSV
dfs = []
for path in glob.glob(os.path.join(data_dir, "*.csv")):
    fname = os.path.basename(path)
    print("Reading:", fname)

    # infer label: laptop or workstation
    if "laptop1" in fname.lower():
        device_type = "laptop1"
    if "laptop2" in fname.lower():
        device_type = "laptop2"
    elif "workstation" in fname.lower():
        device_type = "workstation"
    elif "server" in fname.lower():
        device_type = "server"
    else:
        device_type = "unknown"

    df = pd.read_csv(path)
    df["device_type"] = device_type   # add as a new column
    dfs.append(df)

# 4. Merge all CSVs
full_df = pd.concat(dfs, ignore_index=True)

# 5. Save merged version
if not os.path.exists("data"):
    os.mkdir("data")
full_df.to_csv(os.path.join("data", "llm_inference_energy.csv"), index=False)

print("✅ Combined shape:", full_df.shape)
print("✅ Unique device types:", full_df["device_type"].unique())

In [None]:
data = pd.read_csv(os.path.join("data/llm_inference_energy.csv"))

data.info()

More spec columns

In [None]:
device_specs = {
    "server": {
        "cpu_vendor": "AMD",
        "cpu_family": "EPYC 7643",
        "cpu_cores": 48,
        "cpu_base_clock_ghz": np.nan,  # unknown from your string; fill later if needed
        "ram_gb": 528,
        "has_gpu": 1,
        "gpu_model": "RTX A5000",
        "gpu_vram_gb": 24,
        "gpu_class": "pro",
        "device_family": "server",
        "is_mobile": 0,
    },
    "workstation": {
        "cpu_vendor": "Intel",
        "cpu_family": "Xeon W-2223",
        "cpu_cores": 8,
        "cpu_base_clock_ghz": 3.6,
        "ram_gb": 128,
        "has_gpu": 1,
        "gpu_model": "RTX A2000",
        "gpu_vram_gb": 12,
        "gpu_class": "pro",
        "device_family": "workstation",
        "is_mobile": 0,
    },
    "laptop1": {
        "cpu_vendor": "Intel",
        "cpu_family": "Core i5 11th Gen",
        "cpu_cores": 12,
        "cpu_base_clock_ghz": 2.4,
        "ram_gb": 16,
        "has_gpu": 0,
        "gpu_model": "None",
        "gpu_vram_gb": 0,
        "gpu_class": "none",
        "device_family": "laptop",
        "is_mobile": 1,
    },
    "laptop2": {
        "cpu_vendor": "Intel",
        "cpu_family": "Core i7 10th Gen",
        "cpu_cores": 12,
        "cpu_base_clock_ghz": 2.7,
        "ram_gb": 32,
        "has_gpu": 1,
        "gpu_model": "Quadro RTX 4000",
        "gpu_vram_gb": 8,
        "gpu_class": "pro",
        "device_family": "laptop",
        "is_mobile": 1,
    },
}

data["device_spec"] = data["device_type"].map(device_specs)
data = data.join(pd.json_normalize(data["device_spec"]))
data.drop(columns=["device_spec"], inplace=True)

data.head()

Proper model names

In [None]:
rename_map = {
    "codellama": "codellama:7b",
    "llama3": "llama3:8b",
}

data['model_name'] = data['model_name'].replace(rename_map)

Get model param number

In [None]:
param_map = {
    "gemma:2b":       2e9,
    "gemma:7b":       7e9,
    "codellama:7b":   7e9,
    "llama3:8b":      8e9,
    "llama3:70b":     70e9,
    "codellama:70b":  70e9,
}

data["model_params"] = data["model_name"].map(param_map).astype(float)  # number of parameters
data["model_params_billion"] = data["model_params"] / 1e9

Level of prompt complexity

In [None]:
data["readability_min_grade"] = data["text_standard"].str.split(" ").str[0].str.replace("th|st|nd|rd", "", regex=True).astype(float)
data["readability_max_grade"] = data["text_standard"].str.split(" and ").str[1].str.split(" ").str[0].str.replace("th|st|nd|rd", "", regex=True).astype(float)
data["readability_diff"] = data["readability_max_grade"] - data["readability_min_grade"]

In [None]:
def prepare_data(dataframe: pd.DataFrame):
    basic_columns = ["Unnamed: 0.2", "Unnamed: 0.1", "Unnamed: 0", "index", "created_at", "start_time", "end_time", "energy_consumption_llm_total", "type", "energy_consumption_monitoring", "response", "prompt", "text_standard", "energy_consumption_llm_gpu", "energy_consumption_llm_cpu"]
    basic_columns = list(set(basic_columns) & set(dataframe.columns))
    dataframe.drop(columns=basic_columns, inplace=True)
    dataframe = dataframe.loc[~dataframe["model_name"].isin(["llama3:70b", "codellama:70b"])].reset_index(drop=True)
    return dataframe

#### Column groups

In [None]:
prompt_columns = [
    'prompt', 'word_count', 'sentence_count', 'avg_word_length',
    'word_diversity', 'unique_word_count', 'avg_sentence_length',
    'punctuation_count', 'stop_word_count', 'long_word_count',
    'named_entity_count', 'noun_count', 'verb_count', 'adj_count',
    'adverb_count', 'pronoun_count', 'prop_adverbs', 'prop_pronouns',
    'sentiment_polarity', 'sentiment_subjectivity', 'flesch_reading_ease',
    'flesch_kincaid_grade', 'gunning_fog', 'smog_index',
    'automated_readability_index', 'coleman_liau_index',
    'linsear_write_formula', 'dale_chall_readability_score',
    'text_standard', 'spache_readability', 'mcalpine_eflaw', 'reading_time',
    'fernandez_huerta', 'szigriszt_pazos', 'gutierrez_polini', 'crawford',
    'osman', 'gulpease_index', 'wiener_sachtextformel', 'syllable_count',
    'lexicon_count', 'char_count', 'letter_count', 'polysyllabcount',
    'monosyllabcount', 'question_marks', 'exclamation_marks',
    'sentence_embedding_variance', 'personal_pronouns', 'named_entities',
    'adjectives', 'adverbs', 'length_x_complexity',
    'questions_about_entities', 'desc_complexity_ratio',
    'word_count_squared', 'avg_sentence_length_cubed', 'lexical_diversity'
]
hardware_model_columns = ['device_type', 'cpu_vendor', 'cpu_family', 'cpu_cores', 'cpu_base_clock_ghz', 'ram_gb', 'has_gpu', 'gpu_model', 'gpu_vram_gb', 'gpu_class', 'device_family', 'is_mobile', 'model_name', 'model_params', 'model_params_billion']
response_columns = [_ for _ in data.columns if _ not in prompt_columns and _ not in hardware_model_columns]

#### Train/valid/test

In [None]:
data = prepare_data(data)

data["strata"] = data["model_name"].astype(str) + "__" + data["device_type"].astype(str)

train, test = train_test_split(
    data,
    test_size=0.3,
    random_state=42,
    stratify=data["strata"]
)

# Usuń pomocniczą kolumnę
train = train.drop(columns=["strata"])
test = test.drop(columns=["strata"])

#### Get metrics

In [None]:
from sklearn.metrics import make_scorer


def get_metrics(y_true, y_pred) -> dict:
    rmse = root_mean_squared_error(y_true, y_pred)
    mape = mean_absolute_percentage_error(y_true, y_pred) * 100
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return dict(zip(["rmse", "mape", "mae", "r2"], [rmse, mape, mae, r2]))

def smape(y_true, y_pred, epsilon = 1e-8):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    numerator = np.abs(y_pred - y_true)
    denominator = (np.abs(y_true) + np.abs(y_pred)) + epsilon

    return 100 * np.mean(numerator / denominator)

smape_scorer = make_scorer(smape, greater_is_better=False)

#### <center>Basic dataset</center>

In [None]:
categories = [col for col in train.columns if train[col].dtype == "object"]
whole_X_train_basic, whole_y_train_basic = train.drop(columns=["energy_consumption_llm"]), train["energy_consumption_llm"]

In [None]:
whole_y_train_basic.describe()

In [None]:
whole_X_train_basic[categories] = whole_X_train_basic[categories].astype("category")
# whole_y_train_basic = np.log1p(whole_y_train_basic)

X_train_basic, X_test_basic, y_train_basic, y_test_basic = train_test_split(whole_X_train_basic, whole_y_train_basic, test_size=0.3, random_state=42)

In [None]:
whole_y_train_basic.describe()

#### 1. LightGBM

In [None]:
def define_lgb(trial: optuna.Trial) -> lgb.LGBMRegressor:
    params = {
        "reg_alpha": trial.suggest_float("reg_alpha", 1e-3, 1, log=True),
        "learning_rate": trial.suggest_float("learning_rate", 1e-2, 1, log=True),
        "n_estimators": trial.suggest_int("n_estimators", 10, 150),
        "num_leaves": trial.suggest_int("num_leaves", 8, 64),
        "max_depth": trial.suggest_int("max_depth", 8, 12),
        "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 3, 50),
        "subsample": trial.suggest_float("subsample", 0.7, 1.0),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.7, 1.0),
        "random_state": 42,
        "objective": "regression",
        "boosting": "gbdt",
        "n_jobs": -1,
        "verbose": -1,
    }
    return lgb.LGBMRegressor(**params)


def optimize_lightgbm_basic(trial: optuna.Trial):
    lightgbm = define_lgb(trial)
    groups = X_train_basic["model_name"]
    kf = GroupKFold(
        n_splits=len(np.unique(groups)),
        shuffle=True,
        random_state=42
    )
    scores = cross_val_score(
        lightgbm,
        X_train_basic,
        y_train_basic,
        cv=kf,
        groups=groups,
        scoring='neg_root_mean_squared_error'
    )
    return scores.mean() * (-1)


study_lgb_basic = optuna.create_study(
    study_name="Optimize LGBM",
    direction="minimize",
    sampler=optuna.samplers.TPESampler(seed=42),
)

study_lgb_basic.optimize(optimize_lightgbm_basic, n_trials=10)

#### <center>Optuna visualization</center>

In [None]:
optuna.visualization.plot_optimization_history(study_lgb_basic)

In [None]:
optuna.visualization.plot_slice(study_lgb_basic)

In [None]:
optuna.visualization.plot_param_importances(study_lgb_basic)

Save model to file and read it

In [None]:
import joblib


def save_model(model: lgb.LGBMRegressor, filename: str):
    if not os.path.exists("models"):
        os.mkdir("models")
    joblib.dump(model, filename)

def get_model(filename: str) -> lgb.LGBMRegressor:
    if not os.path.exists(filename):
        raise FileNotFoundError(f"Brak zapisanego modelu o podanej nazwie {filename}")

    return joblib.load(filename)

def predict(model_name: str, x_test: np.typing.NDArray[np.float64]) -> np.typing.NDArray[np.float64]:
    if not os.path.exists(os.path.join("models", model_name)):
        raise FileNotFoundError(f"Brak zapisanego modelu o podanej nazwie {model_name}")

    model = get_model(model_name)
    return model.predict(x_test)

#### Best LightGBM

In [None]:
lightGBM_basic = define_lgb(study_lgb_basic.best_trial).fit(X_train_basic, y_train_basic)
save_model(lightGBM_basic, "lgb.pkl")
get_metrics(y_test_basic, lightGBM_basic.predict(X_test_basic))

In [None]:
study_lgb_basic.best_params

In [None]:
lightGBM_basic_importance = pd.DataFrame({
    'feature': lightGBM_basic.feature_name_,
    'importance': lightGBM_basic.feature_importances_
}).sort_values(by='importance', ascending=False).head(10)

plt.figure(figsize=(12, 8))
sns.set_style('whitegrid')

ax = sns.barplot(
    data=lightGBM_basic_importance,
    x='importance',
    y='feature',
    hue='feature',
    legend=False,
    palette='viridis'
)

for i in ax.containers:
    ax.bar_label(i, fmt='%g', label_type='edge', fontsize=10, padding=3)

plt.title('10 the most important columns for LightGBM', fontsize=16)
plt.xlabel('Importance value', fontsize=12)
plt.ylabel('Column', fontsize=12)

plt.xticks(fontsize=10)
plt.yticks(fontsize=11)
plt.tight_layout()
plt.show()

#### 2. XGBoost

In [None]:
def define_xgb(trial: optuna.Trial) -> xgb.XGBRegressor:
    params = {
        'tree_method': 'hist',
        'enable_categorical': True,
        'n_estimators': trial.suggest_int('n_estimators', 50, 150),
        'learning_rate': trial.suggest_float('learning_rate', 1e-2, 1, log=True),
        'max_depth': trial.suggest_int('max_depth', 5, 10),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'max_leaves': trial.suggest_int('max_leaves', 8, 48),
        'gamma': trial.suggest_float('gamma', 0.5, 1),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-3, 1e-1, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-2, 1, log=True),
        'max_bin': trial.suggest_int('max_bin', 32, 256),
        'n_jobs': -1,
        'objective': 'reg:absoluteerror',
        'random_state': 42
    }
    return xgb.XGBRegressor(**params)


def optimize_xgb(trial: optuna.Trial):
    xgboost = define_xgb(trial)
    X_train_basic.drop(columns=["clock_duration"], inplace=True)
    groups = X_train_basic["device_type"]
    kf = GroupKFold(
        n_splits=len(np.unique(groups)),
        shuffle=True,
        random_state=42
    )
    scores = cross_val_score(
        xgboost,
        X_train_basic,
        y_train_basic,
        cv=kf,
        groups=X_train_basic["device_type"],
        scoring="neg_root_mean_squared_error"
    )
    return scores.mean() * (-1)


study_xgb = optuna.create_study(
    study_name="Optimize XGBoost",
    direction="minimize",
    sampler=optuna.samplers.TPESampler(seed=42),
)

study_xgb.optimize(optimize_xgb, n_trials=10)

#### <center>Optuna visualization</center>

In [None]:
optuna.visualization.plot_optimization_history(study_xgb)

In [None]:
optuna.visualization.plot_slice(study_xgb)

In [None]:
optuna.visualization.plot_param_importances(study_xgb)

#### Best XGBoost

In [None]:
xgBoost_basic = define_xgb(study_xgb.best_trial).fit(X_train_basic, y_train_basic)
save_model(xgBoost_basic, "models/xgBoost.pkl")
get_metrics(y_train_basic, xgBoost_basic.predict(X_train_basic))

In [None]:
study_xgb.best_params

In [None]:
importance_dict = xgBoost_basic.get_booster().get_score(importance_type='weight')

xgBoost_importance = pd.DataFrame({
    'feature': list(importance_dict.keys()),
    'importance': list(importance_dict.values())
}).sort_values('importance', ascending=False).head(10)

plt.figure(figsize=(12, 8))
sns.set_style('whitegrid')

ax = sns.barplot(
    data=xgBoost_importance,
    x='importance',
    y='feature',
    hue='feature',
    legend=False,
    palette='viridis'
)

for i in ax.containers:
    ax.bar_label(i, fmt='%g', label_type='edge', fontsize=10, padding=3)

plt.title('10 the most important columns for XGBoost', fontsize=16)
plt.xlabel('Importance value', fontsize=12)
plt.ylabel('Column', fontsize=12)

plt.xticks(fontsize=10)
plt.yticks(fontsize=11)
plt.tight_layout()
plt.show()