In [11]:
# ==== Kurulum & Ayarlar ====
import os, re, warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
matplotlib.rcParams["figure.dpi"] = 120

from flaml import AutoML
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import shap  # SHAP grafikleri için

# Dosya adı (aynı klasördeyse bu şekilde bırak)
DATA_FILE = "MOHTAT_dataset_4_PeerJ.xlsx"   # gerekirse tam yolu yaz
OUT_DIR   = "./outputs_flaml_shap"
os.makedirs(OUT_DIR, exist_ok=True)

# FLAML ayarları
TIME_BUDGET_SEC = 300   # 300-900 arası önerilir
SEED = 42
N_JOBS = -1


In [12]:
# ==== Veri Okuma & Temizleme (tek sayfa, Battery sütunu) ====

def to_num(x):
    """Virgüllü ondalıkları (örn: '0,987') noktaya çevirerek float yap."""
    if pd.isna(x): return np.nan
    if isinstance(x, str):
        x = x.replace(",", ".")
    try:
        return float(x)
    except Exception:
        return np.nan

def _norm(s: str) -> str:
    return re.sub(r"[^a-z0-9]", "", str(s).strip().lower())

def pick_col(cols, candidates):
    """Kolon adlarını esnek eşle: 'Cycle', 'SoH', 'Normalized_Peak', 'Temperature' için."""
    ncols = [_norm(c) for c in cols]
    candn = {_norm(c) for c in candidates}
    # tam eşleşme
    for i, n in enumerate(ncols):
        if n in candn:
            return cols[i]
    # kısmi eşleşme
    for i, n in enumerate(ncols):
        for a in candn:
            if a and a in n:
                return cols[i]
    return None

# Dosyayı oku (tek sayfa)
raw = pd.read_excel(DATA_FILE, sheet_name=0)

# Zorunlu/opsiyonel kolonları bul
col_batt = pick_col(raw.columns, {"battery","cell","cellid","batteryid"})
col_cycle = pick_col(raw.columns, {"cycle","cycles","efc","equivalentfullcycles"})
col_soh = pick_col(raw.columns, {"soh","stateofhealth"})
col_peak = pick_col(raw.columns, {"normalized_peak","normalizedpmax","normalizedicpeak","peak","pmax","icpeak"})
col_temp = pick_col(raw.columns, {"temperature","meant","mean_temperature","temp","avgtemp"})

missing = [name for name, col in {
    "Battery": col_batt, "Cycle": col_cycle, "SoH": col_soh, "Normalized_Peak": col_peak
}.items() if col is None]
if missing:
    raise ValueError(f"Zorunlu kolon(lar) eksik: {missing}\nMevcut kolonlar: {list(raw.columns)}")

# Standart tablo
data = pd.DataFrame({
    "Battery": raw[col_batt].astype(str),
    "Cycle": raw[col_cycle].map(to_num),
    "SoH": raw[col_soh].map(to_num),
    "Normalized_Peak": raw[col_peak].map(to_num),
})
if col_temp is not None:
    data["Temperature"] = raw[col_temp].map(to_num)

# Temizle
data = data.dropna(subset=["Cycle","SoH","Normalized_Peak"]).copy()

# Özellik listesi (Temperature opsiyonel)
feature_cols = ["Cycle","Normalized_Peak"]
if "Temperature" in data.columns and data["Temperature"].notna().any():
    feature_cols.append("Temperature")

# Hücre listesi (Battery = Cell01/Cell02…)
def _cell_key(x):
    m = re.search(r"\d+", x)
    return int(m.group(0)) if m else 999
cells = sorted(data["Battery"].unique(), key=_cell_key)

print("Hücreler:", cells)
print("Özellikler:", feature_cols)
display(data.head(8))


Hücreler: ['Cell01', 'Cell03', 'Cell10', 'Cell11', 'Cell12']
Özellikler: ['Cycle', 'Normalized_Peak', 'Temperature']


