# Assignment 8 -  Multimedia Lab
### Name - Anirban Dey
### Roll -  002111001108

# Problem Description

Implement Huffman algorithm, Shanon - Fano algorithm

# Documentation

In [1]:
from collections import defaultdict
from typing import List,Dict,Tuple
import os
import time
import heapq
import bitarray as ba
import pickle
import tarfile
import struct
from abc import ABC, abstractmethod

# assert bitarray.test().wasSuccessful()

In [2]:

class Compression(ABC):
  def __init__(self):
    pass

  def encode(self,input_path:str,output_path:str):
    """
      input_path : uncompressed file
      output_path : compressed file
    """
    init_time = time.time()

    prob_arr = self.generate_probability_table(file_path=input_path)
    huffman_code = self.generate_code(prob_arr)
    encoded_bitarray = ba.bitarray()
    # for num in data_list:
    #   encoded_bitarray.extend(ba.bitarray(huffman_code[num]))
    with open(input_path,"rb") as f:
            byte = f.read(1) # Read one byte at a time
            while byte :
                byte_num : int = ord(byte)
                # print(byte_num)
                encoded_bitarray.extend(ba.bitarray(huffman_code[byte_num]))
                byte = f.read(1)

    encoded_bitarray_file_path="tmp_file1.bin"
    dictionary_map_file_path="tmp_file2.bin"
    tmp_file_path_list = [encoded_bitarray_file_path,dictionary_map_file_path]

    with open(encoded_bitarray_file_path,"wb") as f:
      encoded_bitarray.tofile(f)
    with open(dictionary_map_file_path,"wb") as f:
      pickle.dump(huffman_code,f)

    self.merge_files(tmp_file_path_list,output_path)

    average_code_length = sum(len(val) for val in huffman_code.values()) / len(huffman_code.values())
    finish_time = time.time()
    original_size = os.path.getsize(input_path)
    compressed_size = os.path.getsize(output_path)
    compression_ratio = original_size / compressed_size
    efficiency = (1-(compressed_size/original_size))*100
    print("Time taken to compress file (in seconds) : ", finish_time - init_time )
    print("Size of input file (in bytes) : ",original_size)
    print("Size of output file (in bytes) : ",compressed_size)
    print("Average Code Length : ",average_code_length)
    print("Compression Ratio : ",compression_ratio)
    print(f"Efficiency : {efficiency}%")
  

    # with tarfile.open(output_path,"w") as tar:
    #   for file_path in tmp_file_path_list:
    #     tar.add(file_path)
    #   tar.close()

    for file_path in tmp_file_path_list:
      if os.path.exists(file_path) :
        os.unlink(file_path)

  def decode(self,input_path:str,output_path:str):
    """
      input_path : compressed file
      output_path : decompressed file
    """

    init_time = time.time()

    # tmp_file_path_list = [encoded_biarray_file_path,dictionary_map_file_path]
    tmp_file_path_list = self.separate_files(merged_filename=input_path,output_dir=os.getcwd())
    encoded_bitarray_file_path = tmp_file_path_list[0]
    dictionary_map_file_path = tmp_file_path_list[1]

    # with tarfile.open(input_path,"r") as tar:
    #   # file_names = tar.getnames()
    #   tar.extractall() 

    encoded_bitarray = ba.bitarray()
    with open(encoded_bitarray_file_path,"rb") as f:
      encoded_bitarray.fromfile(f)

    with open(dictionary_map_file_path,"rb") as f:
      code = pickle.load(f)
    
    reverse_code:Dict[str,int] = {} # if code[x] = y  then reverse_code[y] = x
    for key,val in code.items():
      reverse_code[val] = key


    # print("Encoded Bitarray")
    # print(encoded_bitarray)
    # print("Huffman Code")
    # print(huffman_code)

    for file_path in tmp_file_path_list:
      if os.path.exists(file_path) :
        os.unlink(file_path)

    with open(output_path,"wb") as of:
      current_code = ba.bitarray()
      for bit in encoded_bitarray:
          current_code.append(bit)
          # Check if the current code is in the Huffman code dictionary
          if current_code.to01() in code.values():
              # Find the corresponding symbol for the current code using the dictionary
              # symbol = [k for k, v in code.items() if v == current_code.to01()][0]
              symbol = reverse_code[current_code.to01()]
             
              # print(symbol)
              # Convert the integer to a single byte and write it to the file
              # byte = bytes([symbol])
              byte = struct.pack('B',symbol)
              # Append the symbol to the the output file
              of.write(byte)
              # Reset the current code
              current_code = ba.bitarray()

    finish_time = time.time()
    print("Time taken to decompress file (in seconds) : ",finish_time-init_time)
  
  def generate_probability_table(self,file_path:str)->List[Tuple[int,float]]:

      freq = defaultdict(int)

      with open(file_path,"rb") as f:
          byte = f.read(1) # Read one byte at a time
          while byte :
              byte_num : int = ord(byte)
              # print(byte_num)
              freq[byte_num] += 1
              byte = f.read(1)

      freq_sum:int = 0
      for key,val in freq.items():
          freq_sum += val


      prob_arr:List[Tuple[int,float]] = []
      for key,val in freq.items():
          prob_arr.append((key,val/freq_sum))
      # print("Probability Array")
      # print(prob_arr)

      # sort freqlist by the second element in the tuple which is
      prob_arr.sort(key= lambda x : -x[1])

      return prob_arr
  def merge_files(self,file_list:List[str], merged_filename:str):
      # Initialize list to store file sizes and contents
      merged_content = []

      # Iterate through each file in the list
      for filename in file_list:
          # Get the size of the file
          file_size = os.path.getsize(filename)
          
          # Read the content of the file
          with open(filename, 'rb') as file:
              file_content = file.read()
          
          # Add size and content to the merged list
          merged_content.append(file_size.to_bytes(8, byteorder='big'))
          merged_content.append(file_content)

      # Write the merged content to a custom merged file
      with open(merged_filename, 'wb') as merged_file:
          for item in merged_content:
              merged_file.write(item)

  def separate_files(self,merged_filename:str, output_dir:str):
      # Read the content of the merged file
      with open(merged_filename, 'rb') as merged_file:
          merged_content = merged_file.read()

      # Initialize list to store the names of separated files
      separated_files = []
      # Iterate through the merged content to separate files
      index = 0
      file_number = 0
      while index < len(merged_content):
          # Get size of the file
          file_size = int.from_bytes(merged_content[index:index+8], byteorder='big')
          index += 8
          file_number += 1
          
          # Get content of the file
          file_content = merged_content[index:index+file_size]
          index += file_size
          
          # Write content to separate file
          output_filename = os.path.join(output_dir, f"tmp_file{file_number}.bin")
          separated_files.append(output_filename)
          with open(output_filename, 'wb') as output_file:
              output_file.write(file_content)
      return separated_files

  @abstractmethod
  def generate_code(self,prob_arr:List[Tuple[int,float]])->Dict[int,str]:
    pass


