In [None]:
import sys
import os

src_dir = os.path.abspath(os.path.join(os.getcwd(), "../2_0_empirical_modelling"))

# Add the parent directory to sys.path
sys.path.append(src_dir)

# Now, import the module
from generator import javh_groups, ja_groups, JAModel, JAVHModel
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Set up initial configurations for plots
plt.rcParams.update({
    'font.size': 12,          # Default font size
    'axes.labelsize': 14,     # Axis labels
    'axes.titlesize': 16,     # Subplot titles
    'xtick.labelsize': 12,    # X-axis tick labels
    'ytick.labelsize': 12,    # Y-axis tick labels
    'legend.fontsize': 12,    # Legend text
    'figure.titlesize': 18    # Figure title
})

In [None]:
modelsJAVH = JAVHModel.load_from_csvs('../../output')
modelsJA = JAModel.load_from_csvs('../../output')

# Fixed Everything
- API
- Solvent 1	
- Solvent 2	
- Temperature


From the ja_groups data alone, can a NN predict the values for the J parameters?

### Inputs
- solvent_1 - one hot encoded
- solvent_2 - one hot encoded
- compound_id - one hot encoded
- temperature - scaled
- solvent_1_pure - scaled
- solvent_2_pure - scaled

- system - solvent_1, solvent_2, compound_id, temperature encoded as a system

### Outputs
- J0, J1, J2 - scaled

In [None]:
class MLModel:
    best_JAModel = JAModel.load_from_csvs('../../output')[-1]
    groups = JAModel.groups
    df = best_JAModel.results_df[['group_index','solvent_1','solvent_2','compound_id','temperature','solvent_1_pure','solvent_2_pure','J0','J1','J2']].copy()

In [None]:
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import RobustScaler

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.model_selection import train_test_split

from graphing import plot_JA



class FixedMLModel(MLModel):
    def __init__(self):
        self.df = self.df.copy()
        
        self.input_df, self.output_df = self.in_out_split()
        self.input_encoded = self.encoding()
        self.output_scaled = self.decoding()
        
        
    def in_out_split(self):
        self.df['System_SSCT'] = (self.df['solvent_1'].astype(str) + '-' + 
                    self.df['solvent_2'].astype(str) + '-' + 
                    self.df['compound_id'].astype(str) + '-' + 
                    self.df['temperature'].astype(str))
        
        return self.df[['System_SSCT']], self.df[['J0','J1','J2']]
    
    def encoding(self):
        self.encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
        
        system_encoded = self.encoder.fit_transform(self.input_df[['System_SSCT']])

        # Create a DataFrame with the encoded data
        system_encoded_df = pd.DataFrame(
            system_encoded,
            columns=[f'system_{val}' for val in self.encoder.categories_[0]],
            index=self.input_df.index
        )

        input_encoded = pd.concat([self.input_df, system_encoded_df], axis=1)
        input_encoded.drop(columns=['System_SSCT'], inplace=True)
        
        return input_encoded

    def decoding(self):
        # Create and fit the robust scaler
        self.output_scaler = RobustScaler()
        output_scaled = self.output_scaler.fit_transform(self.output_df)

        # Convert scaled data back to DataFrame with original column names
        output_df_scaled = pd.DataFrame(
            output_scaled,
            columns=self.output_df.columns,
            index=self.output_df.index
        )
        
        return output_df_scaled
    
    def train_model(self):
        # Split the data into training and testing sets
        X = self.input_encoded.values  # Features
        y = self.output_scaled.values  # Target values
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        # Build the model
        model = Sequential([
            Dense(256, activation='relu', input_shape=(X_train.shape[1],)),
            Dropout(0.3),
            Dense(128, activation='relu'),
            Dropout(0.2),
            Dense(128, activation='relu'),
            Dropout(0.2),
            Dense(128, activation='relu'),
            Dropout(0.2),
            Dense(64, activation='relu'),
            Dense(3)  # 3 output parameters: J0, J1, J2
        ])

        # Compile the model
        model.compile(optimizer='adam', loss='mse', metrics=['mae','mape'])

        # Train the model
        history = model.fit(
            X_train, y_train,
            epochs=200,
            batch_size=32,
            validation_split=0.2,
            verbose=1
        )
        
        
        # Evaluate the model
        loss, mae = model.evaluate(X_test, y_test, verbose=0)
        print(f"Test Mean Absolute Error: {mae:.4f}")

        # Make predictions
        y_pred = model.predict(X_test)

        # Convert scaled predictions back to original scale
        y_pred_original = self.output_scaler.inverse_transform(y_pred)
        y_test_original = self.output_scaler.inverse_transform(y_test)

        # Calculate and print the mean absolute error for each parameter in original scale
        mae_original = np.mean(np.abs(y_pred_original - y_test_original), axis=0)
        print(f"MAE in original scale - J0: {mae_original[0]:.2f}, J1: {mae_original[1]:.2f}, J2: {mae_original[2]:.2f}")
        # Calculate Mean Absolute Percentage Error (MAPE) for each parameter
        mape = np.mean(np.abs((y_test_original - y_pred_original) / y_test_original), axis=0) * 100
        print(f"MAPE - J0: {mape[0]:.2f}%, J1: {mape[1]:.2f}%, J2: {mape[2]:.2f}%")

        
        self.model = model
        self.history = history
        
        return model, history

    def save_model(self,name='fixed_model'):
        if not hasattr(self, 'model'):
            raise ValueError("Model has not been trained yet. Call train_model() first.")
        # Save the model
        self.model.save(f'{name}.keras')
        
    def load_model(self, name='fixed_model'):
        # Load the model
        self.model = tf.keras.models.load_model(f'{name}.keras')
    
    def predict_j_parameters(self, system_name):
        if not hasattr(self, 'model'):
            raise ValueError("Model has not been trained yet. Call train_model() first.")
        
        input_data = pd.DataFrame({'System_SSCT': [system_name]})
        
        # Encode the system
        system_encoded = self.encoder.transform(input_data[['System_SSCT']])
        
        # Make prediction using the model
        scaled_prediction = self.model.predict(system_encoded, verbose=0)
        
        # Convert back to original scale
        original_prediction = self.output_scaler.inverse_transform(scaled_prediction)
                
        # Return the results as a dictionary
        return {
            'J0': original_prediction[0][0],
            'J1': original_prediction[0][1],
            'J2': original_prediction[0][2]
        }
    
    def plot(self, system_name,solubility_1, solubility_2, experimental_data=None,temperature=298.15):
        if not hasattr(self, 'model'):
            raise ValueError("Model has not been trained yet. Call train_model() first.")
        
        # Make predictions using the model
        predictions = self.predict_j_parameters(system_name)
        
        # Plotting
        plot_JA(predictions['J0'],predictions['J1'],predictions['J2'],solubility_1, solubility_2,temperature=temperature,experimental_data=experimental_data)

