# Neural Network Decompression Without Storing Weights

#### 1. Arithmetic Coding for compressing sequences of digits (i.e. 000100) based on the probability of 0/1
Arithmetic coding is a method that compresses data by representing a sequence of symbols (like '0' and '1') as a single fractional number between 0 and 1. Instead of representing each symbol with a fixed number of bits, arithmetic coding allocates a range within that fraction based on the symbol's predicted probability.

Key Points:
- Probability Distribution: The efficiency of arithmetic coding depends on how well we understand the probability distribution of the symbols. In our case, it's the likelihood of encountering '0' or '1' in the sequence.
- Compression Efficiency: The better we can predict the distribution (i.e., the more accurate our probabilities), the more efficient our compression will be. For example, if '0' is highly probable and we predict it correctly, it gets a larger range and thus requires fewer bits to represent.
- Neural Networks & Compression: If we can optimally learn the probability distribution for 0/1 with a NN, we can achieve a better compression

In [96]:
!pip install arithmetic_compressor



In [97]:
import numpy as np
from arithmetic_compressor import AECompressor
from arithmetic_compressor.models import StaticModel

# Create the model, with the probability of 0 being 0.65 and 1 being 0.35 (arbitrary)
# The better we learn the probabilities, the better the compression
model = StaticModel({'0': 0.8, '1': 0.20})

# create an arithmetic coder
coder = AECompressor(model)

# encode some data
data = "000000"

# Compress, convert to string
compressed = coder.compress(data)
compressed_str = "".join([str(s) for s in coder.compress(data)])

# Decompress
N = len(data)
decoded = coder.decompress(compressed, N)
decoded_str = "".join([str(s) for s in coder.decompress(compressed, N)])

print(f"Data: ({len(data)}): {data}")
print(f"Compressed ({len(compressed_str)}): {compressed_str}")
print(f"Decoded ({len(decoded_str)}): {decoded_str}")
print(f"Compression ratio: {len(data) / len(compressed):.2f}")

Data: (6): 000000
Compressed (2): 01
Decoded (6): 000000
Compression ratio: 3.00


#### Neural network compression using an Arithmetic Coder, by learning probabilities for 0/1

In [98]:
import torch
import torch.nn as nn
import torch.optim as optim

import torch.nn as nn

class BasicLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, fixed_value=0.1):
        super(BasicLSTM, self).__init__()

        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=1)

        # Setting fixed initial values for weights and biases
        with torch.no_grad():
            # For LSTM layer
            self.lstm.weight_ih_l0.fill_(fixed_value)
            self.lstm.weight_hh_l0.fill_(fixed_value)
            self.lstm.bias_ih_l0.fill_(fixed_value)
            self.lstm.bias_hh_l0.fill_(fixed_value)

            # For FC layer
            self.fc.weight.fill_(fixed_value)
            self.fc.bias.fill_(fixed_value)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.lstm.hidden_size)
        c0 = torch.zeros(1, x.size(0), self.lstm.hidden_size)
        
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        out = self.softmax(out)
        return out

In [99]:
# Load the data file into a PyTorch tensor
dataset = []
with open("data.txt", 'r') as in_file:
    for line in in_file:
        tensor_representation = torch.tensor([int(bit) for bit in line.strip()], dtype=torch.int)
        dataset.append(tensor_representation)

dataset[0:4]

[tensor([0, 0, 0, 0, 0, 0], dtype=torch.int32),
 tensor([0, 0, 0, 0, 0, 1], dtype=torch.int32),
 tensor([0, 0, 0, 0, 1, 0], dtype=torch.int32),
 tensor([0, 0, 0, 0, 1, 1], dtype=torch.int32)]

In [100]:
# Data we want to compress
sample = dataset[0]
max_sentence_len = len(sample)

# Network hyperparameters
input_dim = max_sentence_len  # Max length of sentence (5 tokens)
num_layers = 1 # One layer LSTM
hidden_dim = 10 # Hidden dimension
output_dim = 2 # since we have two possible outputs: 0 and 1
learning_rate = 1e-01 # Learning rate