Unnamed: 0,Battery,Cycle,SoH,Normalized_Peak,Temperature
0,Cell01,1.0,1.0,1.0,25.0
1,Cell01,2.0,0.996359,1.025722,25.0
2,Cell01,3.0,0.997099,0.975626,25.0
3,Cell01,4.0,0.994473,1.012685,25.0
4,Cell01,5.0,0.994391,0.975566,25.0
5,Cell01,6.0,0.992173,0.987661,25.0
6,Cell01,7.0,0.991241,0.975672,25.0
7,Cell01,8.0,0.990925,0.975637,25.0


In [16]:
# ==== LOCO + FLAML + SHAP (temiz, hatasız) ====

def to_numeric_df(df: pd.DataFrame) -> pd.DataFrame:
    """Virgüllü ondalıkları noktaya çevirip sayıya dönüştürür (tüm DataFrame)."""
    return (df.astype(str)
              .apply(lambda s: s.str.replace(",", ".", regex=False))
              .apply(pd.to_numeric, errors="coerce"))

def to_numeric_series(s: pd.Series) -> pd.Series:
    """Virgüllü ondalıkları noktaya çevirip sayıya dönüştürür (Series)."""
    return pd.to_numeric(s.astype(str).str.replace(",", ".", regex=False), errors="coerce")

results = []

