In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [None]:
import os
import torchvision.transforms as T
import torchvision.transforms.functional as TF
import torchvision
import json
import numpy as np
import pickle 
import torch

from PIL import Image
from torch.utils.data import Dataset

config = {
    "_dataset_choices": "cifar_original | cifar_zs | cifar_wv | cifar_oneshot",
    "dataset": "cifar_original", 
    "model_name": "base.pth",
    "data_dir": "../data",
    "model_dir": "./saved_models",
    "w2v_dir": "../data/wordvectors/",
    
    "lr": 0.1,
    "batch_size": 128,
    "epochs": 200,
    "resume": None,

    "test_classes": ["bicycle", "girl", "table", "crocodile", "maple_tree", "wolf", "raccoon", "dolphin", "train", "crab", "boy", "pear", "leopard", "cup", "lizard"],

    "fasttext": "w2v_fasttext.pkl",
    "googlenews": "w2v_google.pkl",
    "glove": "w2v_glove.pkl",
    "glove_twitter": "w2v_glove_twitter.pkl",
    "bert": "w2v_bert.pkl",
    "support_file": "cifar_kshot_support.pkl"
}

def cifarOriginal(data_dir, train_transforms, transforms):
    train_data = torchvision.datasets.CIFAR100(
        root=data_dir, train=True, download=True, transform=T.Compose(train_transforms+transforms))

    test_data = torchvision.datasets.CIFAR100(
        root=data_dir, train=False, download=True, transform=T.Compose(transforms))

    return train_data, test_data

class cifarZSClassification(Dataset):
    def __init__(self, data_dir, train_transforms, transforms, train):
        self.train = train
        if self.train == True:
            self.transform = T.Compose(train_transforms + transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=True, download=True, transform=self.transform)    
        else:
            self.transform = T.Compose(transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=False, download=True, transform=self.transform)

        test_classes = [torch_data.classes.index(c) for c in config["test_classes"]]

        if train == True:
            self.target_classes = np.delete(np.arange(100), test_classes)
        else:
            self.target_classes = np.array(test_classes)

        self.indices = [i for i in range(len(torch_data.targets)) if torch_data.targets[i] in self.target_classes]

        self.data = torch_data.data[self.indices]
        self.targets = list(np.array(torch_data.targets)[self.indices])

    def __getitem__(self, index):
        img, target = self.data[index], self.targets[index]

        img = Image.fromarray(img)

        if self.transform is not None:
            img = self.transform(img)

        return img, target

    def __len__(self):
        return len(self.data)

class cifarZSW2V(Dataset):
    def __init__(self, data_dir, train_transforms, transforms, train, vector_type='glove'):
        self.train = train
        np.random.seed(10)
        if train == True:
            self.transform = T.Compose(train_transforms + transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=True, download=True, transform=self.transform)    
        else:
            self.transform = T.Compose(transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=False, download=True, transform=self.transform)

        test_classes = [torch_data.classes.index(c) for c in config["test_classes"]]
        self.classes = torch_data.classes
        self.word_vectors = pickle.load(open(os.path.join(config['w2v_dir'], config[vector_type]), 'rb'))
        # print (self.word_vectors)
        if train == True:
            self.target_classes = np.delete(np.arange(100), test_classes)
        else:
            self.target_classes = np.array(test_classes)
        # print (self.classes, len(self.classes))
        # print ()
        self.target_wv = [self.word_vectors[self.classes[idx]] for idx in self.target_classes]
        
        self.indices = [i for i in range(len(torch_data.targets)) if torch_data.targets[i] in self.target_classes]

        self.data = torch_data.data[self.indices]
        self.targets = list(np.array(torch_data.targets)[self.indices])
        self.negatives = [self.word_vectors[self.classes[np.random.choice([c for c in self.target_classes if c != self.targets[idx]])]] for idx in range(len(self.data))]

    def __getitem__(self, index):
        img, target = self.data[index], self.word_vectors[self.classes[self.targets[index]]]
        target_class = self.targets[index]
        negative = self.negatives[index]
        img = Image.fromarray(img)

        if self.transform is not None:
            img = self.transform(img)
        # print (target)
        target = TF.Tensor(target) 
        negative = TF.Tensor(negative)
        return img, target, target_class, negative

    def __len__(self):
        return len(self.data)

