In [None]:
import os
import re
import torch
import shap
import matplotlib
import joblib
import pandas as pd
import numpy as np
import torch.nn as nn
import matplotlib.cm as cm
import seaborn as sns
from sklearn.svm import SVR
from IPython.display import display, HTML
import torch.nn.functional as F
import torch.nn.init as init
from mpl_toolkits.mplot3d import Axes3D
from sklearn.metrics import accuracy_score
import torch.optim as optim
from sklearn.metrics import r2_score, mean_squared_error
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV
from sklearn.multioutput import MultiOutputRegressor
from sklearn.pipeline import make_pipeline
from scipy.interpolate import make_interp_spline

In [None]:
Syngas_data=pd.read_csv('./Paper_1_own')  
Syngas_data=Syngas_data.iloc[:, 2:-1]

In [None]:
Paper_1_corr_matrix = Syngas_data.corr()  
plt.figure(figsize=(12,8))
sns.heatmap(Paper_1_corr_matrix,annot=True,cmap='coolwarm',fmt='.2f',linewidths=.5)
plt.title('Pearson correlation heatmap of input features and syngas experimental results')
plt.show()

In [None]:
X_columns = ['C (%)', 'H (%)', 'O (%)', 'N (%)','S(%)', 'Ash (%)', 'BTW(ratio)', 'Particle size(mm)','Temp. C', 'P (MPa)', 'Res- Time (min)']  # 选择数据，并将其分成x，y
Y_columns = ['H2 mol/kg', 'CO2 mol/kg', 'CO mol/kg', 'CH4 mol/kg']

In [None]:
X = Syngas_data[X_columns]
Y = Syngas_data[Y_columns]

scaler_X = StandardScaler()
scaler_Y = StandardScaler()

features = scaler_X.fit_transform(X)
labels = scaler_Y.fit_transform(Y)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.Tensor(features)
        self.labels = torch.Tensor(labels)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        return self.features[index], self.labels[index]

In [None]:
features_train, features_test, labels_train, labels_test = train_test_split(features, labels, test_size=0.2)
features_test, features_dev, labels_test, labels_dev = train_test_split(features_test, labels_test, test_size=0.15)
train_dataset = CustomDataset(features_train, labels_train)
test_dataset = CustomDataset(features_test, labels_test)
dev_dataset = CustomDataset(features_dev, labels_dev)

In [None]:
class ANeuralNetwork(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size, weight_decay=0.0):
        super(ANeuralNetwork, self).__init__()

        self.hidden_layers = nn.ModuleList()
        for i in range(len(hidden_sizes)):
            if i == 0:
                layer = nn.Linear(input_size, hidden_sizes[i])
            else:
                layer = nn.Linear(hidden_sizes[i - 1], hidden_sizes[i])
            self.hidden_layers.append(layer)
            self.hidden_layers.append(nn.Tanh())
        self.output_layer = nn.Linear(hidden_sizes[-1] if hidden_sizes else input_size, output_size)
        self.weight_decay = weight_decay

    def forward(self, x):
        for layer in self.hidden_layers:
            x = F.relu(layer(x))
        x = self.output_layer(x)
        return x

In [None]:
batch_size = 512
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,drop_last=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,drop_last=False)
dev_dataloader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False,drop_last=False)

In [None]:
def train_model(train_dataloader, input_size, hidden_sizes, output_size, num_epochs=10, learning_rate=0.001, device='cpu',weight_decay=0.01):
    train_losses = []
    model = ANeuralNetwork(input_size, hidden_sizes, output_size,weight_decay=weight_decay)
    model.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for inputs, labels in train_dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        average_loss = epoch_loss / len(train_dataloader)
        train_losses.append(average_loss)
        if epoch % 20 == 0:
            print(f'Epoch [{epoch}/{num_epochs}], Loss: {average_loss:.4f}')
    plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss Over Epochs')
    plt.legend()
    plt.show()

    return model

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
trained_model = train_model(train_dataloader, input_size=11, hidden_sizes=[16,8], output_size=4, num_epochs=1000, learning_rate=0.038, device=device,weight_decay=0.005)