In [None]:
model = FixedMLModel()

In [None]:
model.train_model()

In [None]:
model.load_model('fixed_model')

In [None]:
model.save_model('fixed_model')

In [None]:
def get_system(index):
    system = model.df.iloc[index]
    return system['solvent_1'], system['solvent_2'], system['compound_id'], system['temperature']

In [None]:
n = -1

In [None]:
n += 1
solvent_1,solvent_2,compound_id ,temperature = get_system(n)
row = model.df[(model.df['compound_id'] == compound_id) & (model.df['temperature'] == temperature) & (model.df['solvent_1'] == solvent_1) & (model.df['solvent_2'] == solvent_2)]

model.plot(row['System_SSCT'].values[0],
           row['solvent_1_pure'].values[0],
           row['solvent_2_pure'].values[0],
           temperature=row['temperature'].values[0],
           experimental_data=model.groups[row['group_index'].values[0]])

In [None]:
# Get actual J values from row
actual_values = {
    'J0': f"{row['J0'].values[0]:.3g}",
    'J1': f"{row['J1'].values[0]:.3g}",
    'J2': f"{row['J2'].values[0]:.3g}"
}

# Get predicted values
predicted_values = {
    'J0': f"{float(model.predict_j_parameters(row['System_SSCT'].values[0])['J0']):.3g}",
    'J1': f"{float(model.predict_j_parameters(row['System_SSCT'].values[0])['J1']):.3g}",
    'J2': f"{float(model.predict_j_parameters(row['System_SSCT'].values[0])['J2']):.3g}"
}

# Display both in a clear format
print("Actual J parameters:")
for param, value in actual_values.items():
    print(f"{param}: {value}")

print("\nPredicted J parameters:")
for param, value in predicted_values.items():
    print(f"{param}: {value}")

In [None]:
import sqlite3

In [None]:
connection = sqlite3.connect(f'../../db/MasterDatabase.db')
df = pd.read_sql_query("SELECT * FROM compounds", connection)
connection.close()

compounds = pd.concat(ja_groups, ignore_index=True)['compound_id'].unique()

compound_features_df = df.merge(pd.DataFrame(compounds, columns=['compound_id']),
    how='inner',
    left_on='id',
    right_on='compound_id').drop(columns=['canonical_smiles','molecular_name','id'])

compound_features_df = compound_features_df.dropna(axis=1)

In [None]:
class APIVariableModel(FixedMLModel):
    def encoding(self):
        self.encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')

    def in_out_split(self):
        self.df['System_SST'] = (self.df['solvent_1'].astype(str) + '-' + 
                    self.df['solvent_2'].astype(str) + '-' + 
                    self.df['temperature'].astype(str))
        
        input_df = self.df[['System_SST','compound_id']].copy()
        
        input_df = input_df.merge(compound_features_df, how='inner', left_on='compound_id', right_on='compound_id').drop(columns=['compound_id'])
        
        return input_df, self.df[['J0','J1','J2']]

    def encoding(self):
        self.encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
        
        system_encoded = self.encoder.fit_transform(self.input_df[['System_SST']])

        # Create a DataFrame with the encoded data
        system_encoded_df = pd.DataFrame(
            system_encoded,
            columns=[f'system_{val}' for val in self.encoder.categories_[0]],
            index=self.input_df.index
        )
        self.input_scaler = RobustScaler()
        
        scaler_input = self.input_df.drop(columns=['System_SST']).copy()

        input_scaled = self.input_scaler.fit_transform(scaler_input)
        
        # Convert scaled data back to DataFrame with original column names
        input_df_scaled = pd.DataFrame(
            input_scaled,
            columns=scaler_input.columns,
            index=scaler_input.index
        )
        input_encoded = pd.concat([input_df_scaled, system_encoded_df], axis=1)
        
        return input_encoded

In [None]:
model2 = APIVariableModel()

model2.train_model()
