In [None]:
import numpy as np
import pandas as pd
import time
import os
import PIL
from PIL import Image
from glob import glob
import matplotlib.pyplot as plt

In [None]:
import torch
from torch import nn, optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
path = os.path.join('data', 'train_sets', '*', '*.jpg')
fname = glob(path)

fname

In [None]:
# label = []
# for fn in fname :
#     fn = fn.split("/")[2]
#     label.append(fn)

In [None]:
label = [fn.split("/")[-2] for fn in fname]
label

In [None]:
images = [fn.split("/")[-1] for fn in fname]
images

In [None]:
labels = np.unique(label)
ints = np.arange(0, len(labels))
dicts = dict(zip(labels, ints))

dicts

In [None]:
df_batik = pd.DataFrame({"image_id":images,"label":label})

#check image files
index = []
path = path[:-7]
for i in range(len(df_batik)) :
    try :
        Image.open(path + str(df_batik["label"][i]) + '/' + str(df_batik["image_id"][i]))
        pass
    except PIL.UnidentifiedImageError:
        index.append(i)

df = df_batik.drop(index)
df.head()

## Plot images

In [None]:
target, count = np.unique(df['label'], return_counts=True)
precentange = [x / np.sum(count) for x in count]
plt.style.use('seaborn')
plt.pie(precentange, labels = target,  autopct='%1.f%%', shadow=True)
plt.title('DATA PERCENTAGE')
plt.show()

## Datasets dan dataloader

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms

In [None]:
class Batik(Dataset) :
    def __init__(self, x, y, path, map_label = dicts, transform=None) :
        super().__init__()
        self.X = x
        self.y = y
        self.path = path
        self.map_label = map_label
        self.transform = transform
    
    def __getitem__(self, idx) :
        img = Image.open(path + str(self.y[idx]) + '/' + str(self.X[idx])).convert("RGB")
        label = self.map_label[self.y[idx]]
        
        if self.transform is not None :
            img = self.transform(img)
        
        return img, label

    
    def __len__(self) :
        return self.X.shape[0]

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X = df["image_id"].values
y = df["label"].values

# train data, validation data
X_train, X_val, y_train, y_val = train_test_split(X, y , test_size=0.3, shuffle=True, 
                                                          stratify=y, random_state=42)


len(X_train), len(X_val)

In [None]:
#DATALOADER
bath_size = 32
crop_size = 128

#pipeline data augmentation 
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(crop_size, scale=(0.8, 1.0)),
    transforms.RandomVerticalFlip(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomAffine(0, shear=(10)),
    transforms.RandomRotation(20),
    transforms.ToTensor()
])

val_transform = transforms.Compose([
    transforms.Resize(135),
    transforms.CenterCrop(crop_size),
    transforms.ToTensor()
])

train_set = Batik(X_train, y_train, path, transform=train_transform)
val_set = Batik(X_val, y_val, path, transform=val_transform)

# use pytorch for dataloader
train_loader = DataLoader(train_set, batch_size=bath_size, shuffle=True, 
                          num_workers=4, pin_memory=True)
val_loader = DataLoader(val_set, batch_size=bath_size, shuffle=True, 
                        num_workers=4, pin_memory=True)

In [None]:
# Testing Data
## for predict

test_set = datasets.ImageFolder('data/test_train/', transform=val_transform)

## error

In [None]:
# feature, target = next(iter(train_loader))
# feature.shape

### Cek 

In [None]:
import random

In [None]:
int_label = dict(zip(ints, labels))


img, label = train_set[random.randint(0, len(X_train))]
plt.imshow(img.permute(1,2,0));
print(int_label[label])

## model architecture

In [None]:
class CNN(nn.Module) :
    def __init__(self) :
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten()
        )
        self.fc = nn.Sequential(
            nn.Linear(in_features=2048, out_features=1024),
            nn.ReLU(),
            nn.Linear(in_features=1024, out_features=7),
            nn.LogSoftmax(1)
        )
    def forward(self, x) :
        x = self.conv(x)
        x = self.fc(x)
        return x
        

### Early-Stopping Class

