In [5]:
import numpy as np 
import cv2 as cv
import os 
from pathlib import Path
from imutils.paths import list_images
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torchvision import models
from torch.utils.data import Dataset, DataLoader
import pickle

import pv_vision.defective_cell_detection.model.cnn_train_val as cnn

# Load the data
Folder structure is: \
. \
|-- rf_train_inference.ipynb \
|-- segmented_cells \
....|-- train \
....|...|-- class 1 \
....|...|-- class 2 \
....|...|-- class ... \
....|...\`-- class n \
....|-- val \
....|...|-- class 1 \
....|...|-- class 2 \
....|...|-- class ... \
....|...\`-- class n \
....\`-- test \
........|-- class 1 \
........|-- class 2 \
........|-- class ... \
........`-- class n 



In [None]:
im_train_dir = Path('segmented_cells/train')
im_val_dir = Path('segmented_cells/val')
im_test_dir = Path('segmented_cells/test')

labels_train = []
images_train = []

labels_val = []
images_val = []
names_val = []

labels_test = []
images_test = []
names_test= []

for im_path in tqdm(list(list_images(im_train_dir))):
    images_train.append(cv.imread(im_path))
    labels_train.append(im_path.split('/')[-2])

for im_path in tqdm(list(list_images(im_val_dir))):
    images_val.append(cv.imread(im_path))
    labels_val.append(im_path.split('/')[-2])
    names_val.append(os.path.splitext(os.path.split(im_path)[-1])[0])

for im_path in tqdm(list(list_images(im_test_dir))):
    images_test.append(cv.imread(im_path))
    labels_test.append(im_path.split('/')[-2])
    names_test.append(os.path.splitext(os.path.split(im_path)[-1])[0])
    
images_train = np.array(images_train)
images_val = np.array(images_val)
images_test = np.array(images_test)

le = LabelEncoder()
y_train = le.fit_transform(labels_train)
y_val = le.transform(labels_val)
y_test = le.transform(labels_test)

# assign the device to run the code on
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# define dataset and dataloader
solar_transform_train = transforms.Compose([
    transforms.RandomVerticalFlip(),
    transforms.RandomHorizontalFlip(),
    cnn.OneRotationTransform([0,180]),
    transforms.ToTensor(),
    transforms.Normalize((0.5, ), (0.5, )) #grayscale only
]) 
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]) 

solar_transform_val = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, ), (0.5, ))
]) 

# build dataloader
solar_train = cnn.SolarDataset(images_train, y_train, transform=solar_transform_train)#, transform2=solar_transform_val, inx_aug=[0, 2, 3, 4]) # determine whether to use aug
solar_val = cnn.SolarDataset(images_val, y_val, transform=solar_transform_val)
solar_test = cnn.SolarDataset(images_test, y_test, transform=solar_transform_val)

trainloader = DataLoader(solar_train, batch_size=128, shuffle=True)
valloader = DataLoader(solar_val, batch_size=128, shuffle=False)
testloader = DataLoader(solar_test, batch_size=128, shuffle=False)   

# train the model (ResNet18)

In [None]:
# initialize the model
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 5)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# train
model_fit_wts, loss_acc = cnn.train_model(model, trainloader, valloader, solar_train, solar_val, criterion, optimizer, lr_scheduler, device, num_epochs=20)

model_path = Path('cnn')
os.makedirs(model_path, exist_ok=True)
with open(model_path/'resnet18_loss_acc.pkl', 'wb') as f:
    pickle.dump(loss_acc, f)
for metric_name, wt in model_fit_wts.items():
    ### save model
    models_subpath = model_path/metric_name
    os.makedirs(models_subpath, exist_ok=True)
    torch.save(wt, models_subpath/'resnet18_model.pth')

# Inference

In [None]:
# load model
model_fit = models.resnet18(pretrained=True)
num_ftrs = model_fit.fc.in_features
model_fit.fc = nn.Linear(num_ftrs, 5)
model_fit.load_state_dict(torch.load(model_path/'best_loss'/'resnet18_model.pth', map_location="cuda:0")) 
model_fit.to(device)
model_fit.eval();

In [None]:
# Build predloader. 
# When applying the model to new dataset, the labels are unknown.
solar_test = cnn.PredDataset(images_test, transform=solar_transform_val)

predloader = DataLoader(solar_test, batch_size=128, shuffle=False)   

In [None]:
# make prediction
pred_test, prob_test = cnn.predict_test(predloader, model_fit, device)

In [None]:
# save the prediction
le = LabelEncoder()
le.fit(['crack', 'intact', 'intra', 'oxygen', 'solder'])

with open(Path('cnn')/'results'/'resnet18_predicted.pkl', 'wb') as f:
    pickle.dump({'name': np.array(names_test), 
                'defects_pred': le.inverse_transform(pred_test),
                'y_pred': np.array(pred_test)}, f)