## Implementação do LZW e Trie Compacta

In [26]:
%%writefile lzw.py
import os
from typing import Tuple, List
import struct
import time
import tracemalloc
import json
import argparse
import json
import sys
from pickle import FALSE

class CompactTrieNode:
  def __init__(self, binary_string: str = "", value: str = "", is_leaf: bool = False) -> None:
    self.children = [None, None]
    self.is_leaf = is_leaf
    self.binary_string = binary_string
    self.value = value
    self.unique_id = id(self)  # Temporário

class CompactTrie:
  def __init__(self) -> None:
    self.root = None

  def get_common_prefix_length(self, key1: str, key2: str) -> int:
    i = 0
    while i < min(len(key1), len(key2)) and key1[i] == key2[i]:
        i += 1
    return i

  def search(self, key: str) -> CompactTrieNode:
    current_node = self.root

    if(current_node == None):
      return None

    while True:

      if(current_node.binary_string == key):
        if(current_node.is_leaf == True):
          return current_node
        else:
          return None

      common_prefix_length = self.get_common_prefix_length(current_node.binary_string, key)

      if(common_prefix_length == len(current_node.binary_string)):

        key = key[common_prefix_length:]

        if current_node.children[int(key[0])] == None:
          return None

        current_node = current_node.children[int(key[0])]
      else:
        return None

  def insert(self, key: str, value: str) -> None:
    # Caso em que não tínhamos raiz
    if(self.root == None):
      self.root = CompactTrieNode(key, value, True)
      return

    else:
      current_node = self.root

      while True:
        # Achei, posso parar
        if(current_node.binary_string == key):
          current_node.is_leaf = True
          return

        common_prefix_length = self.get_common_prefix_length(current_node.binary_string, key)

        # Sobrou parte da key ainda, com certeza
        if (common_prefix_length == len(current_node.binary_string)):
          key = key[common_prefix_length:]

          if current_node.children[int(key[0])] == None:
            current_node.children[int(key[0])] = CompactTrieNode(key, value, True)
            return

          current_node = current_node.children[int(key[0])]

        # Achei onde vou ter que criar novo nó para inserir
        else:
          key = key[common_prefix_length:]

          if(current_node.is_leaf == True):
            old_suffix_node = CompactTrieNode(current_node.binary_string[common_prefix_length:], current_node.value, True)
          else:
            old_suffix_node = CompactTrieNode(current_node.binary_string[common_prefix_length:], current_node.value)

          old_suffix_node.children[0] = current_node.children[0]
          old_suffix_node.children[1] = current_node.children[1]

          current_node.binary_string = current_node.binary_string[:common_prefix_length]

          if(len(key) > 0):
            key_suffix_node = CompactTrieNode(key, value, True)
            current_node.is_leaf = False
            current_node.children[int(key_suffix_node.binary_string[0])] = key_suffix_node
            current_node.children[int(old_suffix_node.binary_string[0])] = old_suffix_node
          else:
            current_node.value = value
            current_node.is_leaf = True
            current_node.children = [None, None]
            current_node.children[int(old_suffix_node.binary_string[0])] = old_suffix_node
          return

  def delete_key(self, key: str) -> None:
    if(self.search(key) == None):
      return

    current_node = self.root
    last_node = self.root

    if(current_node.binary_string == key):
      if(current_node.children[0] == None and current_node.children[1] == None):
        self.root = None
        return
      elif(current_node.children[0] != None and current_node.children[1] != None):
        current_node.is_leaf = False
        return
      else:
        valid_child = 0 if current_node.children[0] != None else 1
        current_node.children[valid_child].binary_string = current_node.binary_string + current_node.children[valid_child].binary_string
        self.root = current_node.children[valid_child]
        return

    while True:
      if((current_node.binary_string == key) and (current_node.is_leaf == True)):
        if((current_node.children[0] != None) and (current_node.children[1] != None)):
          current_node.is_leaf = False
        elif((current_node.children[0] == None) and (current_node.children[1] == None)):
          last_node.children[int(current_node.binary_string[0])] = None

          if(last_node.is_leaf == False):
            last_node_other_child = 0 if current_node.binary_string[0] == '1' else 1

            if(last_node.children[last_node_other_child] != None):
              last_node.binary_string += last_node.children[last_node_other_child].binary_string
              last_node.value = last_node.children[last_node_other_child].value
              if(last_node.children[last_node_other_child].is_leaf):
                last_node.is_leaf = True
              last_node.children = last_node.children[last_node_other_child].children
        else:
          valid_child = 0 if current_node.children[0] != None else 1
          current_node.children[valid_child].binary_string = current_node.binary_string + current_node.children[valid_child].binary_string
          last_node.children[int(current_node.children[valid_child].binary_string[0])] = current_node.children[valid_child]
        return

      common_prefix_length = self.get_common_prefix_length(current_node.binary_string, key)
      key = key[common_prefix_length:]
      last_node = current_node
      current_node = current_node.children[int(key[0])]

  # Temporário
  def visualize(self, filename="compact_trie"):
      dot = Digraph(comment="Compact Trie")

      def add_nodes_edges(node, parent_label=None):
          if node is None:
              return

          new_node_binary_string = node.binary_string
          new_node_binary_string = new_node_binary_string.replace("0", "a")
          new_node_binary_string = new_node_binary_string.replace("1", "b")

          node_label = f"{new_node_binary_string}_{node.unique_id}"

          display_label = f"{new_node_binary_string}"
          if(node.is_leaf):
            display_label  += f"\\nValue: {node.value}"

          dot.node(node_label, display_label, shape='circle', color='black', fontcolor='red' if node.is_leaf else 'blue')

          if parent_label:
              dot.edge(parent_label, node_label)
          if node.children[0] is not None:
              add_nodes_edges(node.children[0], node_label)
          if node.children[1] is not None:
              add_nodes_edges(node.children[1], node_label)

      if self.root is not None:
          add_nodes_edges(self.root)

      dot.render(filename, format="png", cleanup=True)
      print(f"Compact trie saved as {filename}.png")