for test_cell in cells:
    # LOCO: bir hücre test, diğerleri train
    train_df = data[data["Battery"] != test_cell].copy()
    test_df  = data[data["Battery"] == test_cell].copy()

    if len(train_df) == 0 or len(test_df) == 0:
        print(f"[UYARI] {test_cell}: boş fold, atlandı.")
        continue

    # Özellik ve hedefi ayır
    X_train_raw, y_train_raw = train_df[feature_cols], train_df["SoH"]
    X_test_raw,  y_test_raw  = test_df[feature_cols],  test_df["SoH"]

    # --- Güvenli numeric dönüşüm (vektörize) ---
    X_train = to_numeric_df(X_train_raw).replace([np.inf, -np.inf], np.nan)
    y_train = to_numeric_series(y_train_raw).replace([np.inf, -np.inf], np.nan)

    X_test  = to_numeric_df(X_test_raw).replace([np.inf, -np.inf], np.nan)
    y_test  = to_numeric_series(y_test_raw).replace([np.inf, -np.inf], np.nan)

    # --- NaN hizalama ve atma ---
    # Train
    train_mask = X_train.notna().all(axis=1) & y_train.notna()
    X_train, y_train = X_train.loc[train_mask], y_train.loc[train_mask]

    # Test
    test_mask = X_test.notna().all(axis=1) & y_test.notna()
    X_test, y_test = X_test.loc[test_mask], y_test.loc[test_mask]

    if len(X_train) == 0 or len(X_test) == 0:
        print(f"[UYARI] {test_cell}: NaN/inf temizliği sonrası boş, atlandı.")
        continue

    # ---- FLAML ----
    automl = AutoML()
    automl.fit(
        X_train=X_train, y_train=y_train,
        task="regression",
        time_budget=TIME_BUDGET_SEC,
        metric="rmse",
        estimator_list=["lgbm", "xgboost", "rf", "extra_tree"],  # ağaç tabanlı, SHAP uyumlu
        n_jobs=N_JOBS, seed=SEED, verbose=0
    )

    # Metrikler
    y_pred = automl.predict(X_test)
    rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
    mae  = mean_absolute_error(y_test, y_pred)
    r2   = r2_score(y_test, y_pred)

    results.append({
        "Test_Cell": test_cell,
        "Best_Estimator": automl.best_estimator,
        "Best_Config": automl.best_config,
        "Best_Iteration": automl.best_iteration,
        "RMSE": rmse, "MAE": mae, "R2": r2
    })

    # ---- SHAP ----
    base_model = getattr(automl.model, "model", automl.model)
    shap_vals = None
    try:
        # Hızlı yol: TreeExplainer
        explainer = shap.TreeExplainer(base_model)
        shap_vals = explainer.shap_values(X_test)
        if isinstance(shap_vals, list):
            shap_vals = shap_vals[0]
        shap_vals = np.asarray(shap_vals)
    except Exception:
        # Genel Explainer (fallback)
        try:
            bg = X_train.sample(min(1000, len(X_train)), random_state=SEED)
            masker = shap.maskers.Independent(bg, max_samples=1000)
            explainer = shap.Explainer(base_model, masker)
            sv = explainer(X_test, check_additivity=False)
            shap_vals = np.asarray(sv.values)
        except Exception as e:
            print(f"[UYARI] {test_cell}: SHAP hesaplanamadı: {type(e).__name__}: {e}")
            shap_vals = None

    if shap_vals is not None and shap_vals.ndim == 2 and shap_vals.shape[0] == len(X_test):
        # SHAP summary (beeswarm)
        try:
            plt.figure()
            shap.summary_plot(shap_vals, X_test, feature_names=X_test.columns, show=False)
            plt.tight_layout()
            plt.savefig(os.path.join(OUT_DIR, f"shap_summary_{test_cell}.png"),
                        dpi=200, bbox_inches="tight")
            plt.close()
        except Exception as e:
            print(f"[UYARI] {test_cell}: SHAP summary çizimi hata: {e}")

        # SHAP CSV
        try:
            pd.DataFrame(shap_vals, columns=X_test.columns).to_csv(
                os.path.join(OUT_DIR, f"shap_values_{test_cell}.csv"), index=False
            )
        except Exception as e:
            print(f"[UYARI] {test_cell}: SHAP CSV yazımı hata: {e}")

        # En önemli 3 özellik için dependence plot
        try:
            mean_abs = np.abs(shap_vals).mean(axis=0)
            order = np.argsort(-mean_abs)
            top_feats = [X_test.columns[i] for i in order[:min(3, X_test.shape[1])]]
            for f in top_feats:
                plt.figure()
                shap.dependence_plot(
                    ind=f, shap_values=shap_vals, features=X_test,
                    feature_names=X_test.columns, interaction_index=None, show=False
                )
                plt.tight_layout()
                plt.savefig(os.path.join(OUT_DIR, f"shap_dependence_{test_cell}_{f}.png"),
                            dpi=200, bbox_inches="tight")
                plt.close()
        except Exception as e:
            print(f"[UYARI] {test_cell}: dependence çizimi hata: {e}")
    else:
        print(f"[UYARI] {test_cell}: SHAP üretilemedi, görseller atlandı.")

# Sonuç tablosu
res_df = pd.DataFrame(results)
res_path = os.path.join(OUT_DIR, "flaml_loco_results.csv")
res_df.to_csv(res_path, index=False)
display(res_df)
print("[KAYIT] Sonuç tablosu:", res_path)


Unnamed: 0,Test_Cell,Best_Estimator,Best_Config,Best_Iteration,RMSE,MAE,R2
0,Cell01,xgboost,"{'n_estimators': 225, 'max_leaves': 83, 'min_c...",411,0.029635,0.027489,0.862294
1,Cell03,rf,"{'n_estimators': 86, 'max_features': 1.0, 'max...",451,0.016789,0.014492,0.865423
2,Cell10,xgboost,"{'n_estimators': 211, 'max_leaves': 99, 'min_c...",598,0.036275,0.034007,0.723274
3,Cell11,extra_tree,"{'n_estimators': 189, 'max_features': 1.0, 'ma...",408,0.023208,0.01651,0.914586
4,Cell12,extra_tree,"{'n_estimators': 36, 'max_features': 1.0, 'max...",199,0.032205,0.022821,0.857266


[KAYIT] Sonuç tablosu: ./outputs_flaml_shap\flaml_loco_results.csv


