In [None]:
import numpy as np
import os
import sys
import matplotlib.pyplot as plt

In [None]:
# Run-Length Encoding (Lossless)

def rle_encode(input, dimension=1):

    if dimension == 1:

        if not input:
            return ''  # Handle empty input

        output = ''
        run_length = 1
        for i in range(1,len(input)):

            if input[i] == input[i-1]:
                run_length += 1
            else:
                output += str(run_length) + input[i-1]
                run_length = 1

        output += str(run_length) + input[-1]
        return output

            
    elif dimension == 2:
        
        output = []

        shape = input.shape
        input = input.flatten()

        run_length = 1

        for i in range(1,len(input)):

            if input[i] == input[i-1]:
                run_length += 1
            else:
                output.append((run_length, input[i-1]))
                run_length = 1

        output.append((run_length, input[-1]))
        return output



def rle_decode(input, dimension=1, shape=None):

    if dimension == 1:
        decompressed = ''

        i = 0
        while i < len(input):
            # Initialize the run_length as an empty string to accumulate digits
            run_length = ''
            
            # Accumulate all the digits for the run length
            while i < len(input) and input[i].isdigit():
                run_length += input[i]
                i += 1

            # Convert the accumulated run length to an integer
            run_length = int(run_length)
            
            # The next character is the one to repeat
            character = input[i]
            
            # Append the character run_length times to the decompressed string
            decompressed += run_length * character
            
            # Move to the next character
            i += 1
        return decompressed

    elif dimension == 2:

        decompressed = []
        for run_len, val in input:

            decompressed.extend(run_len*[val])

        decompressed = np.array(decompressed).reshape(shape)
        
        return decompressed

In [None]:
# If its single dimension, we just assume a string

test1 = 'AAAAABBBBCCCCCDDDDDEEEEEELLLLLPPPPBBBGGGCCCDDDWWQQQ'
test2 = 'JJJHHHAANBVAJJJHHHPLLLLKQ'
test3 = 'HHHHUUUUUUUUUUNNNNNNNNNBBBBBBBVVVCCCCCCCCAAAAEEEEEE'

tests = [test1, test2, test3]

for test in tests:    

    print('Input: ', test)
    compressed = rle_encode(test)

    print('Compression: ', sys.getsizeof(compressed)/sys.getsizeof(test))

    # print(compressed)

    if rle_decode(compressed) == test:
        print('SUCCESS')

In [None]:
# If its two dimenstions, we will assume an image
test1 = np.zeros((32,32))
test2 = np.zeros((32,32))
test3 = np.zeros((32,32))

for i in range(0,test1.shape[0], 2):
    test1[i] = 1

for i in range(0,test2.shape[1],3):
    test2[:,i] = 1

test_images = [test1, test2, test3]

for img in test_images:
    compressed = rle_encode(img,dimension=2)
    decompressed = rle_decode(compressed, dimension=2,shape=img.shape)
    print(sys.getsizeof(compressed)/sys.getsizeof(img))

    fig, axes = plt.subplots(1, 2, figsize=(10, 5))
    axes[0].imshow(img)
    axes[1].imshow(decompressed)



In [None]:
# Training AutoEncoders for compression
# following https://www.geeksforgeeks.org/implement-deep-autoencoder-in-pytorch-for-image-reconstruction/
import torch
import torchvision
plt.rcParams['figure.figsize'] = 15, 10

In [None]:
# Initializing the transform for the dataset 
transform = torchvision.transforms.Compose([ 
    torchvision.transforms.ToTensor(), 
    torchvision.transforms.Normalize((0.5), (0.5)) 
])

batch_size = 256

# Downloading the MNIST dataset 
train_dataset = torchvision.datasets.MNIST( 
    root="./MNIST/train", train=True, 
    transform=torchvision.transforms.ToTensor(), 
    download=True) 
  
test_dataset = torchvision.datasets.MNIST( 
    root="./MNIST/test", train=False, 
    transform=torchvision.transforms.ToTensor(), 
    download=True) 

# Creating Dataloaders from the 
# training and testing dataset 
train_loader = torch.utils.data.DataLoader( 
    train_dataset, batch_size=batch_size) 
test_loader = torch.utils.data.DataLoader( 
    test_dataset, batch_size=batch_size) 
  
# Printing 25 random images from the training dataset 
random_samples = np.random.randint( 
    1, len(train_dataset), (25)) 
  
for idx in range(random_samples.shape[0]): 
    plt.subplot(5, 5, idx + 1) 
    plt.imshow(train_dataset[idx][0][0].numpy(), cmap='gray') 
    plt.title(train_dataset[idx][1]) 
    plt.axis('off') 
  
plt.tight_layout() 
plt.show()

