In [None]:
import pandas as pd
import numpy as np
import os
import joblib
import matplotlib.pyplot as plt
import seaborn as sns
from category_encoders import TargetEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.metrics import r2_score, mean_squared_error, accuracy_score, f1_score
from xgboost import XGBRegressor, XGBClassifier
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.layers import Dense, Dropout, Conv1D, MaxPooling1D, Flatten # type: ignore
from tensorflow.keras.optimizers import Adam # type: ignore
import tensorflow as tf

# Force CPU usage
tf.config.set_visible_devices([], 'GPU')

# ===============================
# Helper Functions
# ===============================

def clean_numeric_string(numeric_str):
    """Cleans numeric-like strings and returns mean if multiple values."""
    if pd.isna(numeric_str) or not isinstance(numeric_str, str):
        return numeric_str
    temp_str = numeric_str.strip().replace(' | ', ';').replace(' >> ', ';').replace(' ', '').replace(':', ';')
    parts = temp_str.split(';')
    temps = []
    for p in parts:
        p_cleaned = p.strip()
        p_cleaned = p_cleaned.replace('%', '').replace('mol%', '').replace('mg/ml', '').replace('uL', '').replace('mM', '').replace('vol%', '')
        try:
            if p_cleaned and p_cleaned.replace('.', '', 1).isdigit():
                temps.append(float(p_cleaned))
        except ValueError:
            continue
    return np.mean(temps) if temps else np.nan

def preprocess_dataframe(df, target_col='Stability_PCE_T80'):
    """Preprocess a single dataframe (NIP or PIN) independently."""
    missing_value_strings = ['unknown', 'nan', 'na', 'none', 'n/a', 'not available', '', ' ']
    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].astype(str).str.strip().str.lower().replace(missing_value_strings, np.nan)

    threshold = 0.70
    missing_percentages = df.isnull().sum() / len(df)
    cols_to_drop = missing_percentages[missing_percentages >= threshold].index.tolist()
    if target_col in cols_to_drop:
        cols_to_drop.remove(target_col)
    df = df.drop(columns=cols_to_drop, errors='ignore')

    numeric_columns_to_clean = [
        'Cell_area_measured', 'ETL_thickness', 'Perovskite_composition_a_ions_coefficients',
        'Perovskite_composition_b_ions_coefficients', 'Perovskite_composition_c_ions_coefficients',
        'Perovskite_thickness', 'Perovskite_band_gap', 'Perovskite_deposition_thermal_annealing_temperature',
        'Perovskite_deposition_thermal_annealing_time', 'Backcontact_thickness_list',
        'Stability_potential_bias_range', 'Stability_temperature_range',
        'JV_default_Voc', 'JV_default_Jsc', 'JV_default_FF', 'JV_default_PCE'
    ]
    for col in numeric_columns_to_clean:
        if col in df.columns:
            df[col] = df[col].apply(clean_numeric_string)

    boolean_columns_to_convert = [
        'Perovskite_single_crystal', 'Perovskite_dimension_0D', 'Perovskite_dimension_2D',
        'Perovskite_dimension_2D3D_mixture', 'Perovskite_dimension_3D_with_2D_capping_layer',
        'Perovskite_composition_inorganic', 'Perovskite_composition_leadfree',
        'Perovskite_deposition_quenching_induced_crystallisation', 'Perovskite_deposition_solvent_annealing',
        'Add_lay_front', 'Add_lay_back', 'Encapsulation', 'Stability_light_UV_filter'
    ]
    for col in boolean_columns_to_convert:
        if col in df.columns:
            df[col] = df[col].astype(str).str.lower().replace({'true': 1, 'false': 0}).astype(float)
    if 'Perovskite_dimension_3D' in df.columns:
        df['Perovskite_dimension_3D'] = df['Perovskite_dimension_3D'].astype(float)

    categorical_cols = [col for col in df.columns if df[col].dtype == 'object' and col != target_col]
    if categorical_cols:
        encoder = TargetEncoder(cols=categorical_cols)
        encoder.fit(df[categorical_cols], df[target_col])
        df_encoded_categorical = encoder.transform(df[categorical_cols])
        non_categorical_cols = [col for col in df.columns if col not in categorical_cols]
        df = pd.concat([df[non_categorical_cols].reset_index(drop=True),
                        df_encoded_categorical.reset_index(drop=True)], axis=1)

    numeric_cols = df.select_dtypes(include=np.number).columns
    for col in numeric_cols:
        if col != target_col:
            df[col] = df[col].fillna(df[col].median())

    return df

