In [None]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:

# LAB 색 공간의 평균과 표준편차
lab_mean = [0.485, 0.456, 0.406]
lab_std = [0.229, 0.224, 0.225]

class AppleDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith('.jpg')]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
        image = cv2.resize(image, (256, 256))
        if self.transform:
            image = self.transform(image)
        return image

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=lab_mean, std=lab_std)
])

train_dataset = AppleDataset(image_dir='/content/drive/MyDrive/aet/apple', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)


In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(in_channels, in_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(in_channels)

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += residual
        out = self.relu(out)
        return out

class ImprovedAutoencoder(nn.Module):
    def __init__(self):
        super(ImprovedAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            ResidualBlock(64),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            ResidualBlock(128),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            ResidualBlock(256),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            ResidualBlock(512)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            ResidualBlock(256),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            ResidualBlock(128),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            ResidualBlock(64),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:

model = ImprovedAutoencoder().cuda()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    for images in train_loader:
        images = images.cuda()
        outputs = model(images)
        loss = criterion(outputs, images)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


Epoch [1/100], Loss: 0.6221
Epoch [2/100], Loss: 0.5531
Epoch [3/100], Loss: 0.5069
Epoch [4/100], Loss: 0.4835
Epoch [5/100], Loss: 0.4645
Epoch [6/100], Loss: 0.4498
Epoch [7/100], Loss: 0.4381
Epoch [8/100], Loss: 0.4276
Epoch [9/100], Loss: 0.4180
Epoch [10/100], Loss: 0.4104
Epoch [11/100], Loss: 0.3972
Epoch [12/100], Loss: 0.3866
Epoch [13/100], Loss: 0.3787
Epoch [14/100], Loss: 0.3708
Epoch [15/100], Loss: 0.3623
Epoch [16/100], Loss: 0.3527
Epoch [17/100], Loss: 0.3446
Epoch [18/100], Loss: 0.3283
Epoch [19/100], Loss: 0.3163
Epoch [20/100], Loss: 0.3181
Epoch [21/100], Loss: 0.2962
Epoch [22/100], Loss: 0.2760
Epoch [23/100], Loss: 0.2636
Epoch [24/100], Loss: 0.2508
Epoch [25/100], Loss: 0.2457
Epoch [26/100], Loss: 0.2375
Epoch [27/100], Loss: 0.2337
Epoch [28/100], Loss: 0.2292
Epoch [29/100], Loss: 0.2252
Epoch [30/100], Loss: 0.2210
Epoch [31/100], Loss: 0.2196
Epoch [32/100], Loss: 0.2174
Epoch [33/100], Loss: 0.2146
Epoch [34/100], Loss: 0.2122
Epoch [35/100], Loss: 0

In [None]:

def denormalize(tensor, mean, std):
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor

def detect_anomalies_and_visualize_heatmap(model, image_dir, threshold=0.01):
    model.eval()
    for img_name in os.listdir(image_dir):
        if img_name.endswith('.jpg'):
            img_path = os.path.join(image_dir, img_name)
            image = cv2.imread(img_path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
            image = cv2.resize(image, (256, 256))
            image_tensor = transform(image).unsqueeze(0).cuda()

            with torch.no_grad():
                reconstructed = model(image_tensor)
                loss = criterion(reconstructed, image_tensor)
                if loss.item() > threshold:
                    print(f'Anomaly detected in image: {img_name}, Loss: {loss.item():.4f}')

                    reconstructed_image = reconstructed.squeeze(0).cpu()
                    original_image = image_tensor.squeeze(0).cpu()

                    reconstructed_image = denormalize(reconstructed_image, lab_mean, lab_std).numpy().transpose(1, 2, 0)
                    original_image = denormalize(original_image, lab_mean, lab_std).numpy().transpose(1, 2, 0)

                    original_image = (original_image * 255).astype(np.uint8)
                    reconstructed_image = (reconstructed_image * 255).astype(np.uint8)

                    original_image_rgb = cv2.cvtColor(original_image, cv2.COLOR_LAB2RGB)
                    reconstructed_image_rgb = cv2.cvtColor(reconstructed_image, cv2.COLOR_LAB2RGB)

                    reconstruction_error = np.abs(original_image - reconstructed_image)
                    error_map = np.mean(reconstruction_error, axis=2)

                    plt.figure(figsize=(18, 6))
                    plt.subplot(1, 3, 1)
                    plt.imshow(original_image_rgb)
                    plt.title('Original Image')

                    plt.subplot(1, 3, 2)
                    plt.imshow(reconstructed_image_rgb)
                    plt.title('Reconstructed Image')

                    plt.subplot(1, 3, 3)
                    sns.heatmap(error_map, cmap='viridis')
                    plt.title('Reconstruction Error Heatmap')

                    plt.show()


In [None]:

detect_anomalies_and_visualize_heatmap(model, '/content/drive/MyDrive/aet/apple_d')

Output hidden; open in https://colab.research.google.com to view.

In [None]:
def preprocess_image(image_path):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
    image = cv2.resize(image, (256, 256))
    image_tensor = transform(image).unsqueeze(0).cuda()
    return image_tensor

def calculate_reconstruction_error(model, image_tensor):
    model.eval()
    with torch.no_grad():
        reconstructed = model(image_tensor)
        loss = criterion(reconstructed, image_tensor)
        reconstruction_error = torch.abs(image_tensor - reconstructed).cpu().numpy()
    return reconstruction_error

def calculate_grid_errors(reconstruction_error, grid_size=32):
    # 이미지 크기와 그리드 크기를 기반으로 그리드 수 계산
    img_size = reconstruction_error.shape[-2]
    num_grids = img_size // grid_size

    # 그리드별 재구성 오류 평균 계산
    grid_errors = np.zeros((num_grids, num_grids))
    for i in range(num_grids):
        for j in range(num_grids):
            grid = reconstruction_error[:, :, i*grid_size:(i+1)*grid_size, j*grid_size:(j+1)*grid_size]
            grid_errors[i, j] = np.mean(grid)

    return grid_errors

def main(image_path):
    # 이미지 전처리
    image_tensor = preprocess_image(image_path)

    # 재구성 오류 계산
    reconstruction_error = calculate_reconstruction_error(model, image_tensor)

    # 그리드별 재구성 오류 평균 계산
    grid_errors = calculate_grid_errors(reconstruction_error)

    return grid_errors

In [None]:
# 예제 이미지 경로
image_path = '/content/drive/MyDrive/aet/apple_d/1.jpg'

# 그리드별 재구성 오류 평균 계산
grid_errors = main(image_path)
print(grid_errors)

[[0.43899611 0.34221041 0.28271416 0.33106139 0.31633359 0.25639081
  0.33745381 0.41102383]
 [0.32531136 0.26268396 0.24896197 0.36678827 0.31475291 0.21755044
  0.17883919 0.32101327]
 [0.25493965 0.2753531  0.27632645 0.26044083 0.27489141 0.26071429
  0.27822918 0.22903883]
 [0.23702605 0.30346826 0.25868067 0.27108127 0.26246473 0.24997322
  0.25375375 0.19978721]
 [0.2344483  0.2836     0.31607214 0.32422081 0.32485929 0.26121745
  0.29733595 0.18308286]
 [0.30211014 0.23036294 0.3235437  0.34191474 0.27281979 0.18649103
  0.251277   0.22659469]
 [0.34630856 0.21630366 0.2882711  0.30055165 0.19955151 0.19971411
  0.17909354 0.19255282]
 [0.28574589 0.20141335 0.17005186 0.26548681 0.24592817 0.20620064
  0.14684455 0.1153819 ]]
