# I. Import Libraries
Here we will be focusing on Torchvision since it has Transfer Learning feature and quiet comfortable to use.

In [1]:
import numpy as np 
import pandas as pd 
import os
import matplotlib.pyplot as plt
import copy
import time
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from skimage import io
from sklearn.preprocessing import LabelEncoder
from mpl_toolkits.axes_grid1 import ImageGrid

# II. Import Necessary Data For Training, Validation & Testing

Here, we have 9 kinds of fish that we will try to classify. These are :
1. Hourse Mackerel
2. Black Sea Sprat
3. Sea Bass
4. Red Mullet
5. Trout
6. Striped Red Mullet
7. Shrimp
8. Gilt-Head Bream
9. Red Sea Bream

In [2]:
os.listdir('../input/a-large-scale-fish-dataset/Fish_Dataset/Fish_Dataset')

In [3]:
img_class = ['Hourse Mackerel', 'Black Sea Sprat', 'Sea Bass', 'Red Mullet', 'Trout', 'Striped Red Mullet',
             'Shrimp', 'Gilt-Head Bream', 'Red Sea Bream']

list_dir = []
list_class = []
main_dir = '../input/a-large-scale-fish-dataset/Fish_Dataset/Fish_Dataset'

for i in img_class:
    dirs = main_dir + '/' + i + '/' + i
    the_files = []
    for _, __, f in os.walk(dirs):
        the_files.append(f)
        for ff in the_files[0]:
            add = i + '/' + i + '/' + ff
            list_dir.append(add)
            list_class.append(i)
            
df_img = pd.DataFrame({'dir':list_dir, 'class':list_class})

In [4]:
class fish_dataset(Dataset):
    def __init__(self, df, root_dir, transform=None):
        self.annotations = df
        self.root_dir = root_dir
        self.transform = transform
    def __len__(self):
        return len(self.annotations)
    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, self.annotations.iloc[index, 0])
        image = io.imread(img_path)
        y_label = self.annotations.iloc[index, 1]
        if self.transform != None:
            image = self.transform(image)
        return (image, y_label)
    
img_transformer = transforms.ToTensor()
    
scaled_fish_dataset = fish_dataset(df_img, main_dir, img_transformer)

In [5]:
train_set, valid_set = random_split(scaled_fish_dataset, [5000, 4000])
valid_set, test_set = random_split(valid_set, [2000, 2000])

train_dataloader = DataLoader(train_set, shuffle=True, batch_size=10, num_workers=4)
valid_dataloader = DataLoader(valid_set, shuffle=True, batch_size=10, num_workers=4)
test_dataloader = DataLoader(test_set, shuffle=True, batch_size=10, num_workers=4)

dataloader = {'train':train_dataloader, 'valid':valid_dataloader}
dataset_size = {'train': len(train_set), 'valid': len(valid_set)}

# III. Showing Some Examples of the Data

In [6]:
def transform_to_numpy(inp):
    inp = inp.numpy()
    i, x, y, z = inp.shape
    inp_new = inp[0].transpose((1, 2, 0)).reshape(1, y, z, x)
    for inp_add in inp[1:]:
        inp_add = inp_add.transpose((1, 2, 0)).reshape(1, y, z, x)
        inp_new = np.vstack([inp_new, inp_add])
    return inp_new

inputs, labels = next(iter(test_dataloader))
inputs = transform_to_numpy(inputs)    

fig = plt.figure(figsize=(14, 7))
grid = ImageGrid(fig, 111, nrows_ncols=(2, 5), axes_pad=0.1)

for pic, g in zip(inputs, grid):
    g.imshow(pic)
    
print('{}\t{}\t{}\t{}\t{}'.format(labels[0], labels[1], labels[2], labels[3], labels[4]))
print('{}\t{}\t{}\t{}\t{}'.format(labels[5], labels[6], labels[7], labels[8], labels[9]))
plt.show()

# IV. Training the Model

Because we will be using transfer learning here, we only need to train the last layer. We will use this pre-trained model as the feature extractor, which has a function to extract the feature from each picture that it process. The last layer is our classifier which we build specific for this particular problem.

In [7]:
def develop_model(model, encoder, criterion, optimizer, scheduler, num_epochs):
    since = time.time()
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())
    
    for epoch in range(num_epochs):
        print('Epoch : {}/{}'.format(epoch + 1, num_epochs))
        print('-' * 10)
        
        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()
            elif phase == 'valid':
                model.eval()
            running_loss = 0.0
            running_corrects = 0.0
            
            for inputs, labels in dataloader[phase]:
                inputs = inputs.to(device)
                labels = torch.Tensor(encoder.transform(labels)).to(device)
                labels = labels.long()
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, predictions = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                if phase == 'train':
                    loss.backward()
                    optimizer.step()
                    
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(predictions == labels.data)
            if phase == 'train':
                scheduler.step()
                
            epoch_loss = running_loss / dataset_size[phase]
            epoch_acc = running_corrects.double() / dataset_size[phase]
            
            if (phase == 'valid') and (epoch_acc > best_acc):
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                
            print('{} Loss : {:.3f} Accuracy {:.3f}'.format(phase, epoch_loss, epoch_acc))
        print()
        
    time_elapsed = time.time() - since
    
    print('The elapsed time is {} minutes {:.1f} seconds'.format(int(time_elapsed // 60), time_elapsed % 60))
    print('The best accuracy is {:.2f}%'.format(best_acc * 100))
    
    model.load_state_dict(best_model_wts)
    
    return model

In [8]:
le_process = LabelEncoder().fit(img_class)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

model = models.resnet18(pretrained=True)
for parameter in model.parameters():
    parameter.requires_grad = False
model_ftrs = model.fc.in_features
model.fc = nn.Linear(model_ftrs, len(img_class))
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.fc.parameters(), lr=5e-3, momentum=0.9)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

model = develop_model(model, le_process, criterion, optimizer, scheduler, num_epochs=8)

# V. Testing

The last step is Testing. Let's see how it goes.

In [9]:
from sklearn.metrics import accuracy_score
from tqdm.auto import tqdm

preds = []
targs = []

progress_bar = tqdm(range(len(test_dataloader)))

for inputs, labels in test_dataloader:
    inputs = inputs.to(device)
    labels = le_process.transform(labels)
    with torch.no_grad():
        outputs = model(inputs)
    _, predictions = torch.max(outputs, 1)
    predictions = predictions.cpu().numpy()
    for pred, targ in zip(predictions, labels):
        preds.append(pred)
        targs.append(targ)
    progress_bar.update(1)
        
test_score = accuracy_score(preds, targs)
print('The accuracy of the test is {:.2f}%'.format(test_score * 100))

The accuracy of the test is 99.80%. Well it's high, but it depends on you to decide is it a good thing or not. It can be an indication of overfitting as we all know that the data that we have here doesn't have much varieties. 