# Create the LSTM model and optimizer
lstm_model = BasicLSTM(input_dim, hidden_dim, output_dim)
optimizer = optim.SGD(lstm_model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# Test that it works
t = torch.Tensor([-1] * input_dim).view(1,1,-1)
print(f"Input: {t}")
print(f"Output: {lstm_model(t)}")

Input: tensor([[[-1., -1., -1., -1., -1., -1.]]])
Output: tensor([[0.5000, 0.5000]], grad_fn=<SoftmaxBackward0>)


In [101]:
lstm_model = BasicLSTM(input_dim, hidden_dim, output_dim, fixed_value=-0.1)
optimizer = optim.SGD(lstm_model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# Prepare compressed text file
with open('compressed.txt', 'w') as file:
    # First sentence is actually not compressed
    compressed_str = "".join([str(s) for s in dataset[0].numpy()])
    file.write(f'{compressed_str}\n')

# Iterate over all sentences in dataset
for i2, sentence in enumerate(dataset):

    # Since we always compress the next sentence, we stop at the second last sentence
    if i2 == len(dataset)-1:
        break

    # Input tensor starts with -1 (missing) for all (6) positions
    # Gets filled one by one as we progress through the sentence
    input_tensor = torch.Tensor([-1]*max_sentence_len).view(1, 1, -1)

    # Iterate over all tokens in sentence
    for i, bit in enumerate(sentence):
        # Since we are predicting next token, we stop at the second last token
        if i == len(sentence)-1:
            break

        # Input is all tokens seen so far
        input_tensor[0, 0, i] = sentence[i].view(1, 1, 1)

        # Next token to predict
        target_tensor = sentence[i+1].view(1).long()
        
        # Forward pass
        output = lstm_model(input_tensor)
        loss = criterion(output, target_tensor)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print stats for first iterations
        if i < 6 and i2 < 3:
            print(f"Input: {input_tensor}")
            print(f"Output: {output.detach().numpy()} (probability distribution))")
            print(f"Target: {target_tensor.detach().numpy()}, Predicted {np.argmax(output.detach().numpy())}\n")

    # After training on sentence, we compress it with the learned probability distribution
    # The longer we train, the better the compression (should) be
    prob_distribution = {
        0: output[0, 0].item(), # Learned probability for 0
        1: output[0, 1].item(), # Learned probability for 1
        }

    model = StaticModel(prob_distribution)
    coder = AECompressor(model)

    next_sentence = dataset[i2+1].numpy().tolist()
    compressed = coder.compress(next_sentence)
    compressed_str = ''.join([str(i) for i in compressed])
    orig_str = "".join([str(s) for s in sentence.numpy()])

    with open('compressed.txt', 'a') as file:
        file.write(f'{compressed_str}\n')

    print(f"Original sentence: {orig_str}")
    print(f"Compessed sentence: {compressed_str}")
    print(f"Compression ratio: {len(orig_str)/len(compressed_str)}\n")

Input: tensor([[[ 0., -1., -1., -1., -1., -1.]]])
Output: [[0.5 0.5]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0., -1., -1., -1., -1.]]])
Output: [[0.5132045  0.48679546]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0., -1., -1., -1.]]])
Output: [[0.5253333  0.47466666]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0.,  0., -1., -1.]]])
Output: [[0.5369131  0.46308687]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0.,  0.,  0., -1.]]])
Output: [[0.5482952  0.45170477]] (probability distribution))
Target: [0], Predicted 0

Original sentence: 000000
Compessed sentence: 00001
Compression ratio: 1.2

Input: tensor([[[ 0., -1., -1., -1., -1., -1.]]])
Output: [[0.5623161 0.4376839]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0., -1., -1., -1., -1.]]])
Output: [[0.5735896  0.42641035]] (probability distribution))


In [102]:
# Calculate compression ratio
import os
size_data, size_compressed = os.path.getsize("data.txt"), os.path.getsize("compressed.txt")
print(f"Size data {size_data}, size compressed {size_compressed}, compression ratio {size_data/size_compressed}")

Size data 210, size compressed 148, compression ratio 1.4189189189189189


### Decompressing WITHOUT transmitting neural weights, by learning iteratively from the decompressed data

The essence of this method lies in the synchronized evolution of the encoder and decoder, even when they operate independently.

##### Initialization:
- Start with a neural network decoder having all weights set to a fixed, known value.

##### Kickstarting the Process:
- Remember, the first sequence isn't compressed. Load this uncompressed sequence as the initial data.

##### Iterative Learning:
- Prediction: Use the decoder to predict the next sequence based on the current state of the neural network.

- Decompression: Based on the predicted probabilities, decompress the sequence. This gives you the original data.

- Network Update: Now, train the neural network using this newly decompressed sequence. This refines its predictions for subsequent sequences.

##### Continuation:
For each subsequent sequence, repeat the above iterative learning process. The decompressed sequence from the previous step helps in predicting and decompressing the next.

##### Why This Works:
Both encoder and decoder start with identical neural network setups. As sequences are processed, both learn and adjust their weights in the same manner. This synchronized evolution ensures that the decoder can accurately decompress, even without receiving the encoder's neural weights.

In [103]:
# Now let's decompress!
# Load the compressed dataset into a PyTorch tensor
compressed_dataset = []
with open("compressed.txt", 'r') as in_file:
    for line in in_file:
        tensor_representation = torch.tensor([int(bit) for bit in line.strip()], dtype=torch.int)
        compressed_dataset.append(tensor_representation)

compressed_dataset[0:4]

[tensor([0, 0, 0, 0, 0, 0], dtype=torch.int32),
 tensor([0, 0, 0, 0, 1], dtype=torch.int32),
 tensor([0, 0, 0, 1, 1], dtype=torch.int32),
 tensor([0, 0, 1], dtype=torch.int32)]

