In [1]:
import random
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from torchsummary import summary
from thop import profile
import os

# Initialize random seed for reproducibility
seed = 1787
random.seed(seed)
np.random.seed(seed)
th.manual_seed(seed)
th.cuda.manual_seed(seed)
th.cuda.manual_seed_all(seed)
th.backends.cudnn.deterministic = True
th.backends.cudnn.benchmark = False

# Set device
device = th.device("cuda" if th.cuda.is_available() else "cpu")

custom_epochs = 40
prune_value=[1,2,4]
prune_limits=[8]*18 + [15]*18 + [30]*18

optim_lr = 0.1
lamda = 0.01
alpha = 0.0001
beta = 0.0001

regularization_prune_percentage = 0.02
decorrelation_lower_bound = 0.3
decorrelation_higher_bound = 0.4

transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

trainset = torchvision.datasets.CIFAR10(root='../data', train=True, download=True, transform=transform_train)
trainloader = th.utils.data.DataLoader(trainset, batch_size=100, shuffle=True)

testset = torchvision.datasets.CIFAR10(root='../data', train=False, download=True, transform=transform_test)
testloader = th.utils.data.DataLoader(testset, batch_size=100, shuffle=True) 

class Network():

    def weight_init(self, m):
        if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
            if self.a_type == 'relu':
                init.kaiming_normal_(m.weight.data, nonlinearity=self.a_type)
                init.constant_(m.bias.data, 0)
            elif self.a_type == 'leaky_relu':
                init.kaiming_normal_(m.weight.data, nonlinearity=self.a_type)
                init.constant_(m.bias.data, 0)
            elif self.a_type == 'tanh':
                g = init.calculate_gain(self.a_type)
                init.xavier_uniform_(m.weight.data, gain=g)
                init.constant_(m.bias.data, 0)
            elif self.a_type == 'sigmoid':
                g = init.calculate_gain(self.a_type)
                init.xavier_uniform_(m.weight.data, gain=g)
                init.constant_(m.bias.data, 0)
            else:
                raise
                return NotImplemented


    def one_hot(self, y, gpu):

        try:
            y = th.from_numpy(y)
        except TypeError:
            None

        y_1d = y
        if gpu:
            y_hot = th.zeros((y.size(0), th.max(y).int()+1)).cuda()
        else:
            y_hot = th.zeros((y.size(0), th.max(y).int()+1))

        for i in range(y.size(0)):
            y_hot[i, y_1d[i].int()] = 1

        return y_hot

   
    def best_tetr_acc(self,prunes):

        print("prunes vaues id ",prunes)
        tr_acc=self.train_accuracy[prunes:]
        te_acc=self.test_accuracy[prunes:]
        best_te_acc=max(te_acc)
        indices = [i for i, x in enumerate(te_acc) if x == best_te_acc]
        temp_tr_acc=[]
        for i in indices:
            temp_tr_acc.append(tr_acc[i])
        best_tr_acc=max(temp_tr_acc)

        del self.test_accuracy[prunes:]
        del self.train_accuracy[prunes:]
        self.test_accuracy.append(best_te_acc)
        self.train_accuracy.append(best_tr_acc)
        return best_te_acc,best_tr_acc

    def best_tetr_acc(self):

        tr_acc=self.train_accuracy[:]
        te_acc=self.test_accuracy[:]
        best_te_acc=max(te_acc)
        indices = [i for i, x in enumerate(te_acc) if x == best_te_acc]
        temp_tr_acc=[]
        for i in indices:
            temp_tr_acc.append(tr_acc[i])
        best_tr_acc=max(temp_tr_acc)

        del self.test_accuracy[prunes:]
        del self.train_accuracy[prunes:]
        self.test_accuracy.append(best_te_acc)
        self.train_accuracy.append(best_tr_acc)
        return best_te_acc,best_tr_acc

    
    def create_folders(self,total_convs):

        main_dir=strftime("/Results/%b%d_%H:%M:%S%p", localtime() )+"_resnet_56/"
        current_dir =  os.path.abspath(os.path.dirname(__file__))
        par_dir = os.path.abspath(current_dir + "/../")
        parent_dir=par_dir+main_dir
        path2=os.path.join(parent_dir, "layer_file_info")
        os.makedirs(path2)
        return parent_dir

    def get_writerow(self,k):

        s='wr.writerow(['

        for i in range(k):

            s=s+'d['+str(i)+']'

            if(i<k-1):
                s=s+','
            else:
                s=s+'])'

        return s

    def get_logger(self,file_path):

        logger = logging.getLogger('gal')
        log_format = '%(asctime)s | %(message)s'
        formatter = logging.Formatter(log_format, datefmt='%m/%d %I:%M:%S %p')
        file_handler = logging.FileHandler(file_path)
        file_handler.setFormatter(formatter)
        stream_handler = logging.StreamHandler()
        stream_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        logger.addHandler(stream_handler)
        logger.setLevel(logging.INFO)

        return logger

