In [6]:
import torch
import torchvision
import torch.nn as nn
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import pathlib
import os
import random
import shutil
from tqdm import tqdm


DATA_BATCH_SIZE = 10
EPOCHS = 30
LEARNING_RATE = 0.01
GRAD_CLIP = 0.12
WEIGHT_DECAY = 0.0001

DATASET_PATH = pathlib.Path(os.path.dirname(os.path.realpath('__file__'))).parent.parent / "dataset" / "SDD" / "luxteel" / "resnet_dataset"

TRAIN_DATASET_PATH = pathlib.Path(os.path.dirname(os.path.realpath('__file__'))).parent.parent / "dataset" / "SDD" / "luxteel" / "resnet_dataset" / "train" / "normal"
TEST_DATASET_PATH = pathlib.Path(os.path.dirname(os.path.realpath('__file__'))).parent.parent / "dataset" / "SDD" / "luxteel" / "resnet_dataset" / "test"

#defect_image_files = os.listdir(DEFECT_DATASET_PATH.as_posix())
normal_image_files = os.listdir(TRAIN_DATASET_PATH.as_posix())

# random shuffle
#random.shuffle(defect_image_files)
#random.shuffle(normal_image_files)

# split dataset
trainset_rate = 0.7
#defect_train_size = int(trainset_rate*len(defect_image_files))
normal_train_size = int(trainset_rate*len(normal_image_files))


dataset = ImageFolder(TRAIN_DATASET_PATH, transform=transforms.ToTensor())
dataloader = DataLoader(dataset=dataset, batch_size=DATA_BATCH_SIZE)

# calc mean and std for image normalization
_sum_channels = 0
_squared_sum_channels =0
_total_batches = 0

for data, _ in tqdm(dataloader):
    _sum_channels += torch.mean(data, dim=[0,2,3]) # calc mean for each channels (dim=Batch, Channel, Height, Width)
    _squared_sum_channels += torch.mean(data**2, dim=[0,2,3])
    _total_batches += 1
mean = _sum_channels / _total_batches
std = (_squared_sum_channels / _squared_sum_channels - mean ** 2) ** 0.5

print(_sum_channels, _squared_sum_channels, _total_batches)
print(f"Mean : {mean}, Std:{std}")

# normalization(0~1) of dataset images
stats = (tuple(mean.tolist()), tuple(std.tolist()))
train_transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize(*stats, inplace=True)])

train_dataset = ImageFolder((DATASET_PATH/"train").as_posix(), train_transform)

train_dataloader = DataLoader(train_dataset, batch_size=DATA_BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True)


# device selection function (GPU, CPU, MPS for MacOS)
def get_device_use():
    if torch.cuda.is_available():
        return torch.device('cuda')
    elif torch.backends.mps.is_available():
        return torch.device('mps')
    else:
        return torch.device('cpu')
device = get_device_use()
print(f"Selected Device : {device}")
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
# transfer data into the selected device
def to_device(data, device):
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    def __init__(self, dataloader, device) -> None:
        self.__dataloader = dataloader
        self.__device = device
        
    def __iter__(self):
        for b in self.__dataloader:
            yield to_device(b, self.__device)
            
    def __len__(self):
        return len(self.__dataloader)
    
train_dataloader = DeviceDataLoader(train_dataloader, device)


0it [00:00, ?it/s]


ZeroDivisionError: division by zero

In [3]:
# convolutional auto encoder model impl.
import torch.nn as nn
import torch.nn.functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        
        # encoder layer
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1) # depth 3 -> 16
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=4, kernel_size=3, padding=1) # depth 16 -> 4
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # decoder layer
        self.t_conv1 = nn.ConvTranspose2d(4, 16, 2, stride=2)
        self.t_conv2 = nn.ConvTranspose2d(16, 3, 2, stride=2)
    
    def forward(self, x_in):
        
        # encode
        x = F.relu(self.conv1(x_in))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        
        #decode
        x = F.relu(self.t_conv1(x))
        x = F.sigmoid(self.t_conv2(x))
        
        return x

# initialize the NN
model = ConvAutoencoder()
print(model)

ConvAutoencoder(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(16, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (t_conv1): ConvTranspose2d(4, 16, kernel_size=(2, 2), stride=(2, 2))
  (t_conv2): ConvTranspose2d(16, 3, kernel_size=(2, 2), stride=(2, 2))
)


In [None]:
# specify loss function
criterion = nn.MSELoss()

# specify loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


# number of epochs to train the model
n_epochs = 30

for epoch in range(1, n_epochs+1):
    # monitor training loss
    train_loss = 0.0
    
    ###################
    # train the model #
    ###################
    for data in train_loader:
        # _ stands in for labels, here
        # no need to flatten images
        images, _ = data
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        # forward pass: compute predicted outputs by passing inputs to the model
        outputs = model(images)
        # calculate the loss
        loss = criterion(outputs, images)
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update running training loss
        train_loss += loss.item()*images.size(0)
            
    # print avg training statistics 
    train_loss = train_loss/len(train_loader)
    print('Epoch: {} \tTraining Loss: {:.6f}'.format(
        epoch, 
        train_loss
        ))