<a href="https://colab.research.google.com/github/alainray/vision/blob/master/models/MACNet_Exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## MACNet: 
Implementation based on the paper "Compositional Attention Networks for Machine Reasoning,  Drew A. Hudson, Christopher D. Manning"  https://arxiv.org/pdf/1803.03067.pdf

In [1]:
!pip install torch torchvision
!pip install -U pillow

Requirement already up-to-date: pillow in /usr/local/lib/python2.7/dist-packages (5.3.0)


In [2]:
!curl -L -o 'sample.zip' 'https://www.dropbox.com/s/zpubau7qezrwfx4/dogs_cats_sample.zip?dl=0'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  1204    0  1204    0     0    740      0 --:--:--  0:00:01 --:--:--     0
100 4357k  100 4357k    0     0  1364k      0  0:00:03  0:00:03 --:--:-- 5669k


In [0]:
!unzip sample.zip

In [0]:
import torch.nn as nn
import torchvision
import torch

class MACNet(nn.Module):
  def __init__(self, embed_dim, hidden_dim, reasoning_steps=9,n_classes=2):
    super(MACNet, self).__init__()
    
    #Net to process inputs
    self.inputLSTM=nn.LSTM(embed_dim, hidden_dim, bidirectional=True)
    self.image_processor=self.load_resnet_model(hidden_dim)
    
    #Net defining params
    self.p=reasoning_steps
    self.hidden_dim=hidden_dim
    self.embed_dim=embed_dim
    
    #Parameters
    #Input Unit
    self.q=torch.zeros(2*hidden_dim)
    self.q_i=torch.zeros(self.p,hidden_dim)
    self.q_bias=nn.Parameter(torch.randn(self.p, hidden_dim))
    self.cq_i=None
    self.knowledge_base=None
    self.contextual_words=None
    #Control Unit
    self.W_CQ=nn.Parameter(torch.randn(hidden_dim,2*hidden_dim))
    self.W_CA=nn.Parameter(torch.randn(1,hidden_dim))
    self.bias_ca=nn.Parameter(torch.randn(self.p, 1))
    #Read Unit
    self.W_M=nn.Parameter(torch.randn(hidden_dim,hidden_dim))
    self.W_K=nn.Parameter(torch.randn(hidden_dim,hidden_dim))
    self.bias_m=nn.Parameter(torch.randn(hidden_dim,1))
    self.bias_k=nn.Parameter(torch.randn(hidden_dim,1))
    self.bias_ihw=nn.Parameter(torch.randn(hidden_dim,1))
    self.bias_ra=nn.Parameter(torch.randn(1,1))
    self.W_IHW=nn.Parameter(torch.randn(hidden_dim,2*hidden_dim))
    self.W_R=nn.Parameter(torch.randn(1,hidden_dim))
    #Write Unit
    self.W_S=nn.Parameter(torch.randn(hidden_dim,hidden_dim))
    self.W_P=nn.Parameter(torch.randn(hidden_dim,hidden_dim))
    self.W_CI=nn.Parameter(torch.randn(1,hidden_dim))
    self.W_MI=nn.Parameter(torch.randn(hidden_dim,2*hidden_dim))
    self.W_SA=nn.Parameter(torch.randn(1,hidden_dim))
    self.bias_mi=nn.Parameter(torch.randn(hidden_dim,1))
    self.bias_mip=nn.Parameter(torch.randn(hidden_dim,1))
    self.bias_ci=nn.Parameter(torch.randn(1))
    self.bias_sa=nn.Parameter(torch.randn(1))
    #Output Unit
    self.output_layer1=nn.Linear(3*hidden_dim,hidden_dim)
    self.output_layer2=nn.Linear(hidden_dim,n_classes)
    #Working variables of model
    self.reasoning_matrix=nn.Parameter(torch.randn(self.p,hidden_dim,hidden_dim*2))

    # Utility Params
    self.FORWARD=0
    self.BACKWARD=1
    self.NDIRECTIONS=2
    self.n_classes=n_classes
    #Initial control and memory states
    self.c0=nn.Parameter(torch.randn(hidden_dim))
    self.m0=nn.Parameter(torch.randn(hidden_dim))
    self.m=torch.zeros(self.p+1, hidden_dim)
    self.c=torch.zeros(self.p+1, hidden_dim)
    self.r=torch.zeros(self.p, hidden_dim)
    self.wipe_memory()
  
  
  def wipe_memory(self):
    self.c=torch.zeros(self.p+1, self.hidden_dim)
    self.m=torch.zeros(self.p+1, self.hidden_dim)
    self.r=torch.zeros(self.p+1, self.hidden_dim)
    self.q=torch.zeros(2*self.hidden_dim)
    self.q_i=torch.zeros(self.p,self.hidden_dim)
    self.c[0]=self.c0.clone()
    self.m[0]=self.m0.clone()
    
  def forward(self, input, image):
    self.contextual_words=None #We clear last input
    self.input_step(input, image)
    self.mac_step()
  
    return self.output_step()
  
  def input_step(self,input, image):
    image=image.view(1,3,224,224)
    question_result=self.question_processing(input)
    self.knowledge_base=self.get_knowledge_base(image)
    
    return question_result, self.knowledge_base
  
  def question_processing(self,input):
    result, (h_n,c_n)=self.inputLSTM(input)
    #We get Contextual Words from LSTM Output
    seq_length=len(input)
    result=result.view(seq_length, 1, self.NDIRECTIONS, self.hidden_dim)
    self.contextual_words=result[:,0,self.FORWARD,:]
    h_n=h_n.view(1, 2, 1, self.hidden_dim)
    cw_b=h_n[0][self.BACKWARD][0]
    cw_f=h_n[0][self.FORWARD][0]
    #Calculate q_i
    self.q=torch.cat((cw_b,cw_f),0).view(2*self.hidden_dim,1)
    
    for i in range(self.p):
      candidate=torch.mm(self.reasoning_matrix[i], self.q)+self.q_bias[i].view(self.hidden_dim,1)
      self.q_i[i]=candidate.view(self.hidden_dim)
      
    return self.q_i
  
  def get_knowledge_base(self, image):
    result=self.image_processor(image)
    return result
  
  def mac_step(self):
    for r in range(self.p):
      self.control_step(r)
      self.read_step(r)
      self.write_step(r)

  def control_step(self, reasoning_step):
      seq_length=len(self.contextual_words)
      #c1
      cq=torch.mm(self.W_CQ,torch.cat((self.c[reasoning_step].clone(),self.q_i[reasoning_step])).view(2*self.hidden_dim,1))
      #c2.1
      cq_M=cq.repeat(1,seq_length)
      ca=torch.mm(self.W_CA, (cq_M.transpose(1,0)*self.contextual_words).transpose(1,0))+self.bias_ca[reasoning_step] 
      #c2.2
      cv=nn.functional.softmax(ca.view(seq_length),dim=0)
      #c2.3
      self.c[reasoning_step+1]=torch.mm(cv.view(1,seq_length),self.contextual_words)
 
  def read_step(self, reasoning_step):
    
    H=14
    W=14
    d=self.hidden_dim
    
    #r1
    memory_vector=torch.mm(self.W_M,self.m[reasoning_step].clone().view(hidden_dim,1))+self.bias_m
    memory_matrix=memory_vector.repeat(14,1,14).transpose(0,1)
    kb_matrix = torch.mm(self.W_K,(self.knowledge_base[0]).view(d,H*W)+self.bias_k).view(d,H,W)
    I_hw=memory_matrix*kb_matrix
    #r2
    tempI=I_hw.view(d,H*W)
    tempKB=self.knowledge_base[0].view(d,H*W)
    I_hwp=(torch.mm(self.W_IHW,torch.cat((tempI,tempKB),dim=0))+self.bias_ihw).view(d,H,W)
    #r3.1
    temp=self.c[reasoning_step+1].clone().repeat(H,W,1).transpose(2,0)
    ra=(torch.mm(self.W_R,(temp*I_hwp).view(d,H*W))+self.bias_ra).view(1,H,W)
    #r3.2
    rv=nn.functional.softmax(ra.view(H*W),dim=0).view(1,H,W)
    #r3.3
    #TODO: redo as matrix multiplication
    weighted_vectors=torch.zeros(H,W,d)
    result=torch.zeros(d)
    for i in range(H):
      for j in range(W):
        result+=rv[:,i,j]*self.knowledge_base[0,:,i,j]
    
    self.r[reasoning_step]=result
    
    return self.r
  def write_step(self, reasoning_step):
    H=14
    W=14
    d=self.hidden_dim
   
    #w1
    m_info=torch.mm(self.W_MI,torch.cat((self.r[reasoning_step].clone(),self.m[reasoning_step].clone())).view(2*d,1))+self.bias_mip
    #w2.1
    sa_ij=nn.functional.softmax(torch.mm(self.W_SA,(self.c[reasoning_step+1].clone()*self.c[0:reasoning_step+1].clone()).transpose(1,0)).view(reasoning_step+1,1)+self.bias_sa,dim=0)
    #w2.2
    mi_sa=torch.mm(sa_ij.view(1,reasoning_step+1),self.m[0:reasoning_step+1].clone()).transpose(1,0)
    #w2.3
    mi_prime=torch.mm(self.W_P,m_info)+self.bias_mi#+torch.mm(self.W_S,mi_sa)
    #w3.1
    c_ip=torch.mm(self.W_CI,self.c[reasoning_step].clone().view(d,1))+self.bias_ci
    #w3.2
    new_memory=torch.sigmoid(c_ip)*self.m[reasoning_step].clone()+(1-torch.sigmoid(c_ip))*mi_prime.view(d)
    #I added this line to avoid memory values exploding over the reasoning process
    new_memory=new_memory/new_memory.norm()
    self.m[reasoning_step+1]=new_memory
    return None
  def output_step(self):
    output=torch.cat((self.q.view(2*self.hidden_dim),self.m[self.p]),dim=0)
    output=nn.functional.relu(self.output_layer1(output))
    output=self.output_layer2(output)
    output=nn.functional.softmax(output,dim=0)
    return output
    
  #Code for extracting features from ResNet101
  def load_resnet_model(self,d=100,pretrained=True):
    class ResNetNoBottom(torch.nn.Module):
      def __init__(self, original_model,d):
        super(ResNetNoBottom, self).__init__()
        self.features = torch.nn.Sequential(*list(original_model.children())[:-3])
        for param in self.features.parameters():
          param.requires_grad=False
        self.convExtra1=nn.Conv2d(1024,d,1,1)
        self.convExtra2=nn.Conv2d(d,d,1,1)

      def forward(self, x):
        x = self.features(x)
        x = self.convExtra1(x)
        x = self.convExtra2(x)
        return x

    resnet101 = torchvision.models.resnet101(pretrained=pretrained)
    return ResNetNoBottom(resnet101,d)