class HuffmanStandardNode():
      def __init__(self,num,val):
        self.num = num # represents the number whose frequency/probability is measured
        self.val = val # probability/ sum of probability
        self.left = None
        self.right = None

      def __lt__(self,other):
        return self.num < other.num

class HuffmanStandard(Compression):

  def __init__(self):
    pass

  def isLeafNode(self,node:HuffmanStandardNode):
    return node != None and node.left == None and node.right == None

  def dfs(self,node:HuffmanStandardNode,cur_str:str,huffman_code:Dict[int,str]):
    if node == None:
      return

    if self.isLeafNode(node):
      # print("Huffman Code assigned : ",node.num,cur_str)
      huffman_code[node.num] = cur_str

    # print(vars(node))
    self.dfs(node.left,cur_str+"0",huffman_code)
    self.dfs(node.right,cur_str+"1",huffman_code)

  def generate_code(self,prob_arr:List[Tuple[int,float]])->Dict[int,str]:

    pq = []
    for char,prob in prob_arr:
      # heapq.heappush(pq,(freq,char))
      heapq.heappush(pq,(prob,HuffmanStandardNode(num=char,val=prob)))

    while len(pq) > 1 :
      top1 = heapq.heappop(pq)
      top2 = heapq.heappop(pq)

      if top1[1].val > top1[1].val:
        top1,top2 = top2,top1

      # print(top1,top2)
      # since this is an internal node its key will -1 it does not correspond to any specific character
      sum_val = top1[0] + top2[0]
      newNode = HuffmanStandardNode(num = -1, val=sum_val)
      newNode.left = top1[1]
      newNode.right = top2[1]

      heapq.heappush(pq,(sum_val,newNode))

    root_of_tree = pq[0][1]
    # print("Root of tree : ",vars(root_of_tree))
    huffman_code = {}
    self.dfs(root_of_tree,"",huffman_code)
    # print("Huffman Code")
    # print(huffman_code)
    return huffman_code