# Para debug
def binary_to_chars(binary_string):
    # Ensure the binary string length is a multiple of 8 (since each character is 8 bits)
    if len(binary_string) % 8 != 0:
        raise ValueError("Binary string length must be a multiple of 8.")

    # Split the binary string into chunks of 8 bits
    chars = [binary_string[i:i+8] for i in range(0, len(binary_string), 8)]

    # Convert each 8-bit binary chunk to its corresponding character
    result = ''.join(chr(int(char, 2)) for char in chars)

    return result

# Função para converter um conjunto de bytes em um '.txt'
def write_txt_file(decoded_bytes, output_file):
    decoded_string = ''.join([chr(byte) for byte in decoded_bytes])
    with open(output_file, 'w', encoding='utf-8') as file:
        file.write(decoded_string)

# Função para converter um conjunto de bytes em um '.bmp' ou '.tiff'
def write_image_file(decoded_bytes, output_file):
    with open(output_file, 'wb') as file:
        file.write(bytes(decoded_bytes))

# Função para remover zeros à esquerda em uma string binárias
def remove_leading_zeros(binary_str):
  return binary_str.lstrip('0') or '0'

# Classe que implementa o algoritmo LZW, empregando árvores Tries Binárias como dicionários
class TrieLZW:

  # Atributos para armazenar as estatísticas
  def __init__(self):
      self.stats = {
          'compression_ratio_over_time': [],
          'dictionary_size_over_time': [],
          'execution_time': 0,
          'memory_usage': 0
      }

  # Realiza a compressão LZW do arquivo passado como parâmetro
  def compress(self, file_path, compression_type, codes_max_size) -> Tuple[str, str, List]:

    # Iniciar rastreamento de tempo e memória
    start_time = time.time()
    tracemalloc.start()

    # Utilizando a Trie Compacta Binária como dicionário no algoritmo
    dictionary = CompactTrie()

    # Inicializando todas as chaves de 00000000 até 11111111 no dicionário, com respectivos valores sendo a própria chave
    for num in range(256):
      byte_num = format(num, '08b')
      dictionary.insert(byte_num, byte_num)

    dict_size = 256 # O dicionário começa com todos os símbolos ASCII
    reset = False

    if(compression_type == "s"):
      num_bits_values = 12 # No caso estático, os códigos terão tamanho 12 bits
      dict_size_limit = 2 ** num_bits_values # Os 12 bits serão suficientes para representar códigos de 000000000000 (0) até 111111111111 (4095)
    else:
      num_bits_values = 9 # No caso estático dinâmico, os códigos terão tamanho 9 bits inicialmente
      next_dict_size_limit = 512 # Os 9 bits serão suficientes para representar códigos de 000000000 (0) até 111111111 (511)

    # Inicializando variáveis utilizadas pela compressão LZW
    string = ""
    compressed_data = []
    resets = []

    count = 0
    # Aplicando a compressão LZW, considerando cada byte do arquivo original como um símbolo de entrada
    try:
      with open(file_path, "rb") as file:
        while (byte := file.read(1)):
          #print(count)
          count += 1
          symbol = bin(int.from_bytes(byte, "big"))[2:].zfill(8)

          string_plus_symbol = string + symbol

          if dictionary.search(string_plus_symbol) != None:
              string = string_plus_symbol
          else:
            # Reiniciando o dicionário no algoritmo estático
            if(compression_type == "s"):
              if(dict_size == dict_size_limit - 1):
                reset = True

            # Expandindo os códigos no algoritmo dinâmico (até o limite) ou reiniciando o dicionário
            else:
              if(dict_size == (2 ** codes_max_size) - 1):
                reset = True
              else:
                if(dict_size == next_dict_size_limit):
                    num_bits_values += 1
                    next_dict_size_limit *= 2

            current_string_formatted_binary_value = dictionary.search(string).value.zfill(num_bits_values)
            compressed_data.append(current_string_formatted_binary_value)
            new_key_value = bin(dict_size)[2:].zfill(num_bits_values)
            dictionary.insert(string_plus_symbol, new_key_value)
            dict_size += 1
            string = symbol

            if(reset):
              reset = False
              dict_size = 256

              if(compression_type != 's'):
                num_bits_values = 9
                next_dict_size_limit = 512

              dictionary = CompactTrie()

              for num in range(256):
                byte_num = format(num, '08b')
                dictionary.insert(byte_num, byte_num)

        if(dictionary.search(string) != None):
          current_string_formatted_binary_value = dictionary.search(string).value.zfill(num_bits_values)
          compressed_data.append(current_string_formatted_binary_value)

    # Tratando erros que podem ocorrer na abertura de um arquivo
    except FileNotFoundError as e:
      print(f"Arquivo não encontrado -> {e}")
    except IOError as e:
      print(f"Erro de I/O -> {e}")
    except Exception as e:
      print(f"Um erro ocorreu: {e}")

    # Salvando o nome do arquivo original e sua extensão para que o arquivo de compressão possa ser salvo
    file_name, file_extension = os.path.splitext(file_path)

    with open(f"compressed_{file_name}.bin", "wb") as file:
      # Convertendo os códigos para uma única string
      final_concat_string_data = ''.join(compressed_data)

      # Verificando se algum padding deverá ser adicionado para que tenhamos um número inteiro de bytes no arquivo comprimido
      padding_size = (8 - len(final_concat_string_data) % 8) % 8
      final_concat_string_data = '0' * padding_size + final_concat_string_data

      # Adicionando uma flag, no arquivo comprimido, para representar o tamanho do padding que foi adicionado
      file.write(bytes([padding_size]))

      # Convertendo a string de dados comprimidos para bytes e escrevendo no arquivo comprimido final
      bits = []
      for char in final_concat_string_data:
        bits.append(1 if char == '1' else 0)
      byte = 0
      bit_count = 0
      for bit in bits:
        byte = (byte << 1) | bit
        bit_count += 1
        if bit_count == 8:
          file.write(bytes([byte]))
          byte = 0
          bit_count = 0

      # Finalizar rastreamento de tempo e memória
      end_time = time.time()
      self.stats['execution_time'] = end_time - start_time

      current_memory, peak_memory = tracemalloc.get_traced_memory()
      self.stats['memory_usage'] = peak_memory / 1024  # Converter para KB
      tracemalloc.stop()

      return file_name, file_extension

  # Realiza a descompressão LZW do arquivo passado como parâmetro
  def decompress(self, file_path, compression_type, original_file_name, original_file_extension, codes_max_size) -> None:

    # Iniciar rastreamento de tempo e memória
    start_time = time.time()
    tracemalloc.start()

    # Utilizando a Trie Compacta Binária como dicionário no algoritmo
    dictionary = CompactTrie()

    # Inicializando todas as chaves de 0, 1, 10, 11, 100, ... até 11111111 no dicionário (sem padding), com respectivos valores sendo a própria chave (com padding)
    for num in range(256):
      numeric_key = bin(num)[2:]
      numeric_key_8bit_value = format(num, '08b')
      dictionary.insert(numeric_key, numeric_key_8bit_value)

    dict_size = 256 # O dicionário começa com todos os símbolos ASCII
    reset = False

    if(compression_type == "s"):
      decompression_size = 12 # Sempre puxaremos 12 bits do arquivo comprimido por vez no caso estático
      dict_size_limit = 2 ** decompression_size # Os 12 bits serão suficientes para representar códigos de 000000000000 (0) até 111111111111 (4095)
    else:
      decompression_size = 9 # Começaremos puxando 9 bits do arquivo comprimido por vez
      next_size_limit = 512 # Os 9 bits serão suficientes para representar códigos de 000000000 (0) até 111111111 (511)

    # Abrindo o arquivo já comprimido e convertendo seus bytes em uma única string
    compressed_data = ""
    try:
      with open(file_path, "rb") as file:
        file_data = file.read()
        compressed_data = ''.join(format(byte, '08b') for byte in file_data)
    # Tratando erros que podem ocorrer na abertura de um arquivo
    except FileNotFoundError:
      print(f"File not found: {file_path}")
    except IOError as e:
      print(f"Error reading the file: {e}")
    except Exception as e:
      print(f"An unexpected error occurred: {e}")

    # Retirando os bits de padding do arquivo comprimido
    padding_size = int(compressed_data[:8], 2)
    compressed_data = compressed_data[(8 + padding_size):]

    # Inicializando as variáveis que serão empregadas posteriormente
    string = compressed_data[:decompression_size]
    string = string[-8:]
    compressed_data = compressed_data[decompression_size:]
    decompressed_data = [string]

    count = 0
    # Aplicando a descompressão LZW
    while(len(compressed_data) > 0):
        #print(count)
        count += 1
        if(compression_type == "s"):
          # Caso em que os códigos têm tamanho estático
          if(dict_size == dict_size_limit - 1):
            reset = True
        else:
          # Caso em que os códigos têm tamanho dinâmico
          if(dict_size == 2 ** codes_max_size - 1):
            decompression_size = 9
            reset = True
          else:
            if(dict_size == next_size_limit - 1):
              decompression_size += 1
              next_size_limit *= 2

        k = compressed_data[:decompression_size]
        compressed_data = compressed_data[decompression_size:]
        k = remove_leading_zeros(k)

        if dictionary.search(k) != None:
          entry = dictionary.search(k).value
        else:
          new_value_entry_concat = string[:8]
          entry = string + new_value_entry_concat

        decompressed_data.append(entry)
        new_key = bin(dict_size)[2:] if dict_size != 0 else '0'
        dictionary.insert(new_key, string + entry[:8])
        dict_size += 1
        string = entry

        if(reset):
          reset = False
          dict_size = 256

          if(compression_type != "s"):
            decompression_size = 9
            next_size_limit = 512

          dictionary = CompactTrie()
          for num in range(256):
            numeric_key = bin(num)[2:]
            numeric_key_8bit_value = format(num, '08b')
            dictionary.insert(numeric_key, numeric_key_8bit_value)

    decompressed_concat_string = ''.join(decompressed_data)

    # Checando algum possível erro que possa ter ocorrido na descompressão (o tamanho do arquivo final deve ser um múltiplo de 8)
    if len(decompressed_concat_string) % 8 != 0:
      raise ValueError("O arquivo descomprimido não tem um número inteiro de bytes!")

    # Transformando a string binárias em bytes individuais
    byte_chunks = [decompressed_concat_string[i:i+8] for i in range(0, len(decompressed_concat_string), 8)]
    bytes_ = [int(chunk, 2) for chunk in byte_chunks]

    # Convert integers to their ASCII characters
    ascii_characters = [chr(i) for i in bytes_]
    # Join characters into a string (optional)
    ascii_string = ''.join(ascii_characters)

    # Gerando o arquivo inicial
    switch = {
        '.txt': write_txt_file,
        '.bmp': write_image_file,
        '.tiff': write_image_file
        }
    handler = switch.get(original_file_extension)
    if handler:
        handler(bytes_, f"decompressed_{original_file_name}{original_file_extension}")
    else:
        print("Tipo de arquivo não suportado!")

    # Finalizar rastreamento de tempo e memória
    end_time = time.time()
    self.stats['execution_time'] = end_time - start_time

    current_memory, peak_memory = tracemalloc.get_traced_memory()
    self.stats['memory_usage'] = peak_memory / 1024  # Converter para KB
    tracemalloc.stop()