In [None]:
torch.save(trained_model.state_dict(), 'ANN.pth')

In [None]:
model_path = "ANN.pth"
model_dict = torch.load(model_path, map_location=torch.device('cpu'))

print("Layer Sizes:")
for key in model_dict.keys():
    if "weight" in key or "bias" in key: 
        param_tensor = model_dict[key]
        print(f"{key}: {param_tensor.size()}")

In [None]:
ANN_model = ANeuralNetwork(input_size=11, hidden_sizes=[16,8], output_size=4, weight_decay=0.015)
ANN_model.load_state_dict(torch.load('ANN.pth', map_location=torch.device('cpu')))
ANN_model.eval()

In [None]:
class FeatureExtractor(nn.Module):
    def __init__(self, model):
        super(FeatureExtractor, self).__init__()
        self.hidden_layers = model.hidden_layers

    def forward(self, x):
        features = []
        for layer in self.hidden_layers:
            x = F.tanh(layer(x))
            features.append(x)
        return torch.cat(features, dim=1)

In [None]:
def set_evaluation_mode(model):
    model.eval()

In [None]:
feature_extractor = FeatureExtractor(ANN_model)
feature_extractor.eval()

In [None]:
def extract_features(model, data_loader, device='cpu'):
    features = []
    labels = []
    model.to(device)
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            if isinstance(outputs, list):
                for output in outputs:
                    features.append(output.view(output.size(0), -1))
            else:
                features.append(outputs.view(outputs.size(0), -1))
            labels.append(targets)
    return torch.cat(features), torch.cat(labels)

In [None]:
X_train_features, Y_train = extract_features(feature_extractor, train_dataloader)
X_test_features, Y_test = extract_features(feature_extractor, test_dataloader)

In [None]:
def find_best_svm_params(X_train, Y_train, param_grid, cv=5, device='cuda:0'):
    X_train = X_train.to(device)
    Y_train = Y_train.to(device)
    
    base_svr_model = SVR()

    svm_model = make_pipeline(StandardScaler(), MultiOutputRegressor(base_svr_model))
    grid_search = GridSearchCV(svm_model, param_grid, scoring='neg_mean_squared_error', cv=cv)
    grid_search.fit(X_train.numpy(), Y_train.numpy())
    best_params = grid_search.best_params_
    return best_params

In [None]:
param_grid = {
    'multioutputregressor__estimator__kernel': ['linear', 'rbf', 'poly'],
    'multioutputregressor__estimator__C': [0.1, 1.0, 10.0],
    'multioutputregressor__estimator__epsilon': [0.01, 0.1, 0.2,0.6,0.9,1.2],
    'multioutputregressor__estimator__gamma': ['scale', 'auto', 0.1, 1.0]
}

In [None]:
X_train_features, Y_train = extract_features(feature_extractor, train_dataloader)
best_params = find_best_svm_params(X_train_features, Y_train, param_grid, device='cpu')

print("best_params:", best_params)

In [None]:
new_kernel = 'rbf'
new_C = 10
new_epsilon = 0.1
new_gamma = 'scale'

new_svr_model = SVR(kernel=new_kernel, C=new_C, epsilon=new_epsilon, gamma=new_gamma)
svm_model = make_pipeline(StandardScaler(), MultiOutputRegressor(new_svr_model))

In [None]:
svm_model.fit(X_train_features.numpy(), Y_train.numpy())
svm_predictions = svm_model.predict(X_train_features)

In [None]:
# torch.save(ANN_model.state_dict(), 'ANN-SVM_ann.pth')
# joblib.dump(svm_model, 'ANN-SVM_svm.pkl')

In [None]:
# loaded_svm_model = joblib.load('ANN-SVM_svm.pkl')
# loaded_svm_model = make_pipeline(StandardScaler(), MultiOutputRegressor(loaded_svm_model))

