In [None]:
import os
import re
import cv2
import time
import shutil
import zipfile
import urllib.request
import copy
import numpy as np
import random
import scipy
from PIL import Image
from os import listdir
from os.path import isfile, join
from random import randrange
import matplotlib.pyplot as plt
import glob
from pandas.core.common import flatten

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
train_data_path = '../input/soil-image-dataset/Dataset/Train'
test_data_path = '../input/soil-image-dataset/Dataset/test'
img_size = (256, 256)

In [None]:
train_image_paths = []
classes = []

for data_path in glob.glob(train_data_path + '/*'):
    classes.append(data_path.split('/')[-1]) 
    train_image_paths.append(glob.glob(data_path + '/*'))
    
train_image_paths = list(flatten(train_image_paths))
random.shuffle(train_image_paths)

print('train_image_path example: ', train_image_paths[0])
print('class example: ', classes[0])


train_image_paths, valid_image_paths = train_image_paths[:int(0.8*len(train_image_paths))], train_image_paths[int(0.8*len(train_image_paths)):] 

test_image_paths = []
for data_path in glob.glob(test_data_path + '/*'):
    test_image_paths.append(glob.glob(data_path + '/*'))

test_image_paths = list(flatten(test_image_paths))

print("Train size: {}\nValid size: {}\nTest size: {}".format(len(train_image_paths), len(valid_image_paths), len(test_image_paths)))

In [None]:
idx_to_class = {i:j for i, j in enumerate(classes)}
class_to_idx = {value:key for key,value in idx_to_class.items()}

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
class AddGaussianNoise:
    def __init__(self, mean=0., stddev=.1):
        self.mean = mean
        self.stddev = stddev

    def __call__(self, tensor):
        noise = torch.zeros_like(tensor).normal_(self.mean, self.stddev)
        return tensor.add_(noise)


train_transformation = transforms.Compose([
    transforms.Resize(size=img_size),
    transforms.ToTensor(),
#     AddGaussianNoise(),
    transforms.RandomHorizontalFlip(0.5),
    transforms.RandomVerticalFlip(0.3),
])

test_transformation = transforms.Compose([
        transforms.Resize(size=img_size),
        transforms.ToTensor(),
    ])

In [None]:
class LandmarkDataset(Dataset):
    def __init__(self, image_paths, transform=False):
        self.image_paths = image_paths
        self.transform = transform
        
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_filepath = self.image_paths[idx]
        image = Image.open(image_filepath)
        image = image.convert('RGB')
        
        label = image_filepath.split('/')[-2]
        label = class_to_idx[label]
        if self.transform is not None:
            try:
                image = self.transform(image)
            except RuntimeError as e:
                print(image_filepath)
                print(type(image))
        
        return image, label
    

train_dataset = LandmarkDataset(train_image_paths,train_transformation)
valid_dataset = LandmarkDataset(valid_image_paths,train_transformation) #test transforms are applied
test_dataset = LandmarkDataset(test_image_paths,test_transformation)

In [None]:
print('The shape of tensor for 50th image in train dataset: ',train_dataset[49][0].shape)
print('The label for 50th image in train dataset: ',train_dataset[49][1])

In [None]:
image = test_dataset[1][0]
plt.imshow(transforms.ToPILImage()(image), interpolation="bicubic")


In [None]:
train_loader = DataLoader(
    train_dataset, batch_size=8, shuffle=True
)

val_loader = DataLoader(
    valid_dataset, batch_size=8, shuffle=True
)


test_loader = DataLoader(
    test_dataset, batch_size=8, shuffle=False
)

In [None]:
import tqdm
from torchvision.models import mobilenet_v3_small

model = mobilenet_v3_small(weights=None)
model.classifier[3] = nn.Linear(in_features=1024, out_features=64, bias=True)
model.classifier.append(nn.Linear(in_features=64, out_features=4, bias=True))
model.classifier

In [None]:
def train(model, device, train_loader, optimizer, epoch):
    # Set the model to training mode
    model.train()
    train_loss = 0
    print("Epoch:", epoch)
    counter = 1
    # Process the images in batches
    for data, target in tqdm.tqdm_notebook(train_loader):
        # Use the CPU or GPU as appropriate
        # Recall that GPU is optimized for the operations we are dealing with
        data, target = data.to(device), target.to(device)
        
        # Reset the optimizer
        optimizer.zero_grad()
        
        # Push the data forward through the model layers
        output = model(data)
        predicted = torch.argmax(output.data, dim=1)

        # Get the loss
        loss = loss_function(output, target)

        # Keep a running total
        train_loss += loss.item()
        
        # Backpropagate
        loss.backward()
        optimizer.step()
        
        # Print metrics so we see some progress
        print('\tLoss: {:.6f}'.format(loss.item()))
        counter += 1
            
    avg_loss = train_loss / counter
    print('Training set: Average loss: {:.6f}'.format(avg_loss))
    return avg_loss

In [None]:
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        batch_count = 0
        for data, target in test_loader:
            batch_count += 1
            data, target = data.to(device), target.to(device)
            
            output = model(data)
            
            test_loss += loss_function(output, target).item()
            

            predicted = torch.argmax(output.data, dim=1)
            print("VALIDATION")
            print(f"target:{target[:10]}")
            print(f"predicted:{predicted[:10]}")
            correct += torch.sum(target.data==predicted)

    avg_loss = test_loss / batch_count
    print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        avg_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    
    return avg_loss

In [None]:
optimizer = optim.Adam(model.parameters(), lr=1e-2)

loss_function = nn.CrossEntropyLoss()

epoch_nums = []
training_loss = []
validation_loss = []

epochs = 30

device = "cpu"
if (torch.cuda.is_available()):
    device = "cuda"
    model.cuda()

print('Training on', device)
for epoch in range(1, epochs + 1):
    train_loss = train(model, device, train_loader, optimizer, epoch)
    val_loss = test(model, device, val_loader)
    epoch_nums.append(epoch)
    training_loss.append(train_loss)
    validation_loss.append(val_loss)

In [None]:
plt.figure(figsize=(15,15))
plt.plot(epoch_nums, training_loss)
plt.plot(epoch_nums, validation_loss)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['training', 'validation'], loc='upper right')
plt.show()

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix
import pandas as pd
import seaborn as sns


truelabels = []
predictions = []
model.eval()
print("Getting predictions from test set...")
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        output = output.cpu()
        predicted = torch.argmax(output.data, dim=1)
        data = data.cpu()
        target = target.cpu()
        
        for label in target.data.numpy():
            truelabels.append(label)
        for prediction in predicted:
            predictions.append(prediction) 

    # Plot the confusion matrix
    cm = confusion_matrix(truelabels, predictions)
    tick_marks = np.arange(len(classes))

    df_cm = pd.DataFrame(cm, index = classes, columns = classes)
    plt.figure(figsize = (7,7))
    sns.heatmap(df_cm, annot=True, cmap=plt.cm.Blues, fmt='g')
    plt.xlabel("Predicted Shape", fontsize = 20)
    plt.ylabel("True Shape", fontsize = 20)
    plt.show()

In [None]:
torch.save(model.state_dict(), 'model_params')