In [21]:
import os
import torch
import torchvision
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import urllib.request
import shutil
import json
from PIL import Image
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score

In [22]:
class image_data_set(torch.utils.data.Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
  
    def __len__(self):
        return len(self.data)
  
    def __getitem__(self, index):
        return {'data': self.data[index], 'label': self.labels[index]}

In [23]:
def download_json_file(downloaded_data_dir, json_file_name, blob_name):    
    if not os.path.isdir(downloaded_data_dir + json_file_name):
        json_file_zip_name = json_file_name + ".zip"
        json_zip_to_download = blob_name + json_file_zip_name
        download_json_zip_command = "azcopy cp '%s' '%s'" % (json_zip_to_download, downloaded_data_dir)
        os.system(download_json_zip_command)
        shutil.unpack_archive(downloaded_data_dir + json_file_zip_name, downloaded_data_dir)
        os.remove(downloaded_data_dir + json_file_zip_name)
    else:
        print("Required json zip already downloaded")
        
def download_images(dir_name, downloaded_data_dir, blob_name):    
    
    print("Downloading data from directory: " + dir_name)
    
    if not os.path.isdir(downloaded_data_dir + dir_name):
        dir_to_download = blob_name + "public/" + dir_name
        download_dir_command = "azcopy cp '%s' '%s' --recursive" % (dir_to_download, downloaded_data_dir)
        os.system(download_dir_command)
    else:
        print("Required directory already downloaded")

In [38]:
def get_image_tensor(file_path):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    
    image = Image.open(file_path)
    return transform(image)
    
    
#TODO: train on single images and test on batches
def get_data_sets(dir_name, downloaded_data_dir, json_file_name, categories_to_label_dict): 
    json_file = open(downloaded_data_dir + json_file_name)
    coco_key = json.load(json_file)
    images = coco_key["images"]

    data, labels = [], []
    sequence_data, sequence_labels = [], []
    index = 0
    old_sequence_id = None
    while index < len(images):
        
        image = images[index]
        file_name = image["file_name"]
        file_path = downloaded_data_dir + file_name
        
        if file_name.startswith(dir_name) and os.path.isfile(file_path):
            category_id = coco_key["annotations"][index]["category_id"]
            label = categories_to_label_dict[category_id]
            try:
                image_tensor = get_image_tensor(file_path)
            except:
                print("Truncated image encountered, leaving out of training and testing")
                    
            sequence_id = image["seq_id"]
            if sequence_id == old_sequence_id or old_sequence_id == None:
                sequence_data.append(image_tensor)
                sequence_labels.append(label)
            else:
                data.append(sequence_data)
                labels.append(sequence_labels)
                sequence_data, sequence_labels = [], []
            
            old_sequence_id = sequence_id
            
            
    training_data, testing_data, training_labels, testing_labels = train_test_split(data, labels, test_size = 0.005)
    
    print("\nNumber of training sequences: " + str(len(training_data)))
    print("Number of testing sequences: " + str(len(testing_data)))
    
    json_file.close()
    
    #Chase change this back
    #shutil.rmtree(downloaded_data_dir + dir_name)
    
    return training_data, testing_data, training_labels, testing_labels

In [33]:
def print_image(image_tensor, prediction):
    if(prediction == 1):
        prediction_string = "Wildlife Present"
    else:
        prediction_string = "No Wildlife Present"

    #Alternative normalized RGB visualization: plt.imshow(image_tensor.cpu().permute(1, 2, 0).numpy())
    plt.imshow(image_tensor[0].cpu(), cmap="gray")
    plt.title("Incorrectly Predicted " + prediction_string) 
    plt.show()

def print_testing_analysis(all_labels, all_predictions, title):
    subplot = plt.subplot()

    cf_matrix = confusion_matrix(all_labels, all_predictions, labels=[1, 0])
    sns.heatmap(cf_matrix, annot=True, fmt='g', cmap='Blues')

    subplot.set_xlabel('Predictions')
    subplot.set_ylabel('Labels')
    subplot.set_title(title + ' Testing Confusion Matrix')
    subplot.xaxis.set_ticklabels(['Wildlife Present', 'No Wildlife Present'])
    subplot.yaxis.set_ticklabels(['Wildlife Present', 'No Wildlife Present'])
    plt.show()

    accuracy = accuracy_score(all_labels, all_predictions)
    print(title + " Accuracy: " + str(accuracy))

    precision, recall, f_score, support = precision_recall_fscore_support(all_labels, all_predictions, average='binary')
    print(title + " Precision: " + str(precision))
    print(title + " Recall: " + str(recall))
    print(title + " F-Score: " + str(f_score))

def train(model, training_loader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    num_correct = 0
    for i, data in enumerate(training_loader):
        data, labels = data['data'].to(device), data['label'].to(device)
        optimizer.zero_grad()
        output = model(data)
        
        loss = criterion(output, labels)
        running_loss += loss.item()
        _, predictions = torch.max(output.data, 1)
        num_correct += (predictions == labels).sum().item()
        loss.backward()
        optimizer.step()
    
    loss = running_loss/len(training_loader.dataset)
    accuracy = num_correct/len(training_loader.dataset)
    return loss, accuracy

def test(model, testing_loader, criterion, print_incorrect_images):
    model.eval()
    running_loss = 0.0
    num_correct = 0
    all_labels, all_predictions = [], []

    for i, data in enumerate(testing_loader):
        data, labels = data['data'].to(device), data['label'].to(device)
        
        output = torch.sum(output, 0)
        output = model(data)
        
        loss = criterion(output, labels)
        running_loss += loss.item()
        _, predictions = torch.max(output.data, 1)
        for index, prediction in enumerate(predictions):
            if(prediction == labels[index]):
                num_correct += 1
            elif(print_incorrect_images):
                print_image(data[index], prediction)

        all_labels.extend(labels.cpu())
        all_predictions.extend(predictions.cpu())
    
    loss = running_loss/len(testing_loader.dataset)
    accuracy = num_correct/len(testing_loader.dataset)
    return loss, accuracy, all_labels, all_predictions

In [34]:
def train_and_test_models(resnet50, resnet152, vit_l_16, training_loader, testing_loader, device, criterion):
    print("\nTraining and Testing ResNet50")
    train_and_test(resnet50, training_loader, testing_loader, device, criterion)

    print("\nTraining and Testing ResNet152")
    train_and_test(resnet152, training_loader, testing_loader, device, criterion)

    print("\nTraining and Testing ViT Large 16")
    train_and_test(vit_l_16, training_loader, testing_loader, device, criterion)

def train_and_test(model, training_loader, testing_loader, device, criterion):
    if torch.cuda.device_count() > 1:
        print("Multiple GPUs available, using: " + str(torch.cuda.device_count()))
        model = nn.DataParallel(model)
        
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
        
    training_loss, training_accuracy = train(model, training_loader, criterion, optimizer)
    print("training loss: " + str(training_loss) + " and training accuracy: " + str(training_accuracy))
        
    testing_loss, testing_accuracy, _, _ = test(model, testing_loader, criterion, False)
    print("testing loss: " + str(testing_loss) + " and testing accuracy: " + str(testing_accuracy))

# Declaring Constants

In [35]:
num_epochs = 2
num_classes = 2
batch_size = 20
json_file_name = "idaho-camera-traps.json"
dir_prefix = "loc_"

#Chase change this back
downloaded_data_dir = "downloaded_data/"

blob_name = "https://lilablobssc.blob.core.windows.net/idaho-camera-traps/"

# Mapping canines, big cats, bears, and ungulates to wildlife present and all other categories to no wildlife present
# This is mostly arbitrary and could be reworked, we just need to draw the line somewhere
categories_to_label_dict = {
    0:0, 1:0, 2:0, 3:1, 4:0, 5:1, 6:1, 7:0, 8:0, 9:1, 
    10:1, 11:0, 12:1, 13:1, 14:0, 15:0, 16:1, 17:0, 18:1, 19:0,
    20:1, 21:0, 22:1, 23:0, 24:1, 25:0, 26:0, 27:0, 28:0, 29:0,
    30:0, 31:0, 32:0, 33:0, 34:0, 35:0, 36:0, 37:0, 38:1, 39:1,
    40:1, 41:0, 42:0, 43:0, 44:0, 45:1, 46:0, 47:0, 48:1, 49:0,
    50:0, 51:0, 52:0, 53:0, 54:0, 55:0, 56:0, 57:0, 58:0, 59:0,
    60:0, 61:0,
}

print(torch.__version__)
print(torchvision.__version__)
print("torch.cuda.is_available(): " + str(torch.cuda.is_available()))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.cuda.empty_cache()
criterion = nn.CrossEntropyLoss()

1.12.1
0.13.1
torch.cuda.is_available(): False


# Declaring Models

In [36]:
resnet50 = models.resnet50(weights = models.ResNet50_Weights.DEFAULT)
resnet50.fc.out_features = num_classes

resnet152 = models.resnet152(weights = models.ResNet152_Weights.DEFAULT)
resnet152.fc.out_features = num_classes

vit_l_16 = models.vit_l_16(weights = models.ViT_L_16_Weights.DEFAULT)
vit_l_16.heads.out_features = num_classes

# Orchestration

In [None]:
download_json_file(downloaded_data_dir, json_file_name, blob_name)

all_testing_data, all_testing_labels = [], []

for epoch in range(num_epochs):
    
    print("Training epoch: " + str(epoch))
    
    #Chase change this back
    for i in range(1):
        
        dir_name = dir_prefix + '{0:04}'.format(i)
        download_images(dir_name, downloaded_data_dir, blob_name)
        training_data, testing_data, training_labels, testing_labels = get_data_sets(dir_name, downloaded_data_dir, json_file_name, categories_to_label_dict)
    
        if (epoch == 0):
            all_testing_data.append(testing_data)
            all_testing_labels.append(testing_labels)
    
        training_data_set = image_data_set(training_data, training_labels)
        testing_data_set = image_data_set(testing_data, testing_labels)
        training_loader = DataLoader(dataset = training_data_set, batch_size = batch_size, shuffle = True)
        testing_loader = DataLoader(dataset = testing_data_set, batch_size = batch_size, shuffle = True)
    
        train_and_test_models(resnet50, resnet152, vit_l_16, training_loader, testing_loader, device, criterion)

INFO: Scanning...
INFO: azcopy: A newer version 10.18.1 is available to download

INFO: Any empty folders will not be processed, because source and/or destination doesn't have full folder support

Job de3bc525-00c2-e84f-6150-4c8930b00878 has started
Log file is located at: /Users/ChaseIson 1/.azcopy/de3bc525-00c2-e84f-6150-4c8930b00878.log

100.0 %, 1 Done, 0 Failed, 0 Pending, 0 Skipped, 1 Total,                                


Job de3bc525-00c2-e84f-6150-4c8930b00878 summary
Elapsed Time (Minutes): 0.2001
Number of File Transfers: 1
Number of Folder Property Transfers: 0
Total Number of Transfers: 1
Number of Transfers Completed: 1
Number of Transfers Failed: 0
Number of Transfers Skipped: 0
TotalBytesTransferred: 26765533
Final Job Status: Completed

Training epoch: 0
Downloading data from directory: loc_0000
Required directory already downloaded


# Final Testing

In [None]:
os.remove(downloaded_data_dir + json_file_name)
final_testing_data_set = image_data_set(sum(all_testing_data, []), sum(all_testing_labels, []))
final_testing_loader = DataLoader(dataset = final_testing_data_set, batch_size = batch_size, shuffle = True)

testing_loss, testing_accuracy, labels, predictions = test(resnet50, final_testing_loader, criterion, True)
print_testing_analysis(labels, predictions, "ResNet50 Overall")

testing_loss, testing_accuracy, labels, predictions = test(resnet152, final_testing_loader, criterion, True)
print_testing_analysis(labels, predictions, "ResNet152 Overall")

testing_loss, testing_accuracy, labels, predictions = test(vit_l_16, final_testing_loader, criterion, True)
print_testing_analysis(labels, predictions, "ViT Large 16 Overall")