In [None]:
!pip install captum

In [None]:
# pytorch組件
import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
from torch.optim import Adam, SGD
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from torchvision.io import read_image

# 基礎套件
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import numpy as np
import random
from collections import Counter

# sklearn套包
from sklearn.metrics import recall_score, f1_score, classification_report, accuracy_score
from sklearn.utils import shuffle
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import confusion_matrix
from PIL import Image

# 訓練讀調
from tqdm import tqdm, trange

# 要train很多次的東西
import optuna

# 可視化AI
from captum.attr import IntegratedGradients

## **設定GPU模式與決定隨機**

In [None]:
device = torch.device('cuda:0')
device

In [None]:
# 決定Seed
# seed = 695
# torch.manual_seed(seed)
# torch.cuda.manual_seed(seed)
# torch.cuda.manual_seed_all(seed)
# np.random.seed(seed)
# random.seed(seed)
# torch.backends.cudnn.benchmark = False
# torch.backends.cudnn.deterministic = True

## **資料前處理**
1. 從各類別讀取所有資料並給與label
2. 合併各類資料成一個Dataframe
3. 把不要的去除掉

In [None]:
# 健康設為1
health_list = [] #各筆資料連結
health_label = [] #各筆資料label
for dirname, file, filename in os.walk(r'/kaggle/input/syn-time-series/region_health'):
    
    if len(filename) > 0:
        
        for files in filename:                
            
            health_list.append(os.path.join(dirname, files)) 
            health_label.append(1)
         
print(len(health_list))
health_list[:10], health_label[:10]

In [None]:
# unhealth
unhealth_list = []
unhealth_label = []

for dirname, file, filename in os.walk(r'/kaggle/input/syn-time-series/region_unhealth'):
    
    if len(filename) > 0:
        
        for files in filename:                
            
            unhealth_list.append(os.path.join(dirname, files))
            unhealth_label.append(0)        

print(len(unhealth_list))
unhealth_list[20:30], unhealth_label[20:30]

In [None]:
# 加再一起，還沒打亂
all_list = health_list + unhealth_list 
all_label = health_label + unhealth_label

# 建成dataframe
df_kaggle = pd.DataFrame()
df_kaggle['picture'] = all_list
df_kaggle['label'] = all_label
df_kaggle.head()

In [None]:
# 刪除不要的data
df_second = df_kaggle[df_kaggle['picture'] != '/kaggle/input/syn-time-series/region_health/INT-033_HBA_Probe1_Oxy_region.npy']
df_second.index = range(len(df_second))
df_second

## **製作訓練資料**
1. 使用StratifiedKFold分訓練以及測試資料
2. 建立dataset
3. 放入dataloader

In [None]:
# dataload 用dataset去inheret
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, transform=None):

        self.dataframe = annotations_file
        self.transform = transform
        

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # 重點，自寫讀檔案方法
        img_path = self.dataframe.iloc[idx, 0]        
        np_araay = np.transpose(np.load(img_path.replace('\\', '/')))

        
        label = self.dataframe.iloc[idx, 1]

        if self.transform:
            np_araay = self.transform(np_araay)
        
        
        return np_araay, label, img_path 

In [None]:
# https://shenxiaohai.me/2018/10/19/pytorch_tutorial_intermediate_02/
class Network(nn.Module):
    def __init__(self, pool = 6, hid_lay=1, fc1= 256, outlay = 64):
        super(Network, self).__init__()
        
        self.cnn1 = nn.Conv1d(2, 64, 3, padding=1)
        self.cnn2 = nn.Conv1d(16, 32, 3, padding=1)
        self.maxpool1 = nn.MaxPool1d(32, 2)
        self.outlay = outlay
        self.LSTM1 = nn.LSTM(1251, self.outlay, hid_lay)#, bidirectional=True)
        
        self.pools = pool
        self.ave_pool = nn.AdaptiveAvgPool1d(self.pools)
        
        # FC
        
        self.fc1num = fc1
        self.fc1 = nn.Linear(2*self.outlay, self.fc1num)
        self.fc2 = nn.Linear(self.fc1num, 64)
        self.fc3 = nn.Linear(64, 1)

        self.soft = nn.Sigmoid()

        self.drop = nn.Dropout(0.3)


    def forward(self, input1):
        output = self.LSTM1(input1)[0]

        output = output.view(-1, 2*self.outlay)
                
        con = self.fc1(output)
        con = F.relu(self.drop(con))
        con = self.fc2(con)   
        con = self.fc3(con)
        con = self.soft(con)

        return con