class cifarKShot(Dataset):
    def __init__(self, data_dir, train_transforms, transforms, train, k=1):
        self.k = k
        self.train = train
        if train == True:
            self.transform = T.Compose(train_transforms + transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=True, download=True, transform=self.transform)    
        else:
            self.transform = T.Compose(transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=False, download=True, transform=self.transform)

        test_classes = [torch_data.classes.index(c) for c in config["test_classes"]]
        support = pickle.load(open(os.path.join(config['data_dir'], config['support_file']), "rb"))
        self.support_data, self.support_indices = support["images"], support["indices"]
        if train == True:
            self.target_classes = np.delete(np.arange(100), test_classes)
        else:
            self.target_classes = np.array(test_classes)

        self.indices = [i for i in range(len(torch_data.targets)) if torch_data.targets[i] in self.target_classes]

        self.data = torch_data.data[self.indices]
        self.targets = list(np.array(torch_data.targets)[self.indices])

    def __getitem__(self, index):
        img, target = self.data[index], self.targets[index]
        img = Image.fromarray(img)
        
        if not self.train:
            query = self.support_data[self.targets[index]][0]
            query = Image.fromarray(query)
        
        if self.transform is not None:
            img = self.transform(img)
            if not self.train:
                query = self.transform(query)
        
        if self.train:
            return img, target
        else:
            return img, query, target

    def __len__(self):
        return len(self.data)

class kShotSupport(Dataset):
    def __init__(self, data_dir, transforms, train, k=1):
        self.k = k
        self.train = train
        if train == True:
            self.transform = T.Compose(transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=True, download=True, transform=self.transform)    
        else:
            self.transform = T.Compose(transforms)
            torch_data = torchvision.datasets.CIFAR100(
                root=data_dir, train=False, download=True, transform=self.transform)

        test_classes = [torch_data.classes.index(c) for c in config["test_classes"]]
        support = pickle.load(open(os.path.join(config['data_dir'], config['support_file']), "rb"))
        self.support_data, self.support_indices = support["images"], support["indices"]
        if train == True:
            self.target_classes = np.delete(np.arange(100), test_classes)
        else:
            self.target_classes = np.array(test_classes)

        self.indices = [i for i in range(len(torch_data.targets)) if torch_data.targets[i] in self.target_classes]
        # self.targets = list(np.array(torch_data.targets)[self.indices])

        self.data = []
        self.targets = []
        for c_i in self.target_classes:
            for j in range(self.k):
                img = self.support_data[c_i][0]
                img = Image.fromarray(img)
                img = self.transform(img)
                self.data.append(img)
                self.targets.append(c_i)

        
        self.data = torch.stack(self.data)
        print ("Support Data: ", self.data.shape, self.targets)
        
        

In [None]:
## this is the torchvision implementation of resnet
# we modify this network for cifar as cifar is 32 x 32. 
# comment with "change:" are made at each line modifications were made

from typing import Type, Any, Callable, Union, List, Optional

import torch
import torch.nn as nn
from torch import Tensor
import torch.nn.functional as F
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )


def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class BasicBlock(nn.Module):
    expansion: int = 1

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError("BasicBlock only supports groups=1 and base_width=64")
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion: int = 4

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.0)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(
        self,
        block: Type[Union[BasicBlock, Bottleneck]],
        layers: List[int],
        num_outputs: int = 100,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        replace_stride_with_dilation: Optional[List[bool]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError(
                "replace_stride_with_dilation should be None "
                f"or a 3-element tuple, got {replace_stride_with_dilation}"
            )
        self.groups = groups
        self.base_width = width_per_group
        
        # change: kernel size and stride
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        
        # change: dropout at the end of block
        self.dropout = nn.Dropout(0.4)
        
        self.fc = nn.Linear(512 * block.expansion, num_outputs)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            # change: initialize linear weights with normal weight
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, std=1e-3)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)  # type: ignore[arg-type]
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

    def _make_layer(
        self,
        block: Type[Union[BasicBlock, Bottleneck]],
        planes: int,
        blocks: int,
        stride: int = 1,
        dilate: bool = False,
    ) -> nn.Sequential:
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(
            block(
                self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
            )
        )
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(
                block(
                    self.inplanes,
                    planes,
                    groups=self.groups,
                    base_width=self.base_width,
                    dilation=self.dilation,
                    norm_layer=norm_layer,
                )
            )

        return nn.Sequential(*layers)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.dropout(x)

        # change: in avg pool size; we do not sure
        x = F.avg_pool2d(x, 4)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)