def plot_correlation_heatmap(df, prefix, target_col):
    numeric_df = df.select_dtypes(include=np.number)
    corr = numeric_df.corr()
    target_corr = corr[[target_col]].sort_values(by=target_col, ascending=False)
    target_corr.to_csv(f"{prefix}_target_correlation.csv")
    plt.figure(figsize=(12, 10))
    sns.heatmap(corr, cmap="coolwarm", annot=False, fmt=".2f", cbar=True)
    plt.title(f"{prefix} - Feature Correlation Heatmap", fontsize=16)
    plt.tight_layout()
    plt.savefig(f"{prefix}_heatmap.png", dpi=300)
    plt.close()

def extract_feature_importance(model, X, prefix, task_type, model_name):
    if hasattr(model, "feature_importances_"):
        feature_importances = pd.DataFrame({
            'Feature': X.columns,
            'Importance': model.feature_importances_
        }).sort_values(by="Importance", ascending=False)
        feature_importances.to_csv(f"{prefix}_{model_name}_{task_type}_feature_importance.csv", index=False)
        plt.figure(figsize=(10, 6))
        sns.barplot(x="Importance", y="Feature", data=feature_importances.head(15), palette="viridis")
        plt.title(f"{prefix} - Top 15 Features ({model_name} {task_type})", fontsize=14)
        plt.tight_layout()
        plt.savefig(f"{prefix}_{model_name}_{task_type}_feature_importance.png", dpi=300)
        plt.close()

