In [2]:
import torch
from sklearn.metrics import f1_score, precision_score, recall_score


In [3]:
import matplotlib.pyplot as plt
%matplotlib inline
import torch.nn.functional as F
from torch import nn
import torch
import numpy as np
from torch.nn import Parameter
from torchvision.datasets import CIFAR10
from torchvision.transforms import ToTensor, Compose, RandomRotation, ToPILImage
torch.cuda.is_available()
torch.random.manual_seed(42)
np.random.seed(42)

In [4]:
tfms = Compose([ToTensor()])

In [5]:
trainset = CIFAR10('./', 
                   download=True, 
                   train=True,
                   transform=tfms)
testset = CIFAR10('./', 
                  train=False,
                  transform=tfms)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128,
                                          # sampler=train_sampler,
                                          shuffle=False,
                                          num_workers=0)
testloader = torch.utils.data.DataLoader(testset, batch_size=128,
                                         # sampler=test_sampler,
                                         shuffle=False,
                                         num_workers=0)

from skimage.exposure import rescale_intensity
import numpy as np
from scipy import fftpack
from scipy.ndimage import maximum_filter, minimum_filter
from torch import Tensor, uint8


def rescale(image: np.ndarray, mn: int = 0, mx: int = 1):
    return rescale_intensity(image, out_range=(mn, mx))


def rescale_torch(image: Tensor) -> Tensor:
    return (255 * (image - image.min()) / (image.max() - image.min())).type(uint8)


def rot90(matrix: Tensor) -> Tensor:
    dims = range(len(matrix.shape))
    return matrix.transpose(dims[-2], dims[-1]).flip(2)


def weight_rotate_(weight: np.array, rot: int = 0) -> np.array:
    if rot % 4 == 0:
        return weight
    if rot % 4 == 1:
        return np.rot90(weight)
    if rot % 4 == 2:
        return np.rot90(np.rot90(weight))
    if rot % 4 == 3:
        return np.rot90(np.rot90(np.rot90(weight)))

def weight_rotate(weight: Tensor, rot: int = 0) -> Tensor:
    if rot % 4 == 0:
        return weight
    if rot % 4 == 1:
        return rot90(weight)
    if rot % 4 == 2:
        return rot90(rot90(weight))
    if rot % 4 == 3:
        return rot90(rot90(rot90(weight)))


def edges(image: np.ndarray,
          size: int = 3) -> np.ndarray:
    image = rescale(image)
    ratio = np.zeros_like(image)
    if len(image.shape) > 2:
        ratio = np.divide(maximum_filter(image, (size, size, size)) + 1,
                          minimum_filter(image, (size, size, size)) + 1)
    else:
        ratio = np.divide(maximum_filter(image, (size, size)) + 1,
                          minimum_filter(image, (size, size)) + 1)
    ratio = rescale(20 * np.log(ratio))
    return ratio


def add_edges(image: np.ndarray, a: float = 0.5, b: float = 0.5,
              size: int = 3) -> np.ndarray:
    ratio = np.zeros_like(image)
    if len(image.shape) > 2:
        ratio = np.divide(maximum_filter(image, (size, size, size)) + 1,
                          minimum_filter(image, (size, size, size)) + 1)
    else:
        ratio = np.divide(maximum_filter(image, (size, size)) + 1,
                          minimum_filter(image, (size, size)) + 1)
    ratio = rescale(20 * np.log(ratio))
    image = rescale(image)
    return (a * image + b * ratio) / (a + b)


def EME(image):
    return np.sum(edges(image)) / image.size


def alpha_rooting_fourier(image: np.ndarray, alpha: float = 0.9) -> np.array:
    ffted = fftpack.fft2(image)
    abs_ffted = np.absolute(ffted) ** alpha
    iffted = fftpack.ifft2(abs_ffted * np.divide(ffted, np.absolute(ffted),
                                                 out=np.zeros_like(ffted),
                                                 where=np.absolute(ffted) != 0))
    iffted = rescale(np.absolute(iffted), 0, 1)  # .astype(int)
    return iffted


