In [1]:
import torch
import os
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import json
import math
from collections import Counter

# Define the LeNet-5 architecture
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        # First convolutional layer: input 1 channel, output 6 channels, kernel size 5x5
        self.conv1 = nn.Conv2d(1, 6, 5)
        # Average pooling layer with kernel size 2x2 and stride 2
        self.pool = nn.AvgPool2d(2, 2)
        # Second convolutional layer: input 6 channels, output 16 channels, kernel size 5x5
        self.conv2 = nn.Conv2d(6, 16, 5)
        # Fully connected layers
        self.fc1 = nn.Linear(16*4*4, 120)  # Flattened input size 16*4*4, output 120
        self.fc2 = nn.Linear(120, 84)  # Output 84 neurons
        self.fc3 = nn.Linear(84, 10)  # Output 10 classes

    def forward(self, x):
        # Apply first convolution, then activation function (tanh), then pooling
        x = torch.tanh(self.conv1(x))
        x = self.pool(x)
        # Apply second convolution, then activation function (tanh), then pooling
        x = torch.tanh(self.conv2(x))
        x = self.pool(x)
        # Flatten the tensor for the fully connected layers
        x = x.view(-1, 16*4*4)
        # Pass through fully connected layers with activation (tanh)
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        # Output layer without activation (raw scores for classification)
        x = self.fc3(x)
        return x
        
def model_to_json(model):
    """
    Function to convert a PyTorch model to a JSON representation
    """
    layers = []
    for name, module in model.named_modules():
        if len(list(module.children())) == 0:  # Consider only terminal modules (not containers)
            layer = {"name": name, "type": module.__class__.__name__}
            
            # Add specific parameters based on the layer type
            if isinstance(module, nn.Conv2d):
                layer.update({
                    "in_channels": module.in_channels,
                    "out_channels": module.out_channels,
                    "kernel_size": module.kernel_size,
                    "stride": module.stride,
                    "padding": module.padding
                })
            elif isinstance(module, nn.AvgPool2d):
                layer.update({
                    "kernel_size": module.kernel_size,
                    "stride": module.stride
                })
            elif isinstance(module, nn.Linear):
                layer.update({
                    "in_features": module.in_features,
                    "out_features": module.out_features
                })
            layers.append(layer)
    return {"layers": layers}

def json_to_model(json_file):
    """
    Function to load a model from a JSON file
    """
    with open(json_file, "r") as f:
        model_data = json.load(f)
    
    # Manually create a LeNet-5 model instance
    model = LeNet5()  # Directly instantiate the LeNet-5 class
    layers = model.children()  # Retrieve model layers
    
    for layer_data, layer in zip(model_data["layers"], layers):
        layer_type = layer_data["type"]
        
        if isinstance(layer, nn.Conv2d):
            # Randomly initialize weights and biases for Conv2d layers
            layer.weight.data = torch.randn_like(layer.weight.data)
            layer.bias.data = torch.randn_like(layer.bias.data)
        
        elif isinstance(layer, nn.Linear):
            # Randomly initialize weights and biases for Linear layers
            layer.weight.data = torch.randn_like(layer.weight.data)
            layer.bias.data = torch.randn_like(layer.bias.data)
        
        # Additional logic can be added to update other layer types if necessary
        
    return model

"""
# Creates the model
model = LeNet5()

# Conversion in JSON
model_dict = model_to_json(model)
json_file = "lenet5.json"
with open(json_file, "w") as f:
    json.dump(model_dict, f, indent=4)

print(f"FILE {json_file} CREATED")
"""

def test_accuracy(model, dataloader, device):
    """
    Function to calculate the accuracy of a model on a given dataloader.
    """
    correct, total = 0, 0
    with torch.no_grad():  # Disable gradient computation for evaluation
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)  # Move data to the appropriate device
            outputs = model(images)  # Get model predictions
            _, predicted = torch.max(outputs.data, 1)  # Get the class with the highest probability
            total += labels.size(0)  # Update total number of samples
            correct += (predicted == labels).sum().item()  # Count correct predictions
    
    accuracy = 100 * correct / total  # Compute accuracy percentage
    return accuracy