In [104]:
# Initialize new LSTM model, but with fixed weights same as before
# NB: No weights are loaded - we will learn them as we decompress
lstm_model = BasicLSTM(input_dim, hidden_dim, output_dim, fixed_value=-0.1)
optimizer = optim.SGD(lstm_model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# Prepare compressed text file
with open('decompressed.txt', 'w') as file:
    # First sentence is actually not compressed
    compressed_str = "".join([str(s) for s in dataset[0].numpy()])
    file.write(f'{compressed_str}\n')

# Nb: First sentence is not compressed!
decompressed_dataset = []
decompressed_dataset.append(compressed_dataset[0])

# Iterate over all sentences in dataset. Note that first sentence is uncompressed
for i2, _ in enumerate(compressed_dataset):

    # Grab last decompressed sentence
    sentence = decompressed_dataset[-1]
    
    # Input tensor starts with -1 (missing) for all (6) positions
    # Gets filled one by one as we progress through the sentence
    input_tensor = torch.Tensor([-1]*max_sentence_len).view(1, 1, -1)

    if i2 == len(compressed_dataset)-1:
        break

    for i, bit in enumerate(sentence):

        if i == len(sentence)-1:
            break

        # Input is all tokens so far
        input_tensor[0, 0, i] = sentence[i].view(1, 1, 1)

        # Next token to predict
        target_tensor = sentence[i+1].view(1).long()
        
        # Forward pass
        output = lstm_model(input_tensor)
        loss = criterion(output, target_tensor)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print stats for first 10 iterations
        if i < 7 and i2 < 3:
            print(f"Input: {input_tensor}")
            print(f"Output: {output.detach().numpy()} (probability distribution))")
            print(f"Target: {target_tensor.detach().numpy()}, Predicted {np.argmax(output.detach().numpy())}\n")

    # After training on sentence, we used learned probability distribution to decompress the next one!
    prob_distribution = {
        0: output[0, 0].item(), # Learned probability for 0
        1: output[0, 1].item() # Learned probability for 1
        }

    model = StaticModel(prob_distribution)
    coder = AECompressor(model)

    # Decompress next sentence
    next_comp_sentence = compressed_dataset[i2+1].numpy().tolist()
    decoded = coder.decompress(next_comp_sentence, max_sentence_len)
    decoded_str = "".join([str(s) for s in decoded])

    # Add decoded sentence to decompressed dataset
    decompressed_dataset.append(torch.Tensor(decoded))

    # Write to file
    with open('decompressed.txt', 'a') as file:
        file.write(f'{decoded_str}\n')    

    if i2 < 10:
        compressed_str = "".join([str(s) for s in next_comp_sentence])
        orig_str = "".join([str(s) for s in dataset[i2+1].numpy()])
        print(f"Original sentence: {orig_str}")
        print(f"Compressed sentence: {compressed_str}")
        print(f"Decoded sentence: {decoded_str}\n")

Input: tensor([[[ 0., -1., -1., -1., -1., -1.]]])
Output: [[0.5 0.5]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0., -1., -1., -1., -1.]]])
Output: [[0.5132045  0.48679546]] (probability distribution))
Target: [0], Predicted 0



Input: tensor([[[ 0.,  0.,  0., -1., -1., -1.]]])
Output: [[0.5253333  0.47466666]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0.,  0., -1., -1.]]])
Output: [[0.5369131  0.46308687]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0.,  0.,  0., -1.]]])
Output: [[0.5482952  0.45170477]] (probability distribution))
Target: [0], Predicted 0

Original sentence: 000001
Compressed sentence: 00001
Decoded sentence: 000001

Input: tensor([[[ 0., -1., -1., -1., -1., -1.]]])
Output: [[0.5623161 0.4376839]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0., -1., -1., -1., -1.]]])
Output: [[0.5735896  0.42641035]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0., -1., -1., -1.]]])
Output: [[0.5838591  0.41614088]] (probability distribution))
Target: [0], Predicted 0

Input: tensor([[[ 0.,  0.,  0.,  0., -1., -1.]]])
Output: [[0.5936383  0.40636164]] (probabilit

In [105]:
!ls -lh *.txt

-rw-r--r--  1 maghoi  staff   148B Aug 19 13:12 compressed.txt
-rw-r--r--  1 maghoi  staff   210B Aug 18 23:42 data.txt
-rw-r--r--  1 maghoi  staff   210B Aug 19 13:12 decompressed.txt


In [106]:
import filecmp
print(f"Check that decompressed data is the same as the original dataset")
filecmp.cmp('data.txt', 'decompressed.txt')

Check that decompressed data is the same as the original dataset


True

##### What we have achieved so far:
- We trained an encoder neural network to predict the next digits in the sequences of data.txt
- By iteratively using the learned NN encoder probability distributions, we to compress the sequences into compressed.txt using Arithmetic Coding
- By exactly mirroring the process using the decoder network, we iteratively decompress and train again on the original sequences
- Outcome: We losslessly decompressed all sequences in compressed.txt WITHOUT transmitting the compressor NN weights