def calc_output_shape(in_shape, kernel, stride, pad):
    w, h = in_shape
    w_ = (w - kernel + 2 * pad) / stride + 1
    h_ = (h - kernel + 2 * pad) / stride + 1
    return int(w_), int(h_)


def train_1_batch(model):
    model.train()
    _, (data, label) = list(enumerate(trainloader))[0]

    data, label = torch.autograd.Variable(data).cuda(), torch.autograd.Variable(label).cuda()
    optimizer.zero_grad()
    output = model(data)
    # label_ = one_hot_enc(output, label, 7)
    loss = criterion(output, label)
    y_pred = torch.max(output, 1)[1]
    print("Output:", output)
    print("Loss on 1 batch:", loss.data.item())
    print("Accuracy on 1 batch:", y_pred.eq(label.data).cpu().sum())
    print("predictions:", y_pred.data)
    print("truth:", label.data)
    loss.backward()
    optimizer.step()


def train_portion(model, epoch, step, portion=20):
    model.train()
    assert isinstance(portion, int)
    for batch_id, (data, label) in enumerate(trainloader):
        if batch_id < portion:
            data, label = torch.autograd.Variable(data).cuda(), torch.autograd.Variable(label).cuda()
            optimizer.zero_grad()
            output = model(data)
            label_ = one_hot_enc(output, label, 10)
            loss = criterion(output, label_)
            y_pred = torch.max(output, 1)[1]
            # print("Output:", output)
            if epoch % step == 0:
                print(f"Epoch {epoch}, training loss on 1 batch:", loss.data.item())
            train_losses.append(loss.data.item())
            loss.backward()
            optimizer.step()


def test_portion(model, epoch, step, portion=20):
    model.eval()
    assert isinstance(portion, int)
    with torch.no_grad():
        for batch_id, (data, label) in enumerate(testloader):
            if batch_id < portion:
                data, label = torch.autograd.Variable(data).cuda(), torch.autograd.Variable(label).cuda()
                output = model(data)
                label_ = one_hot_enc(output, label, 10)
                loss = criterion(output, label_)
                y_pred = torch.max(output, 1)[1]
                test_losses.append(loss.data.item())
                # print("Output:", output)
                if epoch % step == 0:
                    print(f"Epoch {epoch}, testing loss on 1 batch:", loss.data.item())





Files already downloaded and verified


In [289]:
import torch.nn.functional as F
from torch import nn
import torch
import numpy as np
from matplotlib import pyplot as plt


