<a href="https://colab.research.google.com/github/ckodser/RFCL/blob/main/RFCL_pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import numpy as np
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose
import matplotlib.pyplot as plt

In [None]:
# Download training data from open datasets.
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

In [None]:
batch_size = 64
# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print("Shape of X [N, C, H, W]: ", X.shape)
    print("Shape of y: ", y.shape, y.dtype)
    break

Shape of X [N, C, H, W]:  torch.Size([64, 1, 28, 28])
Shape of y:  torch.Size([64]) torch.int64


In [None]:
reconnection_rate=500
reduntant_node_ratio=0.3

In [None]:
at=torch.Tensor([False,True,True,True])
bt=torch.Tensor([1,2,3,4])
print(at*bt)


tensor([0., 2., 3., 4.])


In [None]:
class FewConnectionLinearFunction(torch.autograd.Function):

    @staticmethod
    # bias is an optional argument
    def forward(ctx, input, weight, connections, bias=None):
        
        ctx.save_for_backward(input, weight, connections, bias)
        output_feature=connections.shape[0]
        output=torch.zeros([output_feature,input.shape[0]], dtype=torch.float32,device=device)
        input=input.t()
        for i in range(0,output_feature):
            output[i]=torch.matmul(input[connections[i]].t(), weight[i])
        if bias is not None:
            output += bias.unsqueeze(1).expand_as(output)
        return output.t()



    @staticmethod
    def find_reduntant_connections(weight):
        #there is a problem in this part
        #each node makes a useless edge which this part keep choosing. 
        #that node count on the other edge for computing its output and  
        #when it should introduce a reduntant edge it will intruduce the
        #same edge each time.

        weight_2=torch.square(weight)
        minn=torch.min(weight_2,1)
        y=torch.div(minn.values,torch.mean(weight_2,1))
        y=(torch.kthvalue(y,int(reduntant_node_ratio*weight.shape[0])).values>y)
        print("removed connections:"+str(torch.sum(y))+"   ration of removed connections:"+str(torch.sum(y)/weight.shape[0]))
        print((y*minn.indices)[0:min(20,y.shape[0])])
        return y,minn.indices


    @staticmethod
    def find_best_match(input,grad_output_t,is_remove,connections): 
      # this part didn't check and debug
      # this part should improve its performance 
      points_num=input.shape[1]
      lines_num=grad_output_t.shape[0]
      best_match=torch.zeros_like(is_remove)
      for i in range(lines_num):
          if is_remove[i]:
            line=grad_output_t[i].t()/torch.sqrt(torch.sum(torch.square(grad_output_t[i])))
            dist=torch.sum(torch.square(torch.matmul(torch.matmul(line,input).t(),line).t()-input),0)
            dist[connections[i]]+=10000
            best_match[i]=torch.argmin(dist)
      return best_match
                

    @staticmethod
    def layer_reconnection(ctx, grad_output_t):
        if torch.rand(1)<1-(1/reconnection_rate):  #for each reconnection_rate mini batch preform layer_reconnection
            return 
        input, weight, connections, bias = ctx.saved_tensors
        output_feature=connections.shape[0]
        input_feature=input.shape[1]
        is_remove, which_remove=FewConnectionLinearFunction.find_reduntant_connections(weight)
        best_match = FewConnectionLinearFunction.find_best_match(input,grad_output_t,is_remove,connections)
        for i in range(output_feature):
            if is_remove[i]:
              weight[i][which_remove[i]]=0
              connections[i][which_remove[i]]=best_match[i]


    @staticmethod
    def backward(ctx, grad_output):
        grad_output_t=torch.unsqueeze(grad_output.t(),2)
        input, weight, connections, bias = ctx.saved_tensors
        input_t=input.t()
        grad_input=torch.zeros_like(input_t)
        grad_weight=torch.zeros_like(weight)
        weight_u=torch.unsqueeze(weight,1)
        grad_bias=torch.zeros_like(bias)
        output_feature=connections.shape[0]

        if ctx.needs_input_grad[0]:
            for i in range(0,output_feature):
                grad_input[connections[i]]+= torch.matmul(grad_output_t[i],weight_u[i]).t()  
        
        if ctx.needs_input_grad[1]:
            for i in range(0,output_feature):
                grad_weight[i]+= torch.squeeze(torch.matmul(input_t[connections[i]],grad_output_t[i]).t(),0)   
        
        if bias is not None and ctx.needs_input_grad[3]:
            grad_bias = grad_output.sum(0)
        
        FewConnectionLinearFunction.layer_reconnection(ctx, grad_output_t)

        return grad_input.t(), grad_weight, None , grad_bias

