Copyright (c) 2025 Mitsuru Ohno  
Use of this source code is governed by a BSD-3-style  
license that can be found in the LICENSE file.  

## 当ノートブックのワークフロー  
1. csvに書き込んだ反応式を読み込む。ただし速度定数はすべて既知とする  
2. 化学種ごとの微分型の反応速度式を作成する  
3. 作成した微分方程式を、数値解析可能な形式にする  
4. 化学種ごとの濃度の経時変化を求める  
5. 経時変化を図示する  


## 使用方法の要約
基本的な使用: RxnIVPsolv("sample_data.csv")でインスタンス化  
数値積分: get_ode_system()で必要なオブジェクトを取得  
エラーハンドリング: try-except文でエラーをキャッチ  
可視化: matplotlibで結果をプロット  
パラメータ解析: 異なる初期条件での比較  
これらの使用例は、現在のコードが正しく動作することを前提としています。もしエラーが発生した場合は、debug_ode_system()メソッドで詳細な情報を確認できます。  

## 反応式を記載したcsvファイルを指定する  

In [1]:
file_path = '../sample_data/sample_rxn_3.csv'  # CSVファイルのパスを指定

In [2]:
# development phase
# Add the parent directory (one level up from the notebook's location) to the Python path
import os
import sys

sys.path.append(os.path.join(os.getcwd(), '..'))

## 反応速度式をscipy.integrate.solve_ivpで処理できる連立微分方程式にする  

In [3]:
from dataclasses import dataclass, field
from typing import Optional

import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import solve_ivp

from src.rxnfit.build_ode import RxnODEbuild
from src.rxnfit.solv_ode import SolverConfig, RxnODEsolver

In [4]:
builded_rxnode = RxnODEbuild(file_path)

In [5]:
builded_rxnode.get_ode_info(debug_info=True)