In [17]:
import zipfile
import os

ZIP_PATH = "flaml_shap_outputs_mohtat.zip"   # istersen adını değiştir

with zipfile.ZipFile(ZIP_PATH, "w", zipfile.ZIP_DEFLATED) as zf:
    for root, _, files in os.walk(OUT_DIR):
        for f in files:
            full_path = os.path.join(root, f)
            rel_path  = os.path.relpath(full_path, ".")
            zf.write(full_path, rel_path)

print(f"✅ ZIP hazır: {ZIP_PATH}")


✅ ZIP hazır: flaml_shap_outputs_mohtat.zip


In [None]:
# ==== LOCO + FLAML + SHAP + MODEL KAYDETME ====

from joblib import dump
import json

def to_numeric_df(df: pd.DataFrame) -> pd.DataFrame:
    return (df.astype(str)
              .apply(lambda s: s.str.replace(",", ".", regex=False))
              .apply(pd.to_numeric, errors="coerce"))

def to_numeric_series(s: pd.Series) -> pd.Series:
    return pd.to_numeric(s.astype(str).str.replace(",", ".", regex=False), errors="coerce")

results = []
MODEL_DIR = os.path.join(OUT_DIR, "models")
os.makedirs(MODEL_DIR, exist_ok=True)

# Tüm deneyde kullanılan özellik setini da kaydedelim
with open(os.path.join(MODEL_DIR, "feature_cols.json"), "w", encoding="utf-8") as f:
    json.dump(feature_cols, f, ensure_ascii=False, indent=2)

