# 新規材料の予測

In [1]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='whitegrid', font_scale=1.2)
import numpy as np
import pandas as pd

from sklearn.cross_decomposition import PLSRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, KFold, cross_val_predict
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
import lightgbm as lgb
import optuna

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pickle

from scipy.spatial.distance import cdist
from sklearn.svm import OneClassSVM
from sklearn.decomposition import PCA

  import pkg_resources


## データ読み込み＆前処理

In [2]:
# データ読み込み、学習データ＋未知のデータ
data = pd.read_csv('material_data.csv', index_col=0)
des_rdkit = pd.read_csv('descriptor_rdkit.csv', index_col=0)
fingerprint_df = pd.read_csv('morganFP.csv', index_col=0)
des_mordred_2d = pd.read_csv('descriptor_mordred_2d.csv', index_col=0)
des_mordred_3d = pd.read_csv('descriptor_mordred_3d.csv', index_col=0)

print("データ形状:", data.shape, des_rdkit.shape, fingerprint_df.shape, des_mordred_2d.shape, des_mordred_3d.shape)

データ形状: (233, 3) (233, 217) (233, 2048) (233, 1158) (233, 1826)


In [4]:
# 結合
dataset = pd.concat([data.reset_index(), des_rdkit.reset_index(drop=True)], axis=1)
dataset.index = dataset['Material']
dataset = dataset.drop(dataset.columns[0], axis=1)

# TypeとSMILESも消す
dataset = dataset.drop(['SMILES', 'Type'], axis=1)

# 学習用と予測用に分ける
dataset_train = dataset.dropna(subset='PL')
dataset_test = dataset[dataset['PL'].isnull()]

# 予測データのPLは空なので消す
dataset_test = dataset_test.drop('PL', axis=1)

# infをNaNに置き換え
dataset_train = dataset_train.replace(np.inf, np.nan).fillna(np.nan)
dataset_train = dataset_train.drop(dataset_train.columns[dataset_train.isnull().any()], axis=1)

dataset_test = dataset_test.replace(np.inf, np.nan).fillna(np.nan)
dataset_test = dataset_test.drop(dataset_test.columns[dataset_test.isnull().any()], axis=1)

# 学習データのstdが0の列を特定
zero_std_cols = dataset_train.columns[dataset_train.std() == 0]

# 学習・未知データから同じ列を削除
dataset_train = dataset_train.drop(columns=zero_std_cols)
dataset_test = dataset_test.drop(columns=zero_std_cols, errors='ignore')

print("前処理後のデータ形状:", dataset_train.shape, dataset_test.shape)

前処理後のデータ形状: (228, 152) (5, 151)


In [5]:
# 最終的な特徴量共通セット
X_train_all = dataset_train.drop(columns='PL')
y_train_all = dataset_train['PL']
X_test = dataset_test  # PLなし

## NN

In [6]:
# CPU専用設定
device = torch.device("cpu")

print(f"使用デバイス: {device}")

使用デバイス: cpu


In [7]:
# データセット定義
class RegressionDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 学習loop
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for X_batch, y_batch in dataloader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        pred = model(X_batch)
        loss = criterion(pred, y_batch)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    return total_loss / len(dataloader)

# 評価（修正版）
def evaluate(model, dataloader, device, y_scaler):
    model.eval()
    preds = []
    trues = []
    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.cpu().numpy()
            pred = model(X_batch).cpu().numpy()
            preds.append(pred)
            trues.append(y_batch)

    y_pred_std = np.vstack(preds)
    y_true_std = np.vstack(trues)

    # 逆標準化
    y_pred = y_scaler.inverse_transform(y_pred_std).flatten()
    y_true = y_scaler.inverse_transform(y_true_std).flatten()

    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    return rmse

In [None]:
# Optuna設定（修正版）
def define_model(trial, input_dim):
    n_layers = trial.suggest_int("n_layers", 2, 4)  # 範囲を狭める
    hidden_dim = trial.suggest_int("hidden_dim", 32, 256)  # 範囲を狭める
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5)
    activation = trial.suggest_categorical("activation", ["relu", "leaky_relu"])

    layers = []
    current_dim = input_dim
    
    for i in range(n_layers):
        layers.append(nn.Linear(current_dim, hidden_dim))
        layers.append(nn.ReLU() if activation == "relu" else nn.LeakyReLU())
        layers.append(nn.Dropout(dropout_rate))
        current_dim = hidden_dim
    
    layers.append(nn.Linear(hidden_dim, 1))
    return nn.Sequential(*layers)

