In [1]:
import torch
import math
from matplotlib import pyplot as plt
import numpy as np
import torch.nn.functional as F
from torchvision import datasets, transforms

In [2]:
def gaussian_filter_1d(center, var=1, filter_size=5, filter_range=None):
    if filter_range is None:
        x = torch.linspace(center - var * 2, center + var * 2, filter_size)
    else:
        x = torch.linspace(filter_range[0], filter_range[1], filter_size) - center
    fx = (1 / (2 * math.pi * var) ** .5) * torch.exp(-(x ** 2 / (2 * var)))
    return fx

In [3]:
class GaussianRadialLayer(torch.nn.Module):
    def __init__(self,input_channels=1,output_channels=8,radius=1,filter_sz=9,variance=1):
        super(GaussianRadialLayer,self).__init__()
        assert filter_sz % 2 ==1
        
        self.filter_sz=filter_sz
        self.filter_from=-(filter_sz/2)
        self.filter_to=(filter_sz/2)
        self.output_channels=output_channels
        
        self.radius=torch.nn.Parameter(torch.tensor(float(radius)))
        self.channel_weights=torch.nn.Parameter(torch.ones(input_channels).view([1,-1,1,1]))
        
        self.register_buffer("pi",torch.tensor(float(math.pi)))
        self.register_buffer("variance",torch.tensor(float(variance)))
        #precomputing tensors
        self.register_buffer("filter_x",torch.linspace(self.filter_from, self.filter_to,self.filter_sz).view([1,1,-1,1]).repeat(output_channels,1,1,1))
        self.register_buffer("filter_y",torch.linspace(self.filter_from, self.filter_to,self.filter_sz).view([1,1,1,-1]).repeat(output_channels,1,1,1))
        self.register_buffer("x_angles",torch.cos(torch.linspace(0,math.pi*2,output_channels+1)[:-1].view(-1,1,1,1)))
        self.register_buffer("y_angles",torch.sin(torch.linspace(0,math.pi*2,output_channels+1)[:-1].view(-1,1,1,1)))

    def forward(self,input_tensors):
        batch_sz,in_channels,x,y=input_tensors.size()
        x_field = self.filter_x-self.x_angles*self.radius
        y_field = self.filter_y-self.y_angles*self.radius
        filter_x=(1 / (2 * self.pi * self.variance) ** .5) * torch.exp(-(x_field ** 2 / (2 * self.variance)))
        filter_y=(1 / (2 * self.pi * self.variance) ** .5) * torch.exp(-(y_field ** 2 / (2 * self.variance)))
        filter_x=filter_x*self.channel_weights
        x_applied=F.conv2d(input_tensors,filter_x,padding=[self.filter_sz/2,0])
        y_applied=F.conv2d(x_applied,filter_y,padding=[0,self.filter_sz/2],groups=self.output_channels)
        return y_applied

class DeltaLayer(torch.nn.Module):
    def __init__(self,input_channels=1,radius=1,output_channels=8,filter_sz=9,variance=1,single_input_reference=False):
        super(DeltaLayer,self).__init__()
        self.radial=GaussianRadialLayer(input_channels=input_channels,output_channels=output_channels,radius=radius,filter_sz=filter_sz,variance=variance)
        self.radial_bias=torch.nn.Parameter(torch.zeros([1,output_channels,1,1]))
        if single_input_reference:
            self.input_reference=torch.nn.Conv2d(input_channels,1,1)
            self.input_reference.weight.data[:]=1.0/input_channels
            self.input_reference.bias.data[:]=0
        else:
            self.input_reference=torch.nn.Conv2d(input_channels,output_channels,1)
    
    def forward(self,input_tensors):
        radial=self.radial(input_tensors)+self.radial_bias
        return radial-self.input_reference(input_tensors)

