In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
torch.manual_seed(1)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

https://github.com/stevenliuyi/information-bottleneck/blob/master/information_bottleneck.ipynb <br>
function: calc_mutual_information

In [6]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

In [4]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(784 , 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        if self.btn1 is None:
            self.btn1 = x.detach().copy()
        x = F.relu(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

In [5]:
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = F.log_softmax(model(data), dim=1)
        loss = F.nll_loss(output, target) # nn.CrossEntropyLoss applies internally F.log_softmax and nn.NLLLoss
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = F.log_softmax(model(data), dim=1)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [6]:


transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
    ])
dataset1 = datasets.MNIST('./data', train=True, download=True,
                   transform=transform)
dataset2 = datasets.MNIST('./data', train=False,
                   transform=transform)

train_loader = torch.utils.data.DataLoader(dataset1, batch_size=64, num_workers=1, pin_memory=True, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset2, batch_size=64, num_workers=1, pin_memory=True, shuffle=True)

model = Net().to(device)
checkpoint = torch.load('./model.pt')
model.load_state_dict(checkpoint)
optimizer = optim.Adadelta(model.parameters(), lr=1)

In [14]:
# scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
# for epoch in range(1, 10 + 1):
#     train(model, train_loader, optimizer, epoch)
#     test(model, test_loader)
#     scheduler.step()

In [35]:
# torch.save(model.state_dict(), './model.pt')

In [11]:
temp = []
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(784 , 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        temp.append(x.detach().clone())
        x = F.relu(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output
model = Net().to(device)
# checkpoint = torch.load('./model.pt')
# model.load_state_dict(checkpoint)
optimizer = optim.Adadelta(model.parameters(), lr=1)
test(model, test_loader)


Test set: Average loss: 2.3492, Accuracy: 672/10000 (7%)



In [13]:
torch.tensor(temp).shape

ValueError: only one element tensors can be converted to Python scalars

### Information Bottleneck theory for Deep Learning

In [13]:
import numpy as np
from random import randint, seed
from torch.utils.data import Dataset
from collections import Counter

In [14]:

def calc_mutual_information(hidden, n_samples):
    n_neurons = hidden.shape[1]
  
    # discretization 
    n_bins = 30
    bins = np.linspace(-1, 1, n_bins+1)
    indices = np.digitize(hidden, bins)
    
    # initialize pdfs
    pdf_x = Counter(); pdf_y = Counter(); pdf_t = Counter(); pdf_xt = Counter(); pdf_yt = Counter()

#     n_samples = n_test_samples
    for i in range(n_samples):
        pdf_x[i] += 1/float(n_samples)
        pdf_y[y_train[i,0]] += 1/float(n_samples)      
        pdf_xt[(i,)+tuple(indices[i,:])] += 1/float(n_samples)
        pdf_yt[(y_train[i,0],)+tuple(indices[i,:])] += 1/float(n_samples)
        pdf_t[tuple(indices[i,:])] += 1/float(n_samples)
    
    # calcuate encoder mutual information I(X;T)
    mi_xt = 0
    for i in pdf_xt:
        # P(x,t), P(x) and P(t)
        p_xt = pdf_xt[i]; p_x = pdf_x[i[0]]; p_t = pdf_t[i[1:]]
        # I(X;T)
        mi_xt += p_xt * np.log(p_xt / p_x / p_t)
 
    # calculate decoder mutual information I(T;Y)
    mi_ty = 0
    for i in pdf_yt:
        # P(t,y), P(t) and P(y)
        p_yt = pdf_yt[i]; p_t = pdf_t[i[1:]]; p_y = pdf_y[i[0]]
        # I(X;T)
        mi_ty += p_yt * np.log(p_yt / p_t / p_y)
            
    return mi_xt, mi_ty

# get mutual information for all hidden layers
def get_mutual_information(hiddens):
    mi_xt_list = []; mi_ty_list = []
    for hidden in hiddens:
        mi_xt, mi_ty = calc_mutual_information(hidden, hiddens[0].shape[0])
        mi_xt_list.append(mi_xt)
        mi_ty_list.append(mi_ty)
    return mi_xt_list, mi_ty_list

In [15]:
n_train_samples = 50000 # number of train samples
n_test_samples = 10000 # number of test samples

groups = np.append(np.zeros(8),np.ones(8)) # 16 groups
np.random.seed(1234)
np.random.shuffle(groups)

# generate samples
seed(1234)
def generate_samples(n_samples):
    x_data = np.zeros((n_samples, 10)) # inputs
    x_int = np.zeros(n_samples) # integers representing the inputs
    y_data = np.zeros((n_samples, 2)) # outputs
    
    for i in range(n_samples):
        random_int = randint(0, 1023)
        x_data[i,:] = [int(b) for b in list("{0:b}".format(random_int).zfill(10))]
        x_int[i] = random_int
        y_data[i,0] = groups[random_int % 16]
        y_data[i,1] = 1 - y_data[i,0]
    return np.array(x_data, dtype=np.float32), np.array(y_data, dtype=np.float32), x_int

x_train, y_train, x_train_int = generate_samples(n_train_samples) # training dataset
x_test, y_test, _ = generate_samples(n_test_samples) # testing dataset

In [16]:
class RandomDataset(Dataset):
    def __init__(self, n_samples):
        super().__init__()
        self.x, self.y, _ = generate_samples(n_samples)
        self.y = np.argmax(self.y, axis=1)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, index):
        return self.x[index], self.y[index]

train_loader = torch.utils.data.DataLoader(
    RandomDataset(n_train_samples),
    batch_size=1024,
    num_workers=8,
    pin_memory=True,
    shuffle=True,
)
test_loader = torch.utils.data.DataLoader(
    RandomDataset(n_test_samples),
    batch_size=1024,
    num_workers=8,
    pin_memory=True,
    shuffle=True,
)

In [17]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(10 , 8)
        self.fc2 = nn.Linear(8, 6)
        self.fc3 = nn.Linear(6, 4)
        self.fc4 = nn.Linear(4, 2)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        x = F.relu(x)
        x = self.fc4(x)
        output = F.log_softmax(x, dim=1)
        return output
model = Net().to(device)

In [22]:
activation = {}
def get_activation(name):
    def hook(model, input, output):
        if name not in activation:
            activation[name] = output.detach()
        else:
            activation[name] = torch.cat([activation[name], output.detach()])
    return hook

handle1 = model.fc1.register_forward_hook(get_activation('fc1')) 
handle2 = model.fc2.register_forward_hook(get_activation('fc2')) 
handle3 = model.fc3.register_forward_hook(get_activation('fc3'))
handle4 = model.fc4.register_forward_hook(get_activation('fc4')) 

In [27]:
model.fc1.weight.shape

torch.Size([8, 10])

In [19]:
def get_all_mutual_information():
    _activation = []
    for k in activation.keys():
        _activation.append(activation[k].detach().cpu().numpy())
    mi_xt, mi_ty = get_mutual_information(_activation)
    return mi_xt, mi_ty

In [20]:
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = F.log_softmax(model(data), dim=1)
        loss = F.nll_loss(output, target) # nn.CrossEntropyLoss applies internally F.log_softmax and nn.NLLLoss
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))


