In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from tqdm import tqdm
import itertools
import pickle
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score, precision_score, recall_score, brier_score_loss 

In [None]:
alpha = 0.5
gamma = 1
learning_rate = 1e-4
batch_size = 128
max_epoch = 50
experiment_time = 1
limit_early_stop_count = 5
device = torch.device("cuda")

use_upsample = True
use_mini_sample = True

#task_name_list = ['dialysis','DNR']
#task_name_list = ['dod_7day','DNR']
#task_name_list = ['dod_30day','DNR']
#task_name_list = ['dod_90day','DNR']
#task_name_list = ['Weaning_successful','DNR']
#task_name_list = ['SBT','DNR']
#task_name_list = ['dod_30day']
#task_name_list = ['dod','dod_90day']
#task_name_list = ['dod_30day','dod_90day']
#task_name_list = ['dod']
#task_name_list = ['DNR']

task_name_list = ['DNR','dod_30day','Vasopressor']

#task_name_list = ['DNR','dod_30day','Vasopressor','InvasiveVent','dialysis']





In [None]:
##++
class RNN_MTL(nn.Module):
    def __init__(self, input_dim, task_name_list,window_size = 3, dropout_ratio=0.0):
        super(RNN_MTL, self).__init__()

        self.dropout = nn.Dropout(dropout_ratio)
        self.relu = nn.ReLU()  # Activation function for hidden layers
        self.sigmoid = nn.Sigmoid()
        self.task_name_list = task_name_list
        self.num_tasks = len(task_name_list)
        
        #20240717
        self.n_temporal = 0
        self.n_static = 0
        
        
        hidden_dim = [256, 64]
        output_size = 1

        # Bottom
        self.bi_lstm = torch.nn.LSTM(input_dim, hidden_dim[0], num_layers=2, batch_first = True, bidirectional = True)
        #self.bt_fc1 = nn.Linear(input_dim, hidden_dim[0])
        #self.bt_fc2 = nn.Linear(hidden_dim[0], hidden_dim[1])
        #self.bt_fc3 = nn.Linear(hidden_dim[1], hidden_dim[2])

        # Towers
        self.task_fc0 = nn.ModuleList([nn.Linear(hidden_dim[0]*window_size*2, hidden_dim[1]) for _ in range(self.num_tasks)])
        self.task_fc1 = nn.ModuleList([nn.Linear(hidden_dim[1], output_size) for _ in range(self.num_tasks)])
    
        #Towers test
        # self.task_fc0 = nn.ModuleList([nn.Linear(hidden_dim[0] * window_size * 2, hidden_dim[1]) for _ in range(self.num_tasks)])
        # self.task_fc1 = nn.ModuleList([nn.Linear(hidden_dim[1], output_size) for _ in range(self.num_tasks)])

    def data_check(self,x):
        if isinstance(x, np.ndarray):
            x = torch.tensor(x, dtype=torch.float32)
        #if x.ndim == 3:
        #    x = x.reshape(x.shape[0], x.shape[1] * x.shape[2])  # Flatten 
        
        x = x.to(device)
        return x
    
    def forward(self, x):
        self.train() 
        
        #20240717
        if x.ndim == 2:
            x = x.reshape(x.shape[0], 3, self.n_temporal+self.n_static)
            
        x = self.data_check(x)
        
        h, _   = self.bi_lstm(x)
        h = torch.nn.functional.relu(h)
        h = torch.flatten(h, start_dim = 1)
        # Towers
        task_out = {}
        for task_index in range(self.num_tasks):
            task_name = self.task_name_list[task_index]
            hi = self.task_fc0[task_index](h)
            hi = self.relu(hi)
            hi = self.dropout(hi)
            hi = self.task_fc1[task_index](hi)
            hi = self.sigmoid(hi)
            task_out[task_name] = hi    
            
        if len(self.task_name_list) == 1:
            return task_out[self.task_name_list[0]]
        else:
            return task_out
    
    def predict_prob(self, x):
        self.eval()
        prob_dict = self.forward(x)
        
        if len(self.task_name_list) == 1:
            prob_dict_true = {}
            prob_dict_true[self.task_name_list[0]] = prob_dict
            return prob_dict_true
        return prob_dict

    def predict_proba(self, x):
        self.eval()
        prob_dict = self.forward(x)
        
        if len(self.task_name_list) == 1:
            prob_dict_true = {}
            prob_dict_true[self.task_name_list[0]] = prob_dict
            return prob_dict_true
        
        return prob_dict
    
    def predict(self, x, threshold = 0.5):
        self.eval()
        prob_dict = self.predict_prob(x)
        pred_dict = {}
        
        for key, value in prob_dict.items():
            #tensor轉numpy
            value = value.cpu().detach().numpy()
            pred_class = [1 if x > threshold else 0 for x in value]
            pred_dict[key] = np.array(pred_class) 
        return pred_dict
    
    def evaluate(self,X,label,task_name,criterion):
        with torch.no_grad():
            prob = self.predict_prob(X)[task_name].cpu().detach().numpy() #tensor=>numpy
            pred = self.predict(X)[task_name] 
            score = compute_scores(label,pred,prob)
            score['task'] = task_name
            loss = criterion(torch.from_numpy(prob).to(device),torch.from_numpy(label).to(device)).item()
            score['loss'] = loss/len(label)
            return score
        
        

In [None]:
def compute_scores(y_true, y_pred,y_prob):
    if np.any(np.isnan(y_prob)):
        print(y_prob)
        input()
        
    scores = {}
    try:
        scores['task'] = 'Null'
        scores['auroc'] = round(roc_auc_score(y_true, y_prob), 3)
        scores['acc'] = round(accuracy_score(y_true, y_pred), 3)
        scores['f1'] = round(f1_score(y_true, y_pred), 3)
        scores['pre'] = round(precision_score(y_true, y_pred), 3)
        scores['recall'] = round(recall_score(y_true, y_pred), 3)
        scores['brier_score'] = round(brier_score_loss(y_true, y_prob), 3)
    except Exception as e:
        print("An error occurred:", str(e))
    return scores


In [None]:
"""
Input:
    model
    dict: Mydataset
    loss_function
Output:
    score: dict + dict
    result: dict => ['total_auc','total_loss']
"""
def test(model, dataset_dict, criterion, is_show = True):
    model.eval()

    task_name_list = list(dataset_dict.keys())
    score = {}
    result = {'total_auc': 0, 'total_loss': 0}
    for task_name in task_name_list:  # 循環每個任務
        X = dataset_dict[task_name].inputs.numpy()
        Y = dataset_dict[task_name].labels.unsqueeze(1).numpy()
    
        score[task_name] = model.evaluate(X,Y,task_name,criterion)
        
        result['total_auc'] = result['total_auc'] + score[task_name]['auroc']
        result['total_loss'] = result['total_loss'] + score[task_name]['loss']
            
        if is_show:
            print(score[task_name])
    
    return score,result