In [None]:
# 分資料，使用StratifiedKFold
skf = StratifiedKFold(n_splits=5)
fold1 = list(skf.split(df_second, df_second['label']))

## **使用Optuna來找出最佳參數**
1. 建立 object_fun
2. 將預測試之變數用trial替換，如要測試categorical資料就用suggest_categorical，要測試

In [None]:
def object_fun(trial):
    higest_train = []
    higest_test = []
    for fold in fold1:   
        # 最高分
        higest_train_acc = 0
        higest_test_acc = 0.  
        
        # 存結果
        train_loss_list = []
        train_acc_list = []
        test_loss_list = []
        test_acc_list = []
        
        
        # 提出train_datasets與testing datasets
        X_df = df_second.loc[fold[0]]
        y_df = df_second.loc[fold[1]]

        # 建造transform, 
        trans_comp = transforms.Compose([transforms.ToTensor()])

        # 建立dataset變套用transform
        train_dataset = CustomImageDataset(X_df, transform= trans_comp)
        test_dataset = CustomImageDataset(y_df, transform= trans_comp)
        print(train_dataset.__len__(), test_dataset.__len__())

        # 把dataset放入dataloader
        train_dl = DataLoader(train_dataset, batch_size= 16, shuffle= False, drop_last=True)
        test_dl = DataLoader(test_dataset, batch_size= 16, shuffle= False)
        train_dl_eval = DataLoader(train_dataset, batch_size= 16, shuffle= False, drop_last=False)
        
        #  neural network model
        outlay = trial.suggest_categorical('outlay', [64, 96, 128])
        fc1 = trial.suggest_categorical('fc1', [256, 128, 64])
        
        model = Network(fc1= fc1, outlay = outlay).to(device)

        criterion = nn.BCELoss()

        # leaerning rate
        lr = trial.suggest_float('lr_rate', 0.001, 0.002)
        # optimizer
        optimizer = Adam(model.parameters(), lr=lr, weight_decay= 0.01)



        # 幾個epoch
        epoch = 300
        
        
        for epoch in tqdm(range(epoch)):
            # 訓練訓練訓練訓練訓練訓練訓練訓練訓練訓練訓練  
            losses = 0.  #記得加 . 代表float
            accuracies = 0.
            total = 0

            for batch, (X, y, zz) in enumerate(train_dl):
                # 每一個batch都要train
                model.train()

                inputs, labels = X.squeeze().float().to(device), y.float().reshape([-1, 1]).to(device)
                optimizer.zero_grad()
                pred_out = model(inputs)

                loss = criterion(pred_out, labels)
                losses = losses + loss.item()

                # Backpropagation        
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()

                digital = np.where(pred_out.to('cpu') < 0.5, 0, 1)
                labels = np.array(labels.to('cpu'), dtype= int)

                accuracies += np.sum(digital == labels)
                total += len(labels)
        
            # 計算loss
            loss_ave = losses/(batch+1)
            acc_ave = accuracies/total

            # 測試for training data
            model.eval()
            
            digital_list, label_list = [], []            
            losses1 = 0.
            for batch, (X, y, z) in enumerate(train_dl_eval):
            
                inputs, labels = X.squeeze().float().to(device), y.float().reshape([-1, 1]).to(device)
                pred_out = model(inputs)

                loss1 = criterion(pred_out, labels)

                # Backpropagation
                losses1 = losses1 + loss1.item()

                digital1 = np.where(pred_out.to('cpu') < 0.5, 0, 1)
                labels1 = np.array(labels.to('cpu'), dtype= int)

                digital_list += list(digital1.ravel())
                label_list += list(labels1.ravel())



            loss_ave = losses1/(batch+1)
            acc_ave = accuracy_score(digital_list, label_list)

            # 測試測試測試測試for testing data
            num_batches = len(test_dl)
            test_loss, test_accuracies, test_total = 0., 0., 0
            model.eval()

            with torch.no_grad():
                for batch, (X, y, z) in enumerate(test_dl):
                    X = X.squeeze().float().to(device)
                    y = y.float().reshape([-1, 1]).to(device)
                    pred = model(X)
                    test_loss += criterion(pred, y).item()
                    digital = np.where(pred.to('cpu') < 0.5, 0, 1)
                    labels = np.array(y.to('cpu'), dtype= int)