In [None]:
# Create DeepAutoencoder class
class DeepAutoencoder(torch.nn.Module):
    def __init__(self):
        super().__init__()

        self.encoder = torch.nn.Sequential( 
            torch.nn.Linear(28 * 28, 256), 
            torch.nn.ReLU(), 
            torch.nn.Linear(256, 128), 
            torch.nn.ReLU(), 
            torch.nn.Linear(128, 64), 
            torch.nn.ReLU(), 
            torch.nn.Linear(64, 10) 
        ) 
          
        self.decoder = torch.nn.Sequential( 
            torch.nn.Linear(10, 64), 
            torch.nn.ReLU(), 
            torch.nn.Linear(64, 128), 
            torch.nn.ReLU(), 
            torch.nn.Linear(128, 256), 
            torch.nn.ReLU(), 
            torch.nn.Linear(256, 28 * 28), 
            torch.nn.Sigmoid() 
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
    
    def encode(self,x):
        return self.encoder(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")
print(device)
# Instantiating the model and hyperparameters 
model = DeepAutoencoder().to(device) 
loss_fn = torch.nn.MSELoss() 
num_epochs = 50
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
# List that will store the training loss 
train_loss = [] 
  
# Dictionary that will store the 
# different images and outputs for  
# various epochs 
outputs = {}
  
# Training loop starts 
for epoch in range(num_epochs): 
        
    # Initializing variable for storing  
    # loss 
    running_loss = 0.0
    total_batches = 0
      
    # Iterating over the training dataset 
    for batch in train_loader: 
            
        # Loading image(s) and 
        # reshaping it into a 1-d vector 
        img, _ = batch
        img = img.to(device)   
        img = img.reshape(-1, 28*28)
        # print(img.shape)
        # img = img.to(device) 
          
        # Generating output 
        out = model(img) 
          
        # Calculating loss 
        loss = loss_fn(out, img) 
          
        # Updating weights according 
        # to the calculated loss 
        optimizer.zero_grad() 
        loss.backward() 
        optimizer.step() 
          
        # Incrementing loss 
        running_loss += loss.item()
        total_batches += 1 
      
    # Averaging out loss over entire batch 
    average_loss = running_loss / total_batches
    train_loss.append(average_loss)
    print(f"Epoch: {epoch+1}, Loss: {average_loss:.4f}") 
      
    # Storing useful images and 
    # reconstructed outputs for the last batch 
    outputs[epoch+1] = {'img': img.cpu().detach(), 'out': out.cpu().detach()} 
  
  
# Plotting the training loss 
plt.plot(range(1,num_epochs+1),train_loss) 
plt.xlabel("Number of epochs") 
plt.ylabel("Training Loss") 
plt.show()

In [None]:
# Plotting is done on a 7x5 subplot 
# Plotting the reconstructed images 
  
# Initializing subplot counter 
counter = 1
  
# Plotting reconstructions 
# for epochs = [1, 5, 10, 50, 100] 
epochs_list = [1, 5, 10, 30, 50] 
  
# Iterating over specified epochs 
for val in epochs_list: 
    
      # Extracting recorded information 
    temp = outputs[val]['out'].detach().numpy() 
    title_text = f"Epoch = {val}"
      
    # Plotting first five images of the last batch 
    for idx in range(5): 
        plt.subplot(7, 5, counter) 
        plt.title(title_text) 
        plt.imshow(temp[idx].reshape(28,28), cmap= 'gray') 
        plt.axis('off') 
          
        # Incrementing the subplot counter 
        counter+=1
  
# Plotting original images 
  
# Iterating over first five 
# images of the last batch 
for idx in range(5): 
      
    # Obtaining image from the dictionary 
    val = outputs[10]['img'] 
      
    # Plotting image 
    plt.subplot(7,5,counter) 
    plt.imshow(val[idx].reshape(28, 28), 
               cmap = 'gray') 
    plt.title("Original Image") 
    plt.axis('off') 
      
    # Incrementing subplot counter 
    counter+=1
  
plt.tight_layout() 
plt.show()

In [None]:
# Verifying performance on test set

# Dictionary that will store the different 
# images and outputs for various epochs 
outputs = {} 
  
# Extracting the last batch from the test  
# dataset 
img, _ = list(test_loader)[-1] 
  
# Reshaping into 1d vector 
img = img.reshape(-1, 28 * 28) 
img = img.to(device)
  
# Generating output for the obtained 
# batch 
out = model(img) 
  
# Storing information in dictionary 
outputs['img'] = img 
outputs['out'] = out 
  
# Plotting reconstructed images 
# Initializing subplot counter 
counter = 1
val = outputs['out'].cpu().detach().numpy() 
  
# Plotting first 10 images of the batch 
for idx in range(10): 
    plt.subplot(2, 10, counter) 
    plt.title("Reconstructed \n image") 
    plt.imshow(val[idx].reshape(28, 28), cmap='gray') 
    plt.axis('off') 
  
    # Incrementing subplot counter 
    counter += 1
  
# Plotting original images 
  
# Plotting first 10 images 
for idx in range(10): 
    val = outputs['img'] 
    plt.subplot(2, 10, counter) 
    plt.imshow(val[idx].cpu().detach().reshape(28, 28), cmap='gray') 
    plt.title("Original Image") 
    plt.axis('off') 
  
    # Incrementing subplot counter 
    counter += 1
  
plt.tight_layout() 
plt.show() 

In [None]:
# Looking at the compression stats