In [0]:
import torch.optim as opt
import time
def runModel(model,nEpochs,input_data,output_data,n_print=5):
  n_print=1 #How many epochs until we write an update for the MSE
  n_samples=len(input_data[0])
  errors=list()
  x=list()
  print("Running Model: {}".format(model.__class__.__name__))
  optimizer = opt.Adam(model.parameters(), lr=0.1)
  loss=nn.MSELoss()
  for epoch in range(nEpochs):
    print("Starting Epoch N°{}".format(epoch+1))
    running_loss = 0.0
    sentences=input_data[0]
    images=input_data[1]
    for i,sample in enumerate(sentences):
      model.zero_grad()
      
      model.wipe_memory()
     
      output=model(sample,images[i])
      
      result=loss(output,output_data[i])

      result.backward(retain_graph=False)
   
      optimizer.step()
      
      running_loss += result.item()
      '''print("R:",model.r)
      print("C:",model.c)
      print("M:",model.m)
      print("M0:",model.m0)
      print("C0:",model.c0)'''
    error=running_loss/n_samples
    errors.append(error)
    if  epoch % n_print == 0:
      print("Average MSE: {} for Epoch {}".format(error,epoch+1))
  return model, errors #For plotting purposes

In [98]:
embed_dim=10
batch_size=1
seq_length=5
hidden_dim=10
reasoning_steps=5
samples=100
n_classes=2
mac_net=MACNet(embed_dim,hidden_dim, reasoning_steps,n_classes)