#                     test_accuracies += np.sum(digital == labels)
#                     test_total += len(labels)


            test_loss = test_loss/(batch+1)
            test_accuracies = accuracy_score(digital, labels)


            # append到list上面
            train_loss_list.append(loss_ave)
            train_acc_list.append(acc_ave)
            test_loss_list.append(test_loss)
            test_acc_list.append(test_accuracies)
            
            
            # 存Test best model
            if test_accuracies > higest_test_acc and acc_ave > test_accuracies  and acc_ave >= 0.8 and acc_ave <= 0.94:
                higest_test_acc = test_accuracies
                higest_train_acc = acc_ave
                print(higest_test_acc, higest_train_acc)
    

        higest_train.append(higest_train_acc)
        higest_test.append(higest_test_acc)
        print(higest_train_acc, " ", higest_test_acc)



    print(np.mean(higest_train))
    print(np.mean(higest_test))
    print("Data Length", len(higest_train))

    return np.mean(higest_test)

In [None]:
# define sample
sampler = optuna.samplers.TPESampler(seed=10)

In [None]:
try:
    study = optuna.create_study(storage="sqlite:///cnn_npy_52_channel.db", study_name="mystudy1", direction='maximize', sampler=sampler)
except:
    study = optuna.load_study(study_name="mystudy0", storage="sqlite:///cnn_npy_52_channel.db")
    
study.optimize(object_fun, n_trials=1000)

## **選出最好的參數並跑一次**

In [None]:
good_param = {'fc1': 128, 'lr_rate': 0.0010904593492709073, 'outlay': 128}

In [None]:
df_one = pd.DataFrame()