"""
local_best_model_dict: #dict{'task_name':{'model','performance(target_score)','id'}}
model
"""
def test2(local_best_model_dict, dataset_dict, criterion, is_show = True):
    score = {}
    result = {'total_auc': 0, 'total_loss': 0}
    task_name_list = list(dataset_dict.keys())
    
    for task_name in task_name_list:
        #modelr.load_state_dict(local_best_model_dict[task_name]['model'])
        modelr = local_best_model_dict[task_name]['model']
        modelr.eval()
        X = dataset_dict[task_name].inputs.numpy()
        Y = dataset_dict[task_name].labels.unsqueeze(1).numpy()
        score[task_name] = modelr.evaluate(X,Y,task_name,criterion)
        result['total_auc'] = result['total_auc'] + score[task_name]['auroc']
        result['total_loss'] = result['total_loss'] + score[task_name]['loss']
        
        if is_show:
            print(score[task_name])
    return score,result

In [None]:
from torch.utils.data import Dataset
from torch.utils.data import DataLoader


class MyDataset(Dataset):
    def __init__(self, np_X_scalar,np_X_original, np_Y):
        self.inputs = torch.from_numpy(np_X_scalar).float()
        self.inputs_original = torch.from_numpy(np_X_original).float()
        self.labels = torch.from_numpy(np_Y).float()

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.labels[idx]
    

class BCEFocalLoss(torch.nn.Module):

    def __init__(self, gamma=2, alpha=0.25, reduction='elementwise_mean'):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction
 
    def forward(self, _input, target):
        pt = _input
        alpha = self.alpha
        loss = - alpha * (1 - pt) ** self.gamma * target * torch.log(pt) - \
               (1 - alpha) * pt ** self.gamma * (1 - target) * torch.log(1 - pt)
        if self.reduction == 'elementwise_mean':
            loss = torch.mean(loss)
        elif self.reduction == 'sum':
            loss = torch.sum(loss)
        return loss    

    
def check_label_distribution (data_Y):
    count_1 = np.count_nonzero(data_Y == 1)
    count_0 = np.count_nonzero(data_Y == 0)
    count_others = np.count_nonzero((data_Y != 1) & (data_Y != 0))
    ratio_1 = round(count_1/len(data_Y)*100,2)
    ratio_0 = round(count_0/len(data_Y)*100,2)
    ratio_others = round(count_others/len(data_Y)*100,2)
    print(f'Distribution: 1=>{count_1}({ratio_1}%),  0=>{count_0}({ratio_0}%),  others=>{count_others}({ratio_others}%)')

    
def upsampling_auto(X,X_original,Y,up_ratio):
    check_label_distribution(Y)
    zero_idx = np.where(Y == 0)[0]
    one_idx = np.where(Y == 1)[0]
    other_idx = np.where((Y != 1) & (Y != 0))[0]
    if len(other_idx > 0):
        return X,Y
    repeated_data_X = np.tile(X[one_idx], (up_ratio, 1, 1))
    repeated_data_X_original = np.tile(X_original[one_idx], (up_ratio, 1, 1))
    repeated_data_Y = np.tile(Y[one_idx], (up_ratio))

    X_upsampled = np.vstack((X[zero_idx], repeated_data_X))
    X_original_upsampled = np.vstack((X_original[zero_idx], repeated_data_X_original))

    Y_upsampled = np.concatenate((Y[zero_idx], repeated_data_Y)) 
    return X_upsampled,X_original_upsampled, Y_upsampled

In [None]:
import numpy as np

"""
Input:
    X: numpy
    feature_name_list : List
    select_feature_list : List   (必須是feature_name_list的子集)
Output
    select_feature_list data
"""
def select_features(X, feature_name_list, select_feature_list):
    invalid_features = set(select_feature_list) - set(feature_name_list)
    if invalid_features:
        raise ValueError(f"Invalid features in select_feature_list: {invalid_features}")
    selected_feature_indices = [feature_name_list.index(feature) for feature in select_feature_list]
    X_selected = X[:, :, selected_feature_indices]

    return X_selected


In [None]:
import numpy as np

"""
Input:
    dataset_dict: Mydataset 
    loader_dict: Dataloader
    feature_name_list: List
    select_feature_list: List 
    batch_size: 256

Output:
    dataset_dict
    loader_dict
    feature_name_list ==>
    +++
"""

def read_data(task_name_list, data_date ,data_type, select_feature_list = [], batch_size = 256,use_upsample = False):
    #data_path = "./data/sample/standard_data"
    data_path = "C:/Users/USER/M1326168/MIMIC/DNR/20250507/data/sample/standard_data"
    
    #20241002
    #記得要改!!
    # n_temporal = 128
    # n_static = 143

    #20250507
    n_temporal = 126
    n_static = 145
    
    #Feature name
    #df_feature = pd.read_csv("./data/sample/full_feature_name.csv")
    df_feature = pd.read_csv("C:/Users/USER/M1326168/MIMIC/DNR/20250507/data/sample/full_feature_name.csv")
    feature_name_list = df_feature.columns.to_list()
    
    #Dataset
    dataset_dict = {}
    original_data_dict = {}
    for task_name in task_name_list:
        X_scalar = np.load(f"{data_path}/{data_type}_scalar_X_{task_name}.npy", allow_pickle=True)
        X_original = np.load(f"{data_path}/{data_type}_X_{task_name}.npy", allow_pickle=True)
        
        #20240717 攤平
        X_scalar = X_scalar.reshape(X_scalar.shape[0], (n_temporal+n_static)*3)
        X_original = X_original.reshape(X_original.shape[0], (n_temporal+n_static)*3)
        """
        if len(select_feature_list)>0:
            X_scalar = select_features(X_scalar,feature_name_list,select_feature_list)
            X_original = select_features(X_original,feature_name_list,select_feature_list)
            assert X_scalar.shape[2] == len(select_feature_list)
            assert X_original.shape[2] == len(select_feature_list)
        """
        
            

        Y = np.load(f"{data_path}/{data_type}_Y_{task_name}.npy", allow_pickle=True)
        
        """
        if use_upsample:
            if task_name == 'DNR' and data_type != 'test':
                X_scalar,X_original,Y = upsampling_auto(X_scalar,X_original,Y,10)
        """
        
        dataset_dict[task_name] = MyDataset(X_scalar,X_original,Y)
        original_data_dict['X_scalar'] = X_scalar
        original_data_dict['X'] = X_original
        
        original_data_dict['Y'] = Y
        
    #Dataloader
    loader_dict = {}
    for key, dataset in dataset_dict.items():        
        loader_dict[key] = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)
    
    return dataset_dict,loader_dict,feature_name_list,original_data_dict