number of species: 5
unique species: ['ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
rate constant: {'k1': 0.0115, 'k2': k2, 'k3': 0.201}

=== debug info ===
Successfully created function for ClCH2COOH with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for H2O with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for ClCH2COOOH with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for O2 with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for H2O2 with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
order of args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
system of ODE: {'ClCH2COOH': '-k1*ClCH2COOH(t)*H2O(t)+k2*ClCH2COOOH(t)', 'H2O': '-k1*ClCH2COOH(t)*H2O(t)+k1*ClCH2COOH(t)*H2O(t)+k3*H2O2(t)', 'ClCH2COOOH': 'k1*ClCH2COOH(t)*H2O(t)-k2*ClCH2COOOH(t)', 'O2': '0.5*k2*ClCH2COOOH(t)+0.5*k3*H2O2(t)', '

In [6]:
builded_rxnode.rate_consts_dict

{'k1': 0.0115, 'k2': k2, 'k3': 0.201}

## 数値積分を実行する  

In [7]:
# get_ode_infoの出力を参照して、初期値等の入力項目を渡す
config = SolverConfig(
    y0 = [2.945, 6.807, 0, 13.99, 0], # 初期濃度（必須）
    t_span=(0, 30),         # 時間範囲（必須）
    # t_eval: Optional[np.ndarray] = field(default=None),  # 任意
    method = "RK45",  # 任意
    rtol = 1e-6,    # 任意    
)

In [8]:
# 基本的な数値積分 -インスタンス化-
solved_rxnode = RxnODEsolver(builded_rxnode, config)

In [9]:
ode_construct, sol = solved_rxnode.solve_system()

Successfully created function for ClCH2COOH with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for H2O with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for ClCH2COOOH with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for O2 with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
Successfully created function for H2O2 with args: ['t', 'ClCH2COOH', 'H2O', 'ClCH2COOOH', 'O2', 'H2O2']
An error occurred during numerical integration.: Cannot convert expression to float
Plz. review the debug info.


In [10]:
solved_rxnode.solution_plot()

No solution to plot. Run solve_system() first or pass a solution.


fitするときに一度作ってみたモジュールの控え

In [None]:
# expdata_fit.py の solve_fit_model を使ってシンボリックな速度定数を最適化
# 複数のDataFrameに対応：各DataFrameのt=0の値を初期値として使用
# すべてのDataFrameに対して共通のシンボリックな速度定数を最適化

# 解析対象のDataFrameリスト（任意の数に設定可能）
df_list = [df1, ]  # 必要に応じて [df1, df2, df3, ...] のように追加可能

# シンボリックな速度定数の初期値（リストで指定）
# 例：k2のみの場合 [0.04]、k2とk3の場合 [0.04, 0.03]
initial_rate_const_values = [0.04]  # k2の初期値

function_names = builded_rxnode.function_names
print(f"化学種: {function_names}")

# 各DataFrameから初期値を取得し、fit_funcを作成
fit_funcs = []  # 各DataFrame用のfit_funcを格納
initial_values_list = []  # 各DataFrameの初期値を格納

for idx, df in enumerate(df_list):
    # 各DataFrameのt=0の値を初期値として取得
    fixed_initial_values = [
        df[name].iloc[0] if name in df.columns else 0.0
        for name in function_names
    ]
    initial_values_list.append(fixed_initial_values)
    
    # 各DataFrame用のfit_funcを作成
    fit_func = solve_fit_model(builded_rxnode, fixed_initial_values)
    fit_funcs.append(fit_func)
    
    print(f"\nDataFrame {idx+1} の初期値:")
    for i, name in enumerate(function_names):
        print(f"  {name}: {fixed_initial_values[i]:.6f}")

# パラメータ情報を確認（すべてのfit_funcは同じパラメータ構造を持つ）
print(f"\nパラメータ情報:")
print(fit_funcs[0].param_info)
symbolic_rate_const_keys = fit_funcs[0].param_info['symbolic_rate_consts']
n_params = fit_funcs[0].param_info['n_params']

# 初期値の数がシンボリックな速度定数の数と一致するか確認
if len(initial_rate_const_values) != n_params:
    raise ValueError(
        f"初期値の数({len(initial_rate_const_values)})が"
        f"シンボリックな速度定数の数({n_params})と一致しません。"
        f"シンボリックな速度定数: {symbolic_rate_const_keys}"
    )

# 最適化に使用
from scipy.optimize import minimize

# 全DataFrameの実験データを準備
experimental_data_list = []  # 各DataFrameの実験データを格納
t_data_list = []  # 各DataFrameの時間データを格納

for idx, df in enumerate(df_list):
    t_data = df['t'].values
    t_data_list.append(t_data)
    
    experimental_data = {}
    for name in function_names:
        if name in df.columns:
            experimental_data[name] = df[name].values
    experimental_data_list.append(experimental_data)
    
    print(f"\nDataFrame {idx+1} の実験データ:")
    for name in function_names:
        if name in experimental_data:
            print(f"  {name}: {len(experimental_data[name])}点")

# 全DataFrameの残差の二乗和を最小化する目的関数
def objective_function(params):
    """全DataFrame、全化学種の残差の二乗和を計算"""
    # paramsはシンボリックな速度定数の値のリスト
    # 例：k2のみの場合、params = [0.04]
    params = np.atleast_1d(params)
    
    if len(params) != n_params:
        raise ValueError(
            f"パラメータの数({len(params)})が"
            f"シンボリックな速度定数の数({n_params})と一致しません。"
        )
    
    try:
        total_residual = 0.0
        
        # 各DataFrameに対して予測値を計算し、残差を合計
        for df_idx, (fit_func, t_data, experimental_data) in enumerate(
            zip(fit_funcs, t_data_list, experimental_data_list)
        ):
            # 予測値を計算（paramsを展開して渡す）
            y_pred_all = fit_func(t_data, *params)  # shape: (n_species, n_time)
            
            # 全化学種の残差の二乗和を計算
            for i, name in enumerate(function_names):
                if name in experimental_data:
                    y_exp = experimental_data[name]
                    y_pred = y_pred_all[i]
                    # 残差の二乗和を加算
                    total_residual += np.sum((y_exp - y_pred) ** 2)
        
        return total_residual
    except Exception as e:
        print(f"Error in objective_function: {e}")
        return np.inf

# 境界を設定（正の値）
bounds = [(0.001, 1.0)] * n_params  # 各パラメータに対して下限: 0.001, 上限: 1.0

print(f"\nフィッティング開始...")
print(f"解析対象DataFrame数: {len(df_list)}")
print(f"最適化パラメータ: {symbolic_rate_const_keys} ({n_params}パラメータ)")
print(f"最適化対象: 全DataFrame、全化学種の残差の二乗和")
print(f"パラメータ初期値: {initial_rate_const_values}")

try:
    result = minimize(
        objective_function,
        initial_rate_const_values,
        method='L-BFGS-B',  # 境界制約に対応
        bounds=bounds,
        options={'maxiter': 5000}
    )
    
    if result.success:
        popt = result.x
        print(f"\n=== フィッティング成功 ===")
        for i, key in enumerate(symbolic_rate_const_keys):
            print(f"最適化された{key}: {popt[i]:.6f}")
        print(f"最終残差の二乗和: {result.fun:.6f}")
        
        # 各DataFrameのフィッティング結果を可視化
        n_dfs = len(df_list)
        fig, axes = plt.subplots(n_dfs, len(function_names), figsize=(4*len(function_names), 4*n_dfs))
        if n_dfs == 1:
            axes = axes.reshape(1, -1)
        axes = axes.flatten()
        
        plot_idx = 0
        for df_idx, (fit_func, t_data, experimental_data) in enumerate(
            zip(fit_funcs, t_data_list, experimental_data_list)
        ):
            # 最適化されたパラメータで予測値を計算
            y_pred_all = fit_func(t_data, *popt)
            
            # 各化学種をプロット
            for i, name in enumerate(function_names):
                ax = axes[plot_idx]
                if name in experimental_data:
                    y_exp = experimental_data[name]
                    y_pred = y_pred_all[i]
                    
                    ax.plot(t_data, y_exp, 'o', label='実験データ', markersize=6)
                    ax.plot(t_data, y_pred, '-', label='フィッティング結果', linewidth=2)
                    ax.set_xlabel('Time (s)', fontsize=10)
                    ax.set_ylabel('Concentration', fontsize=10)
                    ax.set_title(f'DataFrame {df_idx+1}: {name}', fontsize=12)
                    ax.legend(fontsize=9)
                    ax.grid(True, alpha=0.3)
                else:
                    ax.set_title(f'DataFrame {df_idx+1}: {name} (データなし)', fontsize=12)
                plot_idx += 1
        
        plt.tight_layout()
        plt.show()
        
        # 初期推測値との比較も表示
        fig2, axes2 = plt.subplots(n_dfs, len(function_names), figsize=(4*len(function_names), 4*n_dfs))
        if n_dfs == 1:
            axes2 = axes2.reshape(1, -1)
        axes2 = axes2.flatten()
        
        plot_idx = 0
        for df_idx, (fit_func, t_data, experimental_data) in enumerate(
            zip(fit_funcs, t_data_list, experimental_data_list)
        ):
            # 最適化されたパラメータと初期推測値で予測値を計算
            y_pred_all = fit_func(t_data, *popt)
            y_pred_initial = fit_func(t_data, *initial_rate_const_values)
            
            # 各化学種をプロット
            for i, name in enumerate(function_names):
                ax = axes2[plot_idx]
                if name in experimental_data:
                    y_exp = experimental_data[name]
                    y_pred = y_pred_all[i]
                    y_pred_init = y_pred_initial[i]
                    
                    ax.plot(t_data, y_exp, 'o', label='実験データ', markersize=6)
                    ax.plot(t_data, y_pred_init, '--', 
                           label=f'初期推測 ({", ".join([f"{k}={v:.6f}" for k, v in zip(symbolic_rate_const_keys, initial_rate_const_values)])})', 
                           linewidth=1.5, alpha=0.7)
                    ax.plot(t_data, y_pred, '-', 
                           label=f'最適化後 ({", ".join([f"{k}={v:.6f}" for k, v in zip(symbolic_rate_const_keys, popt)])})', 
                           linewidth=2)
                    ax.set_xlabel('Time (s)', fontsize=10)
                    ax.set_ylabel('Concentration', fontsize=10)
                    ax.set_title(f'DataFrame {df_idx+1}: {name}', fontsize=12)
                    ax.legend(fontsize=8)
                    ax.grid(True, alpha=0.3)
                else:
                    ax.set_title(f'DataFrame {df_idx+1}: {name} (データなし)', fontsize=12)
                plot_idx += 1
        
        plt.tight_layout()
        plt.show()
        
    else:
        print(f"\n=== フィッティング失敗 ===")
        print(f"エラーメッセージ: {result.message}")
        
except Exception as e:
    print(f"\nエラーが発生しました: {e}")
    import traceback
    traceback.print_exc()
    
    # 初期推測値でのシミュレーションを試行
    print(f"\n初期推測値でのシミュレーションを試行...")
    try:
        n_dfs = len(df_list)
        fig, axes = plt.subplots(n_dfs, len(function_names), figsize=(4*len(function_names), 4*n_dfs))
        if n_dfs == 1:
            axes = axes.reshape(1, -1)
        axes = axes.flatten()
        
        plot_idx = 0
        for df_idx, (fit_func, t_data, experimental_data) in enumerate(
            zip(fit_funcs, t_data_list, experimental_data_list)
        ):
            y_pred_initial = fit_func(t_data, *initial_rate_const_values)
            
            for i, name in enumerate(function_names):
                ax = axes[plot_idx]
                if name in experimental_data:
                    y_exp = experimental_data[name]
                    y_pred = y_pred_initial[i]
                    
                    ax.plot(t_data, y_exp, 'o', label='実験データ', markersize=6)
                    ax.plot(t_data, y_pred, '-', 
                           label=f'初期推測値 ({", ".join([f"{k}={v:.6f}" for k, v in zip(symbolic_rate_const_keys, initial_rate_const_values)])})', 
                           linewidth=2)
                    ax.set_xlabel('Time (s)', fontsize=10)
                    ax.set_ylabel('Concentration', fontsize=10)
                    ax.set_title(f'DataFrame {df_idx+1}: {name}', fontsize=12)
                    ax.legend(fontsize=9)
                    ax.grid(True, alpha=0.3)
                else:
                    ax.set_title(f'DataFrame {df_idx+1}: {name} (データなし)', fontsize=12)
                plot_idx += 1
        
        plt.tight_layout()
        plt.show()
    except Exception as e2:
        print(f"初期推測値でのシミュレーションも失敗: {e2}")