def compute_entropy(string):
    """
    Function to compute the entropy of a given string.
    """
    # Count the frequency of each character in the string
    frequencies = Counter(string)
    # Calculate the total length of the string
    total_length = len(string)
    
    # Compute entropy
    entropy = 0
    for freq in frequencies.values():
        probability = freq / total_length  # Compute probability of each character
        entropy -= probability * math.log2(probability)  # Apply entropy formula
    
    return entropy * total_length  # Return the entropy weighted by string length


In [2]:
# Initialize the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the MNIST test dataset
transform = transforms.Compose([transforms.ToTensor()])
testset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=1000, shuffle=False)

# Load the network from JSON and calculate its size
json_file = "lenet5.json"
model = json_to_model(json_file).to(device)

# Compute the file size in bits
architecture_size_bytes = os.path.getsize(json_file)
architecture_size_bits = architecture_size_bytes * 8

# Output information
print(f"Saved JSON file: {json_file}")
print(f"File size: {architecture_size_bits} bits")

Saved JSON file: lenet5.json
File size: 10656 bits


In [3]:
# Load model weights
model_name = "r1114_1"
model_path = "../BestModelsBeforeQuantization/" + model_name + ".pth"
#model_path = "../BestModelsAfterQuantization/" + model_name + "_quantized.pth"

w0 = -0.11
r = 1.114
min_w = w0 - r
max_w = w0 + r

state_dict = torch.load(model_path, map_location=device, weights_only=False)
model.load_state_dict(state_dict)

w_saved = torch.cat([param.data.view(-1) for param in model.parameters()])

model.eval()

num_unique_weights_saved = torch.unique(w_saved).numel()
original_accuracy = test_accuracy(model, testloader, device)
print(f"Number of unique weights in the original model: {num_unique_weights_saved}")
print(f"Original model accuracy: {original_accuracy:.2f}%")

encoded_list = []
for elem in w_saved:
    loaded_element = float(elem)
    if loaded_element == -0.0:
        loaded_element = 0.0
    encoded_list.append(loaded_element)

entropy = round(compute_entropy(encoded_list)) + 1
print(f"\nEntropy of the string multiplied by its length: {entropy}")

Number of unique weights in the original model: 6751
Original model accuracy: 98.95%

Entropy of the string multiplied by its length: 114234


In [4]:
from IPython.display import clear_output

def quantize_weights_center(weights, v, v_centers):
    """
    Function for weight quantization using central value.
    Quantizes weights based on the central value of the buckets in vector v.
    """
    indices = torch.bucketize(weights, v, right=False) - 1
    indices = torch.clamp(indices, min=0, max=len(v_centers) - 1)  # Ensure indices are valid
    return v_centers[indices]

l1 = []
l2 = []

for C in range(1, 1024):
    # Construct vector v
    v = torch.linspace(min_w, max_w - (max_w - min_w)/C, steps=C)

    # Compute central values of the buckets
    v_centers = (v[:-1] + v[1:]) / 2
    v_centers = torch.cat([v_centers, v[-1:]])  # Add final value to handle the last bucket

    state_dict = torch.load(model_path, map_location='cpu', weights_only=False)  # Adjust device accordingly
    model.load_state_dict(state_dict)

    # Extract model weights
    w_saved = torch.cat([param.data.view(-1) for param in model.parameters()])

    # Quantize weights using central values
    w_quantized = quantize_weights_center(w_saved, v, v_centers)

    # Replace quantized weights in the model
    start_idx = 0
    for param in model.parameters():
        numel = param.data.numel()
        param.data = w_quantized[start_idx:start_idx + numel].view(param.data.size())
        start_idx += numel

    # Evaluate quantized model
    model.eval()
    num_unique_weights_quantized = torch.unique(w_quantized).numel()
    quantized_accuracy = test_accuracy(model, testloader, device)

    # Compute entropy of the quantized string
    encoded_list = [float(elem) if float(elem) != -0.0 else 0.0 for elem in w_quantized]
    entropy = round(compute_entropy(encoded_list)) + 1
    
    l1.append(quantized_accuracy)
    l2.append(entropy)
    
    print(f"C = {C} analyzed")
    clear_output(wait=True)
    
