#### data load from google drive

In [None]:
import os
import zipfile

# 第一個檔案是訓練資料集 (6 種分類)。
!gdown --id '1QGB_C4DvhotmvFIH3BM3PBHTYG_clGQl' --output /content/train_6class.zip
# 第二個檔案是測試資料集 (10 種分類)。
!gdown --id '19ar3WmD3NDAmldXGyH06a6gf58tbQ5eA' --output /content/test_10class.zip

with zipfile.ZipFile('/content/train_6class.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/train_6class')

with zipfile.ZipFile('/content/test_10class.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/test_10class')

#### load library

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from torchvision import transforms
from torchvision.datasets import ImageFolder
import numpy as np
import matplotlib.pyplot as plt
import torchvision.models as models
from sklearn import metrics

#### preprocess

In [None]:
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_data = ImageFolder('/content/train_6class', transform = preprocess)

#設定Validation data比例
valid_size = 0.2

# 將索引列表隨機打亂
num_train = len(train_data)
print(num_train)
indices = list(range(num_train))
print(indices)
np.random.shuffle(indices)
print(indices)

# 根據驗證集比例計算分割點 (索引切分位置)
split = int(np.floor(valid_size * num_train))
print(split)

# 將打亂的索引分割為訓練集與驗證集，創建訓練與驗證數據的sampler
train_idx, valid_idx = indices[split:], indices[:split]
train_sampler = torch.utils.data.sampler.SubsetRandomSampler(train_idx)
valid_sampler = torch.utils.data.sampler.SubsetRandomSampler(valid_idx)
print(train_sampler)
print(valid_sampler)

#### loss graph

In [None]:
# 定義繪製損失曲線的函數
def plotfigure(train_loss,valid_loss):
    fig = plt.figure(figsize=(10,8))
    plt.plot(range(1,len(train_loss)+1),train_loss, label='Training Loss')
    plt.plot(range(1,len(valid_loss)+1),valid_loss,label='Validation Loss')

    # find position of lowest validation loss
    # minposs = valid_loss.index(min(valid_loss))+1
    # plt.axvline(minposs, linestyle='--', color='r',label='Early Stopping Checkpoint')

    plt.xlabel('epochs')
    plt.ylabel('loss')
    # plt.ylim(0, 2) # consistent scale
    plt.xlim(0, len(train_loss)+1) # consistent scale
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

#### Train

In [None]:
!pip install efficientnet_pytorch
from efficientnet_pytorch import EfficientNet

In [None]:
# model = EfficientNet.from_pretrained('efficientnet-b7', num_classes=num_class)
model_nopre_6class = EfficientNet.from_name('efficientnet-b1', num_classes=6)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
criterion_nopre_6class = nn.CrossEntropyLoss()
optimizer_nopre_6class = optim.Adam(model_nopre_6class.parameters(), lr=0.005)
epoch = 50

In [None]:
#直接load之前已經訓練過的6類model
model_nopre_6class = EfficientNet.from_name('efficientnet-b1', num_classes=6)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_nopre_6class.load_state_dict(torch.load('/content/save.pt'))
model_nopre_6class.to(device)
model_nopre_6class.eval()

In [None]:
train_losses = [] # 用於存儲每次訓練批次的損失
valid_losses = [] # 用於存儲每次驗證批次的損失
avg_train_losses = [] # 用於存儲每個 epoch 的平均訓練損失
avg_valid_losses = [] # 用於存儲每個 epoch 的平均驗證損失

# 初始化每個類別的置信度閾值為 1
threshold = {0 : 1, 1 : 1, 2 : 1, 3 : 1, 4 : 1, 5 : 1}

model_nopre_6class.to(device)
for epoch in range(epoch):
  model_nopre_6class.train()
  for i, (images, labels) in enumerate(train_loader):
    images, labels = images.to(device), labels.to(device)
    optimizer_nopre_6class.zero_grad()
    outputs = model_nopre_6class(images)
    loss = criterion_nopre_6class(outputs, labels)
    loss.backward()
    optimizer_nopre_6class.step()
    train_losses.append(loss.item())

  model_nopre_6class.eval()
  validation_loss = 0.0
  correct_predictions = 0
  total_samples = 0
  with torch.no_grad():
    for i,(images, labels) in enumerate(valid_loader):
      images, labels = images.to(device), labels.to(device)
      outputs = model_nopre_6class(images)
      val_loss = criterion_nopre_6class(outputs, labels)
      valid_losses.append(val_loss.item())
      
      # 計算每個類別的置信度閾值
      probabilities = torch.softmax(outputs, dim=1)
      predicted = torch.argmax(probabilities, dim=1)
      for i in range(len(predicted)):
        if predicted[i] == labels[i]:
          correct_predictions += 1
          # 更新該類別的置信度閾值
          if probabilities[i][predicted.cpu().numpy()[i]].cpu().numpy() < threshold[predicted.cpu().numpy()[i]]:
            threshold[predicted.cpu().numpy()[i]] = probabilities[i][predicted.cpu().numpy()[i]].cpu().numpy()
      #val acc
      _, predicted = torch.max(outputs,1)
      correct_predictions += (predicted == labels).sum().item()
      total_samples += labels.size(0)
      
  train_loss = np.average(train_losses)
  valid_loss = np.average(valid_losses)
  avg_train_losses.append(train_loss)
  avg_valid_losses.append(valid_loss)
  print("Epoch {} - Train loss: {:.4f}, Validation loss:{:.4f}, Validation accuracy:{:.4f}".format(epoch+1, train_loss, valid_loss,(correct_predictions/total_samples)))
  train_loss=[]
  valid_loss=[]