def objective(trial):
    # ハイパーパラメータ提案
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "SGD"])
    epochs = trial.suggest_int("epochs", 30, 100)  # エポック数も最適化

    kf = KFold(n_splits=5, shuffle=True, random_state=1234)
    val_losses = []

    # 分割前に標準化
    X_scaler = StandardScaler()
    autoscaled_X = X_scaler.fit_transform(X_train_all)
    y_scaler = StandardScaler()
    autoscaled_y = y_scaler.fit_transform(y_train_all.values.reshape(-1, 1))

    for train_idx, val_idx in kf.split(autoscaled_X):
        # スライス
        X_train, X_val = autoscaled_X[train_idx], autoscaled_X[val_idx]
        y_train, y_val = autoscaled_y[train_idx], autoscaled_y[val_idx]

        # Dataset, DataLoader
        train_ds = RegressionDataset(X_train, y_train)
        val_ds = RegressionDataset(X_val, y_val)

        train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
        val_dl = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

        # モデル定義
        model = define_model(trial, X_train.shape[1]).to(device)
        criterion = nn.MSELoss()

        if optimizer_name == "Adam":
            optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        else:
            optimizer = torch.optim.SGD(model.parameters(), lr=lr)

        # 学習ループ（Early Stopping追加）
        best_val_loss = float('inf')
        patience = 10
        no_improve = 0
        
        for epoch in range(epochs):
            train_epoch(model, train_dl, optimizer, criterion, device)
            val_loss = evaluate(model, val_dl, device, y_scaler)
            
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                no_improve = 0
            else:
                no_improve += 1
                
            if no_improve >= patience:
                break

        val_losses.append(best_val_loss)

    return np.mean(val_losses)

In [None]:
# 実行
print("Optuna最適化開始...")
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)  # 試行回数を増やす

print('Best trial:', study.best_trial.params)

[I 2025-07-03 21:03:41,567] A new study created in memory with name: no-name-9da7f0b6-d4f1-42dc-a2f6-9d5c85bff04f


Optuna最適化開始...


[I 2025-07-03 21:03:54,431] Trial 0 finished with value: 47.105861695292774 and parameters: {'batch_size': 32, 'lr': 0.008481733450322751, 'optimizer': 'SGD', 'epochs': 92, 'n_layers': 2, 'hidden_dim': 164, 'dropout_rate': 0.4433784950601598, 'activation': 'relu'}. Best is trial 0 with value: 47.105861695292774.
[I 2025-07-03 21:03:59,238] Trial 1 finished with value: 42.43999237766509 and parameters: {'batch_size': 64, 'lr': 0.000271920742701073, 'optimizer': 'Adam', 'epochs': 75, 'n_layers': 2, 'hidden_dim': 123, 'dropout_rate': 0.3318458204753981, 'activation': 'leaky_relu'}. Best is trial 1 with value: 42.43999237766509.
[I 2025-07-03 21:04:13,764] Trial 2 finished with value: 80.64582844813569 and parameters: {'batch_size': 16, 'lr': 0.00041933820615399215, 'optimizer': 'SGD', 'epochs': 78, 'n_layers': 2, 'hidden_dim': 37, 'dropout_rate': 0.4000135600453857, 'activation': 'leaky_relu'}. Best is trial 1 with value: 42.43999237766509.
[I 2025-07-03 21:04:29,650] Trial 3 finished wit

Best trial : {'batch_size': 64, 'lr': 0.0007159882185765599, 'optimizer': 'Adam', 'epochs': 31, 'n_layers': 3, 'hidden_dim': 213, 'dropout_rate': 0.20536190735059906, 'activation': 'relu'}


In [None]:
# 汎化性能チェック（修正版）
print("\n汎化性能評価開始...")
rmse_scores = []
mae_scores = []
r2_scores = []

