## Fashion-MNIST-based simulated production line prediction
<font color=#FF0000>Description, TO BE DONE!!</font>

### Packages import

In [1]:
# necessary packages
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose
from torch import nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import random

# additional certain short functions
from torch import is_tensor
from matplotlib.pyplot import pause
from random import randint
from copy import deepcopy
from math import floor, ceil
from torch import stack
from torch import cat

# import custom functions
from data_generate import *
from data_io import *
from CNN import *
from FCN import *

# ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [2]:
# apply CUDA
USE_GPU = True
if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print(device)

cuda


### Custom data loading

In [3]:
# Customize dataset
class ProductLineDataset(Dataset):
    def __init__(self, image: torch.Tensor, gt: torch.Tensor):
        self.x = image.reshape(len(image), 1, 28*3, 28*4).type(torch.float32)
        self.y = gt.type(torch.float32)
    
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

In [4]:
# define file names
CSV_NAME_TRAIN_02346 = 'train_02346.csv'
CSV_NAME_TEST_02346 = 'test_02346.csv'
CSV_NAME_TEST_579 = 'test_579.csv'
CSV_NAME_TEST_18 = 'test_18.csv'
IMAGE_NAME_TRAIN_02346 = 'train_02346'
IMAGE_NAME_TEST_02346 = 'test_02346'
IMAGE_NAME_TEST_579 = 'test_579'
IMAGE_NAME_TEST_18 = 'test_18'

# import data
gt_train_02346 = gt_import(CSV_NAME_TRAIN_02346)
gt_test_02346 = gt_import(CSV_NAME_TEST_02346)
gt_test_579 = gt_import(CSV_NAME_TEST_579)
gt_test_18 = gt_import(CSV_NAME_TEST_18)
image_train_02346 = image_import(gt_train_02346, IMAGE_NAME_TRAIN_02346)/255.
image_test_02346 = image_import(gt_test_02346, IMAGE_NAME_TEST_02346)/255.
image_test_579 = image_import(gt_test_579, IMAGE_NAME_TEST_579)/255.
image_test_18 = image_import(gt_test_18, IMAGE_NAME_TEST_18)/255.

In [5]:
# define dataset
dataset_train_02346 = ProductLineDataset(image_train_02346, gt_train_02346)
dataset_test_02346 = ProductLineDataset(image_test_02346, gt_test_02346)
dataset_test_579 = ProductLineDataset(image_test_579, gt_test_579)
dataset_test_18 = ProductLineDataset(image_test_18, gt_test_18)

# define dataloader
dataloader_train_02346 = DataLoader(dataset_train_02346, batch_size=64, shuffle=True)
dataloader_test_02346 = DataLoader(dataset_test_02346, batch_size=1, shuffle=True)  # batch_size=1 for test
dataloader_test_579 = DataLoader(dataset_test_579, batch_size=1, shuffle=True)
dataloader_test_18 = DataLoader(dataset_test_18, batch_size=1, shuffle=True)

In [6]:
# visualize the shape
for X, y in dataloader_train_02346:
    print("Shape of X [N, C, H, W]: ", X.shape)
    print("Shape of y: ", y.shape)
    print("Data type of y: ", y.dtype)
    print(y)    # need to transfer [64] to [64, 10] by one-hot coding
    break

Shape of X [N, C, H, W]:  torch.Size([64, 1, 84, 112])
Shape of y:  torch.Size([64])
Data type of y:  torch.float32
tensor([4., 4., 6., 6., 7., 6., 3., 5., 7., 7., 7., 6., 3., 4., 3., 3., 6., 3.,
        6., 3., 7., 4., 7., 4., 6., 4., 5., 4., 5., 5., 3., 3., 3., 3., 4., 5.,
        3., 6., 5., 5., 5., 4., 7., 3., 4., 5., 5., 6., 3., 7., 7., 3., 3., 5.,
        6., 4., 7., 7., 4., 5., 3., 3., 5., 3.])


### Define functions