# Funções de Nível Superior para Compressão e Descompressão
def compress(input_file, output_file, compression_type, max_bits, stats_file=None):
    print(f"Compressão de '{input_file}'...")
    trie_lzw = TrieLZW()

    # Executa a compressão com os parâmetros especificados
    file_name, file_extension = trie_lzw.compress(input_file, compression_type, max_bits)
    compressed_file_name = f"compressed_{file_name}.bin"
    print(f"Arquivo comprimido criado: {compressed_file_name}")

    # Define output_file como o nome do arquivo comprimido gerado
    output_file = compressed_file_name
    print(f"Compressão concluída: {output_file}")

    # Salva as estatísticas, se especificado
    if stats_file:
        with open(stats_file, 'w') as f:
            json.dump(trie_lzw.stats, f, indent=4)
        print(f"Estatísticas de compressão salvas em '{stats_file}'")

def decompress(input_file, output_file, compression_type, max_bits, stats_file=None):
    print(f"Descompressão de '{input_file}'...")
    trie_lzw = TrieLZW()

    # Extrai o nome base e a extensão do arquivo de saída desejado
    output_base_name, output_extension = os.path.splitext(os.path.basename(output_file))
    original_file_name = output_base_name.replace('decompressed_', '')

    # Executa a descompressão
    trie_lzw.decompress(input_file, compression_type, original_file_name, output_extension, max_bits)

    # Define decompressed_file_name conforme gerado pela descompressão
    decompressed_file_name = f"decompressed_{original_file_name}{output_extension}"
    print(f"Arquivo descomprimido criado: {decompressed_file_name}")

    # Define output_file como o nome do arquivo descomprimido gerado
    output_file = decompressed_file_name
    print(f"Descompressão concluída: {output_file}")

    # Salva as estatísticas, se especificado
    if stats_file:
        with open(stats_file, 'w') as f:
            json.dump(trie_lzw.stats, f, indent=4)
        print(f"Estatísticas de descompressão salvas em '{stats_file}'")

