<a href="https://colab.research.google.com/github/geraldmc/torch-draft-final_project/blob/main/load_deepweeds.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## DeepWeeds TESTING

In [1]:
import os 
import os.path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import sklearn.metrics as sm
from torchvision import transforms
from PIL import Image

## Steps For K-fold Test

##### 0) Get files in order (mainly Colab-specific).

    1) Get labels, instantiate data loaders.
    2) Load the trained ResNet50 model, alter it.
    3) Test the model. Save results.
    4) REPEAT 1-4 for each fold.

### Get labels.

In [4]:
LABEL_PATH = os.path.join('data/', 'labels') # provide your path.
!ls {LABEL_PATH}

labels.csv        test_subset3.csv  train_subset2.csv val_subset1.csv
test_subset0.csv  test_subset4.csv  train_subset3.csv val_subset2.csv
test_subset1.csv  train_subset0.csv train_subset4.csv val_subset3.csv
test_subset2.csv  train_subset1.csv val_subset0.csv   val_subset4.csv


### Only one transform for testing.

In [5]:
data_transforms = {
    'default': transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.485, 0.456, 0.406], 
            [0.229, 0.224, 0.225])
    ])
}

### Class to instantiate a test dataset, etc.

In [6]:
# Dataset for Deepweeds testing

class DeepWeeds_Test(Dataset):

    def __init__(self, csv_file):
        """
        """
        self.root = 'data/test/' # whatever your image file root is
        self.csv_file = csv_file
        self.transform = data_transforms['default']
        
        self.csv_data = pd.read_csv(self.csv_file)

    def __len__(self):
        return len(self.csv_data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root, self.csv_data.Filename[idx])
        label = self.csv_data.Label[idx]
        img = Image.open(img_path)

        if self.transform is not None:
            img = self.transform(img)

        return img, label

In [29]:
test_label_file = "{}/test_subset{}.csv".format(LABEL_PATH, 0)
#!ls {test_label_file}
#test_df = pd.read_csv(test_label_file)

test_dataset = DeepWeeds_Test(test_label_file)
test_loader  = DataLoader(test_dataset, 
  batch_size=32, shuffle=False,
  pin_memory=torch.cuda.is_available(), 
  num_workers=1)

In [30]:
test_dataset.csv_data

Unnamed: 0,Filename,Label
0,20160928-140747-0.jpg,0
1,20160928-141437-0.jpg,0
2,20160928-142110-0.jpg,0
3,20161207-110730-0.jpg,0
4,20161207-110753-0.jpg,0
...,...,...
3502,20180322-133614-1.jpg,8
3503,20180322-133710-1.jpg,8
3504,20180322-133743-1.jpg,8
3505,20180322-133752-1.jpg,8


In [31]:
next(iter(test_loader))

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pytorch_m1/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/pytorch_m1/lib/python3.8/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'DeepWeeds_Test' on <module '__main__' (built-in)>


RuntimeError: DataLoader worker (pid(s) 48151) exited unexpectedly

In [23]:
dir(test_loader)

['_DataLoader__initialized',
 '_DataLoader__multiprocessing_context',
 '_IterableDataset_len_called',
 '__annotations__',
 '__class__',
 '__class_getitem__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__iter__',
 '__le__',
 '__len__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__orig_bases__',
 '__parameters__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_auto_collation',
 '_dataset_kind',
 '_get_iterator',
 '_index_sampler',
 '_is_protocol',
 '_iterator',
 'batch_sampler',
 'batch_size',
 'check_worker_number_rationality',
 'collate_fn',
 'dataset',
 'drop_last',
 'generator',
 'multiprocessing_context',
 'num_workers',
 'persistent_workers',
 'pin_memory',
 'prefetch_factor',
 'sampler',
 'timeout',
 'worker_init_fn']

### Next three cells necessary to load and tweak a pre-trained ResNet-50 model

In [None]:
# This is here just to reset required before loading the model.

def get_parameters(model, features):
    '''  Only parameters that we've just initialized, i.e. the parameters with 
         requires_grad is True, are updated. (i.e. the last fc layer).
    '''

    params_to_update = model.parameters()

    print("[INFO] Params to learn:")
    if features:
        params_to_update = []
        for name,param in model.named_parameters():
            if param.requires_grad == True:
                params_to_update.append(param)
                print("\t",name)
    else:
        for name,param in model.named_parameters():
            if param.requires_grad == True:
                print("\t",name)
    print()

    # Observe that all parameters are optimized
    # optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)
    opt = optim.Adam(params_to_update, lr=1e-3)
    sch = optim.lr_scheduler.ReduceLROnPlateau(
        opt, patience=16, factor=0.5, min_lr=0.00003125)

    return opt, sch

In [None]:
# 3) Init a new ResNet50 model.

def set_parameter_requires_grad(model, feature_extracting):
    '''FIXME
    '''
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False
            
def initialize_model(model_name, num_classes, feature_extract):
    '''FIXME
    '''
     # Init a new ResNet50 model (called below)
    model_ft = None
    input_size = 0
    if model_name == "resnet50":
        """ Resnet50
        """
        model_ft = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224
    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

def init_model():
    '''FIXME
    '''
    model, input_size = initialize_model('resnet50', 9, 
                                            feature_extract=True)
    if torch.cuda.is_available():
        model.to('cuda') #IMPORTANT!
    
    return model, input_size

In [None]:
import numpy as np
import pandas as pd

'''
TEST
'''

def test(test_loader, model):
  model.eval()
  correct = 0
  targets, preds = [], []

  with torch.no_grad():
    for data, target in test_loader:
      if torch.cuda.is_available():
        data = data.cuda()
        target = target.cuda()
      output = model(data)
      pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
      correct += pred.eq(target.view_as(pred)).sum().item()

      targets += list(target.cpu().numpy())
      preds += list(pred.cpu().numpy())
  
  test_acc = 100. * correct / len(test_loader.dataset)
  confusion_mtx = sm.confusion_matrix(targets, preds, normalize='all')

  return test_acc, confusion_mtx

In [None]:
def run_test_kfold(model):
  
  metrics = {}

  for idx in range(params.FOLDS):
    test_label_file = "{}/test_subset{}.csv".format(LABEL_PATH, idx)
    test_df = pd.read_csv(test_label_file)
    #copy_test_files(test_df, params.IMG_TEST_PATH)

    test_dataset = DeepWeeds_Test(test_label_file)
    test_loader  = DataLoader(test_dataset, 
      batch_size=params.BATCH_SIZE, shuffle=False,
      pin_memory=torch.cuda.is_available(), 
      num_workers=2)

    # --- Get metrics for each fold.
    metrics[idx] = test(test_loader, model)
    # ---

    #delete_test_files(params.IMG_TEST_PATH)
    #cnt = len([name for name in os.listdir(params.IMG_TEST_PATH) \
    #          if os.path.isfile(os.path.join(params.IMG_TEST_PATH, name))])
    #assert cnt == 0
  
  return metrics

### Next cell will instantite the model ready to run. 

In [None]:
def load_model(name):
    '''FIXME
    '''
    model, input_size = init_model()
    model.load_state_dict(torch.load(name))
    return model

### Load the model and test.

In [None]:
filename = '20220426-074813_no_aug_model.pth' # must provide
model = load_model(filename)
print("Running test set with: " + filename)

results = run_test_kfold(model)