In [7]:
# define functions 
def train(model, dataloader, loss_fn, optimizer):   # put epoch in main better for loss calculation
    # size of dataset
    size = len(dataloader.dataset)
    
    # set model mode
    model.train()
    
    # train batches per epoch
    for batch, (X, y) in enumerate(dataloader):
        # move data to device
        X, y = X.to(device), y.to(device)
        
        score = model(X)
        loss = loss_fn(score, y/10.)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 40 == 0:
            loss, current = loss.item(), batch*len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            
def test(model, dataloader, loss_fn_train, loss_fn_test):
    # size of dataset
    size = len(dataloader.dataset)
    
    # number of batches
    num_batches = len(dataloader)
    
    # set model mode
    model.eval()
    
    test_loss, correct = 0, 0
    
    with torch.no_grad():
        for X,y in dataloader:
            # move data to device
            X, y = X.to(device), y.to(device)
            
            score = model(X)
            test_loss += loss_fn_train(score, y/10.).item()
            correct += (y.item()/10. - loss_fn_test(score, y/10.).item()) / (y.item()/10.)    # only for batch_size=1
    
    # calculate the average loss and accuracy
    test_loss /= num_batches
    correct /= num_batches
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

### Custom model

In [8]:
in_channel = (28*3)*(28*4)
node_1 = 4096
node_2 = 1024
node_3 = 1024
node_4 = 256
node_5 = 64
node_6 = 8
out_channel = 1

FCN_model = FCN(in_channel, node_1, node_2, node_3, node_4, \
    node_5, node_6, out_channel).to(device=device)
print(FCN_model)