In [None]:
def MTL_to_STL(multi_task_model):
    single_task_models = {}

    for task_index, task_name in enumerate(multi_task_model.task_name_list):
        
        single_task_model = RNN_MTL(input_dim, [task_name])  
        single_task_model.to(device) 

        #Bottom
        # 複製bi_lstm的權重和偏差到model2的對應層中
        single_task_model.bi_lstm.weight_ih_l0.data = multi_task_model.bi_lstm.weight_ih_l0.data.clone()
        single_task_model.bi_lstm.weight_hh_l0.data = multi_task_model.bi_lstm.weight_hh_l0.data.clone()
        single_task_model.bi_lstm.bias_ih_l0.data = multi_task_model.bi_lstm.bias_ih_l0.data.clone()
        single_task_model.bi_lstm.bias_hh_l0.data = multi_task_model.bi_lstm.bias_hh_l0.data.clone()

        single_task_model.bi_lstm.weight_ih_l1.data = multi_task_model.bi_lstm.weight_ih_l1.data.clone()
        single_task_model.bi_lstm.weight_hh_l1.data = multi_task_model.bi_lstm.weight_hh_l1.data.clone()
        single_task_model.bi_lstm.bias_ih_l1.data = multi_task_model.bi_lstm.bias_ih_l1.data.clone()
        single_task_model.bi_lstm.bias_hh_l1.data = multi_task_model.bi_lstm.bias_hh_l1.data.clone()

        #Tower
        single_task_model.task_fc0[0].weight.data = multi_task_model.task_fc0[task_index].weight.data.clone()
        single_task_model.task_fc0[0].bias.data = multi_task_model.task_fc0[task_index].bias.data.clone()

        single_task_model.task_fc1[0].weight.data = multi_task_model.task_fc1[task_index].weight.data.clone()
        single_task_model.task_fc1[0].bias.data = multi_task_model.task_fc1[task_index].bias.data.clone()

        single_task_models[task_name] = single_task_model
    return single_task_models

In [None]:
"""
Input:
    experiment_time
    max_epoch
    learning_rate
    input_dim
    task_name_list
    train_loader_dict
    val_dataset_dict
    test_dataset_dict
    device
    is_show

Output:
    df_grade
    stl_model_dict
"""


def train_and_test_model(experiment_time, max_epoch, learning_rate, input_dim, task_name_list, train_loader_dict, val_dataset_dict, test_dataset_dict, device,is_show = True):
    df_grade = pd.DataFrame(columns=['time', 'task', 'auroc', 'acc', 'f1', 'pre', 'recall', 'brier_score', 'loss'])
    best_model_params = {}
    global_best_AUC = 0
    global_best_loss = 10000
    best_model_dict = {} 
    
    count = 1
    local_indicator = 'auroc'
    global_indicator = 'loss'
    

    
    for time in range(experiment_time):
        train_loss_list = []
        val_loss_list = []
        auc_list = []    
        local_best_AUC = 0
        local_best_loss = 10000
        local_best_model_dict = {} 
        patience_counter = 0
        
        model = RNN_MTL(input_dim, task_name_list).to(device)
        optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=0.001)
        loss_func = BCEFocalLoss(alpha=alpha, gamma=gamma)
        
        for epoch in tqdm(range(max_epoch)):
            if is_show:
                print(f'Time:{time+1}/{experiment_time} - Epoch:{epoch+1}/{max_epoch}...')
                
            train_loss = train(model, train_loader_dict, loss_func, optimizer)
            val_score_dict, result = test(model, val_dataset_dict, loss_func, is_show= is_show)
            
            train_loss_list.append(train_loss)
            val_loss_list.append(result['total_loss'])
            auc_list.append(result['total_auc'])
            
            ########################################################################################################################
            for task_name in task_name_list:
                if task_name not in local_best_model_dict:
                    model_dict = {}
                    model_dict['model'] = model.state_dict().copy()
                    model_dict['performance'] = val_score_dict[task_name]
                    model_dict['id'] = count
                    local_best_model_dict[task_name] = model_dict
                else:
                    target_score = val_score_dict[task_name]
                    if local_indicator == 'auroc':
                        if local_best_model_dict[task_name]['performance'][local_indicator] < target_score[local_indicator] :
                            local_best_model_dict[task_name]['performance'] = target_score
                            local_best_model_dict[task_name]['model'] = model.state_dict().copy()
                            local_best_model_dict[task_name]['id'] = count                           
                    else:
                        if local_best_model_dict[task_name]['performance'][local_indicator] > target_score[local_indicator] :
                            local_best_model_dict[task_name]['performance'] = target_score
                            local_best_model_dict[task_name]['model'] = model.state_dict().copy()
                            local_best_model_dict[task_name]['id'] = count
                    
            for task_name in task_name_list:
                if task_name not in best_model_dict:
                    model_dict = {}
                    model_dict['model'] = model.state_dict().copy()
                    model_dict['performance'] = val_score_dict[task_name]
                    model_dict['id'] = count
                    best_model_dict[task_name] = model_dict
                else:
                    target_score = val_score_dict[task_name]
                    if local_indicator == 'auroc':
                        if best_model_dict[task_name]['performance'][local_indicator] < target_score[local_indicator] :
                            best_model_dict[task_name]['performance'] = target_score
                            best_model_dict[task_name]['model'] = model.state_dict().copy()
                            best_model_dict[task_name]['id'] = count
                    else:
                        if best_model_dict[task_name]['performance'][local_indicator] > target_score[local_indicator] :
                            best_model_dict[task_name]['performance'] = target_score
                            best_model_dict[task_name]['model'] = model.state_dict().copy()
                            best_model_dict[task_name]['id'] = count
            count+=1

            ########################################################################################################################
            """ Early stop """
            if global_indicator == 'loss':
                if result['total_loss'] < local_best_loss:
                    local_best_loss = result['total_loss']
                    if local_best_loss < global_best_loss:
                        global_best_loss = local_best_loss
                        best_model_params = model.state_dict().copy() 
                    patience_counter = 0
                else:
                    patience_counter += 1     
            else:
                if result['total_auc'] > local_best_AUC:
                    local_best_AUC = result['total_auc']
                    if local_best_AUC > global_best_AUC:
                        global_best_AUC = local_best_AUC
                        best_model_params = model.state_dict().copy()
                    patience_counter = 0
                else:
                    patience_counter += 1 
            
            global limit_early_stop_count
            if patience_counter >= limit_early_stop_count:
                break
        
        #is_show = True
        empty_model = RNN_MTL(input_dim, task_name_list).to(device)
        test_score_dict, result = test2(local_best_model_dict, empty_model, test_dataset_dict, loss_func, is_show = is_show)
        ########################################################################################################################
        #input()
        for task_name in task_name_list:
            test_score_dict[task_name]['time'] = time + 1
            df_grade = pd.concat([df_grade, pd.DataFrame.from_records([test_score_dict[task_name]])])
        print(df_grade)
        print('----------finished----------')
    
    
    global_stl_model_dict = {}
    for task_name in task_name_list:
        model = RNN_MTL(input_dim, task_name_list).to(device)
        model.load_state_dict(best_model_dict[task_name]['model'])
        local_stl_model_dict = MTL_to_STL(model)
        global_stl_model_dict[task_name] = local_stl_model_dict[task_name]
    
    return df_grade, global_stl_model_dict, best_model_dict
    
    