class HarmonicBlock(nn.Module):
    def __init__(self, input_channels, output_ch,
                 bn=True,
                 dropout=False,
                 kernel_size=3,
                 lmbda=None,
                 diag=False, 
                 pad=1, 
                 stride=1,
                 bias = False):
        super(HarmonicBlock, self).__init__()
        """
        :param input_channels: number of channels in the input
        :param kernel_size: size of the kernel in the filter bank
        :param pad: padding size
        :param stride: stride size
        :param lmbda: number of filters to be actually used (feature not implemented)
        """
        self.bn = bn
        self.drop = dropout
        self.input_channels = input_channels
        self.output_ch = output_ch
        self.pad = pad
        self.stride = stride
        self.K = kernel_size
        self.diag = diag
        self.N = self.K  # preferably to have N=K !! (to fully replicate the paper), this is the convolution window size
        self.PI = torch.as_tensor(np.pi)
        if lmbda is not None:
            if lmbda > self.K ** 2:
                self.lmbda = self.K ** 2  # limits the number of kernels
            else:
                self.lmbda = lmbda
        else:
            self.lmbda = lmbda
        self.diag = diag  # flag to select diagonal entries of the block
        self.filter_bank = self.get_filter_bank(N=self.N,
                                                K=self.K,  # kernel size
                                                input_channels=self.input_channels,
                                                lmbda=self.lmbda,
                                                diag=self.diag).to('cuda')
        self.conv = nn.Conv2d(in_channels=self.filter_bank.shape[0], out_channels=self.output_ch,
                              kernel_size=1,
                              padding=0,
                              stride=1, bias=bias)
        
        if self.bn:
            self.bnorm = nn.BatchNorm2d(self.filter_bank.shape[0])
        if self.drop:
            self.dropout = nn.Dropout(0.5)
    def fltr(self, u, v, N, k):
        return torch.as_tensor([[torch.cos(torch.as_tensor(self.PI / N * (ii + 0.5) * v)) * torch.cos(
            torch.as_tensor(self.PI / N * (jj + 0.5) * u)) for ii in range(k)] for jj in range(k)])


    def get_idx(self, K, l):
        out = []
        for i in range(K):
            for j in range(K):
                if i + j < l:
                    out.append(K * i + j)
        return tuple(out)

    def get_idx_diag(self, K):
        out = []
        for i in range(K):
            for j in range(K):
                if i == j:
                    out.append(i + j)
        return tuple(out)

    def draw_filters(self, fb_):
        fig, ax = plt.subplots(len(fb_), 1, figsize=(12, 4))
        j = 0
        for i in range(len(fb_)):
            ax[i].imshow(fb_[i, 0, :, :])
            ax[i].axis('off')
            ax[i].grid(False)

    def get_filter_bank(self, N, K, input_channels=3, lmbda=None, diag=False):
        filter_bank = torch.zeros((K, K, K, K))
        for i in range(K):
            for j in range(K):
                filter_bank[i, j, :, :] = self.fltr(i, j, N, K)
        if lmbda is not None:
            ids = self.get_idx(K, lmbda)
            return torch.stack(tuple([filter_bank.view(-1, 1, K, K)] * input_channels), dim=1).view(
                (-1, input_channels, K, K))[ids, :, :, :].view(-1,1,K,K)  # filter_bank.view(K**2,-1,K,K)
        if diag:
            ids = self.get_idx_diag(K)
            return torch.stack(tuple([filter_bank.view(-1, 1, K, K)] * input_channels), dim=1).view(
                (-1, input_channels, K, K))[ids, :, :, :].view(-1,1,K,K)   # filter_bank.view(K**2,-1,K,K)[ids,:,:,:]
        return torch.stack(tuple([filter_bank.view(-1, 1, K, K)] * input_channels), dim=1).view(
            (-1, 1, K, K))

    def forward(self, x):
        x = F.conv2d(x.to(torch.float32),
                     weight=self.filter_bank.to(torch.float32),
                     padding=self.pad,
                     stride=self.stride,
                     groups=self.input_channels)  # int(self.K/2)
        if self.bn:
            x = self.bnorm(x)
        if self.drop:
            x = self.dropout(x)
        x = self.conv(x)
        return x

In [290]:
h = HarmonicBlock(input_channels=5, output_ch=3,kernel_size=5).cuda()
x=torch.from_numpy(np.random.randn(1,5,32,32)).cuda()

In [291]:
h(x).shape

torch.Size([1, 3, 30, 30])