In [None]:
trained_ANN_model = ANeuralNetwork(input_size=11, hidden_sizes=[16,8], output_size=4, weight_decay=0.005)
trained_ANN_model.load_state_dict(torch.load('ANN-SVM_ann.pth', map_location=torch.device('cpu')))
trained_SVR_model = joblib.load('ANN-SVM_svm.pkl')

In [None]:
def combined_model_predict(trained_ann,trained_svm,features_train): 
    feature_extractor_own = FeatureExtractor(trained_ann)    
    features_train = torch.Tensor(features_train)
    X_features =  feature_extractor_own(features_train)
    predictions = trained_svm.predict(X_features.detach().numpy())
    return predictions

In [None]:
def combined_model(FEATURE):
    predictions_train_own = []
    predictions_batch = combined_model_predict(trained_ANN_model, trained_SVR_model, FEATURE)
    predictions_train_own.append(predictions_batch)
    return predictions_train_own[0]

In [None]:
explainer = shap.Explainer(combined_model, features_train)
shap_values = explainer.shap_values(features_train)

In [None]:
for output_idx in range(4):
    shap_vals_output = [shap_values[:,:,output_idx]]
    
    plt.figure(figsize=(10, 8)) 
    
    shap.summary_plot(shap_vals_output, features_train, feature_names=X_columns, show=False, 
                      color=matplotlib.cm.Oranges.reversed(), color_bar=False)
    
    plt.title(Y_columns[output_idx])  
    plt.gca().get_legend().remove()  
    # plt.savefig(f'{output_idx}.png')
    plt.show()

In [None]:
num_outputs = 4
fig, axs = plt.subplots(2, 2, figsize=(16, 12))

for output_idx in range(num_outputs):

    shap_abs_sum_per_feature = np.sum(np.abs(shap_values[:, :, output_idx]), axis=0)
    total_shap_abs_sum = np.sum(shap_abs_sum_per_feature)
    feature_proportions = shap_abs_sum_per_feature / total_shap_abs_sum

    row = output_idx // 2
    col = output_idx % 2
    ax = axs[row, col]
    ax.barh(X_columns, feature_proportions)
    ax.set_title(f'PDP of {Y_columns[output_idx]} for Each Feature')
    ax.set_xlabel('Proportion')

    np.savetxt(f'X_columns_output_{output_idx + 1}.txt', X_columns, fmt='%s', delimiter='\t', header='X Columns')
    np.savetxt(f'feature_proportions_output_{output_idx + 1}.txt', feature_proportions, fmt='%.18e', delimiter='\t', header='Feature Proportions')

    print(f'{Y_columns[output_idx]}:')
    print(f'  Biomass Element: {np.sum(feature_proportions[0:6])}')
    print(f'  Gasification Conditions: {np.sum(feature_proportions[6:])}')


plt.tight_layout()
# plt.savefig('PDP.png')
plt.show()


In [None]:
shap_values_3d = np.transpose(shap_values, (2, 0, 1))  # (n_classes, n_samples, n_features)

for i in range(len(Y_columns)):

    shap_df = pd.DataFrame(shap_values_3d[i], columns=X_columns)
    shap_df['Sample Index'] = range(shap_values_3d[i].shape[0])  

    plt.figure(figsize=(20, 15))
    shap.summary_plot(shap_values_3d[i], features_train, plot_type="dot", show=False, feature_names=X_columns)

    # plt.title(Y_columns[i], fontsize=20)
    plt.xlabel("SHAP Value", fontsize=18)

    plt.show()

In [None]:
def clean_filename(filename):
    return re.sub(r'[^\w\s-]', '', filename).strip()



shap_values_3d = np.transpose(shap_values, (2, 0, 1))
explainer = shap.Explainer(combined_model, features_train)
plt.rcParams["font.family"] = "Times New Roman"