# Função `process` para Compressão e Descompressão Sequencial
def process(input_file, output_file, compression_type, max_bits=12, stats_file=None):
    print("Compressão...")
    trie_lzw = TrieLZW()

    # Executa a compressão
    file_name, file_extension = trie_lzw.compress(input_file, compression_type, max_bits)
    compressed_file_name = f"compressed_{file_name}.bin"
    print(f"Arquivo comprimido criado: {compressed_file_name}")

    # Define output_file como o nome do arquivo comprimido gerado
    temp_compressed_file = compressed_file_name
    # print(f"Usando o arquivo comprimido '{temp_compressed_file}' para descompressão")

    # Salva as estatísticas da compressão, se especificado
    if stats_file:
        with open(stats_file, 'w') as f:
            json.dump(trie_lzw.stats, f, indent=4)
        print(f"Estatísticas de compressão salvas em '{stats_file}'")

    print("Descompressão...")
    # Extrai o nome base e a extensão do arquivo de saída desejado
    output_base_name, output_extension = os.path.splitext(os.path.basename(output_file))
    original_file_name = output_base_name.replace('decompressed_', '')

    # Executa a descompressão
    trie_lzw.decompress(temp_compressed_file, compression_type, original_file_name, output_extension, max_bits)

    # Define decompressed_file_name conforme gerado pela descompressão
    decompressed_file_name = f"decompressed_{original_file_name}{output_extension}"
    # print(f"Arquivo descomprimido criado: {decompressed_file_name}")

    # Define output_file como o nome do arquivo descomprimido gerado
    output_file = decompressed_file_name
    print(f"Descompressão concluída: {output_file}")

    # Salva as estatísticas da descompressão, se especificado
    if stats_file:
        with open(stats_file, 'w') as f:
            json.dump(trie_lzw.stats, f, indent=4)
        print(f"Estatísticas de descompressão salvas em '{stats_file}'")

    print(f"Processo concluído.")