In [None]:
import shap
import matplotlib.pyplot as plt
"""
Input:
    shap_value: flatten的結果,[sample,feature]
    feature_name_list: flatten的feature list
Output:
    feature_important 
    sum_per_feature 
    ++
"""
def calculate_feature_important(shap_value,feature_name_list,seq_day, n_temporal, n_static):
    #特徵+入時序 ex. PEEP_D1 PEEP_D2 ... 
    full_feature_name_list = []
    for i in range(len(feature_name_list)):
        name = feature_name_list[i]
        #static
        if i >=n_temporal:
            full_feature_name_list.append(f'{name}')
        #temporal
        else:
            for day in range(seq_day):
                full_feature_name_list.append(f'{name}_D{day+1}')
                         
    abs_shap_value = np.abs(shap_value)
    sum_per_feature = np.sum(abs_shap_value, axis=0) #feature,value
    
    assert len(full_feature_name_list) == sum_per_feature.shape[0], f'{len(full_feature_name_list)}//{sum_per_feature.shape[0]}'
    
    #未排序
    feature_important_dict = dict(zip(full_feature_name_list, sum_per_feature))
    
    sorted_feature_indices = np.argsort(sum_per_feature)[::-1] #[::-1]是reversed
    sorted_feature_names = [full_feature_name_list[i] for i in sorted_feature_indices]
    return sorted_feature_names, sum_per_feature, feature_important_dict

"""
Input:
    model  
    train_X (np)
    test_X  (np)
    test_X_original (np)
    feature_name_list (list)
    task_name (string)
    use_mini_sample (是否少量資料計算shap)
    n_sample, (shap參數)
    n_temporal,
    n_static 
    
Output:
    shap_value
    shap_data
    (flatten的結果)
"""
#####################################################################################
def get_model_shap(model,data_X_train,data_X_test,data_X_test_original,feature_name_list,task_name,use_mini_sample = True,n_sample = 100,n_temporal = 103,n_static = 53):
    max_sample = 1000
    seq_day = data_X_train.shape[1]
    feature_count = data_X_train.shape[2]
    
    if use_mini_sample:
        background_data = torch.from_numpy(data_X_train[:max_sample]).float().to(device)
        shap_data = torch.from_numpy(data_X_test[:max_sample]).float().to(device)
        shap_data_original = torch.from_numpy(data_X_test_original[:max_sample]).float().to(device)
    else:
        background_data = torch.from_numpy(data_X_train[:]).float().to(device)
        shap_data = torch.from_numpy(data_X_test[:]).float().to(device)
        shap_data_original = torch.from_numpy(data_X_test_original[:]).float().to(device)

    model.eval()
    explainer = shap.GradientExplainer(model, background_data)
    
    shap_values = explainer.shap_values(shap_data,nsamples=n_sample)
    shap_values = np.array(shap_values)
    
    shap_value_flatten = np.zeros((len(shap_data),seq_day*n_temporal + n_static))
    shap_data_flatten = np.zeros((len(shap_data),seq_day*n_temporal + n_static))
    
    for i in range(0,len(shap_data)):
        count=0
        for j in range(feature_count):
            #static
            if j >= n_temporal:
                for k in range(seq_day):
                    shap_value_flatten[i][count]=shap_values[i][k][j]  
                    shap_data_flatten[i][count]=shap_data_original[i][k][j]  
                shap_value_flatten[i][count]/= seq_day
                shap_data_flatten[i][count]/= seq_day
                count += 1
            #temporal
            else:
                for k in range(seq_day):
                    shap_value_flatten[i][count]=shap_values[i][k][j]  
                    shap_data_flatten[i][count]=shap_data_original[i][k][j]  
                    count += 1
            
    feature_important, _, feature_important_dict = calculate_feature_important(shap_value_flatten, feature_name_list, seq_day, n_temporal, n_static)
    return feature_important, shap_value_flatten, shap_data_flatten, feature_important_dict

"""
Input:
    shap_value_flatten (sample,feature_flatten)
    shap_data_flatten (sample,feature_flatten)
    max_display 
"""
def show_shap(shap_value_flatten, shap_data_flatten,feature_name_list, max_display = 20,task_name = '',plot_type = 'dot'):
    fig = shap.summary_plot(shap_value_flatten,shap_data_flatten,feature_names=feature_name_list,plot_type = plot_type, show=False,max_display = max_display)
    #plt.title(f"***Task:{task_name}***")
    #plt.xticks(fontsize=20, fontweight='bold', fontfamily='Arial')
    #plt.yticks(fontsize=20, fontweight='bold', fontfamily='Arial')
    #plt.xlabel('SHAP Value',fontsize=24, fontweight='bold', fontfamily='Arial')
    #plt.ylabel(fontsize=26, fontweight='bold', fontfamily='Arial')
    
    #ax = plt.gca()  # 获取当前图形的轴
    #plt.savefig(f'./解釋用模型/解釋結果/SHAP_{plot_type}.tif', bbox_inches = 'tight', dpi=300)
        
    plt.show()





In [None]:
from datetime import datetime

start_time = datetime.now()

# Start

In [None]:
#select_feature_list = []
top_percent = 0.9
remove_time_count = 0
full_result_dict = {}
select_feature_list = []

#20241002
# n_temporal = 128
# n_static = 143

#20250507
n_temporal = 126
n_static = 145

"""
read data
"""
train_dataset_dict,train_loader_dict,feature_name_list,_ = read_data(task_name_list,'','train',select_feature_list,batch_size = batch_size,use_upsample = use_upsample)
val_dataset_dict,val_loader_dict,_,_ = read_data(task_name_list,'','validation',select_feature_list,batch_size = batch_size,use_upsample = use_upsample)
test_dataset_dict,test_loader_dict,_,_ = read_data(task_name_list,'','test',select_feature_list,batch_size = batch_size,use_upsample = use_upsample)
if len(select_feature_list)!=0:
    feature_name_list = select_feature_list
input_dim = n_temporal + n_static
print(f'==> input_dim: {input_dim}')


In [None]:
loss_func = BCEFocalLoss(alpha=alpha, gamma=gamma)

In [None]:
"""
n_temporal = 130
n_static = 143
"""