# 分割前に標準化
X_scaler = StandardScaler()
autoscaled_X = X_scaler.fit_transform(X_train_all)
y_scaler = StandardScaler()
autoscaled_y = y_scaler.fit_transform(y_train_all.values.reshape(-1, 1))

# 5分割交差検証
kf = KFold(n_splits=5, shuffle=True, random_state=1234)

for fold, (train_index, val_index) in enumerate(kf.split(autoscaled_X)):
    print(f"Fold {fold + 1}/5")
    
    # データ分割
    X_train, X_val = autoscaled_X[train_index], autoscaled_X[val_index]
    y_train, y_val = autoscaled_y[train_index], autoscaled_y[val_index]

    # Dataset/DataLoader
    train_ds = RegressionDataset(X_train, y_train)
    val_ds = RegressionDataset(X_val, y_val)
    train_dl = DataLoader(train_ds, batch_size=study.best_trial.params['batch_size'], shuffle=True)
    val_dl = DataLoader(val_ds, batch_size=study.best_trial.params['batch_size'], shuffle=False)

    # モデルと最適化
    model_nn_op = define_model(study.best_trial, input_dim=X_train.shape[1]).to(device)
    if study.best_trial.params['optimizer'] == "Adam":
        optimizer = torch.optim.Adam(model_nn_op.parameters(), lr=study.best_trial.params['lr'])
    else:
        optimizer = torch.optim.SGD(model_nn_op.parameters(), lr=study.best_trial.params['lr'])
    criterion = torch.nn.MSELoss()

    # 学習（Early Stopping付き）
    best_val_loss = float('inf')
    patience = 10
    no_improve = 0
    
    for epoch in range(100):  # 最大エポック数
        model_nn_op.train()
        for X_batch, y_batch in train_dl:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            pred = model_nn_op(X_batch)
            loss = criterion(pred, y_batch)
            loss.backward()
            optimizer.step()

        # 検証
        val_loss = evaluate(model_nn_op, val_dl, device, y_scaler)
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            no_improve = 0
        else:
            no_improve += 1
            
        if no_improve >= patience:
            break

    # 最終推論
    model_nn_op.eval()
    preds = []
    with torch.no_grad():
        for X_batch, _ in val_dl:
            X_batch = X_batch.to(device)
            y_pred_std = model_nn_op(X_batch).cpu().numpy()
            preds.append(y_pred_std)
    
    y_pred_std = np.vstack(preds)
    y_pred = y_scaler.inverse_transform(y_pred_std).flatten()
    y_true = y_scaler.inverse_transform(y_val).flatten()

    # 評価
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)

    rmse_scores.append(rmse)
    mae_scores.append(mae)
    r2_scores.append(r2)

    print(f'Fold RMSE: {rmse:.4f}')
    print(f'Fold MAE: {mae:.4f}')
    print(f'Fold R2: {r2:.4f}\n')

# 平均結果出力
print(f'\n平均RMSE: {np.mean(rmse_scores):.4f} ± {np.std(rmse_scores):.4f}')
print(f'平均MAE: {np.mean(mae_scores):.4f} ± {np.std(mae_scores):.4f}')
print(f'平均R2: {np.mean(r2_scores):.4f} ± {np.std(r2_scores):.4f}')

Fold RMSE : 45.6510
Fold MAE : 29.5482
Fold R2  : 0.7745

Fold RMSE : 48.6657
Fold MAE : 33.2880
Fold R2  : 0.8075

Fold RMSE : 45.0280
Fold MAE : 34.1156
Fold R2  : 0.8035

Fold RMSE : 40.4799
Fold MAE : 29.1705
Fold R2  : 0.7934

Fold RMSE : 31.5687
Fold MAE : 25.4763
Fold R2  : 0.8773


平均RMSE : 42.2787
平均MAE  : 30.3197
平均R2   : 0.8112


In [None]:
# 最終モデルの学習と保存
print("\n最終モデルの学習...")
X_scaler_final = StandardScaler()
X_train_scaled = X_scaler_final.fit_transform(X_train_all)
y_scaler_final = StandardScaler()
y_train_scaled = y_scaler_final.fit_transform(y_train_all.values.reshape(-1, 1))