for test_cell in cells:
    # LOCO: bir hücre test, diğerleri train
    train_df = data[data["Battery"] != test_cell].copy()
    test_df  = data[data["Battery"] == test_cell].copy()

    if len(train_df) == 0 or len(test_df) == 0:
        print(f"[UYARI] {test_cell}: boş fold, atlandı.")
        continue

    # Özellik ve hedefi ayır
    X_train_raw, y_train_raw = train_df[feature_cols], train_df["SoH"]
    X_test_raw,  y_test_raw  = test_df[feature_cols],  test_df["SoH"]

    # --- Güvenli numeric dönüşüm (vektörize) ---
    X_train = to_numeric_df(X_train_raw).replace([np.inf, -np.inf], np.nan)
    y_train = to_numeric_series(y_train_raw).replace([np.inf, -np.inf], np.nan)

    X_test  = to_numeric_df(X_test_raw).replace([np.inf, -np.inf], np.nan)
    y_test  = to_numeric_series(y_test_raw).replace([np.inf, -np.inf], np.nan)

    # --- NaN hizalama ve atma ---
    train_mask = X_train.notna().all(axis=1) & y_train.notna()
    X_train, y_train = X_train.loc[train_mask], y_train.loc[train_mask]

    test_mask = X_test.notna().all(axis=1) & y_test.notna()
    X_test, y_test = X_test.loc[test_mask], y_test.loc[test_mask]

    if len(X_train) == 0 or len(X_test) == 0:
        print(f"[UYARI] {test_cell}: NaN/inf temizliği sonrası boş, atlandı.")
        continue

    # ---- FLAML ----
    automl = AutoML()
    automl.fit(
        X_train=X_train, y_train=y_train,
        task="regression",
        time_budget=TIME_BUDGET_SEC,
        metric="rmse",
        estimator_list=["lgbm", "xgboost", "rf", "extra_tree"],  # ağaç tabanlı, SHAP uyumlu
        n_jobs=N_JOBS, seed=SEED, verbose=0
    )

    # Metrikler
    y_pred = automl.predict(X_test)
    rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
    mae  = mean_absolute_error(y_test, y_pred)
    r2   = r2_score(y_test, y_pred)

    results.append({
        "Test_Cell": test_cell,
        "Best_Estimator": automl.best_estimator,
        "Best_Config": automl.best_config,
        "Best_Iteration": automl.best_iteration,
        "RMSE": rmse, "MAE": mae, "R2": r2
    })

    # === Model & verileri kaydet ===
    # 1) Tüm AutoML nesnesi (tüm bilgilerle)
    dump(automl, os.path.join(MODEL_DIR, f"automl_{test_cell}.joblib"))
    # 2) En iyi model (sadece tahminci)
    dump(automl.model, os.path.join(MODEL_DIR, f"bestmodel_{test_cell}.joblib"))
    # 3) Test verisi ve etiket
    X_test.to_csv(os.path.join(MODEL_DIR, f"X_test_{test_cell}.csv"), index=True)
    y_test.to_csv(os.path.join(MODEL_DIR, f"y_test_{test_cell}.csv"), index=True)
    # 4) SHAP masker için arka plan (eğer tekrar üretmek istersek)
    bg = X_train.sample(min(1000, len(X_train)), random_state=SEED)
    bg.to_csv(os.path.join(MODEL_DIR, f"bg_{test_cell}.csv"), index=True)

    # ---- SHAP ----
    base_model = getattr(automl.model, "model", automl.model)
    shap_vals = None
    try:
        # Hızlı yol: TreeExplainer
        explainer = shap.TreeExplainer(base_model)
        shap_vals = explainer.shap_values(X_test)
        if isinstance(shap_vals, list):
            shap_vals = shap_vals[0]
        shap_vals = np.asarray(shap_vals)
    except Exception:
        # Genel Explainer (fallback)
        try:
            masker = shap.maskers.Independent(bg, max_samples=1000)
            explainer = shap.Explainer(base_model, masker)
            sv = explainer(X_test, check_additivity=False)
            shap_vals = np.asarray(sv.values)
        except Exception as e:
            print(f"[UYARI] {test_cell}: SHAP hesaplanamadı: {type(e).__name__}: {e}")
            shap_vals = None

    if shap_vals is not None and shap_vals.ndim == 2 and shap_vals.shape[0] == len(X_test):
        # SHAP summary (beeswarm)
        try:
            plt.figure()
            shap.summary_plot(shap_vals, X_test, feature_names=X_test.columns, show=False)
            plt.tight_layout()
            plt.savefig(os.path.join(OUT_DIR, f"shap_summary_{test_cell}.png"),
                        dpi=200, bbox_inches="tight")
            plt.close()
        except Exception as e:
            print(f"[UYARI] {test_cell}: SHAP summary çizimi hata: {e}")

        # SHAP CSV
        try:
            pd.DataFrame(shap_vals, columns=X_test.columns).to_csv(
                os.path.join(OUT_DIR, f"shap_values_{test_cell}.csv"), index=False
            )
        except Exception as e:
            print(f"[UYARI] {test_cell}: SHAP CSV yazımı hata: {e}")

        # En önemli 3 özellik için dependence plot
        try:
            mean_abs = np.abs(shap_vals).mean(axis=0)
            order = np.argsort(-mean_abs)
            top_feats = [X_test.columns[i] for i in order[:min(3, X_test.shape[1])]]
            for f in top_feats:
                plt.figure()
                shap.dependence_plot(
                    ind=f, shap_values=shap_vals, features=X_test,
                    feature_names=X_test.columns, interaction_index=None, show=False
                )
                plt.tight_layout()
                plt.savefig(os.path.join(OUT_DIR, f"shap_dependence_{test_cell}_{f}.png"),
                            dpi=200, bbox_inches="tight")
                plt.close()
        except Exception as e:
            print(f"[UYARI] {test_cell}: dependence çizimi hata: {e}")
    else:
        print(f"[UYARI] {test_cell}: SHAP üretilemedi, görseller atlandı.")

# Sonuç tablosu
res_df = pd.DataFrame(results)
res_path = os.path.join(OUT_DIR, "flaml_loco_results.csv")
res_df.to_csv(res_path, index=False)
display(res_df)
print("[KAYIT] Sonuç tablosu:", res_path)
print("[KAYIT] Modeller ve veri parçaları:", MODEL_DIR)