In [None]:
#++
stl_model_dict = {}

for task_name in task_name_list:
    model = RNN_MTL(input_dim,[task_name]).to(device)
    route = ''
    if len(task_name_list) == 1:
        #route = f'model_parm/STL/{task_name}'
        
        route = f'C:/Users/USER/M1326168/MIMIC/DNR/20250507/model_parm/STL/{task_name}'
    else:
        #route = 'model_parm/MTL/'+('+'.join(task_name_list)) 
        route = 'C:/Users/USER/M1326168/MIMIC/DNR/20250507/model_parm/MTL/'+('+'.join(task_name_list)) 
         
    model.load_state_dict(torch.load(f'{route}/{task_name}_1'))    
    model.n_temporal = n_temporal
    model.n_static = n_static
    stl_model_dict[task_name] = model
    
    task_dict = {}
    task_dict[task_name] = test_dataset_dict[task_name]
    result,_ = test(model, task_dict, loss_func, is_show = False)
    print(result[task_name])

In [None]:
test_dataset_dict['DNR'].inputs.shape

# 以下開始Error_analysis

In [None]:
"""
分析的model
"""
model = RNN_MTL(input_dim,['DNR']).to(device)
model.load_state_dict(torch.load(f'{route}/{task_name}_1'))    
model.n_temporal = n_temporal
model.n_static = n_static
result,_ = test(model, {'DNR':test_dataset_dict['DNR']}, loss_func, is_show = False)
print(result['DNR'])

In [None]:
"""
分析的model
"""
# model = RNN_MTL(input_dim,task_name_list).to(device)
# model.load_state_dict(torch.load(f'{route}/{task_name}_1'))    
# model.n_temporal = n_temporal
# model.n_static = n_static

# model.load_state_dict(torch.load(f'{route}/{task_name}_1'))    
#     model.n_temporal = n_temporal
#     model.n_static = n_static
#     stl_model_dict[task_name] = model
    
#     task_dict = {}
#     task_dict[task_name] = test_dataset_dict[task_name]
#     result,_ = test(model, task_dict, loss_func, is_show = False)
#     print(result[task_name])

model = RNN_MTL(input_dim, task_name_list).to(device)
model.load_state_dict(torch.load(f'{route}/{task_name}_1'), strict=False)
model.n_temporal = n_temporal
model.n_static = n_static
task_dict[task_name] = test_dataset_dict[task_name]
result,_ = test(model, task_dict, loss_func, is_show = False)
print(result[task_name])


In [None]:
from interpret_community.common.constants import ShapValuesOutput, ModelTask
from interpret.ext.blackbox import MimicExplainer
from interpret.ext.glassbox import LGBMExplainableModel
from sklearn import svm
import pandas as pd
import zipfile
from lightgbm import LGBMClassifier

# Explainer Used: Mimic Explainer
from interpret.ext.blackbox import MimicExplainer
from interpret.ext.glassbox import LinearExplainableModel
from interpret.ext.glassbox import LGBMExplainableModel

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer

from raiwidgets import ErrorAnalysisDashboard
from interpret_community.common.constants import ShapValuesOutput, ModelTask


In [None]:
#定义预测函数和预测概率函数
def predict_func(model, X):
    task_name = model.task_name_list[0]
    return model.predict(X)[task_name]

def predict_proba_func(model, X):
    task_name = model.task_name_list[0]
    result = model.predict_proba(X)[task_name]
    result = result.cpu().detach().numpy()
    return result 

def create_model_pipeline(model):
    # 載入標準化模型
    #scaler_model = joblib.load('scaler_model.joblib')
    # 將模型包裝在Pipeline中，依序進行轉換和預測
    model_pipeline = Pipeline([
        #('scaler', scaler_model),
        ('model', model)
    ])

    # 添加自定义的 predict 和 predict_proba 方法
    model_pipeline.predict = lambda X: predict_func(model_pipeline.named_steps['model'], X)
    model_pipeline.predict_proba = lambda X: predict_proba_func(model_pipeline.named_steps['model'], X)

    return model_pipeline

# from sklearn.preprocessing import StandardScaler
# import joblib

# def predict_func(model, X, scaler_model=None):
#     task_name = model.task_name_list[0]
#     predictions = model.predict(X)[task_name]
    
#     # 還原標準化，如果提供了 scaler_model
#     if scaler_model is not None:
#         predictions = scaler_model.inverse_transform(predictions.reshape(-1, 1)).flatten()
    
#     return predictions
# def create_model_pipeline(model):
#     # 載入標準化模型
#     scaler_model = joblib.load(f'C:/Users/USER/M1326168/MIMIC/DNR/20240904/data/scaler_model.joblib')
    
#     # 將模型包裝在Pipeline中，依序進行轉換和預測
#     model_pipeline = Pipeline([
#         ('scaler', scaler_model),
#         ('model', model)
#     ])

#     # 添加自定义的 predict 和 predict_proba 方法
#     model_pipeline.predict = lambda X: predict_func(model_pipeline.named_steps['model'], X, scaler_model=scaler_model)
#     model_pipeline.predict_proba = lambda X: predict_proba_func(model_pipeline.named_steps['model'], X)

#     return model_pipeline

In [None]:
model_pipeline = create_model_pipeline(model)

In [None]:
#X_train_original = train_dataset_dict['DNR'].inputs.numpy()
X_train_original = train_dataset_dict[task_name].inputs.numpy()
X_train_original = np.squeeze(X_train_original)

#X_test_original_full = test_dataset_dict['DNR'].inputs.numpy()
X_test_original_full = test_dataset_dict[task_name].inputs.numpy()

X_test_original_full = np.squeeze(X_test_original_full)

In [None]:
print(X_test_original_full)

In [None]:
sample_count = X_test_original_full.shape[0]
#y_test_full = test_dataset_dict['DNR'].labels.numpy()
y_test_full = test_dataset_dict[task_name].labels.numpy()
X_test_original = X_test_original_full[:int(sample_count*0.9),:]
y_test = y_test_full[:int(sample_count*0.9)]


In [None]:
print(sample_count)

In [None]:
print(X_train_original.shape)

In [None]:
#特徵名稱 + 時序
full_feature_name_list = []
seq_day = 3

"""
for i in range(len(feature_name_list)):
    name = feature_name_list[i]
    for day in range(seq_day):
        full_feature_name_list.append(f'{name}_D{day+1}')
"""
for day in range(seq_day):
    for i in range(len(feature_name_list)):
        name = feature_name_list[i]
        #full_feature_name_list.append(f'{name}_day{day+1}')
        full_feature_name_list.append(f'{name}_day-{day-1}')

In [None]:
model_task = ModelTask.Classification
explainer = MimicExplainer(model_pipeline, X_train_original, LGBMExplainableModel,
                           augment_data=True, max_num_of_augmentations=10,
                           features=full_feature_name_list, classes=[0,1], model_task=model_task)