class BinaryAlphabet(torch.nn.Module):
    def __init__(self,input_channels):
        super(BinaryAlphabet,self).__init__()
        weight ,bias = BinaryAlphabet.create_binary_alphabet_filter_bank(input_channels)
        self.register_buffer("weight",weight)
        self.register_buffer("bias",bias)

    def forward(self,x):
        x = torch.nn.functional.conv2d(x, weight=self.weight, groups=1)+self.bias[None,:,None,None]
        return torch.relu(x)

    @staticmethod
    def create_binary_alphabet_filter_bank(input_channels):
        W = torch.zeros([input_channels,2**input_channels])
        words = torch.arange(2**input_channels, dtype=torch.int32)
        b=[]
        W=[]
        for word in words:
            bit_string=[word%2]
            for pow in range(1,8):
                bit_string.append(((word/2**pow)%2))
            W.append([1.0 if n > 0 else float(-input_channels) for n in bit_string])
            b.append(1.0 - sum(bit_string))
        W=torch.Tensor(W).view(2**input_channels, input_channels, 1, 1)
        b=torch.Tensor(b)
        #W = torch.Tensor(W).reshape(words, input_channels, 1, 1)
        return W, b

class LBP(torch.nn.Module):
    def __init__(self,input_channels=1,radius=1,circle_points=8,filter_sz=9,variance=1.0,delta_scale=2.0):
        super(LBP,self).__init__()
        self.scale_delta=delta_scale
        self.delta=DeltaLayer(input_channels=input_channels,output_channels=circle_points,radius=radius,filter_sz=filter_sz,variance=variance)
        self.binary_alphabet=BinaryAlphabet(input_channels=circle_points)
    def forward(self,x):
        return self.binary_alphabet(torch.relu(self.delta(x))*self.scale_delta)
    def get_radial_constraint(self):
        return torch.relu(-self.delta.radial.radius)+torch.relu(-self.delta.radial.filter_sz/2+self.delta.radial.radius+2*self.delta.radial.radius)

class LbpNet(torch.nn.Module):
    def __init__(self,input_channels=1):
        super(LbpNet,self).__init__()
        self.lbp_layer1=LBP(input_channels=input_channels,circle_points=8,filter_sz=9,radius=2)
        self.lbp_layer2=LBP(input_channels=256,circle_points=8,filter_sz=9,radius=2)
        #self.lbp_layer2=LBP(input_channels=256,circle_points=8,filter_sz=9,radius=2)
        self.mp2=torch.nn.MaxPool2d(2)
        self.conv3=torch.nn.Conv2d(256,256,1)
        self.sp7=torch.nn.AvgPool2d(7)
        self.conv4=torch.nn.Conv2d(256,256,1)
        self.global_pool=torch.nn.AdaptiveAvgPool2d((3,3))
        self.fc1=torch.nn.Linear(256*3*3,128)
        self.fc_class=torch.nn.Linear(128,10)

    def forward(self,x):
        x=self.lbp_layer1(x)
        x=self.mp2(x)
        x=torch.relu(x)

        x=self.lbp_layer2(x)
        x=self.mp2(x)
        x=torch.relu(x)
        
        x=self.conv3(x)
        x=self.mp2(x)
        x=torch.relu(x)
        
        x=self.conv4(x)
        x=torch.relu(x)
        #x=self.sp7(x)
        x=self.global_pool(x)
        x=self.fc1(x.view(x.size(0),-1))
        x=torch.relu(x)
        
        x=self.fc_class(x)
        return F.log_softmax(x, dim=1)
    def __str__(self):
        return "LbpNet(radius={:.4f},radius={:.4f})".format(float(self.lbp_layer1.delta.radial.radius.data),float(self.lbp_layer2.delta.radial.radius.data))

    def get_radial_constraint(self):
        return self.lbp_layer1.get_radial_constraint() + self.lbp_layer2.get_radial_constraint()

    

    
class CnnNet(torch.nn.Module):
    def __init__(self):
        super(CnnNet,self).__init__()
        #self.lbp_layer1=LBP(input_channels=1,circle_points=8,filter_sz=9,radius=2)
        self.conv1=torch.nn.Conv2d(1,256,3,padding=1)
        #self.lbp_layer2=LBP(input_channels=256,circle_points=8,filter_sz=9,radius=2)
        self.mp2=torch.nn.MaxPool2d(2)
        self.conv3=torch.nn.Conv2d(256,256,1)
        self.conv4=torch.nn.Conv2d(256,256,1)
        self.fc1=torch.nn.Linear(256*14*14,128)
        self.fc_class=torch.nn.Linear(128,10)

    def forward(self,x):
        x=self.conv1(x)
        x=self.mp2(x)
        x=torch.relu(x)
        
        x=self.conv3(x)
        x=torch.relu(x)        
        #x=self.sp7(x)
        x=self.fc1(x.view(x.size(0),-1))
        x=torch.relu(x)        
        x=self.fc_class(x)
        return F.log_softmax(x, dim=1)
    
    def get_radial_constraint(self):
        return 0