sorted_indices = np.argsort(l1)
for i in range(1, 10):
    print(f"Quantization at C={sorted_indices[-i] + 1}, Accuracy:{l1[sorted_indices[-i]]}, Entropy:{l2[sorted_indices[-i]]}")


Quantization at C=140, Accuracy:99.01, Entropy:59601
Quantization at C=386, Accuracy:99.01, Entropy:70135
Quantization at C=331, Accuracy:99.01, Entropy:68712
Quantization at C=327, Accuracy:99.0, Entropy:68583
Quantization at C=322, Accuracy:99.0, Entropy:68531
Quantization at C=678, Accuracy:98.99, Entropy:76202
Quantization at C=220, Accuracy:98.99, Entropy:63668
Quantization at C=350, Accuracy:98.99, Entropy:69694
Quantization at C=377, Accuracy:98.99, Entropy:70439


In [5]:
# Quantization with C = 140
C = 140

# Construct vector v
v = torch.linspace(min_w, max_w - (max_w - min_w)/C, steps=C)

# Compute central values of the buckets
v_centers = (v[:-1] + v[1:]) / 2
v_centers = torch.cat([v_centers, v[-1:]])  # Add final value to handle the last bucket

state_dict = torch.load(model_path, map_location='cpu', weights_only=False)  # Adjust device accordingly
model.load_state_dict(state_dict)

# Extract model weights
w_saved = torch.cat([param.data.view(-1) for param in model.parameters()])

# Quantize weights using central values
w_quantized = quantize_weights_center(w_saved, v, v_centers)

# Replace quantized weights in the model
start_idx = 0
for param in model.parameters():
    numel = param.data.numel()
    param.data = w_quantized[start_idx:start_idx + numel].view(param.data.size())
    start_idx += numel

# Evaluate quantized model
model.eval()
num_unique_weights_quantized = torch.unique(w_quantized).numel()
quantized_accuracy = test_accuracy(model, testloader, device)

print(f"Number of unique weights in the quantized model: {num_unique_weights_quantized}")
print(f"Quantized model accuracy: {quantized_accuracy:.2f}%")

# Compute entropy of the quantized string
encoded_list = [float(elem) if float(elem) != -0.0 else 0.0 for elem in w_quantized]
entropy = round(compute_entropy(encoded_list)) + 1
print(f"\nEntropy of the string multiplied by its length: {entropy}")

model_path = "../BestModelsAfterQuantization/" + model_name + "_quantized.pth"
torch.save(model.state_dict(), model_path)
print("\nModel quantized saved!")

Number of unique weights in the quantized model: 127
Quantized model accuracy: 99.01%

Entropy of the string multiplied by its length: 59601

Model quantized saved!


## Compression: Huffman Coding ( See https://rosettacode.org/wiki/Huffman_coding#Python)

In [6]:
from heapq import heappush, heappop, heapify
from collections import defaultdict

def encode(symb2freq):
    """Huffman encode the given dict mapping symbols to weights"""
    heap = [[wt, [sym, ""]] for sym, wt in symb2freq.items()]
    heapify(heap)
    while len(heap) > 1:
        lo = heappop(heap)
        hi = heappop(heap)
        for pair in lo[1:]:
            pair[1] = '0' + pair[1]
        for pair in hi[1:]:
            pair[1] = '1' + pair[1]
        heappush(heap, [lo[0] + hi[0]] + lo[1:] + hi[1:])
    return sorted(heappop(heap)[1:], key=lambda p: (len(p[-1]), p))

def decode(encoded_text, huffman_code):
    """
    Decodes the given encoded text using the provided Huffman code.
    """
    code_to_symbol = {code: symbol for symbol, code in huffman_code}
    decoded_text = []
    temp_code = ""
    for bit in encoded_text:
        temp_code += bit
        if temp_code in code_to_symbol:
            decoded_text.append(code_to_symbol[temp_code])
            temp_code = ""
    return decoded_text

# Apply Huffman encoding
symb2freq = defaultdict(int)
for sym in encoded_list:
    symb2freq[sym] += 1
huff = encode(symb2freq)
encoded_huffman = ''.join([dict(huff)[sym] for sym in encoded_list])
decoded_list = decode(encoded_huffman, huff)

if encoded_list == decoded_list:
    print("\nEncoding successful!")