In [292]:
class WideHarmonicResNet(nn.Module):
    def __init__(self, in_channels, depth, num_classes=10, widen_factor=1, lmbda=None, diag=False ):
        super(WideHarmonicResNet, self).__init__()
        nChannels = [16, 16*widen_factor, 32*widen_factor, 64*widen_factor]
        assert((depth - 4) % 6 == 0)
        n = (depth - 4) // 6
        block = HarmonicBlock
        self.lmbda=lmbda
        self.diag=diag
        self.conv1 = HarmonicBlock(input_channels=3, output_ch=nChannels[0],
                                   kernel_size=3, 
                                   stride=1,
                                   pad=1, bias=False)
        self.drop = nn.Dropout(0.5)
        
        self.stack1 = self._make_layer(block, 
                                       nb_layers=n, 
                                       in_planes=nChannels[0],
                                       out_planes=nChannels[1],
                                       kernel_size=3,
                                       stride=1,
                                       pad=1)
        self.stack2 = self._make_layer(block, 
                                       nb_layers=n, 
                                       in_planes=nChannels[1],
                                       out_planes=nChannels[2],
                                       kernel_size=3,
                                       stride=2,
                                       pad=1)
        self.stack3 = self._make_layer(block, 
                                       nb_layers=n, 
                                       in_planes=nChannels[2],
                                       out_planes=nChannels[3],
                                       kernel_size=3,
                                       stride=2,
                                       pad=1)
        self.bn1 = nn.BatchNorm2d(nChannels[3])
        self.relu = nn.ReLU(inplace=True)
        self.fc = nn.Linear(nChannels[3], num_classes)
        self.nChannels = nChannels[3]
        
    def _make_layer(self, block, in_planes, out_planes, nb_layers, stride, kernel_size, pad):
        strides = [stride] +[1]*(nb_layers-1)
        stacking = []
        for st in strides:
            stacking.append(block(input_channels=in_planes,
                                  lmbda=self.lmbda,
                                  diag=self.diag,
                                  output_ch=out_planes,
                                  kernel_size=kernel_size,
                                  stride=st,
                                  pad=pad))
            if in_planes!=out_planes:
                in_planes=out_planes
        return nn.Sequential(*stacking)
    def forward(self, x):
        out = self.conv1(x)
        out = self.drop(out)
        out = self.stack1(out)
        out = self.drop(out)
        out = self.stack2(out)
        out = self.drop(out)
        out = self.stack3(out)
        out = self.drop(out)
        out = self.relu(self.bn1(out))
        out = F.avg_pool2d(out, 8)
        out = out.view(-1, self.nChannels)
        return self.fc(out)



In [280]:
net = WideHarmonicResNet(3,28,widen_factor=10).cuda()


In [281]:
import gc
gc.collect()

107

In [295]:
from sklearn.metrics import f1_score, precision_score, recall_score
# , classification_report, confusion_matrix
from torchvision.transforms import ToTensor
from models.model_harmonic import HarmonicNet
# from local_loader import LocalLoader
from loader import LoaderSmall
import pandas as pd
import warnings
import torch
from torchvision.datasets import CIFAR10
from torch import nn
import torch.optim as optim
from sklearn.preprocessing import LabelEncoder
import gc
from typing import List  # pylint: ignore
from tqdm import tqdm_notebook as tqdm
import matplotlib.pyplot as plt
import numpy as np

warnings.filterwarnings('ignore')

torch.random.manual_seed(42)

tfms = None  # Compose(transforms=[RandomRotate90(p=0.4), Rotate(p=0.5)])

trainset = CIFAR10('./', download=True, train=True,
                   transform=ToTensor())  # LoaderSmall(imageid_path_dict, labels, train=True, transform=tfms, color_space=None)
# LoaderSmall(imageid_path_dict, labels, train=True, transform=tfms, color_space=None)
# #CIFAR10('./', download=True, train=True, transform=ToTensor())#
testset = CIFAR10('./', train=False,
                  transform=ToTensor())  # LoaderSmall(imageid_path_dict, labels, train=False, transform=tfms, color_space=None)
# LoaderSmall(imageid_path_dict, labels, train=False, transform=tfms, color_space=None)
# CIFAR10('./', train=False, transform=ToTensor())#
'''
train_sampler = torch.utils\
    .data.WeightedRandomSampler(trainset.weights[trainset.train_labels],
                                len(trainset.weights[trainset.train_labels]),
                                True)
test_sampler = torch.utils\
    .data.WeightedRandomSampler(testset.weights[testset.test_labels],
                                len(testset.weights[testset.test_labels]),
                                True)
'''
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128,
                                          # sampler=train_sampler,
                                          shuffle=True,
                                          num_workers=0)