def _resnet(
    arch: str,
    num_outputs: int,
    block: Type[Union[BasicBlock, Bottleneck]],
    layers: List[int],
    **kwargs: Any,
) -> ResNet:
    model = ResNet(block, layers, num_outputs, **kwargs)
    return model


def resnet18(num_outputs=100, **kwargs: Any) -> ResNet:
    r"""ResNet-18 model from
    `"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_.

    Args:
        num_outputs (int): number of final output neurons
    """
    # change: add num_outputs to 
    return _resnet("resnet18", num_outputs, BasicBlock, [2, 2, 2, 2], **kwargs)

In [None]:
import torch
import torch.nn as nn
from torch import Tensor
import torch.nn.functional as F
# from models.resnet import *

class OneShotBaseModel(nn.Module):
    def __init__(self, num_outputs=100):
        super(OneShotBaseModel, self).__init__()

        self.resnet = resnet18(300)
        self.pre_bn = nn.BatchNorm1d(300)
        self.linear_1 = nn.Linear(300, num_outputs)
        self.relu = nn.ReLU(inplace=True)
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, std=1e-3)
                nn.init.constant_(m.bias, 0)
    def forward(self, img):
        embedding = self.resnet(img)
        out = self.relu(self.linear_1(embedding))

        return out, embedding

class Siamese(nn.Module):
    def __init__(self, num_outputs=300):
        super(Siamese, self).__init__()

        self.resnet = resnet18(300)
        # self.pre_bn = nn.BatchNorm1d(300)
        # self.linear_1 = nn.Linear(300, num_outputs)
        # self.relu = nn.ReLU(inplace=True)
    
    def forward(self, img, target):
        embedding = self.resnet(img)
        embedding = self.resnet(target)
        return embedding

In [None]:
!pip3 install pytorch_metric_learning

Collecting pytorch_metric_learning
  Downloading pytorch_metric_learning-1.0.0-py3-none-any.whl (102 kB)