# Função `main` com Suporte para Processamento Sequencial
def main():
    parser = argparse.ArgumentParser(description="Compressão e Descompressão LZW")
    subparsers = parser.add_subparsers(dest='command', help='Comandos disponíveis: compress, decompress, process')

    # Parser para o comando 'compress'
    compress_parser = subparsers.add_parser('compress', help='Comprimir um arquivo')
    compress_parser.add_argument('input_file', type=str, help='Caminho para o arquivo de entrada a ser comprimido')
    compress_parser.add_argument('output_file', type=str, help='Caminho para o arquivo comprimido de saída')
    compress_parser.add_argument('--type', type=str, choices=['s', 'd'], required=True, help='Tipo de compressão: "s" para tamanho fixo, "d" para tamanho variável')
    compress_parser.add_argument('--max-bits', type=int, default=12, help='Número máximo de bits (padrão: 12)')
    compress_parser.add_argument('--stats-file', type=str, help='Caminho para salvar as estatísticas de compressão', default=None)

    # Parser para o comando 'decompress'
    decompress_parser = subparsers.add_parser('decompress', help='Descomprimir um arquivo')
    decompress_parser.add_argument('input_file', type=str, help='Caminho para o arquivo comprimido de entrada')
    decompress_parser.add_argument('output_file', type=str, help='Caminho para o arquivo descomprimido de saída')
    decompress_parser.add_argument('--type', type=str, choices=['s', 'd'], required=True, help='Tipo de compressão: "s" para tamanho fixo, "d" para tamanho variável')
    decompress_parser.add_argument('--max-bits', type=int, default=12, help='Número máximo de bits (padrão: 12)')
    decompress_parser.add_argument('--stats-file', type=str, help='Caminho para salvar as estatísticas de descompressão', default=None)

    # Parser para o comando 'process'
    process_parser = subparsers.add_parser('process', help='Comprimir e descomprimir um arquivo sequencialmente')
    process_parser.add_argument('input_file', type=str, help='Caminho para o arquivo de entrada a ser comprimido e descomprimido')
    process_parser.add_argument('output_file', type=str, help='Caminho para o arquivo descomprimido final de saída')
    process_parser.add_argument('--type', type=str, choices=['s', 'd'], required=True, help='Tipo de compressão: "s" para tamanho fixo, "d" para tamanho variável')
    process_parser.add_argument('--max-bits', type=int, default=12, help='Número máximo de bits (padrão: 12)')
    process_parser.add_argument('--stats-file', type=str, help='Caminho para salvar as estatísticas de compressão e descompressão', default=None)

    args = parser.parse_args()

    if args.command == 'compress':
        compress(args.input_file, args.output_file, args.type, args.max_bits, args.stats_file)
    elif args.command == 'decompress':
        decompress(args.input_file, args.output_file, args.type, args.max_bits, args.stats_file)
    elif args.command == 'process':
        process(args.input_file, args.output_file, args.type, args.max_bits, args.stats_file)
    else:
        parser.print_help()

if __name__ == "__main__":
    main()

Overwriting lzw.py


## Teste texto gerado

Gerando artificialmente um texto para teste:

In [27]:
import random

# Generate the random text
input_data = ''.join(random.choices("abcd", k=100000))

#input_data = 'geekific-geekific'

# Specify the file path
file_path = "test_text.txt"

# Save the text to a .txt file
with open(file_path, "w") as file:
    file.write(input_data)

Aplicando a compressão e a descompressão em sequência:

In [28]:
!python lzw.py process test_text.txt saida_final.txt --type d

Compressão...
Arquivo comprimido criado: compressed_test_text.bin
Descompressão...
Descompressão concluída: decompressed_saida_final.txt
Processo concluído.


Comparando o tamanho dos arquivos iniciais e finais:

In [29]:
# Calcula e imprime os tamanhos dos arquivos original e comprimido
original_file_size = os.path.getsize("test_text.txt")
compressed_file_size = os.path.getsize("compressed_test_text.bin")
print(f"Tamanho do arquivo original: {original_file_size} bytes.")
print(f"Tamanho do arquivo comprimido: {compressed_file_size} bytes.")

Tamanho do arquivo original: 100000 bytes.
Tamanho do arquivo comprimido: 29883 bytes.


Verificando se os arquivos são equivalentes:

In [30]:
# Verifica se os arquivos original e descomprimido são iguais
def are_files_identical(file1, file2):
    with open(file1, 'r') as f1, open(file2, 'r') as f2:
        return f1.read() == f2.read()

txt_file1 = "test_text.txt"
txt_file2 = "decompressed_saida_final.txt"
print(f"Os arquivos .txt são iguais? {are_files_identical(txt_file1, txt_file2)}")

Os arquivos .txt são iguais? True


## Teste imagem gerada

In [31]:
from PIL import Image

# Create a new monochrome (mode '1') image with 10x10 pixels
width, height = 1500, 1500
image = Image.new("1", (width, height))  # Mode "1" means 1-bit pixels (monochrome)

# Set some pixels to black (1 = white, 0 = black)
pixels = image.load()
for x in range(width):
    for y in range(height):
        if (x + y) % 2 == 0:  # Example pattern: checkerboard
            pixels[x, y] = 0  # Black pixel
        else:
            pixels[x, y] = 1  # White pixel

# Save the image as a BMP file
image.save("monochrome_test_image.bmp")

Aplicando a compressão e a descompressão em sequência:

In [None]:
!python lzw.py process imagemhomog_30kb.bmp imagemhomog_des.bmp --type s

Comparando o tamanho dos arquivos iniciais e finais:

In [36]:
original_file_size = os.path.getsize("imagemhomog_30kb.bmp")
compressed_file_size = os.path.getsize("compressed_monochrome_test_image.bin")

print(f"Tamanho do arquivo original: {original_file_size} bytes.")
print(f"Tamanho do arquivo comprimido: {compressed_file_size} bytes.")

Tamanho do arquivo original: 282062 bytes.
Tamanho do arquivo comprimido: 3417 bytes.


Verificando se os arquivos são equivalentes:

In [34]:
!python lzw.py process imagemhomog_30kb.bmp imagemhomog_des.bmp --type s

Compressão...
Arquivo comprimido criado: compressed_imagemhomog_30kb.bin
Descompressão...
Descompressão concluída: decompressed_imagemhomog_des.bmp
Processo concluído.


## Arquivos de teste

In [5]:
# Instalar o gdown
!pip install gdown

# Baixar o arquivo usando o ID do arquivo
!gdown --id 1MK1qVVt3kmuohGnZVx2riHmVaKuGvs_z

# # Descompactar o arquivo
print("Descompactando arquivo. Aguarde...")
!unzip -oq 'test_cases.zip' -d './'
print("Descompactação concluída.")

