In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from tqdm import tqdm
import torch
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader, Subset
from torch.utils.data.sampler import SubsetRandomSampler
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torchsummary import summary
import torchvision
import torchdata as td
import time
import copy

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class ProductImageCategoryDataset():
    def __init__(self, learning_rate = 1e-3):
        super().__init__()
        self.X = pd.read_pickle('data/image_model_X.pkl')
        #self.X['image_array'] = self.X.values.tolist()
        #self.X = self.X['image_array']
        self.y = pd.read_pickle('data/image_model_y.pkl')
        #print(train_X.shape)
        #print(len(self.X))
        #print(len(self.y))
        assert len(self.X) == len(self.y)

    def __getitem__(self, index):
        features = self.X.iloc[index]
        label = self.y.iloc[index]
        #print(index)
        features = torch.tensor(features).float()
        #3=num of batch(get 3 images at every iteration of training the network)
        features = features.reshape(3, 64, 64)
        #print(features.shape)
        label = int(label)
        
        return (features, label)
    
    def __len__(self):
        return len(self.X)

dataset = ProductImageCategoryDataset()
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

#split the dataset to train, val, test

total_count = len(dataset)
train_count = int(0.7 * total_count)
valid_count = int(0.2 * total_count)
test_count = total_count - train_count - valid_count
train_dataset, valid_dataset, test_dataset = torch.utils.data.random_split(
    dataset, (train_count, valid_count, test_count)
)

#train_dataset = train_dataset.map(transform)

train_loader = {
    'train' : torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=1),
    'validation': torch.utils.data.DataLoader(valid_dataset, batch_size=16, shuffle=True, num_workers=1),
    'test' : torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=1)
}
dataset_sizes = {x: len(train_loader[x]) for x in ['train','validation','test']}

In [4]:
dataset_sizes

{'train': 542, 'validation': 155, 'test': 78}

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    #Train the model
    since = time.time()
    total_step = len(train_loader)
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        running_loss = 0.0
        print(f'Epoch {epoch}/{num_epochs-1}')
        print('-' * 10)

        for phase in ['train','validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
            
            running_loss = 0.0
            running_corrects = 0

            for i, batch in enumerate(train_loader[phase]):
        
                features, label = batch
                features = features.to(device)
                label = label.to(device)

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(features)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, label)

                    if phase == 'train':
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                #if (i+1) % 100 == 0:
                    #print('Epoch [{}/{}, Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
            
                running_loss += loss.item() * features.size(0)
                running_corrects += torch.sum(preds == label.data)
        
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / total_step[phase]
            epoch_acc = running_corrects.double() / total_step[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Accuracy: {epoch_acc:.4f}')

            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        print()            
         

#print('{} loss: {: {:.4f}, acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
    time_elapsed = time.time() - since   
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed%60:.0f}s')
    print(f'Best Validation Acc: {best_acc:4f}')

    #load best model weight
    model.load_state_dict(best_model_wts)
    return model

model = torchvision.models.resnet50(pretrained=True)
for param in model.parameters():
    param.requires_grad = False

#summary(model_ft, (3, 64, 64))
num_fltrs = model.fc.in_features

model.fc = torch.nn.Linear(num_fltrs, 13)
model = model.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
model = train_model(model, criterion, optimizer, scheduler, num_epochs=10)