input = torch.zeros(samples,seq_length,batch_size,embed_dim)
import torchvision.transforms as transforms
from skimage import io, transform
import os
import numpy as np
gatos="cat."
perros="dog."
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
data=list()
output=list()
for img in range(100):
  image=io.imread(os.path.join('dogs_cats_sample/', gatos+str(img)+".jpg"))
  image = torch.from_numpy(transform.resize(image, (224, 224))).float()

  data.append(normalize(image/255))
  output.append(torch.tensor([1,0]).float())
  image=io.imread(os.path.join('dogs_cats_sample/', perros+str(img)+".jpg"))
  image = torch.from_numpy(transform.resize(image, (224, 224))).float()
  data.append(normalize(image/255))
  output.append(torch.Tensor([0,1]).float())

#image_input=torch.zeros(samples,3,224,224)
#image_input.random_(0,255)
output_data=torch.randn(samples,n_classes)
for i,el in enumerate(output_data):
  output_data[i]=nn.functional.softmax(el, dim=0)
input_data=list()
input_data.append(input)
input_data.append(data)

mac_net(input[0], data[0])

#print("Control:",result)
#print("Memoria:",image)

tensor([0.5125, 0.4875], grad_fn=<SoftmaxBackward>)

In [101]:
model, errors=runModel(mac_net,10,input_data,output)