In [None]:
class EarlyStopping() :
    #    Early stops the training if validation loss doesn't improve after a given patience.
    def __init__(self, patience=5, delta=0, verbose= False, path='checkpoint.pt', trace_func=print) :
        
        #Args:
        #   patience (int): How long to wait after last time validation loss improved.
        #                    Default: 7
        #    verbose (bool): If True, prints a message for each validation loss improvement. 
        #                    Default: False
        #    delta (float): Minimum change in the monitored quantity to qualify as an improvement.
        #                    Default: 0
        #    path (str): Path for the checkpoint to be saved to.
        #                    Default: 'checkpoint.pt'
        #    trace_func (function): trace print function.
        #                    Default: print            
                
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
        
    def __call__(self, val_loss, model) :
        
        score = -val_loss

        if self.best_score is None :
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            
        elif score < self.best_score + self.delta :
            self.counter += 1
            self.trace_func(f'\n |EarlyStopping counter: {self.counter} out of {self.patience}|')
            
            # early stopping
            if self.counter >= self.patience :
                self.early_stop = True
        
        else :
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0
        
    #save checkpoint
    def save_checkpoint (self, val_loss, model) :
        if self.verbose:
            self.trace_func(f'\nValidation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

## Training preperation

In [None]:
from torch import optim
from tqdm.auto import tqdm

In [None]:
lr = 0.001

model = CNN().to(device)
criterion = nn.NLLLoss()
optimizer = optim.AdamW(model.parameters(), lr= lr)

In [None]:
# print(model)

In [None]:
#function looping
def looping(mode, dataset, dataloader, model, criterion, optimizer, device) :
    if mode =="train" :
        model.train()
    
    elif mode =="val" or mode == "test":
        model.eval()
    
    cost = correct = 0
    for feature, target in tqdm(dataloader, desc=mode.title()) :
        feature, target = feature.to(device), target.to(device)
        output = model(feature)
        loss = criterion(output, target)
        
        if mode =="train" :
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        
        cost += loss.item() * feature.shape[0]
        correct += (output.argmax(1) == target).sum().item()
        
    cost = cost / len(dataset)
    acc = correct / len(dataset)
    
    return cost, acc

In [None]:
# #test

# epochs = 12
# train_cost, val_cost = [], []
# train_acc, val_acc = [], []
# for i in range (epochs) :
#     since = time.time()
    
#     # training for data train
#     cost, acc = looping("train", train_set, train_loader, model, criterion, optimizer, device)
#     train_cost.append(cost)
#     train_acc.append(acc)
    
#     # training for data val
#     with torch.no_grad() :
#         cost, acc = looping("val", val_set, val_loader, model, criterion, optimizer, device)
#         val_cost.append(cost)
#         val_acc.append(acc)
    
#     print("Epochs : {}/{} | ".format(i+1, epochs),
#           "train_cost : {} | ".format(train_cost[-1]),
#           "val_cost : {} | ".format(val_cost[-1]),
#           "train_acc : {} | ".format(train_acc[-1]),
#           "val_cost : {} | ".format(val_cost[-1]),
#           'time {:.3f} s'.format(time.time() - since)
#          )
    

In [None]:
# test - 2

train_cost, val_cost = [], []
train_acc, val_acc = [], []

#object early_stopping
early_stopping = EarlyStopping(patience=7, verbose=True)

epoch = 1
while True :
    since = time.time()
    
    # training for data train
    cost, acc = looping("train", train_set, train_loader, model, criterion, optimizer, device)
    train_cost.append(cost)
    train_acc.append(acc)
    
    # training for data val
    with torch.no_grad() :
        cost, acc = looping("val", val_set, val_loader, model, criterion, optimizer, device)
        val_cost.append(cost)
        val_acc.append(acc)
        
    print("Epochs : {} | ".format(epoch),
          "train_cost : {} | ".format(train_cost[-1]),
          "val_cost : {} | ".format(val_cost[-1]),
          "train_acc : {} | ".format(train_acc[-1]),
          "val_acc : {} | ".format(val_acc[-1]),
          'time {:.3f} s'.format(time.time() - since)
         )
    epoch+=1
    
    early_stopping(val_cost[-1], model)
    #ealry stopping
    if early_stopping.early_stop:
            print("Early stopping")
            break
    

In [None]:
#Load Model 
model.load_state_dict(torch.load('checkpoint.pt'))