higest_train = []
higest_test = []
for kk, fold in enumerate(fold1):   
    higest_train_acc = 0
    higest_test_acc = 0


    X_df = df_second.loc[fold[0]]
    y_df = df_second.loc[fold[1]]
    X_df, y_df

    # 建造transform, 
    trans_comp = transforms.Compose([transforms.ToTensor()])  # transforms.Grayscale(), transforms.PILToTensor(), 

    # 建立dataset變套用transform
    train_dataset = CustomImageDataset(X_df, transform= trans_comp)
    test_dataset = CustomImageDataset(y_df, transform= trans_comp)

    print(train_dataset.__len__(), test_dataset.__len__())

    # 把dataset放入dataloader
    train_dl = DataLoader(train_dataset, batch_size= 16, shuffle= False, drop_last=True)
    test_dl = DataLoader(test_dataset, batch_size= 16, shuffle= False)
    
    train_dl_eval = DataLoader(train_dataset, batch_size= 16, shuffle= False, drop_last=False)

    #  neural network model 放到GPU
     
    outlay = good_param['outlay']
    fc1 = good_param['fc1']
    
    lr = good_param['lr_rate']

    model = Network(fc1= fc1, outlay = outlay).to(device)
    
    criterion = nn.BCELoss()

    # SGD
    optimizer = Adam(model.parameters(), lr=lr, weight_decay= 0.01)  # lr=0.0098632002427510 過小

    # 模型存下來要叫啥
    name = 'Test_minmax_task_m_rest_as_bpd_npy'

    # 存結果
    train_loss_list = []
    train_acc_list = []
    test_loss_list = []
    test_acc_list = []
    higest_test_acc = 0.

    # 存取learning rate
    cur_lr_list = []

    # 幾個epoch
    # 幾個epoch
    epoch = 300


    for epoch in tqdm(range(epoch)):
        # 訓練訓練訓練訓練訓練訓練訓練訓練訓練訓練訓練  
        losses = 0.  #記得加 . 代表float
        accuracies = 0.
        total = 0

        for batch, (X, y, zz) in enumerate(train_dl):
            # 每一個batch都要train
            model.train()

            inputs, labels = X.squeeze().float().to(device), y.float().reshape([-1, 1]).to(device)
            optimizer.zero_grad()
            pred_out = model(inputs)

            loss = criterion(pred_out, labels)
            losses = losses + loss.item()

            # Backpropagation        
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            digital = np.where(pred_out.to('cpu') < 0.5, 0, 1)
            labels = np.array(labels.to('cpu'), dtype= int)

            accuracies += np.sum(digital == labels)
            total += len(labels)

        # 計算loss
        loss_ave = losses/(batch+1)
        acc_ave = accuracies/total

        # 測試for training data
        model.eval()

        digital_list, label_list = [], []            
        losses1 = 0.
        for batch, (X, y, z) in enumerate(train_dl_eval):

            inputs, labels = X.squeeze().float().to(device), y.float().reshape([-1, 1]).to(device)
            pred_out = model(inputs)

            loss1 = criterion(pred_out, labels)

            # Backpropagation
            losses1 = losses1 + loss1.item()

            digital1 = np.where(pred_out.to('cpu') < 0.5, 0, 1)
            labels1 = np.array(labels.to('cpu'), dtype= int)

            digital_list += list(digital1.ravel())
            label_list += list(labels1.ravel())



        loss_ave = losses1/(batch+1)
        acc_ave = accuracy_score(digital_list, label_list)

        # 測試測試測試測試for testing data
        num_batches = len(test_dl)
        test_loss, test_accuracies, test_total = 0., 0., 0
        model.eval()

        with torch.no_grad():
            for batch, (X, y, z) in enumerate(test_dl):
                X = X.squeeze().float().to(device)
                y = y.float().reshape([-1, 1]).to(device)
                pred = model(X)
                test_loss += criterion(pred, y).item()
                digital = np.where(pred.to('cpu') < 0.5, 0, 1)
                labels = np.array(y.to('cpu'), dtype= int)


        test_loss = test_loss/(batch+1)
        test_accuracies = accuracy_score(digital, labels)


        # append到list上面
        train_loss_list.append(loss_ave)
        train_acc_list.append(acc_ave)
        test_loss_list.append(test_loss)
        test_acc_list.append(test_accuracies)


        # test acc大且 train acc大於test acc，則存模型，原本如下

        if test_accuracies > higest_test_acc and acc_ave > test_accuracies and acc_ave >= 0.76 and acc_ave <= 0.96: #and f1_score(labels, digital) > 0.8 
            higest_test_acc = test_accuracies
            higest_train_acc = acc_ave
            torch.save(model.state_dict(), "{}_{}.pth".format(kk, name+"_epoch_" +str(epoch)))
            print('F1_score= ', f1_score(labels, digital))
            print('File: ', "{}_{}.pth".format(kk, name+"_epoch_" +str(epoch)))     
                
    print('\n\n')
    print("Good train:", higest_train_acc)
    print("Good test:", higest_test_acc)
            

    higest_train.append(higest_train_acc)
    higest_test.append(higest_test_acc)

    
    df_one[f'train_loss_{kk}'] = train_loss_list
    df_one[f'train_acc_{kk}'] = train_acc_list
    df_one[f'test_acc_{kk}'] = test_loss_list
    df_one[f'test_acc_{kk}'] = test_acc_list



print('Mean of train: ', np.mean(higest_train),", Mean of test", np.mean(higest_test))
print("Data Length", len(higest_train))