testloader = torch.utils.data.DataLoader(testset, batch_size=100,
                                         # sampler=test_sampler,
                                         shuffle=False,
                                         num_workers=0)
gc.collect()

net =  WideHarmonicResNet(3,28,widen_factor=10)

for module in net.modules():
    if isinstance(module, nn.Conv2d):
        module.weight.data.normal_(0, 0.05)
        if module.bias is not None:
            module.bias.data.zero_()

net = net.to('cuda')
for parameter in net.parameters():
    parameter.to('cuda')
net.cuda()
net = torch.nn.DataParallel(
    net, device_ids=range(torch.cuda.device_count()))
base_lr = 0.1
param_dict = dict(net.named_parameters())
params = []  # type: List
train_losses = []  # type: List[float]
test_losses = []  # type: List[float]
train_accs = []  # type: List[float]
test_accs = []  # type: List[float]
train_precisions = []  # type: List[float]
test_precisions = []  # type: List[float]
train_f1 = []  # type: List[float]
test_f1 = []  # type: List[float]
train_recall = []  # type: List[float]
test_recall = []  # type: List[float]

criterion = nn.CrossEntropyLoss()  # BCEWithLogitsLoss()
# BCEWithLogitsLoss()

optimizer = optim.SGD(params=net.parameters(),
                      lr=base_lr,
                      momentum=0.9,
                      dampening=0,
                      weight_decay=0.0005)  # Adam(model_harmonic.parameters(),
#     lr=base_lr,
#     weight_decay=1e-3)

best_acc = 0

gc.collect()


def one_hot_enc(output, target, num_classes=7):
    labels = target.view((-1, 1))
    batch_size, _ = labels.size()
    labels_one_hot = torch.FloatTensor(
        batch_size, num_classes).zero_().to('cuda')
    labels_one_hot.scatter_(1, labels, 1)
    return labels_one_hot


def train(epoch, model):
    model.train()
    corrects = 0.0
    f1 = 0.0
    prec = 0.0
    rec = 0.0
    iteration = 0
    acc = 0.
    for batch_idx, (data, label) in enumerate(tqdm(trainloader)):
        data, label = torch.autograd.Variable(
            data.cuda()), torch.autograd.Variable(label.cuda())
        optimizer.zero_grad()
        output = model(data)
        #label_ = one_hot_enc(output, label, 10)
        loss = criterion(output, label)
        y_pred = torch.max(output, 1)[1]
        loss.backward()
        optimizer.step()
        corrects += y_pred.eq(label.data).cpu().sum()
        f1 += f1_score(y_true=label.data.cpu().numpy(),
                       y_pred=y_pred.cpu().numpy(),
                       average='weighted')
        prec += precision_score(y_true=label.data.cpu().numpy(),
                                y_pred=y_pred.cpu().numpy(),
                                average='weighted')
        rec += recall_score(y_true=label.data.cpu().numpy(),
                            y_pred=y_pred.cpu().numpy(),
                            average='weighted')
        iteration += 1
        # t.update(batch_idx)
    acc = 100. * corrects / len(trainloader.dataset)
    f1 = f1 / iteration
    prec = prec / iteration
    rec = rec / iteration
    train_losses.append(loss.data.item())
    train_accs.append(acc)
    train_precisions.append(100. * rec)
    train_recall.append(100. * rec)
    train_f1.append(100. * f1)
    print(f"\nTraining accuracy = {acc:.2f}%;\n\
             F1 = {100. * f1:.2f}%;\
             Precision = {100. * prec:.2f}%;\
             Recall = {100. * rec:.2f}%\
             Loss: {loss.data.item():1.2e}\n")

    # t.close()