Downloading...
From: https://drive.google.com/uc?id=1MK1qVVt3kmuohGnZVx2riHmVaKuGvs_z
To: /content/test_cases.zip
100% 935k/935k [00:00<00:00, 98.0MB/s]
Descompactando arquivo. Aguarde...
Descompactação concluída.


In [None]:
import subprocess
import os
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import filecmp

# Configurações para melhor visualização dos gráficos
sns.set(style="whitegrid")
plt.rcParams['figure.figsize'] = (18, 12)  # Aumenta o tamanho padrão das figuras

# Diretório onde os arquivos de teste estão localizados
test_dir = 'test_cases'

# Diretório onde os arquivos comprimidos serão salvos
compressed_dir = 'compressed_test_cases'

# Verifica se o diretório 'test_cases' existe
if not os.path.exists(test_dir):
    print(f"O diretório '{test_dir}' não existe. Por favor, certifique-se de que os arquivos de teste estão na pasta '{test_dir}'.")
    exit(1)

# Cria o diretório 'compressed_test_cases' se não existir
if not os.path.exists(compressed_dir):
    os.makedirs(compressed_dir)
    print(f"Diretório '{compressed_dir}' criado para armazenar os arquivos comprimidos.")

# Lista para armazenar os arquivos que precisam ser processados
files_to_process = []

# Extensões de arquivos a serem considerados
valid_extensions = ['.txt', '.bmp', '.bin', '.pgm', '.wav']

# Percorrer todos os arquivos no diretório 'test_cases'
for filename in os.listdir(test_dir):
    file_path = os.path.join(test_dir, filename)
    # Ignorar diretórios
    if os.path.isdir(file_path):
        continue
    # Ignorar arquivos que já foram descomprimidos
    if '_decompressed' in filename:
        continue
    # Ignorar arquivos de estatísticas ou outros arquivos não relevantes
    if filename.endswith('_process_stats.json'):
        continue
    # Verificar se a extensão é válida
    _, ext = os.path.splitext(filename)
    if ext.lower() in valid_extensions:
        files_to_process.append(filename)
    else:
        print(f"Arquivo com extensão não suportada encontrado e será ignorado: {filename}")

# Função para determinar o tipo de arquivo com base na extensão
def get_file_type(extension):
    if extension == '.txt':
        return 'Texto'
    elif extension in ['.bmp', '.pgm']:
        return 'Imagem'
    elif extension == '.wav':
        return 'Áudio'
    elif extension == '.bin':
        return 'Binário'
    else:
        return 'Desconhecido'

# Listas para os dados de Compressão e Descompressão
compression_data = []
decompression_data = []

# Dicionário para armazenar dados de estatísticas detalhadas para cada arquivo
detailed_stats = {}

# Automatizar o Processamento Sequencial (Compressão e Descompressão)
for filename in files_to_process:
    input_file = os.path.join(test_dir, filename)
    decompressed_filename = f'{os.path.splitext(filename)[0]}_decompressed{os.path.splitext(filename)[1]}'
    output_file = os.path.join(test_dir, decompressed_filename)
    stats_file = os.path.join(test_dir, f'{filename}_process_stats.json')

    # Verifica se o arquivo já foi descomprimido
    if os.path.exists(output_file):
        print(f"Arquivo descomprimido já existe: {output_file}. Pulando o processamento deste arquivo.")
        continue

    # Comando para executar o processo de compressão e descompressão
    command = [
        'python3', 'lzw.py', 'process', input_file, output_file,
        '--type', 's',
        '--max-bits', '12',
        '--stats-file', stats_file
    ]

    print(f'Processando {input_file}...')
    result = subprocess.run(command, capture_output=True, text=True)

    if result.returncode != 0:
        print(f"Erro ao processar {input_file}:")
        print(result.stderr)
    else:
        print(result.stdout)

# Verificar os Arquivos Descomprimidos
for filename in files_to_process:
    original_file = os.path.join(test_dir, filename)
    original_extension = os.path.splitext(filename)[1]
    decompressed_filename = f'{os.path.splitext(filename)[0]}_decompressed{original_extension}'
    decompressed_file = os.path.join(test_dir, decompressed_filename)

    # Verifica se o arquivo descomprimido existe
    if not os.path.exists(decompressed_file):
        print(f"Arquivo descomprimido não encontrado: {decompressed_file}. Pulando verificação deste arquivo.")
        continue

    # Verifica se o arquivo já foi descomprimido corretamente
    if original_extension.lower() in ['.bmp', '.bin', '.pgm', '.wav']:
        compressed_file_path = os.path.join(compressed_dir, f"{filename}.bin")  # Ajuste aqui
        files_are_equal = filecmp.cmp(original_file, decompressed_file, shallow=False)
    else:
        # Para arquivos de texto, podemos comparar como strings
        with open(original_file, 'r', encoding='utf-8') as f1, open(decompressed_file, 'r', encoding='utf-8') as f2:
            files_are_equal = f1.read() == f2.read()

    if files_are_equal:
        print(f'{filename} descomprimido com sucesso.')
    else:
        print(f'Erro: {filename} foi descomprimido incorretamente.')