In [None]:
global_explanation = explainer.explain_global(X_test_original)

In [None]:
dashboard_pipeline = create_model_pipeline(model)

In [None]:
# Run error analysis on the full dataset with subsampled explanation data on 5k rows
# Note in this case we need to provide the true_y_dataset parameter matching the
# original full dataset
ErrorAnalysisDashboard(global_explanation, dashboard_pipeline, dataset=X_test_original_full,
                       true_y=y_test, categorical_features = [],
                       true_y_dataset=y_test_full)

In [None]:
sample_scaled = y_test_full[0].reshape(1, -1)
print(sample_scaled)

In [None]:
import joblib
#scaler = joblib.load('./data/scaler_model.joblib')

scaler = joblib.load('C:/Users/USER/M1326168/MIMIC/DNR/20250507/data/scaler_model.joblib')
#df_feature = pd.read_csv(f'./full_feature/{data_date}_columns_name.csv')
df_feature = pd.read_csv(f'C:/Users/USER/M1326168/MIMIC/DNR/20250507/data/sample/full_feature_name.csv')

#不含時間
feature_name_list = df_feature.columns.tolist()
#含時間
select_feature_list = full_feature_name_list


print(f'含時序特徵數:{len(select_feature_list)}\n不含時序特徵數:{len(feature_name_list)}')

feature_count = len(feature_name_list)
feature_count_with_time = len(select_feature_list)


In [None]:
def choose_feature(select_feature_list):
    for i in range(len(select_feature_list)):
        print(f'[{i}]...{select_feature_list[i]}')
        
    select_id = int(input('feature id = '))
    value = float(input('value = '))
    
    return select_feature_list[select_id], value

# 手動選擇節點特徵與值

In [None]:
feature_name1, value1 = choose_feature(feature_name_list)

day = int(input('day = '))
feature_name1_with_time = feature_name1 + f"_D{day}"


# index_of_feature1_full = feature_name_list_notime.index(feature_name1)
# index_of_feature1_lite = select_feature_list.index(feature_name1_with_time)

index_of_feature1_full = feature_name_list.index(feature_name1)
index_of_feature1_lite = select_feature_list.index(feature_name1_with_time)


#建構虛擬資料
data = np.full(feature_count, value1)
data = data.reshape(1,feature_count)
original_value = scaler.inverse_transform(data)[0,index_of_feature1_full]
#original_value = data

print(f'特徵:{feature_name1} = {value1} 的原始值為: {original_value}')
str1 = f'特徵:{feature_name1} = {value1} 的原始值為: {original_value}'


#154 0.35 1
#155 0.35

#apdiii = 131 0.30 1
#PO2 = 34
#first_day_Platelets x1000 = 214

#first_day_PT-INR = 226 0.1 1
#first_day_Sodium = 218 0.7 1
#first_day_Nutrition_Enteral_value = 233 0.39 1

In [None]:
feature_name2, value2 = choose_feature(feature_name_list)

day = int(input('day = '))
feature_name2_with_time = feature_name2 + f"_D{day}"


# index_of_feature2_full = feature_name_list_notime.index(feature_name2)
# index_of_feature2_lite = select_feature_list.index(feature_name2_with_time)

index_of_feature2_full = feature_name_list.index(feature_name2)
index_of_feature2_lite = select_feature_list.index(feature_name2_with_time)

#建構虛擬資料
data = np.full(feature_count, value2)
data = data.reshape(1,feature_count)
original_value = scaler.inverse_transform(data)[0,index_of_feature2_full]
#original_value = data

print(f'特徵:{feature_name2} = {value2} 的原始值為: {original_value}')
str2 = f'特徵:{feature_name2} = {value2} 的原始值為: {original_value}'

#220 0.19 1

#PO2 = 34 0.08 3
#first_day_PH =  219 0.82 1

#first_day_Systemic Mean = 185 0.05 1

#first_day_Sodium = 218 0.7 1

#first_day_Sodium = 218 0.7 1


In [None]:
feature_name3, value3 = choose_feature(feature_name_list)

day = int(input('day = '))
feature_name3_with_time = feature_name3 + f"_D{day}"


# index_of_feature3_full = feature_name_list_notime.index(feature_name3)
# index_of_feature3_lite = select_feature_list.index(feature_name3_with_time)

index_of_feature3_full = feature_name_list.index(feature_name3)
index_of_feature3_lite = select_feature_list.index(feature_name3_with_time)

#建構虛擬資料
data = np.full(feature_count, value1)
data = data.reshape(1,feature_count)
original_value = scaler.inverse_transform(data)[0,index_of_feature3_full]
#original_value = data

print(f'特徵:{feature_name3} = {value3} 的原始值為: {original_value}')
str3 = f'特徵:{feature_name3} = {value3} 的原始值為: {original_value}'

#236 0.13 1
#first_day_Platelets x1000 = 214 0.36 1
#first_day_Tidal = 209 0.57 1
#Cortisol = 176 0.5 1

#first_day_Hgb = 211 0.13 1

#first_day_Nutrition_Enteral_value = 233 0.39 1

In [None]:
print(str1)
print(str2)
print(str3)

# Lite資料集

In [None]:

#print(MyDataset(X_scalar,X_original,Y))
# ErrorAnalysisDashboard(global_explanation, dashboard_pipeline, dataset=X_test_original_full,
#                        true_y=y_test, categorical_features = [],
#                        true_y_dataset=y_test_full)

#         dataset_dict[task_name] = MyDataset(X_scalar,X_original,Y)
#         original_data_dict['X_scalar'] = X_scalar
#         original_data_dict['X'] = X_original
        
#         original_data_dict['Y'] = Y

In [None]:
# #test
# """ 讀lite資料集 """
# _, _, _, data = read_data(
#     task_name_list,    # 多任務清單，例如 ['DNR','dod_30day','Vasopressor']
#     '',                # 資料路徑改為空字串，代表讀預設的 lite 版本
#     'test',            # 指定載入的資料切分(split)為 test
#     select_feature_list,  # 只選取你先前定義好的特徵清單
#     batch_size = batch_size,
#     use_upsample = use_upsample
# )

# # 從回傳的 data dict 中取出：
# X_scalar   = data['X_scalar']   # 經過 scaler 正規化後的特徵矩陣（float tensor）
# X_original = data['X']          # 原始值矩陣（inverse_transform 後的結果）
# Y          = data['Y']          # 對應的標籤向量

# # 封裝成 PyTorch Dataset，方便後續 DataLoader 使用
# dataset_dict = {}
# dataset_dict[task_name] = MyDataset(X_scalar, X_original, Y)

In [None]:
# print(f"test_dataset_dict['DNR'].inputs shape: {test_dataset_dict['DNR'].inputs.shape}")
# print(f"X_scalar shape: {X_scalar.shape}")