[?25l[K     |███▏                            | 10 kB 26.1 MB/s eta 0:00:01[K     |██████▍                         | 20 kB 22.0 MB/s eta 0:00:01[K     |█████████▋                      | 30 kB 9.9 MB/s eta 0:00:01[K     |████████████▊                   | 40 kB 9.1 MB/s eta 0:00:01[K     |████████████████                | 51 kB 5.1 MB/s eta 0:00:01[K     |███████████████████▏            | 61 kB 5.6 MB/s eta 0:00:01[K     |██████████████████████▎         | 71 kB 5.4 MB/s eta 0:00:01[K     |█████████████████████████▌      | 81 kB 6.0 MB/s eta 0:00:01[K     |████████████████████████████▊   | 92 kB 4.7 MB/s eta 0:00:01[K     |███████████████████████████████▉| 102 kB 5.1 MB/s eta 0:00:01[K     |████████████████████████████████| 102 kB 5.1 MB/s 
Installing collected packages: pytorch-metric-learning
Successfully installed pytorch-metric-learning-1.0.0


In [None]:
import os
import torch
import json

import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
import torch.backends.cudnn as cudnn

from pytorch_metric_learning import losses

from sklearn.neighbors import KNeighborsClassifier
# from datasets.dataloader import *
# from models.oneshot_model import *

# tqdm progressbarfrom:
# https://towardsdatascience.com/training-models-with-a-progress-a-bar-2b664de3e13e
from tqdm import tqdm
from time import sleep
# config_file = "./configs/config_oneshot_base.json"
config = {
    "_dataset_choices": "cifar_original | cifar_zs | cifar_wv | cifar_oneshot",
    "dataset": "cifar_oneshot", 
    "model_name": "oneshot_10shot.pth",
    "data_dir": "../data",
    "model_dir": "./saved_models",
    "w2v_dir": "../data/wordvectors/",
    
    "lr": 0.1,
    "batch_size": 512,
    "epochs": 500,
    "resume": None,

    "test_classes": ["bicycle", "girl", "table", "crocodile", "maple_tree", "wolf", "raccoon", "dolphin", "train", "crab", "boy", "pear", "leopard", "cup", "lizard"],

    "fasttext": "w2v_fasttext.pkl",
    "googlenews": "w2v_google.pkl",
    "glove": "w2v_glove.pkl",
    "glove_twitter": "w2v_glove_twitter.pkl",
    "bert": "w2v_bert.pkl",
    "support_file": "cifar_kshot_support.pkl"
}

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print (f"Device: {device}")
print(f'Loading Data ({data_dir, dataset})...')
print(f'Models will be saved in {model_dir} as {model_name}')

# reference for data transfomations from:
# https://github.com/kuangliu/pytorch-cifar/blob/master/main.py
train_transforms = [
    T.RandomCrop(32, padding=4),
    T.RandomHorizontalFlip(),
]

# normalization constants from
# https://www.programcreek.com/python/example/105099/torchvision.datasets.CIFAR100
transforms = [
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
]

def test(epoch):
    model.eval()

    correct = 0
    total = 0
    with torch.no_grad():
        with tqdm(test_loader, unit='batch') as tepoch:
            for inputs, query, targets in tepoch:
                tepoch.set_description(f'Epoch: {epoch}')
                inputs, targets = inputs.to(device), targets.to(device)
                outputs, embedding = model(inputs)
                _, support_embedding = model(support.data.to(device))

                support_embedding = support_embedding.cpu().data.numpy()

                # print ("Support Embedding Shape: ", support_embedding.shape)
                test_classifier = KNeighborsClassifier(n_neighbors=3)
                test_classifier.fit(support_embedding, np.array(support.targets))

                preds = test_classifier.predict(embedding.cpu().data.numpy())
                preds = torch.Tensor(preds)
                # preds = torch.Tensor(train_data.target_classes[predicted])
                # predict_probas = train_classifier.predict_proba(outputs.cpu().data.numpy())

                loss = criterion(outputs, targets)
                
                t = targets.cpu()
                total += targets.size(0)
                correct += (preds == t).sum().item()
                accuracy = 100.*correct/total
                tepoch.set_postfix(
                    loss=loss.item(), 
                    accuracy= accuracy, 
                    correct=correct, 
                    total=total
                )
    
    return accuracy

# best_acc = 0
# print ("Starting Training...")
# for epoch in range(config['epochs']):
#     train(epoch)
#     test_acc = test(epoch)
#     if test_acc > best_acc:
#         best_acc = test_acc
#         save_model(epoch, test_acc)
#     scheduler.step()

Device: cuda
Loading Data (('../data', 'cifar_oneshot'))...
Models will be saved in ./saved_models/oneshot as oneshot_10shot.pth


In [None]:
# train_loader = torch.utils.data.DataLoader(
#     train_data, batch_size=batch_size, shuffle=True, num_workers=2)

# test_loader = torch.utils.data.DataLoader(
#     test_data, batch_size=batch_size, shuffle=False, num_workers=2)

print("Creating Model...")
model = OneShotBaseModel()

model = model.to(device)

model.load_state_dict(torch.load('/content/gdrive/MyDrive/SchoolWork/CV/oneshot_triplet.pth')['model'])

if device == "cuda":
    cudnn.benchmark = True

# criterion = losses.TripletMarginLoss()

# optimizer = optim.SGD(model.parameters(), lr=config['lr'],
#                       momentum=0.9, weight_decay=0.0005)

# scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)


Creating Model...


In [None]:
from torchvision.datasets import CIFAR100
test_loader = torch.utils.data.DataLoader(CIFAR100, batch_size=64)

In [None]:
train_features, train_labels = next(iter(test_loader))

TypeError: ignored