In [None]:
class FewConnectionLinear(nn.Module):
    def __init__(self, input_features, output_features, connection_num,bias=True):
        super(FewConnectionLinear, self).__init__()
        self.input_features = input_features
        self.output_features = output_features

        self.connection_num=connection_num

        self.weight = nn.Parameter(torch.empty(output_features, connection_num))
        self.connections = nn.Parameter(torch.empty(output_features, connection_num), requires_grad=False)
        if bias:
            self.bias = nn.Parameter(torch.empty(output_features))
        else:
            self.register_parameter('bias', None)
        
        #  initialize weights
        
        self.weight.data.normal_(mean=0.0, std=np.sqrt(1/self.connection_num))
        if self.bias is not None:
            torch.nn.init.zeros_(self.bias)#uniform -0.0001 +0.0001 work very well to
      
        self.connections.data=torch.from_numpy(self.connection_initializer())

    def connection_initializer(self):
      # this part need improvement
      x=11 #x=(self.input_features)//(self.output_features)

      aa=np.arange(self.connection_num)
      bb=np.arange(self.output_features)*x
      bb=bb.reshape((self.output_features,1))
      cc=np.ones_like(aa).reshape((1,self.connection_num))

      return np.remainder(aa+np.matmul(bb,cc),self.input_features)

    def forward(self, input):
        # See the autograd section for explanation of what happens here.
        return FewConnectionLinearFunction.apply(input, self.weight, self.connections,self.bias)

In [None]:
# Get cpu or gpu device for training.1
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            FewConnectionLinear(28*28, 512,50),
            nn.BatchNorm1d(512,affine=False),nn.ReLU(),
            FewConnectionLinear(512, 512,10),
            nn.BatchNorm1d(512,affine=False),nn.ReLU(),
            FewConnectionLinear(512, 512,10),
            nn.BatchNorm1d(512,affine=False),nn.ReLU(),
            FewConnectionLinear(512, 512,10),
            nn.BatchNorm1d(512,affine=False),nn.ReLU(),
            FewConnectionLinear(512, 10,50),
            nn.Softmax(),      
        )

    def forward(self, x):
        x = self.flatten(x)
        x=self.linear_relu_stack(x)
        return x

model = NeuralNetwork().to(device)
print(model)

Using cpu device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): FewConnectionLinear()
    (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=False, track_running_stats=True)
    (2): ReLU()
    (3): FewConnectionLinear()
    (4): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=False, track_running_stats=True)
    (5): ReLU()
    (6): FewConnectionLinear()
    (7): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=False, track_running_stats=True)
    (8): ReLU()
    (9): FewConnectionLinear()
    (10): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=False, track_running_stats=True)
    (11): ReLU()
    (12): FewConnectionLinear()
    (13): Softmax(dim=None)
  )
)


In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-1)

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)
        
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
       # return
        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(batch)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
def test(dataloader, model):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
           # return #for testing pytorch
    test_loss /= size
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 10
for name,param in model.named_parameters():
  if name=="linear_relu_stack.0.connections":
    print(param.data)
  print(name)


for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model)

for name,param in model.named_parameters():
  if name=="linear_relu_stack.0.connections":
    print(param.data)
  print(name)

print("Done!")

linear_relu_stack.0.weight
tensor([[  0,   1,   2,  ...,  47,  48,  49],
        [ 11,  12,  13,  ...,  58,  59,  60],
        [ 22,  23,  24,  ...,  69,  70,  71],
        ...,
        [111, 112, 113,  ..., 158, 159, 160],
        [122, 123, 124,  ..., 169, 170, 171],
        [133, 134, 135,  ..., 180, 181, 182]])
linear_relu_stack.0.connections
linear_relu_stack.0.bias
linear_relu_stack.3.weight
linear_relu_stack.3.connections
linear_relu_stack.3.bias
linear_relu_stack.6.weight
linear_relu_stack.6.connections
linear_relu_stack.6.bias
linear_relu_stack.9.weight
linear_relu_stack.9.connections
linear_relu_stack.9.bias
linear_relu_stack.12.weight
linear_relu_stack.12.connections
linear_relu_stack.12.bias
Epoch 1
-------------------------------
0
loss: 2.316355  [    0/60000]


  input = module(input)


removed connections:tensor(152)   ration of removed connections:tensor(0.2969)
tensor([0, 0, 0, 4, 5, 6, 0, 3, 0, 0, 0, 0, 0, 0, 9, 0, 8, 0, 9, 0])
100
loss: 2.164378  [ 6400/60000]
removed connections:tensor(152)   ration of removed connections:tensor(0.2969)
tensor([0, 0, 0, 7, 5, 6, 0, 3, 0, 0, 0, 0, 0, 0, 9, 0, 0, 0, 9, 0])
200
loss: 1.951322  [12800/60000]
removed connections:tensor(152)   ration of removed connections:tensor(0.2969)
tensor([0, 0, 0, 0, 5, 0, 0, 3, 0, 5, 0, 0, 0, 0, 9, 0, 0, 0, 9, 0])
300
loss: 1.911749  [19200/60000]
removed connections:tensor(152)   ration of removed connections:tensor(0.2969)
tensor([2, 1, 0, 4, 0, 8, 7, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 9, 0])
400
loss: 1.818340  [25600/60000]
removed connections:tensor(152)   ration of removed connections:tensor(0.2969)
tensor([0, 0, 0, 0, 5, 0, 0, 3, 0, 5, 0, 0, 0, 0, 9, 0, 0, 5, 9, 0])
500
loss: 1.753925  [32000/60000]
removed connections:tensor(152)   ration of removed connections:tensor(0.2969)
tensor([0, 

KeyboardInterrupt: ignored

In [None]:
test_num=70
input_features=50
output_features=20
connection_num=10
input=(torch.randn(test_num,28,28,dtype=torch.float32,requires_grad=True))
test = torch.autograd.gradcheck(model, input, eps=1e-6, atol=1e-4)
print(test)

  f'Input #{idx} requires gradient and '


KeyboardInterrupt: ignored