def test(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = F.log_softmax(model(data), dim=1)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [21]:
optimizer = optim.SGD(model.parameters(), lr=.1)
for epoch in range(0, 40):
    activation = {}
    train(model, train_loader, optimizer, epoch)
    if epoch % 10 == 0:
        mi_xt, mi_ty = get_all_mutual_information()
        print(mi_xt, mi_ty)
    test(model, test_loader)


[7.014777425505188, 6.589065240920553, 2.872409623225701, 2.0743472328347443] [0.012214297958215346, 0.01153353603837338, 0.0005616772879522717, 0.0002623783441895671]

Test set: Average loss: 0.6936, Accuracy: 5027/10000 (50%)


Test set: Average loss: 0.6931, Accuracy: 5027/10000 (50%)


Test set: Average loss: 0.6928, Accuracy: 5199/10000 (52%)


Test set: Average loss: 0.6920, Accuracy: 5343/10000 (53%)


Test set: Average loss: 0.6906, Accuracy: 5256/10000 (53%)


Test set: Average loss: 0.6886, Accuracy: 5558/10000 (56%)


Test set: Average loss: 0.6850, Accuracy: 5833/10000 (58%)


Test set: Average loss: 0.6771, Accuracy: 6098/10000 (61%)


Test set: Average loss: 0.6601, Accuracy: 6487/10000 (65%)


Test set: Average loss: 0.6328, Accuracy: 6527/10000 (65%)

[8.007631457213526, 7.713728747732322, 4.736695994058063, 3.2237643891164223] [0.050251280927471574, 0.04383872875252049, 0.0029777773032134388, 0.0004264350756646436]

Test set: Average loss: 0.6142, Accuracy: 6234/10000 