## **畫出訓練結果**

In [None]:
df_one.loc[0] = 0
df_one = df_one.sort_index()

In [None]:
for i in range(5):
    plt.figure(figsize= (16,8))
    plt.plot(df_one.index, df_one[f'train_acc_{i}'], label= 'train_acc', linewidth=3)
    plt.plot(df_one.index, df_one[f'test_acc_{i}'], label= 'test_acc', linewidth=3)
    plt.legend(fontsize= 15 )
    plt.xlabel('Epochs', fontsize= 15)
    plt.ylabel('Accuracy', fontsize= 15)
    plt.title('Control vs Schizophrenia', fontsize= 15)
    plt.show()

## **讀取最好的模型並重現預測**
1. 讀取模型
2. 重現testing data 並畫confusion matrix
3. 重現training data 並畫confusion matrix

In [None]:
higest_train_acc = 0
higest_test_acc = 0


X_df = df_second.loc[fold1[3][0]]
y_df = df_second.loc[fold1[3][1]]
X_df, y_df

# 建造transform, 
trans_comp = transforms.Compose([transforms.ToTensor()])  # transforms.Grayscale(), transforms.PILToTensor(), 

# 建立dataset變套用transform
train_dataset = CustomImageDataset(X_df, transform= trans_comp)
test_dataset = CustomImageDataset(y_df, transform= trans_comp)

print(train_dataset.__len__(), test_dataset.__len__())

# 把dataset放入dataloader
train_dl = DataLoader(train_dataset, batch_size= 16, shuffle= False, drop_last=False)
test_dl = DataLoader(test_dataset, batch_size= 16, shuffle= False)

In [None]:
# Load model
model.load_state_dict(torch.load('/kaggle/working/{}_{}.pth'.format(3, name+"_epoch_" +str(129))))

In [None]:
# test
size = len(test_dl.dataset)
num_batches = len(test_dl)
test_loss, test_accuracies, total_len = 0, 0, 0
digital_list = []
label_list = []
model.eval()
with torch.no_grad():
    for batch, (X, y, z) in enumerate(test_dl):
        X = X.squeeze().float().to(device)
        y = y.float().reshape([-1, 1]).to(device)
        pred = model(X)
        test_loss += criterion(pred, y).item()
        digital = np.where(pred.to('cpu') < 0.5, 0, 1)
        labels = np.array(y.to('cpu'), dtype= int)
        digital_list += list(digital)
        label_list += list(labels)