for i in range(len(Y_columns)):
    plt.figure(figsize=(20, 15)) 
    shap.summary_plot(shap_values_3d[i], features_train, plot_type="dot", show=False, feature_names=X_columns)

    # plt.title(Y_columns[i], fontdict={'fontsize': 20})
    plt.xlabel("SHAP Value", fontdict={'fontsize': 18})
    # plt.ylabel("Feature")
    cleaned_filename = clean_filename(Y_columns[i])

    plt.show() 


In [None]:
from scipy.interpolate import UnivariateSpline
original_feature_values = scaler_X .inverse_transform(features_train)

def clean_filename(filename):
    return re.sub(r'[^\w\s-]', '', filename).strip()



plt.rcParams["font.family"] = "Times New Roman"
shap_values_3d = np.transpose(shap_values, (2, 0, 1)) 


# selected_features = ["Temp. C", "H (%)", "BTW(ratio)", "N (%)"]
# X_columns = ['C (%)', 'H (%)', 'O (%)', 'N (%)','S(%)', 'Ash (%)', 'BTW(ratio)', 'Particle size(mm)','Temp. C', 'P (MPa)', 'Res- Time (min)'] 
selected_features = ['BTW(ratio)']
selected_indices = [X_columns.index(feature) for feature in selected_features]

# print(f"Selected indices: {selected_indices}")
# print(f"X_columns: {X_columns}")
# print(f"shap_values_3d shape: {shap_values_3d.shape}")

fig, axs = plt.subplots(1, len(selected_features), figsize=(5, 4))
# axs_flat = axs.flatten()

for i, feature_index in enumerate(selected_indices):
    feature_name = X_columns[feature_index]
    

    shap_values_for_feature = shap_values_3d[0][:, feature_index]
    feature_values = original_feature_values[:, feature_index]
    
    axs.scatter(feature_values, shap_values_for_feature, s=10, color='blue')
    unique_feature_values, unique_indices = np.unique(feature_values, return_index=True)
    unique_shap_values_for_feature = shap_values_for_feature[unique_indices]
    
    if len(unique_feature_values) > 1:
        sort_idx = np.argsort(unique_feature_values)
        spl = UnivariateSpline(unique_feature_values[sort_idx], unique_shap_values_for_feature[sort_idx], s=0.5, k=3)
        x_smooth = np.linspace(unique_feature_values.min(), unique_feature_values.max(), 300)
        y_smooth = spl(x_smooth)
        axs.plot(x_smooth, y_smooth, 'r--')
    # axs[i].set_title(feature_name)
    axs.set_xlabel(feature_name)
    axs.set_ylabel(f'SHAP Value for {feature_name}')

    cleaned_filename = clean_filename(X_columns[i])
    # plt.savefig('others.png')
plt.show()

In [None]:
# X_columns = ['C (%)', 'H (%)', 'O (%)', 'N (%)','S(%)', 'Ash (%)', 'BTW(ratio)', 'Particle size(mm)','Temp. C', 'P (MPa)', 'Res- Time (min)'] 
original_feature_values = scaler_X .inverse_transform(features_train)
feature_index = 6 
data = {
    'Temp. C':original_feature_values[:, feature_index],
    'SHAP_values': shap_values_3d[0][:, feature_index]  
}

num_bins = 10
df = pd.DataFrame(data)

df['Temp. C_bins'] = pd.cut(df['Temp. C'], bins=num_bins, labels=False)


average_shap = df.groupby('Temp. C_bins')['SHAP_values'].mean().reset_index()


print(average_shap)
original_feature_values[:, feature_index]
min_number = np.min(original_feature_values[:, feature_index])
max_number = np.max(original_feature_values[:, feature_index])
number_points = 5
X_number = np.linspace(min_number, max_number, number_points )