# 最終モデル
final_model = define_model(study.best_trial, X_train_scaled.shape[1]).to(device)
if study.best_trial.params['optimizer'] == "Adam":
    optimizer = torch.optim.Adam(final_model.parameters(), lr=study.best_trial.params['lr'])
else:
    optimizer = torch.optim.SGD(final_model.parameters(), lr=study.best_trial.params['lr'])

criterion = nn.MSELoss()

# 全データで学習
train_dataset = RegressionDataset(X_train_scaled, y_train_scaled)
train_loader = DataLoader(train_dataset, batch_size=study.best_trial.params['batch_size'], shuffle=True)

for epoch in range(study.best_trial.params.get('epochs', 50)):
    train_epoch(final_model, train_loader, optimizer, criterion, device)

# モデルとスケーラーの保存
torch.save(final_model.state_dict(), 'final_nn_model.pth')
with open('scalers.pkl', 'wb') as f:
    pickle.dump({'X_scaler': X_scaler_final, 'y_scaler': y_scaler_final}, f)

print("最終モデルとスケーラーを保存しました。")


最終モデルの学習...
最終モデルとスケーラーを保存しました。


In [None]:
# 保存したモデルでX_testを予測
print("\nX_testの予測を開始...")

# X_testを標準化
X_test_scaled = X_scaler_final.transform(X_test)

# 予測用データセット（yはダミー）
test_dataset = RegressionDataset(X_test_scaled, np.zeros(len(X_test_scaled)))
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 予測実行
final_model.eval()
predictions = []
with torch.no_grad():
    for X_batch, _ in test_loader:
        X_batch = X_batch.to(device)
        y_pred_std = final_model(X_batch).cpu().numpy()
        predictions.append(y_pred_std)

# 予測結果をまとめる
y_pred_std = np.vstack(predictions)
y_pred = y_scaler_final.inverse_transform(y_pred_std).flatten()

# 結果をデータフレームに格納
results_df = pd.DataFrame({
    'Material': X_test.index,
    'Predicted_PL': y_pred
})

print("\n予測結果:")
print(results_df)


X_testの予測を開始...

予測結果:
  Material  Predicted_PL
0    test1    553.840149
1    test2    697.178528
2    test3    620.509521
3    test4    670.996704
4    test5    625.717773


In [None]:
# ===== 別途：保存したモデルを読み込んで予測する場合 =====
def load_model_and_predict(model_path, scaler_path, X_test_data):
    """
    保存したモデルとスケーラーを読み込んで予測を行う関数
    
    Args:
        model_path: 保存したモデルファイルのパス
        scaler_path: 保存したスケーラーファイルのパス
        X_test_data: 予測したいデータ
    
    Returns:
        予測結果のDataFrame
    """
    # スケーラーの読み込み
    with open(scaler_path, 'rb') as f:
        scalers = pickle.load(f)
    X_scaler = scalers['X_scaler']
    y_scaler = scalers['y_scaler']
    
    # モデルの読み込み
    # 注意: モデルの構造は事前に定義されている必要があります
    input_dim = X_test_data.shape[1]
    model = define_model(study.best_trial, input_dim).to(device)
    model.load_state_dict(torch.load(model_path, map_location='cpu'))  # CPU専用
    model.eval()
    
    # データの標準化
    X_test_scaled = X_scaler.transform(X_test_data)
    
    # 予測用データセット
    test_dataset = RegressionDataset(X_test_scaled, np.zeros(len(X_test_scaled)))
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    # 予測実行
    predictions = []
    with torch.no_grad():
        for X_batch, _ in test_loader:
            X_batch = X_batch.to(device)
            y_pred_std = model(X_batch).cpu().numpy()
            predictions.append(y_pred_std)
    
    # 予測結果をまとめる
    y_pred_std = np.vstack(predictions)
    y_pred = y_scaler.inverse_transform(y_pred_std).flatten()
    
    # 結果をデータフレームに格納
    results_df = pd.DataFrame({
        'Material': X_test_data.index,
        'Predicted_PL': y_pred
    })
    
    return results_df

読み込みモデルによる予測結果:
  Material  Predicted_PL
0    test1    553.840149
1    test2    697.178528
2    test3    620.509521
3    test4    670.996704
4    test5    625.717773