else:
    print("Error during encoding!")

print("Compressed size in bits:", len(encoded_huffman))
print("Uncompressed size in bits:", len(encoded_list) * 32)
print("\nCompression ratio:", 100 * round(len(encoded_huffman) / (len(encoded_list) * 32), 4), "%")


Encoding successful!
Compressed size in bits: 77872
Uncompressed size in bits: 1421632

Compression ratio: 5.48 %


In [7]:
dictionary_size = 0
for sym, code in huff:
    dictionary_size += 32 + len(code)

# Total dimension
total_size = len(encoded_list) + dictionary_size + architecture_size_bits

# Compression ratio
original_size = len(encoded_list) * 32  # Ogni peso è un float a 32 bit
compression_ratio = total_size / original_size

print(f"Original dimension: {original_size} bit")

print(f"\nEncoded list dimension: {len(encoded_list)} bit")
print(f"Huffman dictionary dimension: {dictionary_size} bit")
print(f"Network architecture dimension: {architecture_size_bits} bit")
print(f"\nTotal dimension: {total_size} bit")
print(f"\nCompression ratio: {compression_ratio:.2%}")


Original dimension: 1421632 bit

Encoded list dimension: 44426 bit
Huffman dictionary dimension: 5421 bit
Network architecture dimension: 10656 bit

Total dimension: 60503 bit

Compression ratio: 4.26%


## Arithmetic coding

In [8]:
## from collections import defaultdict
from decimal import Decimal, getcontext
import time

def encode_arithmetic(symb2freq):
    """
    Performs arithmetic encoding for a dictionary of symbols with their respective frequencies.
    """
    total_freq = sum(symb2freq.values())  # Compute total frequency
    prob_intervals = {}  # Dictionary to store probability intervals
    low = Decimal(0.0)
    
    for sym, freq in sorted(symb2freq.items()):  # Iterate through symbols in sorted order
        prob_intervals[sym] = (low, low + Decimal(freq) / Decimal(total_freq))  # Assign probability range
        low = prob_intervals[sym][1]  # Update lower bound for next symbol
    
    return prob_intervals  # Return dictionary of intervals

def encode_arithmetic_text(symbol_list, prob_intervals):
    """
    Encodes a list of symbols using arithmetic encoding.
    """
    low = Decimal(0.0)
    high = Decimal(1.0)
    
    for sym in symbol_list:  # Process each symbol in the list
        range_ = high - low  # Compute current range
        high = low + range_ * prob_intervals[sym][1]  # Update upper bound
        low = low + range_ * prob_intervals[sym][0]  # Update lower bound
    
    R = high - low  # Final range after encoding
    dimension = - R.ln() / Decimal(2).ln()  # Compute encoding length in bits
    return (low + high) / 2, dimension  # Return the midpoint representation and bit size

def decode_arithmetic(encoded_value, symb2freq, length_of_symbols):
    """
    Decodes an arithmetic-encoded value given the symbol frequency dictionary.
    """
    prob_intervals = encode_arithmetic(symb2freq)  # Recompute probability intervals
    decoded_symbols = []
    
    for _ in range(length_of_symbols):  # Decode for the original length of symbols
        for sym, (low, high) in prob_intervals.items():  # Iterate through interval mappings
            if low <= encoded_value < high:  # Check if value falls in symbol range
                decoded_symbols.append(sym)  # Append decoded symbol
                range_ = high - low  # Compute new range
                encoded_value = (encoded_value - low) / range_  # Normalize value
                break
        else:
            raise ValueError(f"Unable to decode symbol. encoded_value: {encoded_value}")
    
    return decoded_symbols  # Return decoded symbol list

# Set higher precision for decimal operations
getcontext().prec = round(len(encoded_list) * 1.4)

# Create the frequency dictionary
symb2freq = defaultdict(int)
for sym in encoded_list:
    symb2freq[sym] += 1  # Count symbol occurrences

# Perform arithmetic encoding
start_time = time.time()
print("...STARTING ARITHMETIC CODING...")
prob_intervals = encode_arithmetic(symb2freq)  # Compute probability intervals
encoded_value, dimension = encode_arithmetic_text(encoded_list, prob_intervals)  # Encode data
final_time = time.time() - start_time
print(f'Encoding time: {final_time:.2f} seconds')