def build_ann(input_dim):
    model = Sequential([
        Dense(64, activation='relu', input_dim=input_dim),
        Dropout(0.2),
        Dense(32, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(0.001), loss='mse')
    return model

def build_cnn(input_dim):
    model = Sequential([
        Conv1D(32, kernel_size=3, activation='relu', input_shape=(input_dim, 1)),
        MaxPooling1D(pool_size=2),
        Flatten(),
        Dense(32, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer=Adam(0.001), loss='mse')
    return model

def train_and_save_models(df, target_col, prefix, metrics_list):
    X = df.drop(columns=[target_col])
    y = df[target_col]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    y_class = (y >= y.median()).astype(int)
    X_train_c, X_test_c, y_train_c, y_test_c = train_test_split(X, y_class, test_size=0.2, random_state=42)

    # ======= Add these conversions to fix dtype issues =======
    # Convert all to numeric numpy arrays with float32 dtype
    X_train = X_train.astype(np.float32).to_numpy()
    X_test = X_test.astype(np.float32).to_numpy()
    y_train = y_train.astype(np.float32).to_numpy()
    y_test = y_test.astype(np.float32).to_numpy()

    X_train_c = X_train_c.astype(np.float32).to_numpy()
    X_test_c = X_test_c.astype(np.float32).to_numpy()
    y_train_c = y_train_c.astype(np.float32).to_numpy()
    y_test_c = y_test_c.astype(np.float32).to_numpy()
    # ==========================================================

    os.makedirs("models", exist_ok=True)

    # Rest of your code unchanged ...

    # Models to train
    models = {
        "RF": (RandomForestRegressor(random_state=42), RandomForestClassifier(random_state=42)),
        "XGB": (XGBRegressor(random_state=42, n_jobs=-1), XGBClassifier(random_state=42, n_jobs=-1)),
    }

    for name, (reg_model, clf_model) in models.items():
        # Regression
        reg_model.fit(X_train, y_train)
        y_pred = reg_model.predict(X_test)
        rmse = np.sqrt(mean_squared_error(y_test, y_pred))
        metrics_list.append([prefix, name, "Regression", r2_score(y_test, y_pred), rmse, None, None])
        extract_feature_importance(reg_model, X, prefix, "regression", name)
        joblib.dump(reg_model, f"models/{prefix}_{name}_regression.joblib")

        # Classification
        clf_model.fit(X_train_c, y_train_c)
        y_pred_c = clf_model.predict(X_test_c)
        metrics_list.append([prefix, name, "Classification", None, None, accuracy_score(y_test_c, y_pred_c), f1_score(y_test_c, y_pred_c)])
        extract_feature_importance(clf_model, X, prefix, "classification", name)
        joblib.dump(clf_model, f"models/{prefix}_{name}_classification.joblib")

    # ANN Regression
    ann = build_ann(X_train.shape[1])
    ann.fit(X_train, y_train, epochs=30, batch_size=16, verbose=0)
    y_pred_ann = ann.predict(X_test).flatten()
    rmse = np.sqrt(mean_squared_error(y_test, y_pred_ann))
    metrics_list.append([prefix, "ANN", "Regression", r2_score(y_test, y_pred_ann), rmse, None, None])
    ann.save(f"models/{prefix}_ANN_regression.h5")

    # ANN Classification
    ann_c = build_ann(X_train_c.shape[1])
    ann_c.compile(optimizer=Adam(0.001), loss='binary_crossentropy', metrics=['accuracy'])
    ann_c.fit(X_train_c, y_train_c, epochs=30, batch_size=16, verbose=0)
    y_pred_ann_c = (ann_c.predict(X_test_c) >= 0.5).astype(int)
    metrics_list.append([prefix, "ANN", "Classification", None, None, accuracy_score(y_test_c, y_pred_ann_c), f1_score(y_test_c, y_pred_ann_c)])
    ann_c.save(f"models/{prefix}_ANN_classification.h5")

    # CNN Regression
    cnn = build_cnn(X_train.shape[1])
    cnn.fit(np.expand_dims(X_train, axis=2), y_train, epochs=30, batch_size=16, verbose=0)
    y_pred_cnn = cnn.predict(np.expand_dims(X_test, axis=2)).flatten()
    metrics_list.append([prefix, "CNN", "Regression", r2_score(y_test, y_pred_cnn), np.sqrt(mean_squared_error(y_test, y_pred_cnn)), None, None])
    cnn.save(f"models/{prefix}_CNN_regression.h5")

    # CNN Classification
    cnn_c = build_cnn(X_train_c.shape[1])
    cnn_c.compile(optimizer=Adam(0.001), loss='binary_crossentropy', metrics=['accuracy'])
    cnn_c.fit(np.expand_dims(X_train_c, axis=2), y_train_c, epochs=30, batch_size=16, verbose=0)
    y_pred_cnn_c = (cnn_c.predict(np.expand_dims(X_test_c, axis=2)) >= 0.5).astype(int)
    metrics_list.append([prefix, "CNN", "Classification", None, None, accuracy_score(y_test_c, y_pred_cnn_c), f1_score(y_test_c, y_pred_cnn_c)])
    cnn_c.save(f"models/{prefix}_CNN_classification.h5")

# ===============================
# Main Script
# ===============================
dtype_spec = {
    'Substrate_thickness': 'object',
    'Perovskite_single_crystal': 'object',
    'JV_default_PCE': 'object'
}
df = pd.read_csv("Perovskite_database.csv", dtype=dtype_spec)
df = df.dropna(subset=['Stability_PCE_T80'])

df_nip = df[df['Cell_architecture'] == 'nip'].copy()
df_pin = df[df['Cell_architecture'] == 'pin'].copy()

df_nip_cleaned = preprocess_dataframe(df_nip)
df_pin_cleaned = preprocess_dataframe(df_pin)

plot_correlation_heatmap(df_nip_cleaned, "NIP", "Stability_PCE_T80")
plot_correlation_heatmap(df_pin_cleaned, "PIN", "Stability_PCE_T80")

df_nip_cleaned.to_csv("Perovskite_NIP_cleaned.csv", index=False)
df_pin_cleaned.to_csv("Perovskite_PIN_cleaned.csv", index=False)

metrics = []
train_and_save_models(df_nip_cleaned, 'Stability_PCE_T80', "NIP", metrics)
train_and_save_models(df_pin_cleaned, 'Stability_PCE_T80', "PIN", metrics)

metrics_df = pd.DataFrame(metrics, columns=["Architecture", "Model", "Task", "R2", "RMSE", "Accuracy", "F1"])
metrics_df.to_csv("model_performance_summary.csv", index=False)
print(metrics_df)



# Show top 5 regression models
print("\nTop 5 Regression Models:")
top_reg = metrics_df.dropna(subset=["R2"]).sort_values(by="R2", ascending=False).head(5)
print(top_reg)

# Show top 5 classification models
print("\nTop 5 Classification Models:")
top_clf = metrics_df.dropna(subset=["F1"]).sort_values(by="F1", ascending=False).head(5)
print(top_clf)