FCN(
  (fc1): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=9408, out_features=4096, bias=True)
    (2): BatchNorm1d(4096, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): LeakyReLU(negative_slope=0.01)
    (4): Dropout(p=0.5, inplace=False)
  )
  (fc2): Sequential(
    (0): Linear(in_features=4096, out_features=1024, bias=True)
    (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): Dropout(p=0.3, inplace=False)
  )
  (fc3): Sequential(
    (0): Linear(in_features=1024, out_features=1024, bias=True)
    (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): LeakyReLU(negative_slope=0.01)
    (3): Dropout(p=0.3, inplace=False)
  )
  (fc4): Sequential(
    (0): Linear(in_features=1024, out_features=256, bias=True)
    (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  

In [9]:
in_channel = 1
channel_1 = 4
channel_2 = 8
channel_3 = 16
node_1 = 1024
node_2 = 1024
node_3 = 256
node_4 = 64
out_channel = 1

CNN_model = CNN(in_channel, channel_1, channel_2, channel_3, \
    node_1, node_2, node_3, node_4, out_channel).to(device=device)
print(CNN_model)

CNN(
  (conv1): Sequential(
    (0): Conv2d(1, 4, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(4, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=2240, out_features=1024, bias=True)
  

### Model setup and train/test

In [10]:
# define parameters
learning_rate = 1e-3

# define lose function
loss_fn_train = nn.MSELoss()
loss_fn_test = nn.L1Loss()

# define optimizer
optimizer_FCN = torch.optim.Adam(FCN_model.parameters(), lr=learning_rate)
optimizer_CNN = torch.optim.Adam(CNN_model.parameters(), lr=learning_rate)

In [11]:
# set epoch
epoch = 10

# start training
for t in range(epoch):
    print(f"Epoch {t+1}\n-------------------------------")
    train(FCN_model, dataloader_train_02346, loss_fn_train, optimizer_FCN)
    test(FCN_model, dataloader_test_02346, loss_fn_train, loss_fn_test)
print("Done!!")

# model saving
torch.save(FCN_model.state_dict(), "FCN.pth")

Epoch 1
-------------------------------
loss: 0.231278  [    0/10415]
loss: 0.031181  [ 2560/10415]
loss: 0.025409  [ 5120/10415]
loss: 0.023247  [ 7680/10415]
loss: 0.020751  [10240/10415]
Test Error: 
 Accuracy: 67.4%, Avg loss: 0.021580 

Epoch 2
-------------------------------
loss: 0.021952  [    0/10415]
loss: 0.022875  [ 2560/10415]
loss: 0.016569  [ 5120/10415]
loss: 0.018011  [ 7680/10415]
loss: 0.022932  [10240/10415]
Test Error: 
 Accuracy: 67.5%, Avg loss: 0.021443 

Epoch 3
-------------------------------
loss: 0.018424  [    0/10415]
loss: 0.019572  [ 2560/10415]
loss: 0.025865  [ 5120/10415]
loss: 0.017935  [ 7680/10415]
loss: 0.021914  [10240/10415]
Test Error: 
 Accuracy: 67.7%, Avg loss: 0.021398 

Epoch 4
-------------------------------
loss: 0.016854  [    0/10415]
loss: 0.021165  [ 2560/10415]
loss: 0.022858  [ 5120/10415]
loss: 0.024479  [ 7680/10415]
loss: 0.018903  [10240/10415]
Test Error: 
 Accuracy: 67.6%, Avg loss: 0.021340 

Epoch 5
------------------------

In [12]:
# set epoch
epoch = 10

# start training
for t in range(epoch):
    print(f"Epoch {t+1}\n-------------------------------")
    train(CNN_model, dataloader_train_02346, loss_fn_train, optimizer_CNN)
    test(CNN_model, dataloader_test_02346, loss_fn_train, loss_fn_test)
print("Done!!")

# model saving
torch.save(CNN_model.state_dict(), "CNN.pth")

Epoch 1
-------------------------------
loss: 0.123665  [    0/10415]
loss: 0.030165  [ 2560/10415]
loss: 0.027246  [ 5120/10415]
loss: 0.021267  [ 7680/10415]
loss: 0.023675  [10240/10415]
Test Error: 
 Accuracy: 74.4%, Avg loss: 0.028393 

Epoch 2
-------------------------------
loss: 0.023180  [    0/10415]
loss: 0.022445  [ 2560/10415]
loss: 0.020435  [ 5120/10415]
loss: 0.021895  [ 7680/10415]
loss: 0.021527  [10240/10415]
Test Error: 
 Accuracy: 74.4%, Avg loss: 0.030693 

Epoch 3
-------------------------------
loss: 0.018587  [    0/10415]
loss: 0.022913  [ 2560/10415]
loss: 0.023693  [ 5120/10415]
loss: 0.024786  [ 7680/10415]
loss: 0.021452  [10240/10415]
Test Error: 
 Accuracy: 74.5%, Avg loss: 0.027758 

Epoch 4
-------------------------------
loss: 0.018888  [    0/10415]
loss: 0.023242  [ 2560/10415]
loss: 0.025638  [ 5120/10415]
loss: 0.015348  [ 7680/10415]
loss: 0.024268  [10240/10415]
Test Error: 
 Accuracy: 74.7%, Avg loss: 0.022699 

Epoch 5
------------------------

### <font color=#FF0000>Visualization reference (non-used yet) <font>

#### single bounding box testing 

In [None]:
def box(col, row, box_x, box_y):    # position + length / width (visualization usage)
    view = torch.zeros([28*3, 28*4], dtype=torch.float32)
    # set the box, outside the target with one pixel
    view[row-box_y:row+1, col-box_x] = 1
    view[row-box_y:row+1, col] = 1
    view[row-box_y, col-box_x:col+1] = 1
    view[row, col-box_x:col+1] = 1
    return view

In [None]:
import torch
import matplotlib.pyplot as plt
from matplotlib.pyplot import pause
from random import randint
from copy import deepcopy

rand_x = randint(14, 27)
rand_y = randint(27, 28*3-1)
print("new start position [x, y] = [", rand_x,",", rand_y, "]")

rand_start = deepcopy(aug_sample)
box_origin = box(27, 21, 27, 15)    # array for visualization, the only manual input part
box_sample = deepcopy(box_origin)

plt.imshow(rand_start, cmap='gray')
plt.imshow(box_origin, cmap='gray', alpha=0.3)
pause(0.1)

# initial movement
for i in range(28):
    for j in range(28):
        rand_start[rand_y-i][rand_x-j] = rand_start[27-i][27-j]

for i in range(29):
    for j in range(29):
        box_origin[rand_y-i+1][rand_x-j+1] = box_origin[28-i][28-j]

# clean other part
rand_start[:rand_y-28+1, :] = 0
rand_start[:, rand_x+1:rand_x+28+1] = 0
box_origin[:rand_y-28, :] = 0       # 1-pixel cleaning region difference
box_origin[:, rand_x+1:rand_x+28+1] = 0

plt.imshow(rand_start, cmap='gray')
plt.imshow(box_origin, cmap='gray', alpha=0.3)
pause(0.1)


#### sequential bounding box visualization and random position

In [None]:
from math import floor, ceil
from torch import cat
import matplotlib.pyplot as plt
from matplotlib.pyplot import pause
from copy import deepcopy

sequential box preparation

In [None]:
# initialization
stride = 5
first_frame = deepcopy(box_sample)
next_frame = deepcopy(box_sample)
y_box = deepcopy(box_sample)

# show info
print(y_box.size())
plt.imshow(y_box, cmap='gray')
pause(0.1)

for i in range(floor((112-28)/stride)):
   # moving part (1-dim only)
   for j in range(30):  # bigger range to cover bounding box
      next_frame[:, 30+stride*(i+1)-(j+1)] = next_frame[:, 30+stride*i-(j+1)]
   next_frame[:, :stride*(i+1)] = 0  # clean other area
   
   # sequencing part
   if i == 0:
      y_box = stack((first_frame, next_frame)) 
   else:
      y_box = cat((y_box, next_frame.reshape(1, 28*3, 28*4)), dim=0)
      
   # # show info
   # print(y_box.size())
   # plt.imshow(y_box[i+1], cmap='gray')
   # pause(0.1)

parallel moving and concat last frames

In [None]:
# sequential data preparation
y_box_moved = deepcopy(y_box)
y_moved = deepcopy(y)
rand_x = randint(14, 27)
rand_y = randint(27, 28*3-1)
print("new start position [x, y] = [", rand_x,",", rand_y, "]")

# # visualization for no parallel movement
# for i in range(len(y_box_moved)):
#     plt.imshow(y_moved[i], cmap='gray')
#     plt.imshow(y_box_moved[i], cmap='gray', alpha=0.3)
#     pause(0.1)

# all frames move left parellel
for i in range(floor((112-28)/stride)+1): 
    # moving items
    for j in range(28):
        for k in range(28):
            y_moved[i][rand_y-j][rand_x+stride*i-k] = y_moved[i][27-j][27+stride*i-k]
    # moving box
    for j in range(29):
        for k in range(29):
            y_box_moved[i][rand_y-j+1][rand_x+stride*i-k+1] = y_box_moved[i][28-j][28+stride*i-k]
          
    # clean other area
    y_moved[i][:rand_y-28+1, :] = 0
    y_moved[i][:, rand_x+stride*i+1:] = 0
    y_box_moved[i][:rand_y-28+1, :] = 0
    y_box_moved[i][:, rand_x+stride*i+1:] = 0
   
    # show info
    plt.imshow(y_moved[i], cmap='gray')
    plt.imshow(y_box_moved[i], cmap='gray', alpha=0.3)
    pause(0.1)

# initialization
frame_old = len(y_moved)   # number of old frames
next_frame = deepcopy(y_moved[frame_old-1])
next_frame_box = deepcopy(y_box_moved[frame_old-1])

# last frame moves right and cat
for i in range(ceil((28-rand_x)/stride)+1):
    # move next frame
    for j in range(28):
        if rand_x+stride*(frame_old+i)-j < 112:
           next_frame[:, rand_x+stride*(frame_old+i)-j] = next_frame[:, rand_x+stride*(frame_old+i-1)-j]
    for j in range(29):
        if rand_x+stride*(frame_old+i)-j+1 < 112:
           next_frame_box[:, rand_x+stride*(frame_old+i)-j+1] = next_frame_box[:, rand_x+stride*(frame_old+i-1)-j+1]
    # clean and cat
    next_frame[:, :rand_x+stride*(frame_old+i)-28+1] = 0
    y_moved = cat((y_moved, next_frame.reshape(1, 28*3, 28*4)), dim=0)
    next_frame_box[:, :rand_x+stride*(frame_old+i)-28+1] = 0
    y_box_moved = cat((y_box_moved, next_frame_box.reshape(1, 28*3, 28*4)), dim=0)
   
    # show info
    plt.imshow(y_moved[frame_old+i], cmap='gray')
    plt.imshow(y_box_moved[frame_old+i], cmap='gray', alpha=0.3)
    pause(0.1)

print(y_moved.size())