# Coletar Estatísticas de Compressão e Descompressão
for filename in files_to_process:
    # Caminho para o arquivo de estatísticas do processamento
    process_stats_file = os.path.join(test_dir, f'{filename}_process_stats.json')

    # Determinar o tipo de arquivo com base na extensão
    _, ext = os.path.splitext(filename)
    ftype = get_file_type(ext.lower())

    # Verifica se o arquivo de estatísticas existe
    if os.path.exists(process_stats_file):
        with open(process_stats_file, 'r') as f:
            process_stats = json.load(f)

        # Extraindo estatísticas de compressão
        compress_stats = process_stats.get('compression_stats', {})
        # Extraindo estatísticas de descompressão
        decompress_stats = process_stats.get('decompression_stats', {})

        # Calcular a taxa de compressão com base nos tamanhos dos arquivos
        original_file_path = os.path.join(test_dir, filename)
        compressed_file_path = os.path.join(compressed_dir, f"{filename}.bin")  # Ajuste aqui
        if os.path.exists(original_file_path) and os.path.exists(compressed_file_path):
            original_size = os.path.getsize(original_file_path)
            compressed_size = os.path.getsize(compressed_file_path)
            compression_ratio = compressed_size / original_size if original_size != 0 else None
        else:
            compression_ratio = None

        # Adicionar dados à lista de compressão
        compression_data.append({
            'File_Name': filename,
            'File_Type': ftype,
            'Compression_Ratio': compression_ratio,
            'Dictionary_Size': compress_stats.get('dictionary_size_over_time', [])[-1] if compress_stats.get('dictionary_size_over_time') else None,
            'Compression_Time(s)': compress_stats.get('execution_time', 0),
            'Memory_Usage(KB)': compress_stats.get('memory_usage', 0)
        })

        # Adicionar dados à lista de descompressão
        decompression_data.append({
            'File_Name': filename,
            'File_Type': ftype,
            'Decompression_Time(s)': decompress_stats.get('execution_time', 0)
        })

        # Armazenar estatísticas detalhadas para o arquivo
        detailed_stats[filename] = {
            'compression_ratio_over_time': compress_stats.get('compression_ratio_over_time', []),
            'dictionary_size_over_time': compress_stats.get('dictionary_size_over_time', []),
            'memory_usage': compress_stats.get('memory_usage', 0)
        }

    else:
        print(f"Arquivo de estatísticas de processamento não encontrado: {process_stats_file}")

# Criar DataFrames
compression_df = pd.DataFrame(compression_data)
decompression_df = pd.DataFrame(decompression_data)

# Exibir DataFrames
print("Dados de Compressão:")
display(compression_df)

print("Dados de Descompressão:")
display(decompression_df)

# Função para determinar limites globais para escalas consistentes
def get_global_limits(df, column):
    min_val = df[column].min()
    max_val = df[column].max()
    return min_val, max_val

# Obter limites globais para cada métrica
compression_time_min, compression_time_max = get_global_limits(compression_df, 'Compression_Time(s)') if not compression_df.empty else (0, 1)
decompression_time_min, decompression_time_max = get_global_limits(decompression_df, 'Decompression_Time(s)') if not decompression_df.empty else (0, 1)
compression_ratio_min, compression_ratio_max = get_global_limits(compression_df, 'Compression_Ratio') if not compression_df.empty else (0, 1)
memory_usage_min, memory_usage_max = get_global_limits(compression_df, 'Memory_Usage(KB)') if not compression_df.empty else (0, 1)

# Função para ordenar o DataFrame e retornar a ordem dos arquivos
def get_sorted_order(df, y_column):
    sorted_df = df.sort_values(by=y_column)
    return sorted_df['File_Name']

# 1. Gráfico de Tempo de Compressão e Descompressão Ordenados
fig, axes = plt.subplots(1, 2, figsize=(24, 10))

# Tempo de Compressão
if not compression_df.empty:
    order = get_sorted_order(compression_df, 'Compression_Time(s)')
    sns.barplot(ax=axes[0], x='File_Name', y='Compression_Time(s)', hue='File_Type', data=compression_df, order=order)
    axes[0].set_title('Tempo de Compressão por Arquivo')
    axes[0].set_xlabel('Arquivo')
    axes[0].set_ylabel('Tempo de Compressão (segundos)')
    axes[0].set_ylim(compression_time_min, compression_time_max * 1.1)  # Adiciona 10% para visualização
    axes[0].tick_params(axis='x', rotation=45)
else:
    axes[0].text(0.5, 0.5, 'Nenhum dado de compressão disponível para plotar.', horizontalalignment='center', verticalalignment='center')
    axes[0].set_axis_off()

# Tempo de Descompressão
if not decompression_df.empty:
    order = get_sorted_order(decompression_df, 'Decompression_Time(s)')
    sns.barplot(ax=axes[1], x='File_Name', y='Decompression_Time(s)', hue='File_Type', data=decompression_df, order=order)
    axes[1].set_title('Tempo de Descompressão por Arquivo')
    axes[1].set_xlabel('Arquivo')
    axes[1].set_ylabel('Tempo de Descompressão (segundos)')
    axes[1].set_ylim(decompression_time_min, decompression_time_max * 1.1)
    axes[1].tick_params(axis='x', rotation=45)
else:
    axes[1].text(0.5, 0.5, 'Nenhum dado de descompressão disponível para plotar.', horizontalalignment='center', verticalalignment='center')
    axes[1].set_axis_off()

plt.tight_layout()
plt.show()

# 2. Gráfico de Taxa de Compressão e Uso de Memória Ordenados
fig, axes = plt.subplots(1, 2, figsize=(24, 10))

# Taxa de Compressão
if not compression_df.empty:
    order = get_sorted_order(compression_df, 'Compression_Ratio')
    sns.barplot(ax=axes[0], x='File_Name', y='Compression_Ratio', hue='File_Type', data=compression_df, order=order)
    axes[0].set_title('Taxa de Compressão por Arquivo')
    axes[0].set_xlabel('Arquivo')
    axes[0].set_ylabel('Taxa de Compressão (Comprimido / Original)')
    axes[0].set_ylim(compression_ratio_min * 0.9, compression_ratio_max * 1.1)
    axes[0].tick_params(axis='x', rotation=45)