test_loss = test_loss/(batch+1)
test_accuracies = accuracy_score(digital, labels)
print(f"Test Error: \n Accuracy: {(100*test_accuracies):5f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
cm = confusion_matrix(np.array(label_list).squeeze(), np.array(digital_list).squeeze())
plt.figure(figsize = (8, 6))
sns.heatmap(cm, annot = True, fmt='.20g', cmap= sns.color_palette("Blues", as_cmap=True), xticklabels= ['Schizophrenia', 'Control'], yticklabels= ['Schizophrenia', 'Control'], annot_kws={"size": 16})
plt.ylabel('Actual', fontsize=15)
plt.xlabel('Predicted', fontsize=15)
plt.title('Test Confusion Matrix')

In [None]:
# train 
size = len(train_dl.dataset)    
print(size)
losses = 0.  
accuracies = 0.
total_len_t = 0
digital_list = []
label_list = []

model.eval()
with torch.no_grad():
    for batch, (X, y, z) in enumerate(train_dl):
        # Compute prediction and loss
        print(batch, "~~~~~", end= '\r')

        inputs, labels = X.squeeze().float().to(device), y.float().reshape([-1, 1]).to(device)

        pred_out = model(inputs)
        
        loss = criterion(pred_out, labels)

        # Backpropagation
        losses = losses + loss.item()

        digital = np.where(pred_out.to('cpu') < 0.5, 0, 1)
        labels = np.array(labels.to('cpu'), dtype= int)

        digital_list += list(digital.ravel())
        label_list += list(labels.ravel())
        

        accuracies += np.sum(digital == labels)
        total_len_t += len(labels)

loss_ave = losses/(batch+1)
acc_ave = accuracies/total_len_t

print(f"loss: {loss_ave} accuracy: {acc_ave}")

In [None]:
cm = confusion_matrix(label_list, digital_list)
plt.figure(figsize = (8, 6))
sns.heatmap(cm, annot = True, fmt='.20g', cmap= sns.color_palette("Blues", as_cmap=True), xticklabels= ['Schizophrenia', 'Control'], yticklabels= ['Schizophrenia', 'Control'], annot_kws={"size": 16})
plt.ylabel('Actual', fontsize=15)
plt.xlabel('Predicted', fontsize=15)
plt.title('Train Confusion Matrix')

# **可視化AI - Intergated Gradients **
> https://captum.ai/docs/extension/integrated_gradients

1. 使用Captum的IntegratedGradients，將模型與訊號丟入之後得出attributions，及模型所關注的區域
2. 使用rolling apply數值平滑，以方便畫圖
3. 在背景畫出模型主要所看的位置

In [None]:
def minmax(x):
    return (x - x.min())/(x.max() - x.min())

In [None]:
model.train()
ig = IntegratedGradients(model)
for batch, (X, y, zz) in enumerate(train_dl):
    
    df_cuba = pd.DataFrame()
    input1 = X.squeeze().float().to(device)
    # 把處理過的.npy改成原始.npy
    z = [x.replace('region_h', 'nominmax_h').replace('region_un', 'nominmax_un').replace('syn-time-series', 'nominmax').replace('.npy', '_nominmax.npy') for x in zz]
    
    # 製作IntegratedGradients
    attributions, delta = ig.attribute(input1, target=0, return_convergence_delta=True)
    for nums, att in enumerate(attributions):
        df_cuba[f'{nums}_Region_0'] = att.cpu()[0]
        df_cuba[f'{nums}_Region_1'] = att.cpu()[1]

    df_cleann = minmax(df_cuba.rolling(50).mean().bfill())

    
    for ind in range(len(attributions)):
        # First plot
        y = np.linspace(-0.6, 1, 100)
        cmapp1 = [(1, 0, 0, a*0.5) for a in df_cleann[f'{ind}_Region_0']]    
        # 繪製水平色彩漸進圖
        fig, ax = plt.subplots(figsize=(40, 15))

        for i in range(len(colors)):
            ax.plot(np.ones_like(y) * i/10, y, color=cmapp2[i], linewidth=4)

        datasss = np.transpose(np.load(z[ind]),(1,0))
        plt.plot(np.arange(0, len(datasss[0])/10, 0.1), datasss[0], label= 'region1', c='black', linewidth=10)
        plt.xticks(fontsize=20)
        plt.yticks(fontsize=20)
        plt.xlabel('Second(s)', fontsize=30)

        plt.xlim(-0.1, 125.1)
        plt.ylim(-0.6, 1)
        plt.title(f'{z[ind]}_Region 1', fontsize=30)

        # Second plot
        y = np.linspace(-0.6, 1, 100)

        cmapp2 = [(1, 0, 0, a*0.5) for a in df_cleann[f'{ind}_Region_1']]

        # 繪製水平色彩漸進圖
        fig, ax = plt.subplots(figsize=(40, 15))

        for i in range(len(colors)):
            ax.plot(np.ones_like(y) * i/10, y, color=cmapp2[i], linewidth=4)

        plt.plot(np.arange(0, len(datasss[1])/10, 0.1), datasss[1], label= 'region1', c='black', linewidth=10)
        plt.xlim(-0.1, 125.1)
        plt.ylim(-0.6, 1)
        plt.title(f'{z[ind]}_Region 2', fontsize=30)
        plt.xticks(fontsize=20)
        plt.yticks(fontsize=20)
        plt.xlabel('Second(s)', fontsize=30)
        plt.show()
        print("\n\n")