plt.figure(figsize=(10, 6))
plt.plot(average_shap['Temp. C_bins'], average_shap['SHAP_values'], marker='o', linestyle='--', color='r')
plt.title('Average SHAP Values per Temp. C')
plt.xlabel('Temp. C')
plt.ylabel('Average SHAP Value')
plt.grid()
plt.xticks(range(num_bins))
plt.show()

In [None]:
# X_columns = ['C (%)', 'H (%)', 'O (%)', 'N (%)','S(%)', 'Ash (%)', 'BTW(ratio)', 'Particle size(mm)','Temp. C', 'P (MPa)', 'Res- Time (min)'] 
input_feature_index_1 = 8  
input_feature_index_2 = 7 
output_index = 3 


shap_values_input_output = shap_values[:, [input_feature_index_1, input_feature_index_2], output_index]

min_value_1 = np.min(features_train[:, input_feature_index_1])
max_value_1 = np.max(features_train[:, input_feature_index_1])
min_value_2 = np.min(features_train[:, input_feature_index_2])
max_value_2 = np.max(features_train[:, input_feature_index_2])
num_points = shap_values.shape[0] 

feature_values_1 = np.linspace(min_value_1, max_value_1, num_points)
feature_values_2 = np.linspace(min_value_2, max_value_2, num_points)


partial_dependence_values = np.zeros((len(feature_values_1), len(feature_values_2)))
for i, value_1 in enumerate(feature_values_1):
    for j, value_2 in enumerate(feature_values_2):
        input_feature = features_train.copy()
        input_feature[:, input_feature_index_1] = value_1
        input_feature[:, input_feature_index_2] = value_2
        prediction = combined_model(input_feature)
        partial_dependence_values[i, j] = prediction[0, output_index]


In [None]:
min_syn_1 = np.min(Syngas_data.iloc[:, input_feature_index_1])
max_syn_1 = np.max(Syngas_data.iloc[:, input_feature_index_1])
min_syn_2 = np.min(Syngas_data.iloc[:, input_feature_index_2])
max_syn_2 = np.max(Syngas_data.iloc[:, input_feature_index_2])
num_points = shap_values.shape[0]  

feature_values_1_syn = np.linspace(min_syn_1, max_syn_1 , num_points)
feature_values_2_syn = np.linspace(min_syn_2, max_syn_2, num_points)

fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

X, Y = np.meshgrid(feature_values_2_syn, feature_values_1_syn)
surf = ax.plot_surface(X, Y, partial_dependence_values, cmap='viridis')

ax.set_xlabel(f'{X_columns[input_feature_index_2]}')
ax.set_ylabel(f'{X_columns[input_feature_index_1]}')
ax.set_zlabel('Two-Way Partial Dependence')
ax.view_init(elev=50, azim=260)


plt.show()

In [None]:
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

X, Y = np.meshgrid(feature_values_2, feature_values_1)
surf = ax.plot_surface(X, Y, partial_dependence_values, cmap='viridis')

ax.set_xlabel(f'{X_columns[input_feature_index_2]}')
ax.set_ylabel(f'{X_columns[input_feature_index_1]}')
ax.set_zlabel('Two-Way Partial Dependence')

ax.view_init(elev=30, azim=240)

plt.title(f'Two-way partial dependance plots for {X_columns[input_feature_index_1]} and {X_columns[input_feature_index_2]} on {Y_columns[output_index]}')
plt.show(

In [None]:
feature_values_1_original = scaler_X.inverse_transform(np.tile(feature_values_1[:,np.newaxis],(1, 11)))
feature_values_1_ori = feature_values_1_original[:, input_feature_index_1]  

feature_values_2_original = scaler_X.inverse_transform(np.tile(feature_values_2[:,np.newaxis],(1, 11)))
feature_values_2_ori = feature_values_2_original[:, input_feature_index_2]

# data_to_save = np.column_stack((feature_values_1_ori.repeat(len(feature_values_2_ori)), 
#                                 np.tile(feature_values_2_ori, len(feature_values_1_ori)), 
#                                 partial_dependence_values.flatten()))