else:
    axes[0].text(0.5, 0.5, 'Nenhum dado de compressão disponível para plotar.', horizontalalignment='center', verticalalignment='center')
    axes[0].set_axis_off()

# Uso de Memória
if not compression_df.empty:
    order = get_sorted_order(compression_df, 'Memory_Usage(KB)')
    sns.barplot(ax=axes[1], x='File_Name', y='Memory_Usage(KB)', hue='File_Type', data=compression_df, order=order)
    axes[1].set_title('Uso de Memória do Dicionário por Arquivo')
    axes[1].set_xlabel('Arquivo')
    axes[1].set_ylabel('Uso de Memória (KB)')
    axes[1].set_ylim(memory_usage_min * 0.9, memory_usage_max * 1.1)
    axes[1].tick_params(axis='x', rotation=45)
else:
    axes[1].text(0.5, 0.5, 'Nenhum dado de uso de memória disponível para plotar.', horizontalalignment='center', verticalalignment='center')
    axes[1].set_axis_off()

plt.tight_layout()
plt.show()

# 3. Gráficos Detalhados por Arquivo

# Preparar limites globais para os gráficos de Taxa de Compressão e Tamanho do Dicionário
global_compression_ratio_max = max([max(stats['compression_ratio_over_time']) if stats['compression_ratio_over_time'] else 0 for stats in detailed_stats.values()], default=1)
global_dictionary_size_max = max([max(stats['dictionary_size_over_time']) if stats['dictionary_size_over_time'] else 0 for stats in detailed_stats.values()], default=1)

# Plotar Taxa de Compressão ao Longo do Tempo para cada arquivo
for filename in files_to_process:
    if filename in detailed_stats:
        compression_ratio_over_time = detailed_stats[filename]['compression_ratio_over_time']
        if compression_ratio_over_time:
            plt.figure(figsize=(12, 6))
            plt.plot(compression_ratio_over_time, marker='o')
            plt.title(f'Taxa de Compressão ao Longo do Tempo - {filename}')
            plt.xlabel('Iteração')
            plt.ylabel('Taxa de Compressão')
            plt.ylim(0, global_compression_ratio_max * 1.1)
            plt.grid(True)
            plt.tight_layout()
            plt.show()
        else:
            print(f"Sem dados de taxa de compressão ao longo do tempo para {filename}")
    else:
        print(f"Estatísticas detalhadas não encontradas para {filename}")

# Plotar Tamanho do Dicionário ao Longo do Tempo para cada arquivo
for filename in files_to_process:
    if filename in detailed_stats:
        dictionary_size_over_time = detailed_stats[filename]['dictionary_size_over_time']
        if dictionary_size_over_time:
            plt.figure(figsize=(12, 6))
            plt.plot(dictionary_size_over_time, color='orange', marker='o')
            plt.title(f'Tamanho do Dicionário ao Longo do Tempo - {filename}')
            plt.xlabel('Iteração')
            plt.ylabel('Tamanho do Dicionário')
            plt.ylim(0, global_dictionary_size_max * 1.1)
            plt.grid(True)
            plt.tight_layout()
            plt.show()
        else:
            print(f"Sem dados de tamanho do dicionário ao longo do tempo para {filename}")
    else:
        print(f"Estatísticas detalhadas não encontradas para {filename}")

# 4. Gráfico Comparativo do Uso de Memória

# Plotar Uso de Memória do Dicionário por Arquivo Ordenados
if not compression_df.empty:
    # Ordenar os arquivos do menor para o maior uso de memória
    compression_df_sorted_memory = compression_df.sort_values(by='Memory_Usage(KB)')
    order = compression_df_sorted_memory['File_Name']

    plt.figure(figsize=(18, 10))
    sns.barplot(x='File_Name', y='Memory_Usage(KB)', hue='File_Type', data=compression_df, order=order)
    plt.title('Uso de Memória do Dicionário por Arquivo')
    plt.xlabel('Arquivo')
    plt.ylabel('Uso de Memória (KB)')
    plt.ylim(memory_usage_min * 0.9, memory_usage_max * 1.1)
    plt.xticks(rotation=45)
    plt.legend(title='Tipo de Arquivo')
    plt.tight_layout()
    plt.show()
else:
    print("Nenhum dado de uso de memória disponível para plotar.")

Diretório 'compressed_test_cases' criado para armazenar os arquivos comprimidos.
Arquivo com extensão não suportada encontrado e será ignorado: notebook_1mb.tiff
Arquivo com extensão não suportada encontrado e será ignorado: lorem_5kb.docx
Arquivo com extensão não suportada encontrado e será ignorado: .DS_Store
Processando test_cases/imagemhomog_30kb.bmp...
Compressão...
Arquivo comprimido criado: compressed_test_cases/imagemhomog_30kb.bin
Estatísticas de compressão salvas em 'test_cases/imagemhomog_30kb.bmp_process_stats.json'
Descompressão...
Descompressão concluída: decompressed_imagemhomog_30kb_decompressed.bmp
Estatísticas de descompressão salvas em 'test_cases/imagemhomog_30kb.bmp_process_stats.json'
Processo concluído.

Processando test_cases/StarWars3_132kb.wav...
Compressão...
Arquivo comprimido criado: compressed_test_cases/StarWars3_132kb.bin
Estatísticas de compressão salvas em 'test_cases/StarWars3_132kb.wav_process_stats.json'
Descompressão...
Tipo de arquivo não suportad