class ShanonFano(Compression):
    def __init__(self):
        pass

    def dfs(self,l:int, r:int, code:str, arr:List[Tuple[int,float]],shanon_code:Dict[int,str]) -> None:
        # print(l,r,code,arr[l:r+1])

        if l>r :
            raise ValueError(f"Left Index {l} should be <= Right index {r}")

        if l == r:
            num:int = arr[l][0]
            # print("Shanon Code Assigned : " ,num,code)
            shanon_code[num] = code
            return

        prob_sum : float = 0
        
        for i in range(l,r+1):
            num,prob = arr[i]
            prob_sum += prob

        # print("Probability Sum for Current Range : ",prob_sum)

        """ 
            In the following implmentation :
            sum_prob_left >= sum_prob_right
        """
        i:int = l # i starts from l not 0
        cur_prob_sum:float = 0
        while i < r: # i cannot become r as then there would not be any element in the right subarray
            num,prob = arr[i]
            cur_prob_sum += prob
            if cur_prob_sum > prob_sum / 2 :
                break
            else :
                if i+1<r :
                    i+= 1

        # print("Rightmost Index of Left Subarray : ",i)

        # left subarray
        self.dfs(l,i,code+"0",arr,shanon_code)
        # right subarray
        self.dfs(i+1,r,code+"1",arr,shanon_code)

    def generate_code(self,prob_arr:List[Tuple[int,float]]):
        shanon_code = {}
        self.dfs(0,len(prob_arr)-1, "", prob_arr,shanon_code)
        # print("ShanonCode")
        # print(shanon_code)
        return shanon_code

        


    

In [3]:
# Create objects
hf = HuffmanStandard()
sf = ShanonFano()

In [4]:
# Image Input

print("Huffman Standard")
hf.encode(input_path="image_input.bmp",output_path="image_compressed_file_huffman_standard.bin")
hf.decode(input_path="image_compressed_file_huffman_standard.bin",output_path="image_decompressed_file_huffman_standard.bmp")

print("\nShanon Fano")
sf.encode(input_path="image_input.bmp",output_path="image_compressed_file_shanon_fano.bin")
sf.decode(input_path="image_compressed_file_shanon_fano.bin",output_path="image_decompressed_file_shanon_fano.bmp")

Huffman Standard
Time taken to compress file (in seconds) :  0.2653207778930664
Size of input file (in bytes) :  263224
Size of output file (in bytes) :  247093
Average Code Length :  9.984375
Compression Ratio :  1.065283112026646
Efficiency : 6.1282405859648055%
Time taken to decompress file (in seconds) :  10.088044881820679

Shanon Fano
Time taken to compress file (in seconds) :  0.22403383255004883
Size of input file (in bytes) :  263224
Size of output file (in bytes) :  248076
Average Code Length :  9.8671875
Compression Ratio :  1.0610619326335478
Efficiency : 5.754794395647811%
Time taken to decompress file (in seconds) :  9.405275821685791


In [5]:
# Text Input

print("Huffman Standard")
hf.encode(input_path="text_input.txt",output_path="text_compressed_file_huffman_standard.bin")
hf.decode(input_path="text_compressed_file_huffman_standard.bin",output_path="text_decompressed_file_huffman_standard.txt")

print("\nShanon Fano")
sf.encode(input_path="text_input.txt",output_path="text_compressed_file_shanon_fano.bin")
sf.decode(input_path="text_compressed_file_shanon_fano.bin",output_path="text_decompressed_file_shanon_fano.txt")

Huffman Standard
Time taken to compress file (in seconds) :  0.07370567321777344
Size of input file (in bytes) :  100000
Size of output file (in bytes) :  26710
Average Code Length :  4.444444444444445
Compression Ratio :  3.7439161362785476
Efficiency : 73.29%
Time taken to decompress file (in seconds) :  0.16224932670593262

Shanon Fano
Time taken to compress file (in seconds) :  0.06645035743713379
Size of input file (in bytes) :  100000
Size of output file (in bytes) :  28749
Average Code Length :  3.7777777777777777
Compression Ratio :  3.478381856760235
Efficiency : 71.251%
Time taken to decompress file (in seconds) :  0.11778521537780762