# Decode the encoded text
print("...STARTING DECODING...")
start_time = time.time()
decoded_symbols = decode_arithmetic(encoded_value, symb2freq, len(encoded_list))  # Decode data
final_time = time.time() - start_time
print(f'Decoding time: {final_time:.2f} seconds')

# Verify that encoding and decoding are correct
if encoded_list == decoded_symbols:
    print("\nENCODING SUCCESSFUL!")
else:
    print(f"Encoding error! Decoded: {decoded_symbols}")

print("\nArithmetic Coding achieves", round(dimension) + 1, "bits.")

# Compute original file size
bits_per_float = 32  # Assuming 32-bit float representation
dimensione_originale = len(encoded_list) * bits_per_float  # Total bit size

# Compute compression ratio
compression_ratio = (round(dimension) + 1) / dimensione_originale

# Output compression ratio
print(f"Compression ratio: {compression_ratio:.4%}")

...STARTING ARITHMETIC CODING...
Encoding time: 599.79 seconds
...STARTING DECODING...
Decoding time: 734.95 seconds

ENCODING SUCCESSFUL!

Arithmetic Coding achieves 59601 bits.
Compression ratio: 4.1924%


In [9]:
print("ARITHMETIC CODING REACHES n*H?", entropy == round(dimension) + 1)

ARITHMETIC CODING REACHES n*H? True


## gzip-9 & zstd-22

In [10]:
import gzip
import zstandard as zstd
import struct

# Converts float list in byte
input_bytes = b''.join(struct.pack('f', num) for num in encoded_list)

# Compresses with gzip-9
def compress_gzip(data):
    return gzip.compress(data, compresslevel=9)

# Compresses with zstd-22
def compress_zstd(data):
    cctx = zstd.ZstdCompressor(level=22)  # Creates a compressor object with level 22
    return cctx.compress(data)

# Decompresses with gzip-9
def decompress_gzip(data):
    decompressed = gzip.decompress(data)
    return list(struct.unpack(f'{len(decompressed) // 4}f', decompressed))  # Casts to float

# Decompresses with zstd-22
def decompress_zstd(data):
    dctx = zstd.ZstdDecompressor()  # Creates a compressor object
    decompressed = dctx.decompress(data)
    return list(struct.unpack(f'{len(decompressed) // 4}f', decompressed))  # Casts to float

def compare_lists(list1, list2, tollerance=1e-4):
    if len(list1) != len(list2):
        return False
    return all(abs(a - b) <= tollerance for a, b in zip(list1, list2))

# Compression
gzip_compressed = compress_gzip(input_bytes)
zstd_compressed = compress_zstd(input_bytes)

# Decompression
gzip_decompressed = decompress_gzip(gzip_compressed)
zstd_decompressed = decompress_zstd(zstd_compressed)

# Verifies
if compare_lists(encoded_list, gzip_decompressed):
    print("\nENCODING SUCCESSFUL!")
else:
    print(f"Encoding error! Decoded")
    
if compare_lists(encoded_list, zstd_decompressed):
    print("\nENCODING SUCCESSFUL!")
else:
    print(f"Encoding error! Decoded")
    
# Calculates dimensions
original_size_bits = len(input_bytes) * 8
gzip_size = len(gzip_compressed) * 8
zstd_size = len(zstd_compressed) * 8

# Compression ratio
gzip_ratio = gzip_size / original_size_bits
zstd_ratio = zstd_size / original_size_bits

# Output delle dimensioni e del rapporto di compressione
print(f"\n\nOriginal dimension: {original_size_bits} bits")
print(f"Gzip-9 compressed dimension: {gzip_size} bits (Compression Ratio: {gzip_ratio:.2%})")
print(f"Zstd-22 compressed dimension: {zstd_size} bits (Compression Ratio: {zstd_ratio:.2%})")



ENCODING SUCCESSFUL!

ENCODING SUCCESSFUL!


Original dimension: 1421632 bits
Gzip-9 compressed dimension: 61336 bits (Compression Ratio: 4.31%)
Zstd-22 compressed dimension: 48824 bits (Compression Ratio: 3.43%)
