In [None]:
# https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

# %pip install torch==1.8.0+cpu torchvision==0.9.0+cpu -f https://download.pytorch.org/whl/torch_stable.html
# %pip install timm==0.3.2 six


from __future__ import print_function, division
from ConvNeXt.convnext_isotropic import convnext_isotropic_small
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
from torch.utils.tensorboard import SummaryWriter
import datetime
import re

plt.ion()   # interactive mode

#### These are all helper function . .  Check the last block for the content.

In [None]:
# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = {
    'train': transforms.Compose([
        # transforms.RandomResizedCrop(32),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'valid': transforms.Compose([
        # transforms.Resize(32),
        # transforms.CenterCrop(32),
        transforms.ToTensor(),
        # transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

from torch.utils.data import Dataset, DataLoader
import cv2
    

In [None]:
%pip install opencv-python

In [None]:
# def imshow(inp, title=None):
#     """Imshow for Tensor."""
#     inp = inp.numpy().transpose((1, 2, 0))
#     mean = np.array([0.485, 0.456, 0.406])
#     std = np.array([0.229, 0.224, 0.225])
#     inp = std * inp + mean
#     inp = np.clip(inp, 0, 1)
#     plt.imshow(inp)
#     if title is not None:
#         plt.title(title)
#     plt.pause(0.001)  # pause a bit so that plots are updated


In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25, fineTune = False, dataName=None):
    if fineTune:
        _type = '_FineTune_' + dataName
    else:
        _type = '_FeatureExtractor_' + dataName
        
        
    since = time.time()
    writer = SummaryWriter(log_dir="./runs/FeatureExtractTL/"+re.sub(r'\W+', '', model._get_name()+ _type + datetime.datetime.now().strftime("%Y-%m-%d%H:%M:%S")))
    # writer = SummaryWriter(log_dir="./runs/")

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'valid' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

            writer.add_scalar('Loss/'+ phase, epoch_loss, epoch)
            # writer.add_scalar('Loss/test', np.random.random(), n_iter)
            writer.add_scalar('Accuracy/'+ phase, epoch_acc, epoch)
            # writer.add_scalar('Accuracy/test', np.random.random(), n_iter)
            writer.add_scalar('Time/'+ phase, time.time() - since, epoch)
    
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['valid']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title('predicted: {}'.format(class_names[preds[j]]))
                # imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
# from PIL import Image
# import os


# # from DHCD_loader import NPZLoader
# import numpy as np
# from matplotlib import pyplot as plt

# # dataManager = NPZLoader('DHCD_dataset.npz')
# _npz = np.load("DHCD_dataset.npz")
# list(_npz.keys())

# image_train = _npz['arr_0']
# label_train =_npz['arr_1']
# image_test = _npz['arr_2']
# label_test = _npz['arr_3']
# label_train.shape

# for idx, img in enumerate(image_train):
#     label = label_train[idx]
#     folderName = 'data_DHCD/train/' + str(label) + '/'
#     if not os.path.exists(folderName):
#         os.makedirs(folderName)
#     im = Image.fromarray(img)
#     im.save(folderName+str(idx) +".jpeg")
    
# for idx, img in enumerate(image_test):
#     label = label_test[idx]
#     folderName = 'data_DHCD/valid/' + str(label) + '/'
#     if not os.path.exists(folderName):
#         os.makedirs(folderName)
#     im = Image.fromarray(img)
#     im.save(folderName+str(idx) +".jpeg")

In [None]:
data_dir = './data_DHCD'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'valid']}



dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
                                             shuffle=True, )
              for x in ['train', 'valid']}


dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'valid']}
class_names = image_datasets['train'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device ='cpu'

# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

# imshow(out, title=[class_names[x] for x in classes])



fineTune = True   # feature Extractor if False

model_conv = models.resnet18(pretrained=True)
# model_conv  = convnext_isotropic_small(pretrained=True,)


model_conv.fc = nn.Linear(model_conv.fc.in_features, 46)
# model_conv.head = nn.Linear(model_conv.head.in_features , 46)

model_ft = model_conv .to(device)
criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft,  step_size=7, gamma=0.1)

model_conv  = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
                       num_epochs=25, fineTune= fineTune, dataName= data_dir)

visualize_model(model_conv , num_images=10)