In [None]:
# 檢查資料分割比例
# sample_count = X_scalar.shape[0]
# test_count = test_dataset_dict['DNR'].inputs.shape[0]
# print(f"總樣本數: {sample_count}, 測試集樣本數: {test_count}")

In [None]:
# print(data.keys())

In [None]:
#_, _, _, data = read_data(task_name_list, '', 'test', select_feature_list, batch_size=batch_size, use_upsample=use_upsample)

_, _, _, data = read_data([task_name], '', 'test', select_feature_list, batch_size=batch_size, use_upsample=use_upsample)

# 使用正确的键名
X_scalar = data['X_scalar']
X_original = data['X']
Y = data['Y']

print(f"X_scalar shape: {X_scalar.shape}")
print(f"X_original shape: {X_original.shape}")
print(f"Y shape: {Y.shape}")

In [None]:
#print(dataset_dict)

In [None]:
def remove_samples_np(data, feature_index, threshold, condition_type):
    """
    Remove samples based on a specified condition on a specific feature.

    Parameters:
    - data (numpy.ndarray): Input data with shape [sample, 1, feature].
    - feature_index (int): Index of the feature.
    - threshold (float): Threshold value for the condition.
    - condition_type (str): Type of condition ('type1' for '<' or 'type2' for '>=').

    Returns:
    - numpy.ndarray: Updated data after removing samples.
    """
    if condition_type == 'type1':
        indices_to_remove = np.squeeze(np.argwhere(data[:,  feature_index] < threshold))
    elif condition_type == 'type2':
        indices_to_remove = np.squeeze(np.argwhere(data[:,  feature_index] <= threshold))
    elif condition_type == 'type3':
        indices_to_remove = np.squeeze(np.argwhere(data[:,  feature_index] > threshold))
    elif condition_type == 'type4':
        indices_to_remove = np.squeeze(np.argwhere(data[:,  feature_index] >= threshold))
    else:
        raise ValueError("Invalid condition_type. Use 'type1' for '<' or 'type2' for '>='.")
    
    return indices_to_remove

In [None]:
input()

In [None]:
"""
手動選擇type
"""
#type1: <
#type2: <=
#type3: >
#type4: >=
indices_to_remove1 = remove_samples_np(X_scalar,index_of_feature1_lite,value1,'type3')
indices_to_remove2 = remove_samples_np(X_scalar,index_of_feature2_lite,value2,'type3')
indices_to_remove3 = remove_samples_np(X_scalar,index_of_feature3_lite,value3,'type2')

#common_indices = np.intersect1d(np.intersect1d(indices_to_remove1, indices_to_remove2), indices_to_remove3)

common_indices = np.intersect1d(indices_to_remove1, indices_to_remove2)



In [None]:
print(X_original.shape[0])
print(X_scalar.shape[0])
print(Y.shape[0])

In [None]:
print(f'總樣本數(刪除前):{X_scalar.shape[0]}')
print(f'條件1樣本數:{len(indices_to_remove1)}')
print(f'條件2樣本數:{len(indices_to_remove2)}')
print(f'條件3樣本數:{len(indices_to_remove3)}')
print(f'符合條件的索引數:{len(common_indices)}')

In [None]:
""" 刪除的樣本 """
X_scalar_remove = X_scalar[common_indices].copy()
X_original_remove = X_original[common_indices].copy()
Y_remove = Y[common_indices].copy()
print(X_scalar_remove.shape)

""" 保留的索引 """
full_indices = np.arange(0, Y.shape[0])
# 找到在 full_indices 中不在 common_indices 中的索引
keep_indices = np.array([i for i in full_indices if i not in common_indices])
print(keep_indices.shape)

""" 保留的樣本 """
X_scalar_keep = X_scalar[keep_indices].copy()
X_original_keep = X_original[keep_indices].copy()
Y_keep = Y[keep_indices].copy()

print(f'刪除後 剩餘樣本數:{X_scalar_keep.shape[0]}')


In [None]:
# """ 剩餘的樣本結果 """
dataset_dict_keep = {}
dataset_dict_keep[task_name] = MyDataset(X_scalar_keep,X_original_keep,Y_keep)
#原始分數
print("*** 原始分數 ***")
#result_before_remove,_ = test(model, {'DNR':test_dataset_dict['DNR']}, loss_func, is_show = True)
result_before_remove,_ = test(model, {'DNR':test_dataset_dict['DNR']}, loss_func, is_show = True)
print("*** 新分數 ***")
#新分數
result_after_remove,_ = test(model, dataset_dict_keep, loss_func, is_show = True)

In [None]:
# 確保 task_name 是有效的
task_name = 'DNR'  # 或從 task_name_list 中取得
if task_name not in test_dataset_dict:
    raise KeyError(f"Task name '{task_name}' not found in test_dataset_dict.")

# 檢查資料形狀
print(f"X_scalar_keep shape: {X_scalar_keep.shape}")
print(f"X_original_keep shape: {X_original_keep.shape}")
print(f"Y_keep shape: {Y_keep.shape}")
if X_scalar_keep.shape[0] != X_original_keep.shape[0] or X_scalar_keep.shape[0] != Y_keep.shape[0]:
    raise ValueError("Mismatch in the number of samples between X_scalar_keep, X_original_keep, and Y_keep.")

# 建立剩餘的樣本結果
dataset_dict_keep = {}
dataset_dict_keep[task_name] = MyDataset(X_scalar_keep, X_original_keep, Y_keep)

# 原始分數
print("*** 原始分數 ***")
result_before_remove, _ = test(model, {'DNR': test_dataset_dict['DNR']}, loss_func, is_show=True)

# 新分數
print("*** 新分數 ***")
result_after_remove, _ = test(model, dataset_dict_keep, loss_func, is_show=True)

In [None]:
df_result = pd.DataFrame([result_before_remove['DNR'], result_after_remove['DNR']], index=['before', 'after'])
if input('Save? (y/n)') == 'y':
    df_result.to_csv(f'C:/Users/USER/M1326168/MIMIC/DNR/20250507/error_analysis/實驗圖/error_analysis結果.csv')
    print('儲存成功')
print(df_result)

In [None]:
""" 總樣本分布 """
arr = Y.copy()

# 計算 0 和 1 的數量
total_sample = Y.shape[0]
count_zero = np.count_nonzero(arr == 0)
count_one = np.count_nonzero(arr == 1)

# 顯示結果
print(f'總樣本: {total_sample}')
print(f"Number of 0: {count_zero} ({round(count_zero/total_sample*100,2)}%)")
print(f"Number of 1: {count_one} ({round(count_one/total_sample*100,2)}%)")
print("**************************************")

""" 移除的樣本分布"""
import matplotlib.pyplot as plt
# 假設 arr 是你的 NumPy 陣列
arr = Y_remove.copy()