def test(epoch, model):
    global best_acc
    model.eval()
    # w = testloader.dataset.weights
    acc = 0.0
    f1 = 0.0
    prec = 0.0
    rec = 0.0
    iteration = 0
    testloss = 0.0
    corrects = 0.0
    # criterion.weight = torch.from_numpy(testset.weights)\
    # .to('cuda').type(torch.float)
    # t = tqdm(total=len(testloader))
    for batch_id, (data, label) in enumerate(tqdm(testloader)):
        with torch.no_grad():
            data, label = torch.autograd.Variable(
                data).cuda(), torch.autograd.Variable(label).cuda()
            output = model(data)
            # one_hot_enc(output, label) # one hot encoding for loss function
            #label_ = one_hot_enc(output, label, 10)
            loss = criterion(output, label)
            y_pred = torch.max(output, 1)[1]
            corrects += y_pred.eq(label.data).cpu().sum()
            testloss += loss
            f1 += f1_score(y_true=label.data.cpu().numpy(),
                           y_pred=y_pred.cpu().numpy(),
                           average='weighted')
            prec += precision_score(y_true=label.data.cpu().numpy(),
                                    y_pred=y_pred.cpu().numpy(),
                                    average='weighted')
            rec += recall_score(y_true=label.data.cpu().numpy(),
                                y_pred=y_pred.cpu().numpy(),
                                average='weighted')
        iteration += 1
        # t.update(batch_id)
    acc = 100. * corrects / len(testloader.dataset)
    f1 = f1 / iteration
    prec = prec / iteration
    rec = rec / iteration
    testloss /= len(testloader.dataset)
    if best_acc < acc.item():
        best_acc = acc
        save_state(model, best_acc)
    print(
        f"\nTesting accuracy = {acc:.2f}%; \n \
            F1 = {100. * f1:.2f}%; \
            Precision = {100. * prec:.2f}%;\
            Recall = {100. * rec:.2f}% \
            Loss: {loss.data.item():1.2e}\n")
    test_losses.append(loss.data.item())
    test_accs.append(acc)
    test_precisions.append(100. * rec)
    test_recall.append(100. * rec)
    test_f1.append(100. * f1)
    # t.close()


def adjust_learning_rate(optimizer,
                         epoch,
                         update_list=[25, 75],
                         factor=10,
                         lim = 1.):
    # [60, 120, 160]  #[2,5,8,11,14,17,20]
    if epoch in update_list:
        for param_group in optimizer.param_groups:
            param_group['lr'] = min(param_group['lr'] * factor, lim)
    return


def save_state(model, best_acc):
    print('\n==> Saving model ...\n')
    state = {'best_acc': best_acc,
             'state_dict': model.state_dict()}
    keys = list(state['state_dict'].keys())
    for key in keys:
        if 'module' in key:
            state['state_dict'][key.replace('module.', '')] = \
                state['state_dict'].pop(key)
    torch.save(state, 'harmonic_network.tar')


def get_lr(optimizer=optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']


# t = tqdm(total=300)
for epoch in range(200):
    adjust_learning_rate(optimizer, epoch, [60,120,160],factor=0.2, lim=1e-4)
    lr = get_lr()
    print(f" Epoch: {epoch}, learning rate = {lr:1.1e};\n")
    train(epoch, net)
    test(epoch, net)
    gc.collect()
    if np.nan in test_losses or np.nan in train_losses:
        break
    torch.cuda.empty_cache()
    

Files already downloaded and verified
 Epoch: 0, learning rate = 1.0e-02;



HBox(children=(IntProgress(value=0, max=391), HTML(value='')))


Training accuracy = 14.00%;
             F1 = 8.24%;             Precision = 9.79%;             Recall = 14.52%             Loss: 2.20e+00



HBox(children=(IntProgress(value=0, max=79), HTML(value='')))


==> Saving model ...


Testing accuracy = 16.00%; 
             F1 = 9.53%;             Precision = 13.16%;            Recall = 16.95%             Loss: 2.14e+00

 Epoch: 1, learning rate = 1.0e-02;



HBox(children=(IntProgress(value=0, max=391), HTML(value='')))

KeyboardInterrupt: 