In [None]:
# ==== MODELİ YÜKLE, AYNI BATTERY İÇİN GRAFİKLERİ TEKRAR ÜRET ====

from joblib import load
import json

SELECT_BATTERY = "Cell01"   # örn: "Cell02", "Cell07" ...

# 1) Özellik listesini ve modeli yükle
with open(os.path.join(MODEL_DIR, "feature_cols.json"), "r", encoding="utf-8") as f:
    feature_cols_saved = json.load(f)

automl = load(os.path.join(MODEL_DIR, f"automl_{SELECT_BATTERY}.joblib"))
best_model = load(os.path.join(MODEL_DIR, f"bestmodel_{SELECT_BATTERY}.joblib"))

# 2) Veri ve arka plan örneklemini yükle
raw = pd.read_excel(DATA_FILE, sheet_name=0)
# önceki yardımcıları kullanarak kolonları bulalım (Hücre 2’de tanımladık: pick_col)
col_batt = pick_col(raw.columns, {"battery","cell","cellid","batteryid"})
# Excel’den bu Battery’ye ait satırları al
mask = raw[col_batt].astype(str) == SELECT_BATTERY
subset = raw.loc[mask, :]

# Feature kolonlarını (saved order) vektörize numeric’e çevir
def to_numeric_df(df: pd.DataFrame) -> pd.DataFrame:
    return (df.astype(str)
              .apply(lambda s: s.str.replace(",", ".", regex=False))
              .apply(pd.to_numeric, errors="coerce"))

X = to_numeric_df(subset[feature_cols_saved]).replace([np.inf,-np.inf], np.nan).dropna()

# 3) SHAP hesapla (TreeExplainer dene; olmazsa masker kullan)
base_model = getattr(best_model, "model", best_model)
try:
    explainer = shap.TreeExplainer(base_model)
    shap_vals = explainer.shap_values(X)
    if isinstance(shap_vals, list):
        shap_vals = shap_vals[0]
    shap_vals = np.asarray(shap_vals)
except Exception:
    # Kaydedilmiş bg varsa onu kullan
    bg_path = os.path.join(MODEL_DIR, f"bg_{SELECT_BATTERY}.csv")
    if not os.path.exists(bg_path):
        # başka bir bg dosyası da seçebilirsin; yoksa training’siz genel masker kullan
        bg_df = X.sample(min(100, len(X)), random_state=SEED)
    else:
        bg_df = pd.read_csv(bg_path, index_col=0)
    masker = shap.maskers.Independent(bg_df, max_samples=1000)
    explainer = shap.Explainer(base_model, masker)
    sv = explainer(X, check_additivity=False)
    shap_vals = np.asarray(sv.values)

# 4) Grafikleri üret ve kaydet
plt.figure()
shap.summary_plot(shap_vals, X, feature_names=X.columns, show=False)
plt.tight_layout()
out_png = os.path.join(OUT_DIR, f"reshap_summary_{SELECT_BATTERY}.png")
plt.savefig(out_png, dpi=200, bbox_inches="tight"); plt.close()
print("Kaydedildi:", out_png)

# ilk 3 özellik için dependence
mean_abs = np.abs(shap_vals).mean(axis=0)
order = np.argsort(-mean_abs)
top_feats = [X.columns[i] for i in order[:min(3, X.shape[1])]]
for f in top_feats:
    plt.figure()
    shap.dependence_plot(f, shap_vals, X, feature_names=X.columns, interaction_index=None, show=False)
    dep_png = os.path.join(OUT_DIR, f"reshap_dependence_{SELECT_BATTERY}_{f}.png")
    plt.tight_layout(); plt.savefig(dep_png, dpi=200, bbox_inches="tight"); plt.close()
    print("Kaydedildi:", dep_png)