for i in range(len(threshold)):
  print(f'Class {i} threshold: {threshold[i]}')
plotfigure(avg_train_losses,avg_valid_losses)

In [None]:
torch.save(model_nopre_6class.state_dict(), '/content/save.pt')

##### 用和訓練資料集同樣的六類測試data測試模型訓練分類效果

In [None]:
!gdown --id '1akuOLZjxZB813WFVBd6y-BQdqh9lz7Me' --output /content/test_6class.zip

with zipfile.ZipFile('/content/test_6class.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/test_6class')

In [None]:
test_loader_6c = DataLoader(ImageFolder('/content/test_6class',transform = preprocess), batch_size=32, shuffle = False)

In [None]:
model_nopre_6class.eval()
correct = 0
total = 0
predictions = []
true_labels = []
with torch.no_grad():
    for i, (images, labels) in enumerate(test_loader_6c):
        images, labels = images.to(device), labels.to(device)
        outputs = model_nopre_6class(images)
        probabilities = torch.softmax(outputs, dim=1)
        max_prob, predicted = torch.max(probabilities,1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        predictions.extend(predicted.cpu().numpy())
        # print(predictions)
        true_labels.extend(labels.cpu().numpy())
        # print(true_labels)
acc = correct / total
print("Test Accuracy: {:.4f}".format(acc))
#confusion_matrix
print(metrics.confusion_matrix(true_labels, predictions))
print(metrics.classification_report(true_labels, predictions))

#### 用10類的test data作test並分出other類

In [None]:
model_nopre_6class.eval()
correct = 0
total = 0
predictions = []
true_labels = []
other_samples_10class = [] # 儲存低置信度的樣本特徵
other_labels_10class = [] # 儲存低置信度樣本的真實標籤
avg_pooling = nn.AdaptiveAvgPool2d(1) #全局平均池化以獲取壓縮的特徵表示
with torch.no_grad():
    for i, (images, labels) in enumerate(test_loader):
        images, labels = images.to(device), labels.to(device)
        outputs = model_nopre_6class(images)
        probabilities = torch.softmax(outputs, dim=1)
        max_prob, predicted = torch.max(probabilities,1)
        total += labels.size(0)
        # correct += (predicted == labels).sum().item()

        # 將低於置信度閾值的樣本標記為類別 6（未知類別）
        for j in range(len(predicted)):
          # if predicted[j] != labels[j]:
          if max_prob[j].cpu().numpy() < threshold_2[predicted.cpu().numpy()[j]]: 
            predicted[j] = 6

        # 提取低置信度樣本的特徵和標籤
        for j in range(len(predicted)):
          if predicted[j] == 6:
            image_batch = images[j].unsqueeze(0)
            image_batch = image_batch.to(device)
            features = model_nopre_6class.extract_features(image_batch)
            fea = avg_pooling(features)
            other_samples_10class.append(fea.cpu().numpy())  # 樣本加到列表
            other_labels_10class.append(labels[j].cpu().numpy())  # label加到列表
            # if labels[j] == 7 or labels[j] == 8 or labels[j] == 9:
            #   correct += 1
            
        # 將真實標籤 7、8、9 轉換為類別 6（因為6為others類別，將原本就不再訓練資料集的7,8,9類設成others類）
        for j in range(len(labels)):
          if(labels[j] == 7 or labels[j] == 8 or labels[j] == 9):
            labels[j] = 6
        correct += (predicted == labels).sum().item()
        predictions.extend(predicted.cpu().numpy())
        # print(predictions)
        true_labels.extend(labels.cpu().numpy())
        # print(true_labels)

acc = correct / total
print("Test Accuracy: {:.4f}".format(acc))
#confusion_matrix
print(metrics.confusion_matrix(true_labels, predictions))
print(metrics.classification_report(true_labels, predictions))

將被分到others類別的樣本進行重塑

In [None]:
print(len(other_samples_10class))
print(len(other_labels_10class))
other_samples_10class = np.array(other_samples_10class)
# 去掉平均池化後的多餘維度（squeeze 第 1 維）
other_samples_10class = other_samples_10class.squeeze(1)
other_labels_10class = np.array(other_labels_10class)

print(other_samples_10class.shape)
print(other_labels_10class.shape)
n_samples, channels, width, height = other_samples_10class.shape
print(n_samples, channels, width, height)

# 將樣本展平處理，每個樣本展平成一維向量
other_samples_10class_flat = other_samples_10class.reshape(n_samples, height * width * channels)
print(other_samples_10class_flat)

In [None]:
#因為第9類是normal類別，將不是normal類別其他標籤都轉為1，是normal的標籤轉為0
for i in range(len(other_labels_10class)):
  if other_labels_10class[i] != 9:
    other_labels_10class[i] = 1
  else:
    other_labels_10class[i] = 0
print(other_labels_10class)

#### Mahalanobis distance

用normal data的訓練資料取得馬氏距離所需的平均值、斜方差逆矩陣

In [None]:
import os
import zipfile
!gdown --id "1u3jlb_D5VrlIeK0wBC2D3qTMWLiJPK8B" --output /content/normalonly.zip

with zipfile.ZipFile('/content/normalonly.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/normalonly')

取特徵

In [None]:
preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
normalonly_loader = DataLoader(ImageFolder('/content/normalonly',transform = preprocess), batch_size=32, shuffle = False)
model_nopre_6class.eval()
normalonlyarr=[]
model_nopre_6class.to(device)
with torch.no_grad():
    for i,(images, labels) in enumerate(normalonly_loader):
      images, labels = images.to(device), labels.to(device)
      for j in range(len(images)):
        image_batch = images[j].unsqueeze(0)
        features = model_nopre_6class.extract_features(image_batch)
        fea = avg_pooling(features)
        normalonlyarr.append(fea.cpu().numpy())  # 樣本加到列表

重塑

In [None]:
print(len(normalonlyarr))
normalonlyarr = np.array(normalonlyarr)
normalonlyarr = normalonlyarr.squeeze(1)
print(normalonlyarr.shape)
n_samples, channels, width, height = normalonlyarr.shape
print(n_samples, channels, width, height)
normalonlyarr_flat = normalonlyarr.reshape(n_samples, height * width * channels)
print(normalonlyarr_flat)

用normal類的訓練資料來獲得馬氏距離所需的平均值、斜方差逆矩陣

In [None]:
#用normal類的訓練資料來獲得馬氏距離所需的平均值、斜方差逆矩陣
mean = np.mean(normalonlyarr_flat, axis=0)
cov = np.cov(normalonlyarr_flat.T)
cov_inv = np.linalg.inv(cov)

#用normal類的訓練資料的第一個樣本作馬氏距離測試
diff = normalonlyarr_flat[0] - mean
mahalanobis_distance = np.dot(diff.T, cov_inv)
mahalanobis_distance = np.dot(mahalanobis_distance, diff)
mahalanobis_distance = np.sqrt(mahalanobis_distance)
print(mahalanobis_distance)

In [None]:
#用全部normal類的訓練資料作馬氏距離測試並存到all_val內
all_val=[]
for i in range(len(normalonlyarr_flat)):
  diff = normalonlyarr_flat[i] - mean
  mahalanobis_distance = np.dot(diff.T, cov_inv)
  mahalanobis_distance = np.dot(mahalanobis_distance, diff)
  mahalanobis_distance = np.sqrt(mahalanobis_distance)
  all_val.append(mahalanobis_distance)

用全部normal類的訓練資料作馬氏距離的平均作threshold作OOD

In [None]:
#計算馬氏距離非 NaN 平均值
mean_value = np.nanmean(all_val)
print(mean_value)
for i in range(len(all_val)):
  if np.isnan(all_val[i]):
    all_val[i] = mean_value
print(all_val)

In [None]:
#把分到others類別的樣本作馬氏距離OOD作分類
other_val=[] # 保存每個樣本的 Mahalanobis 距離
other_val_label=[] # 保存每個樣本的預測標籤
# 計算其他樣本的 Mahalanobis 距離
for i in range(len(other_samples_10class_flat)):
  diff = other_samples_10class_flat[i] - mean
  mahalanobis_distance = np.dot(diff.T, cov_inv)
  mahalanobis_distance = np.dot(mahalanobis_distance, diff)
  mahalanobis_distance = np.sqrt(mahalanobis_distance)
  other_val.append(mahalanobis_distance)
print(other_val)
# 處理 NaN 值，替換 NaN 距離為平均值
mean_value = np.nanmean(other_val)
print(mean_value)
for i in range(len(other_val)):
  if np.isnan(other_val[i]):
    other_val[i] = mean_value
print(other_val)

# 判定樣本是否為OOD，如果距離大於 all_val 中的最大值
for i in range(len(other_val)):
  if other_val[i] > max(all_val):
    other_val_label.append(1)
  else:
    other_val_label.append(0)
print(metrics.confusion_matrix(other_labels_10class, other_val_label))

In [None]:
print(other_labels_10class)
print(other_val_label)

用全部normal類的訓練資料作馬氏距離的中位數作threshold作OOD

In [None]:
#用中位數測試
sorted_all_val = sorted(all_val)
print(all_val)
print(sorted_all_val)
med = np.median(sorted_all_val)
print(med)
med_label = []
for i in range(len(other_val)):
  if other_val[i] > med:
    med_label.append(1)
  else:
    med_label.append(0)
print(metrics.confusion_matrix(other_labels_10class, med_label))