In [4]:
def train(model, device, train_loader, optimizer, epoch,log_interval=100):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)+model.get_radial_constraint()**2
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('{}: Test loss: {:.4f}, Test Acc.: {}/{} ({:.0f}%)\n'.format(str(model).split("\n")[0],
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [5]:
device = torch.device("cuda")
batch_size=200
test_batch_size=100
train_ds=datasets.FashionMNIST('../data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
#train_ds=[train_ds[n] for n in range(0,len(train_ds),len(train_ds)/2000)]

train_loader = torch.utils.data.DataLoader(train_ds,batch_size=batch_size, shuffle=True)

test_loader = torch.utils.data.DataLoader(
        datasets.FashionMNIST('../data', train=False, transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=test_batch_size, shuffle=True)

model = LbpNet().to(device)
#model = CnnNet().to(device)

for name,p in model.named_parameters():
    print name,"->",p.data.size()
    optimizer = torch.optim.SGD([p], lr=.001, momentum=.9)
optimizer = torch.optim.SGD(model.parameters(), lr=.001, momentum=.9)

for epoch in range(1, 150):
    train(model, device, train_loader, optimizer, epoch)
    print epoch,
    test(model, device, test_loader)

lbp_layer1.delta.radial_bias -> torch.Size([1, 8, 1, 1])
lbp_layer1.delta.radial.radius -> torch.Size([])
lbp_layer1.delta.radial.channel_weights -> torch.Size([1, 1, 1, 1])
lbp_layer1.delta.input_reference.weight -> torch.Size([8, 1, 1, 1])
lbp_layer1.delta.input_reference.bias -> torch.Size([8])
lbp_layer2.delta.radial_bias -> torch.Size([1, 8, 1, 1])
lbp_layer2.delta.radial.radius -> torch.Size([])
lbp_layer2.delta.radial.channel_weights -> torch.Size([1, 256, 1, 1])
lbp_layer2.delta.input_reference.weight -> torch.Size([8, 256, 1, 1])
lbp_layer2.delta.input_reference.bias -> torch.Size([8])
conv3.weight -> torch.Size([256, 256, 1, 1])
conv3.bias -> torch.Size([256])
conv4.weight -> torch.Size([256, 256, 1, 1])
conv4.bias -> torch.Size([256])
fc1.weight -> torch.Size([128, 2304])
fc1.bias -> torch.Size([128])
fc_class.weight -> torch.Size([10, 128])
fc_class.bias -> torch.Size([10])
1 LbpNet(radius=1.2411,radius=1.3702): Test loss: 0.6268, Test Acc.: 7559/10000 (76%)



KeyboardInterrupt: 

In [None]:
impulse=torch.zeros(1,1,19,19);impulse[:,:,9,9]=1
img=torch.zeros(1,1,501,501);img[:,:,22:30,22:30]=1;img[:,:,8:13,8:13]=1
img=img.cuda()
impulse=impulse.cuda()
#%timeit _,lbp_img=ba(delta(img))[0,:,:,:].max(dim=0)
_,lbp_img=model(img)#[0,:,:,:].max(dim=0)

#conv2=torch.nn.Conv2d(1,8,31)
fig,[ax1,ax2,ax3]=plt.subplots(1,3)
im1=ax1.imshow(model.lbp_layer1.delta.radial(impulse)[0,:,:,:].sum(dim=0).detach().cpu().numpy())
im2=ax2.imshow(delta_img[0,0,:40,:40].detach().cpu().numpy(),cmap="gray")
im3=ax3.imshow(lbp_img[:40,:40].detach().cpu().numpy(),cmap="d256")
fig.colorbar(im1)#, cax=cax, orientation='horizontal')