class PruningMethod(nn.Module):
    
    def prune_filters(self, indices):
        conv_layer = 0
        
        for layer_name, layer_module in self.named_modules():
            
            if isinstance(layer_module, nn.Conv2d) and layer_name != 'conv1':
                if layer_name.find('conv1') != -1:
                    in_channels = [i for i in range(layer_module.weight.shape[1])]
                    out_channels = indices[conv_layer]
                    layer_module.weight = nn.Parameter(th.FloatTensor(th.from_numpy(layer_module.weight.data.cpu().numpy()[out_channels])).to('cuda'))
                
                if layer_name.find('conv2') != -1:
                    in_channels = indices[conv_layer]
                    out_channels = [i for i in range(layer_module.weight.shape[0])]
                    layer_module.weight = nn.Parameter(th.FloatTensor(th.from_numpy(layer_module.weight.data.cpu().numpy()[:, in_channels])).to('cuda'))
                    conv_layer += 1
         
                layer_module.in_channels = len(in_channels)
                layer_module.out_channels = len(out_channels)
            
            if isinstance(layer_module, nn.BatchNorm2d) and layer_name != 'bn1' and layer_name.find('bn1') != -1:
                out_channels = indices[conv_layer]
                
                layer_module.weight = nn.Parameter(th.FloatTensor(th.from_numpy(layer_module.weight.data.cpu().numpy()[out_channels])).to('cuda'))
                layer_module.bias = nn.Parameter(th.FloatTensor(th.from_numpy(layer_module.bias.data.cpu().numpy()[out_channels])).to('cuda'))
                layer_module.running_mean = th.from_numpy(layer_module.running_mean.cpu().numpy()[out_channels]).to('cuda')
                layer_module.running_var = th.from_numpy(layer_module.running_var.cpu().numpy()[out_channels]).to('cuda')
                layer_module.num_features = len(out_channels)
            
            if isinstance(layer_module, nn.Linear):
                break

    def get_indices_topk(self, layer_bounds, layer_num, prune_limit, prune_value):
        i = layer_num
        indices = prune_value[i]
        
        p = len(layer_bounds)
        if (p - indices) < prune_limit:
            prune_value[i] = p - prune_limit
            indices = prune_value[i]
      
        k = sorted(range(len(layer_bounds)), key=lambda j: layer_bounds[j])[:indices]
        return k
      
    def get_indices_bottomk(self, layer_bounds, i, prune_limit):
        k = sorted(range(len(layer_bounds)), key=lambda j: layer_bounds[j])[-prune_limit:]
        return k


    
def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)


class LambdaLayer(nn.Module):
    def __init__(self, lambd):
        super(LambdaLayer, self).__init__()
        self.lambd = lambd

    def forward(self, x):
        return self.lambd(x)

class ResBasicBlock(Network,PruningMethod):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1):
        super(ResBasicBlock, self).__init__()
        self.inplanes = inplanes
        self.planes = planes
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu1 = nn.ReLU(inplace=True)

        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.relu2 = nn.ReLU(inplace=True)
        self.stride = stride
        self.shortcut = nn.Sequential()
        if stride != 1 or inplanes != planes:
            self.shortcut = LambdaLayer(lambda x: F.pad(x[:, :, ::2, ::2], (0, 0, 0, 0, planes // 4, planes-inplanes-(planes//4)), "constant", 0))

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out) #batch norm
        out = self.relu1(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += self.shortcut(x)
        out = self.relu2(out)

        return out


class ResNet(Network, PruningMethod):
    
    def __init__(self, block, num_layers, covcfg, num_classes=10):
        super(ResNet, self).__init__()
        assert (num_layers - 2) % 6 == 0, 'depth should be 6n+2'
        n = (num_layers - 2) // 6
        self.covcfg = covcfg
        self.num_layers = num_layers

        self.inplanes = 16
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=1, padding=1, bias=False)

        self.bn1 = nn.BatchNorm2d(self.inplanes)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(1, block, 16, blocks=n, stride=1)
        self.layer2 = self._make_layer(2, block, 32, blocks=n, stride=2)
        self.layer3 = self._make_layer(3, block, 64, blocks=n, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d(1)

        if num_layers == 110:
            self.linear = nn.Linear(64 * block.expansion, num_classes)
        else:
            self.fc = nn.Linear(64 * block.expansion, num_classes)

        self.initialize()
        self.layer_name_num={}
        self.pruned_filters={}
        self.remaining_filters={}

        self.remaining_filters_each_epoch=[]

    def initialize(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self,a, block, planes, blocks, stride):
        layers = [] 

        layers.append(block(self.inplanes, planes, stride))

        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)

        if self.num_layers == 110:
            x = self.linear(x)
        else:
            x = self.fc(x)

        return x


def resnet_56():
    cov_cfg = [(3 * i + 2) for i in range(9 * 3 * 2 + 1)]
    return ResNet(ResBasicBlock, 56, cov_cfg)

# Load the model
model = resnet_56().to(device)

Files already downloaded and verified
Files already downloaded and verified


In [2]:
checkpoint = th.load('resnet56_base.pth')
model.load_state_dict(checkpoint['model'])

  checkpoint = th.load('resnet56_base.pth')


<All keys matched successfully>

In [3]:
print(checkpoint.keys())
print(checkpoint['train_acc'])
print(checkpoint['test_acc'])

dict_keys(['model', 'train_acc', 'test_acc', 'optimizer', 'scheduler'])
99.938
93.53


In [4]:
model.eval()


correct = 0
total = 0

with th.no_grad():
    for images, labels in trainloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = th.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
train_accuracy = 100 * correct / total


correct = 0
total = 0

with th.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = th.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_accuracy = 100 * correct / total


print(f"Train Accuracy: {train_accuracy:.2f}%")
print(f"Test Accuracy: {test_accuracy:.2f}%")

Train Accuracy: 99.99%
Test Accuracy: 93.54%


In [5]:
a = [1,2,3,4]

b=[i for i in a]

a+=[69]

print(b)

[1, 2, 3, 4]