Running Model: MACNet
Starting Epoch N°1
Average MSE: 0.255278826058 for Epoch 1
Starting Epoch N°2
Average MSE: 0.251421414614 for Epoch 2
Starting Epoch N°3
Average MSE: 0.251315880716 for Epoch 3
Starting Epoch N°4
Average MSE: 0.251316294968 for Epoch 4
Starting Epoch N°5
Average MSE: 0.251316610277 for Epoch 5
Starting Epoch N°6
Average MSE: 0.251316821575 for Epoch 6
Starting Epoch N°7
Average MSE: 0.251316961944 for Epoch 7
Starting Epoch N°8
Average MSE: 0.251317068338 for Epoch 8
Starting Epoch N°9
Average MSE: 0.251317140758 for Epoch 9
Starting Epoch N°10
Average MSE: 0.251317205429 for Epoch 10


In [0]:
for name, param in model.named_parameters():
  print(name, param)


In [33]:
print("MEMORY,",model.m)
print("READ,",model.r)
print("CONTROL,",model.c)

('MEMORY,', tensor([[ -1.7046,  -0.8735,   0.1411,  -1.4196,   1.4526,  -0.3175,   0.0305,
          -0.6452,   0.0056,   1.5256],
        [ -2.1049,  -0.1052,   0.7353,  -1.1375,   1.7160,  -0.3764,   0.4080,
          -0.6157,  -0.1103,   1.0577],
        [ -9.6281,  14.3336,  11.9030,   4.1641,   6.6665,  -1.4816,   7.5037,
          -0.0612,  -2.2866,  -7.7356],
        [ -8.4134,  11.2923,   9.9382,   6.2258,   4.1051,   1.2810,  -0.2851,
           2.6954,   0.2859,  -9.0008],
        [ -7.2000,   8.2498,   7.9734,   8.2842,   1.5438,   4.0429,  -8.0737,
           5.4545,   2.8528, -10.2639],
        [ -5.9878,   5.2062,   6.0086,  10.3392,  -1.0173,   6.8038, -15.8620,
           8.2162,   5.4141, -11.5249]], grad_fn=<CopySlices>))
('READ,', tensor([[    0.0000,     0.0000,     0.0000,     0.0000,     0.0000,     0.0000,
             0.0000,     0.0000, 25900.0293,     0.0000],
        [    0.0000,     0.0000,     0.0000,     0.0000,     0.0000,     0.0000,
             0.0000,

In [0]:

resultado=model_ft(random_input)

In [0]:
resultado.size()

torch.Size([10, 100, 14, 14])

In [0]:
print(model_ft)

In [0]:
import math
def convolution_size(in_features,kernel_size,stride,padding):
  return math.floor((in_features+2*padding-kernel_size)/stride)+1

(layer3): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(256, 1024, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace)
      (downsample): Sequential(
        (0): Conv2d(512, 1024, kernel_size=(1, 1), stride=(2, 2), bias=False)
        (1): BatchNorm2d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )

In [0]:
convolution_size(224,7,2,3)
convolution_size(112,3,2,1)
convolution_size(56,1,1,0)
convolution_size(56,3,1,1)
convolution_size(56,1,1,0)
convolution_size(56,1,1,0)
convolution_size(56,3,2,1)
convolution_size(28,1,2,0)
convolution_size(14,3,2,1)
convolution_size(7,1,2,0)
convolution_size(4,3,2,1)
convolution_size(2,1,2,0)

1.0

In [0]:
for name, param in model_ft.named_parameters():
  print(name, param.requires_grad)

  warn("The default mode, 'constant', will be changed to 'reflect' in "


In [43]:
print(data[0])

tensor([[[0.0031, 0.0025, 0.0013],
         [0.0032, 0.0026, 0.0014],
         [0.0032, 0.0026, 0.0014],
         ...,
         [0.0038, 0.0031, 0.0018],
         [0.0037, 0.0031, 0.0019],
         [0.0037, 0.0031, 0.0019]],

        [[0.0031, 0.0025, 0.0013],
         [0.0032, 0.0026, 0.0014],
         [0.0032, 0.0026, 0.0014],
         ...,
         [0.0038, 0.0032, 0.0018],
         [0.0037, 0.0031, 0.0019],
         [0.0037, 0.0031, 0.0019]],

        [[0.0031, 0.0025, 0.0013],
         [0.0032, 0.0026, 0.0014],
         [0.0032, 0.0026, 0.0014],
         ...,
         [0.0038, 0.0031, 0.0019],
         [0.0037, 0.0031, 0.0019],
         [0.0037, 0.0031, 0.0019]],

        ...,

        [[0.0024, 0.0019, 0.0009],
         [0.0024, 0.0019, 0.0009],
         [0.0024, 0.0019, 0.0009],
         ...,
         [0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000]],

        [[0.0024, 0.0019, 0.0008],
         [0.0024, 0.0019, 0.0008],
         [0.

In [239]:
!ls data/data

ls: cannot access 'data/data': Not a directory