# 計算 0 和 1 的數量
total_remove_sample = Y_remove.shape[0]
count_zero = np.count_nonzero(arr == 0)
count_one = np.count_nonzero(arr == 1)

# 顯示結果
print(f'移除的樣本: {total_remove_sample}')
print(f"標籤為0: {count_zero} ({round(count_zero/total_remove_sample*100,2)}%)")
print(f"標籤為1: {count_one} ({round(count_one/total_remove_sample*100,2)}%)")
print("**************************************")



In [None]:
def plot_confusion_matrix(y_true, y_pred, classes, file_name, title='Confusion Matrix', cmap=plt.cm.Blues):
    """
    繪製混淆矩陣

    Parameters:
    y_true (array-like): 實際類別標籤
    y_pred (array-like): 預測類別標籤
    classes (list): 類別標籤的名稱列表
    title (str): 圖表標題
    cmap (matplotlib colormap): 顏色映射

    Returns:
    None
    """
    # 計算混淆矩陣
    cm = confusion_matrix(y_true, y_pred)
    
    # 將混淆矩陣轉換為 DataFrame
    cm_df = pd.DataFrame(cm, index=classes, columns=classes)
    
    new_order = ['Class 1','Class 0']
    new_order_columns = ['Class 1','Class 0']
    # 使用 reindex 方法重新指定行的順序
    cm_df = cm_df.reindex(new_order)
    cm_df = cm_df.reindex(columns=new_order_columns)
    #cm_df = cm_df.reorder_levels(['Class 1', 'Class 0'], axis=1)
    print(cm_df)
    #input()
    
    
    # 調整標籤的顯示順序
    y_labels = list(reversed(classes))
    x_labels = list(reversed(classes))
    #y_labels = list(classes)
    #x_labels = list(classes)

    # 使用 Seaborn 繪製熱度圖
    plt.figure(figsize=(8, 6))
    # 設置 xticklabels 和 yticklabels 的字體屬性
    font_properties = {'size': 20,  'family': 'Arial'}
    title_font_properties = {'size': 20, 'weight': 'bold', 'family': 'Arial'}
    
    annot_kws = {'size': 20, 'weight': 'bold', 'family': 'Arial'}
    sns.heatmap(cm_df, annot=True, fmt="d", cmap=cmap, cbar=False,
                xticklabels=x_labels, yticklabels=y_labels, annot_kws=annot_kws)
    
    plt.xticks(fontproperties='Arial', **font_properties)
    plt.yticks(fontproperties='Arial', **font_properties)
    
    # 設置 x 軸標籤
    plt.xlabel(title, fontproperties='Arial', **title_font_properties)
    plt.ylabel(title, fontproperties='Arial', **title_font_properties)
    
    # 設置圖的標題
    #plt.title('Confusion Matrix', fontsize=20)
    
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.savefig(f'./error_analysis/實驗圖/confusion_{file_name}.png')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix



In [None]:
""" [1]所有的樣本 的 預測狀況 """
correct_count_0 = 0
correct_count_1 = 0
count_zero = np.count_nonzero(Y == 0)
count_one = np.count_nonzero(Y == 1)
count = 0
pred_y = model.predict(X_scalar)['DNR']

for i in range(pred_y.shape[0]):
    if Y[i] != pred_y[i]:
        count+=1
    if Y[i] == 1 and pred_y[i] == 1:
        correct_count_1 += 1
    if Y[i] == 0 and pred_y[i] == 0:
        correct_count_0 += 1
 
plot_confusion_matrix(Y,pred_y,['Class 0','Class 1'],'full')
        
print(f'總樣本 1的樣本數: {count_one}')
print(f"--預測正確: {correct_count_1} ({round(correct_count_1/count_one*100,2)}%)")
print(f"--預測失敗: {count_one-correct_count_1} ({round((count_one-correct_count_1)/count_one*100,2)}%)")

print("************************************************")

print(f'總樣本 0的樣本數: {count_zero}')
print(f"--預測正確: {correct_count_0} ({round(correct_count_0/count_zero*100,2)}%)")
print(f"--預測失敗: {count_zero-correct_count_0} ({round((count_zero-correct_count_0)/count_zero*100,2)}%)")

In [None]:
""" [2]移除的樣本 的 預測狀況 """
count_zero = np.count_nonzero(Y_remove == 0)
count_one = np.count_nonzero(Y_remove == 1)
correct_count_0 = 0
correct_count_1 = 0

pred_y = model.predict(X_scalar_remove)['DNR']

for i in range(pred_y.shape[0]):
    if Y_remove[i] == 1 and pred_y[i] == 1:
        correct_count_1 += 1
    if Y_remove[i] == 0 and pred_y[i] == 0:
        correct_count_0 += 1

plot_confusion_matrix(Y_remove,pred_y,['Class 0','Class 1'],'remove_sample')

        
print(f'移除的樣本 1的樣本數: {count_one}')
print(f"--預測正確: {correct_count_1} ({round(correct_count_1/count_one*100,2)}%)")
print(f"--預測失敗: {count_one-correct_count_1} ({round((count_one-correct_count_1)/count_one*100,2)}%)")

print("************************************************")

print(f'移除的樣本 0的樣本數: {count_zero}')
print(f"--預測正確: {correct_count_0} ({round(correct_count_0/count_zero*100,2)}%)")
print(f"--預測失敗: {count_zero-correct_count_0} ({round((count_zero-correct_count_0)/count_zero*100,2)}%)")

In [None]:
""" [3]剩餘的樣本 的 預測狀況 """
count_zero = np.count_nonzero(Y_keep == 0)
count_one = np.count_nonzero(Y_keep == 1)
correct_count_0 = 0
correct_count_1 = 0

pred_y = model.predict(X_scalar_keep)['DNR']

for i in range(pred_y.shape[0]):
    if Y_keep[i] == 1 and pred_y[i] == 1:
        correct_count_1 += 1
    if Y_keep[i] == 0 and pred_y[i] == 0:
        correct_count_0 += 1

plot_confusion_matrix(Y_keep,pred_y,['Class 0','Class 1'],'keep_sample')
        
print(f'保留的樣本 1的樣本數: {count_one}')
print(f"--預測正確: {correct_count_1} ({round(correct_count_1/count_one*100,2)}%)")
print(f"--預測失敗: {count_one-correct_count_1} ({round((count_one-correct_count_1)/count_one*100,2)}%)")

print("************************************************")

print(f'保留的樣本 0的樣本數: {count_zero}')
print(f"--預測正確: {correct_count_0} ({round(correct_count_0/count_zero*100,2)}%)")
print(f"--預測失敗: {count_zero-correct_count_0} ({round((count_zero-correct_count_0)/count_zero*100,2)}%)")