# Compress and decompress text file

In [52]:
import os
import struct
import csv
import time
import shutil
import json

def findRangePow(x):
    i = 0
    while pow(2,i) - 1 < x:i+=1
    return i

def packZip(outputFolderName, inputParentPath):
    shutil.make_archive(outputFolderName, 'zip', inputParentPath)

def unpackZip(inputPath, inputParentPath):
    shutil.unpack_archive(inputPath ,inputParentPath, 'zip')

def setUpPathToCom(inputPath, opt='txt'):
    inputName = os.path.basename(inputPath).split('.')[0]
    inputParentPath = os.path.dirname(inputPath)
    if opt == 'png':
        outputFolderName = inputName + '.comPng'

        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        dicPath = os.path.join(outputFolderPath, 'dictonary.csv')
        compressedPath = os.path.join(outputFolderPath, '{}.comPng.bin'.format(inputName))
    else:
        outputFolderName = inputName + '.comTxt'

        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        dicPath = os.path.join(outputFolderPath, 'dictonary.csv')
        compressedPath = os.path.join(outputFolderPath, '{}.comTxt.bin'.format(inputName))
        
    if not os.path.exists(outputFolderPath):
        os.mkdir(outputFolderPath)

    return outputFolderPath, outputFolderName, dicPath, compressedPath

def setUpPathToDecom(inputPath, opt='txt'):
    # Check valid
    if inputPath.endswith('.comTxt.zip') or inputPath.endswith('.comPng.zip'):
        message = "Invalid file"
    
    inputName = os.path.basename(inputPath).split('.')[0]
    inputParentPath = os.path.dirname(inputPath)



    if opt == 'txt':
        outputFolderName = inputName + '.decomTxt'
        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        outputFileName = inputName + '.txt'
        comparePath = os.path.join(inputParentPath,outputFileName)

        toDecompressedPath = os.path.join(outputFolderPath, '{}.comTxt.bin'.format(inputName))
        decompressedPath = os.path.join(outputFolderPath, '{}.decom.txt'.format(inputName))
    else:
        outputFolderName = inputName + '.decomPng'
        outputFolderPath = os.path.join(inputParentPath,outputFolderName)


        outputFileName = inputName + '.png'
        comparePath = os.path.join(inputParentPath,outputFileName)

        toDecompressedPath = os.path.join(outputFolderPath, '{}.comPng.bin'.format(inputName))

        decompressedPath = os.path.join(outputFolderPath, '{}.decomPng.png'.format(inputName))

    if not os.path.exists(outputFolderPath):
        os.mkdir(outputFolderPath)
    
    return toDecompressedPath, decompressedPath, outputFolderPath, comparePath
    

def calDiffTxt(firstPath, secondPath):
    with open(firstPath, "r", encoding="utf-8") as input_file:
        dataFirst = input_file.read().encode("utf-8")

    with open(secondPath, "r", encoding="utf-8") as input_file:
        dataSecond = input_file.read().encode("utf-8")
    
    diff = 0
    for i in range(len(dataFirst)):
        if dataFirst[i] != dataSecond[i]:
            diff +=1

    return diff



def saveDicttoCSV(dicPath, dictionary):
    headers = ['CodeWord', 'Code']
    with open(dicPath, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)
        for key, value in dictionary.items():
            # print(key, value)
            writer.writerow([key,value])

def compress_txt(inputPath, outputPath, type='H'):
    # Set up parameter
    code = 256
    dictionary = {bytes([i]): i for i in range(code)}
    compressed_data = bytearray()
    current_sequence = b""

    # Read input file
    with open(inputPath, "r", encoding="utf-8") as input_file:
        data = input_file.read().encode("utf-8")

    inputSize = len(data)*8
    compressedSize = 0

    # Compress data using LZW
    for byte in data:
        sequence = current_sequence + bytes([byte])
        if sequence in dictionary:
            current_sequence = sequence
        else:
            compressedSize += findRangePow(dictionary[current_sequence])
            compressed_data += struct.pack(type, dictionary[current_sequence])
            dictionary[sequence] = code
            code += 1
            current_sequence = bytes([byte])

    if current_sequence:
        compressedSize += findRangePow(dictionary[current_sequence])
        compressed_data += struct.pack(type, dictionary[current_sequence])

    #Save type in compressed data
    compressed_data = compressed_data + bytes('<.>{}'.format(type), 'utf-8')
    
    # Write compressed data to output file
    with open(outputPath, "wb") as output_file:
        output_file.write(compressed_data)

    return inputSize, compressedSize, dictionary

def compress_png(inputPath, outputPath, type='H'):
    # Set up parameter
    code = 256
    dictionary = {bytes([i]): i for i in range(code)}
    compressed_data = bytearray()
    current_sequence = b""

    # Open image and convert to raw pixel data
    with Image.open(inputPath) as img:
        data = img.convert("RGB").tobytes()
        width, height = img.size
        print(width, height)

    inputSize = len(data)*8
    compressedSize = 0

    # Compress data using LZW
    for byte in data:
        sequence = current_sequence + bytes([byte])
        if sequence in dictionary:
            current_sequence = sequence
        else:
            compressedSize += findRangePow(dictionary[current_sequence])
            compressed_data += struct.pack(type, dictionary[current_sequence])
            dictionary[sequence] = code
            code += 1
            current_sequence = bytes([byte])

    if current_sequence:
        compressedSize += findRangePow(dictionary[current_sequence])
        compressed_data += struct.pack(type, dictionary[current_sequence])

    #Save type in parameter image and type
    compressed_data = compressed_data + bytes('<.>{}'.format(width), 'utf-8')
    compressed_data = compressed_data + bytes('<.>{}'.format(height), 'utf-8')
    compressed_data = compressed_data + bytes('<.>{}'.format(type), 'utf-8')

    print(compressed_data)
    
    # Write compressed data to output file
    with open(outputPath, "wb") as output_file:
        output_file.write(compressed_data)

    return inputSize, compressedSize, dictionary

def decompress_txt(inputPath, outputPath):
    # Read compressed data from input file
    with open(inputPath, "rb") as input_file:
        compressed_data = input_file.read()
    print(compressed_data)

    compressed_data, type = compressed_data.split(bytes('<.>', 'utf-8'))[:]
    step = 2
    match type:
        case 'H':
            step = 2
        case 'I':
            step = 4
        case 'Q':
            step = 8

    # Decompress data using LZW
    dictionary = {i: bytes([i]) for i in range(256)}
    code = 256
    decompressed_data = bytearray()
    current_sequence = bytes()
    

    for i in range(0, len(compressed_data), step):
        value = struct.unpack(type, compressed_data[i:i+step])[0]
        if value in dictionary:
            sequence = dictionary[value]
            # print("In dic", sequence[0])
            # print("sequence", sequence)
        elif value == code:
            sequence = current_sequence + bytes([current_sequence[0]])
        else:
            raise ValueError("Invalid compressed data")
        decompressed_data += sequence
        # print("Decompress", decompressed_data)
        if current_sequence:
            # print("in if current", sequence[0])
            dictionary[code] = current_sequence + bytes([sequence[0]])
            code += 1
            # print("Code", code)
        current_sequence = sequence

    # Write decompressed data to output file
    with open(outputPath, "w", encoding="utf-8") as output_file:
        output_file.write(decompressed_data.decode("utf-8"))


def decompress_png(inputPath, outputPath):
    # Read compressed data from input file
    with open(inputPath, "rb") as input_file:
        compressed_data = input_file.read()
    print(compressed_data)

    compressed_data, width, height, type = compressed_data.split(bytes('<.>', 'utf-8'))[:]
    print(type)
    step = 2
    match type:
        case 'H':
            step = 2
        case 'I':
            step = 4
        case 'Q':
            step = 8

    # Decompress data using LZW
    dictionary = {i: bytes([i]) for i in range(256)}
    code = 256
    decompressed_data = b""
    current_sequence = b""
    


    for i in range(0, len(compressed_data), step):
        value = struct.unpack(type, compressed_data[i:i+step])[0]
        if value in dictionary:
            sequence = dictionary[value]
            # print("In dic", sequence[0])
            # print("sequence", sequence)
        elif value == code:
            sequence = current_sequence + bytes([current_sequence[0]])
        else:
            raise ValueError("Invalid compressed data")
        decompressed_data += sequence
        # print("Decompress", decompressed_data)
        if current_sequence:
            # print("in if current", sequence[0])
            dictionary[code] = current_sequence + bytes([sequence[0]])
            code += 1
            # print("Code", code)
        current_sequence = sequence

    width = int(width)
    height = int(height)

    # width, height = original_image.size
    print('Width', width, 'Height', height)
    img = Image.frombytes("RGB", (width, height), decompressed_data)
    img.save(outputPath, format="PNG")

def compress_lzw_utf8(inputPath, type="H"):
    try:
        startTime = time.time()
        outputFolderPath, outputFolderName, dicPath, compressedPath = setUpPathToCom(inputPath)

        inputSize, compressedSize, dictionary = compress_txt(inputPath,compressedPath, type=type)

        saveDicttoCSV(dicPath,dictionary)
        packZip(outputFolderName, outputFolderPath)

        # Check if compressed file is smaller than input file
        elapsedTime = '{:.5}s'.format(time.time() - startTime)
        compression_ratio =  '{:.2%}'.format(inputSize /compressedSize)
        avarageLength = '{:.2} bits/symbol'.format(float(compressedSize) / float(inputSize / 8))

        response ={
            'inputSize': inputSize,
            'compressedSize': compressedSize,
            'elapsedTime': elapsedTime,
            'cr': compression_ratio,
            'avarageLength':avarageLength,
            'outputFolderPath': outputFolderPath
        }
        status = 200
        responseJson = json.dumps(response)
    except:
        status = 400
        response ={
            'message':"Error Compress Txt File"
        }
        responseJson = json.dumps(response)
    
    return status, responseJson

def decompress_lzw_utf8(inputPath):
    try:
        startTime = time.time()
        toDecompressedPath, decompressedPath, outputFolderPath, comparePath = setUpPathToDecom(inputPath)
        unpackZip(inputPath ,outputFolderPath)
        decompress_txt(toDecompressedPath, decompressedPath)

        diff = calDiffTxt(comparePath,decompressedPath)
        elapsedTime = '{:.5}s'.format(time.time() - startTime)
        print("Eslapsed time: ", elapsedTime)
        message = "Decompression Txt Successful"

        response ={
            'elapsedTime': elapsedTime,
            'diff': diff,
            'outputFolderPath': outputFolderPath,
            'message':message
        }
        status = 200
        responseJson = json.dumps(response)
    except:
        status = 400
        response ={
            'message':"Error Decompress Txt file"
        }
        responseJson = json.dumps(response)
    return status, responseJson



In [6]:
inputPath = 'D:\Project\DPT\LzwCompressor\input.comTxt.zip'
decompress_lzw_utf8(inputPath)

b"T\x00\xc3\x00\xb4\x00i\x00\n\x00\n\x00U\x00n\x00l\x00o\x00c\x00k\x00 \x00Y\x00o\x00u\x00r\x00 \x00V\x00e\x00r\x00t\x00i\x00c\x00a\x00l\x00 \x00P\x00o\x00t\x00e\x00n\x00\x15\x01\x18\x01:\x00 \x00M\x00a\x00s\x00\x1d\x01\x10\x01t\x00h\x00e\x00 \x00A\x00\x14\x01 \x00o\x00f\x00 \x00E\x00x\x00p\x00\x08\x01s\x00i\x00v\x00+\x01J\x00u\x00m\x00p\x00i\x00n\x00g\x00!\x00\x04\x01F\x00o\x00r\x00g\x00e\x00t\x00 \x00a\x00b\x00\x0e\x01I\x01\x1e\x01d\x00l\x00e\x00s\x00s\x00 \x00\x15\x01p\x00T\x01a\x00n\x00d\x00 \x00?\x01e\x00f\x00f\x00e\x00c\x00\x15\x019\x01 \x00m\x00H\x01h\x00o\x00d\x00s\x00.\x00 \x00W\x00e\x00'\x00d\x01g\x00\x1c\x01 \x00y\x00\x0e\x01 \x00c\x00o\x009\x01r\x00e\x00[\x01w\x00i\x00)\x01U\x01*\x01 \x00u\x00l\x00\x15\x01m\x00a\x00\x1d\x01 \x00s\x00o\x00l\x00u\x00\x15\x01o\x00n\x00U\x01o\x00\x8a\x01k\x00y\x00r\x00\t\x01k\x00H\x01t\x01\x0e\x01\x10\x01z\x01\x15\x01\x17\x01\x19\x01j\x00<\x01p\x00l\x01\n\x00S\x00\x1d\x01p\x00 \x00b\x00y\x00\x8a\x01\xa8\x01,\x00 \x00w\x00o\x01l\x00\x19\x01g\x00

(200,
 '{"elapsedTime": "0.0079954s", "outputFolderPath": "D:\\\\Project\\\\DPT\\\\LzwCompressor\\\\input.decomTxt", "message": "Decompression successful"}')

# Compress and decompress png file

In [54]:
from PIL import Image
import struct
import os    

def compress_lzw_png(inputPath, type="H"):
    try:
        startTime = time.time()
        outputFolderPath, outputFolderName, dicPath, compressedPath = setUpPathToCom(inputPath, opt='png')

        inputSize, compressedSize, dictionary = compress_png(inputPath,compressedPath, type=type)

        saveDicttoCSV(dicPath,dictionary)
        packZip(outputFolderName, outputFolderPath)

        # Check if compressed file is smaller than input file
        elapsedTime = '{:.5}s'.format(time.time() - startTime)
        print("Eslapsed time: ", elapsedTime)
        compression_ratio =  '{:.2%}'.format(inputSize /compressedSize)
        avarageLength = '{:.2} bits/symbol'.format(float(compressedSize) / float(inputSize / 8))

        response ={
            'inputSize': inputSize,
            'compressedSize': compressedSize,
            'elapsedTime': elapsedTime,
            'cr': compression_ratio,
            'avarageLength':avarageLength,
            'outputFolderPath': outputFolderPath
        }
        status = 200
        responseJson = json.dumps(response)
    except:
        status = 400
        response ={
            'message':"Error Compress Png"
        }
        responseJson = json.dumps(response)
    return status, responseJson


def calDiffPng(firstPath, secondPath):

    with Image.open(firstPath) as img:
        dataFirst = img.convert("RGB").tobytes()

    with Image.open(secondPath) as img:
        dataSecond = img.convert("RGB").tobytes()

    print(len(dataFirst))
    print(len(dataSecond))

    diff = 0
    for i in range(len(dataFirst)):
        if dataFirst[i] != dataSecond[i]:
            diff +=1

    return diff

    

def decompress_lzw_png(inputPath):
    try: 
        startTime = time.time()
        toDecompressedPath, decompressedPath, outputFolderPath, comparePath = setUpPathToDecom(inputPath, opt='png')
        unpackZip(inputPath ,outputFolderPath)
        decompress_png(toDecompressedPath, decompressedPath)

        diff = calDiffPng(comparePath,decompressedPath)
        elapsedTime = '{:.5}s'.format(time.time() - startTime)
        print("Eslapsed time: ", elapsedTime)
        message = "Decompression Png Successful"

        response ={
            'elapsedTime': elapsedTime,
            'diff': diff,
            'outputFolderPath': outputFolderPath,
            'message':message
        }
        status = 200
        responseJson = json.dumps(response)
    except:
        status = 400
        response ={
            'message':"Error Decompress Png"
        }
        responseJson = json.dumps(response)
    return status, responseJson


In [14]:
inputPath = 'D:\Project\DPT\LzwCompressor\output.comPng.zip'
decompress_lzw_png(inputPath)

b'\x1c\x00#\x00(\x00\x1d\x00$\x00)\x00\x18\x00\x06\x01\x07\x01\x08\x01\t\x01\n\x01\x0b\x01\x0c\x01\r\x01\x0e\x01\x0f\x01\x10\x01\x11\x01\x12\x01\x13\x01\x14\x01\x15\x01\x16\x01\x17\x01\x18\x01\x19\x01\x1a\x01\x1b\x01\x1c\x01\x1d\x01\x1e\x01\x1f\x01 \x01!\x01"\x01#\x01$\x01%\x01&\x01\'\x01(\x01)\x01*\x01+\x01,\x01-\x01.\x01/\x010\x011\x012\x013\x014\x015\x016\x017\x018\x019\x01:\x01;\x01<\x01=\x01"\x01\x00\x00x\x00\xd7\x00@\x01B\x01A\x01C\x01F\x01E\x01H\x01D\x01J\x01G\x01K\x01I\x01L\x01O\x01N\x01Q\x01M\x01S\x01P\x01T\x01R\x01U\x01X\x01W\x01R\x01\x1f\x00\\\x01]\x01^\x01_\x01`\x01a\x01b\x01c\x01d\x01e\x01f\x01g\x01h\x01i\x01j\x01k\x01l\x01m\x01n\x01o\x01p\x01q\x01r\x01s\x01t\x01u\x01v\x01w\x01x\x01y\x01z\x01{\x01l\x01>\x01~\x01\x7f\x01\x80\x01\x81\x01\x82\x01\x83\x01\x84\x01\x85\x01\x86\x01\x87\x01\x88\x01\x89\x01\x8a\x01\x8b\x01\x8c\x01\x8d\x01\x8e\x01\x8f\x01\x90\x01\x91\x01\x92\x01\x93\x01\x94\x01\x16\x01\x03\x01)\x00\x97\x01\x95\x01\x9a\x01\x9b\x01\x9c\x01\x9d\x01\x9e\x01\x9f\x01\xa0\

(200,
 '{"elapsedTime": "34.339s", "diff": 0, "outputFolderPath": "D:\\\\Project\\\\DPT\\\\LzwCompressor\\\\output.decomPng", "message": "Decompression Png Successful"}')

In [20]:
print('{:.2}'.format(100 /50))

2.0


In [55]:
import cv2
import numpy as np
import struct


def setUpPathToComV2(inputPath):
    inputName = os.path.basename(inputPath).split('.')[0]
    inputParentPath = os.path.dirname(inputPath)

    outputFolderName = inputName + '.comPngv2'
    outputFolderPath = os.path.join(inputParentPath,outputFolderName)

    dicPaths = [os.path.join(outputFolderPath, 'dictonary({}).csv'.format(i)) for i in range(3)]
    
    compressedPath = [os.path.join(outputFolderPath, '{}({}).comPngv2.bin'.format(inputName,i)) for i in range(3)]


    if not os.path.exists(outputFolderPath):
        os.mkdir(outputFolderPath)

    return outputFolderPath, outputFolderName, dicPaths, compressedPath


def compress_pngv2(inputPath, compressedPath, type="H"):

    img = cv2.imread(inputPath)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    w,h,_ = img.shape

    dictionarys = []

    inputSize = 0
    compressedSize = 0

    # print(np.array(img[:,:,0]).flatten())
    for chanel in range(3):
        type = type
        imgArr = np.array(img[:,:,chanel])
        imgFla = imgArr.flatten()

        inputSize += len(imgFla)*8

        # print(max(imgFla))
        # print(len(imgFla))

        dictionary = {str(i): i for i in range(256)}
        print("Dictionary", dictionary)

        code = 256
        compressed_data = bytearray()
        current_sequence = ""
        output_data = []

        for byte in imgFla:
            # print("Byte: ", byte)
            if current_sequence:
                sequence = current_sequence + "+" + str(byte)
            else:
                sequence = str(byte)
            # print("Current sequence:", current_sequence)
            # print("Sequence", sequence)
            if sequence in dictionary:
                # print("In dictionary")
                current_sequence = sequence
            else:
                # print('What is struct:', dictionary[current_sequence])
                compressedSize += findRangePow(dictionary[current_sequence])
                compressed_data += struct.pack(type, dictionary[current_sequence])
                output_data.append(dictionary[current_sequence])
                dictionary[sequence] = code
                code += 1
                current_sequence = str(byte)

            # print(compressed_data)

        if current_sequence:
            compressedSize += findRangePow(dictionary[current_sequence])
            compressed_data += struct.pack(type, dictionary[current_sequence])
            output_data.append(dictionary[current_sequence])

        dictionarys.append(dictionary)

        if chanel == 2:
            compressed_data = compressed_data + bytes('<.>{}'.format(w), 'utf-8')
            compressed_data = compressed_data + bytes('<.>{}'.format(h), 'utf-8')
        elif chanel == 0:
            compressed_data = compressed_data + bytes('<.>{}'.format(type), 'utf-8')
        # print("Final compressed data: ", compressed_data)
        # print("Final output data: ", output_data[:])
        # print("Final output data: ", imgFla[:])
        # print("Dictionary", dictionary)

        with open(compressedPath[chanel], "wb") as output_file:
            output_file.write(compressed_data)
    
    return inputSize, compressedSize, dictionarys

def compress_lzw_png_v2(inputPath, type='H'):
    try:
        startTime = time.time()
        
        outputFolderPath, outputFolderName, dicPaths, compressedPath = setUpPathToComV2(inputPath)

        inputSize, compressedSize, dictionarys = compress_pngv2(inputPath,compressedPath, type=type)

        for i, dict in enumerate(dictionarys):
            saveDicttoCSV(dicPaths[i],dict)

        packZip(outputFolderName, outputFolderPath)

        # Check if compressed file is smaller than input file
        elapsedTime = '{:.5}s'.format(time.time() - startTime)
        compression_ratio =  '{:.2%}'.format(inputSize /compressedSize)
        avarageLength = '{:.2} bits/symbol'.format(float(compressedSize) / float(inputSize / 8))

        response ={
            'inputSize': inputSize,
            'compressedSize': compressedSize,
            'elapsedTime': elapsedTime,
            'cr': compression_ratio,
            'avarageLength':avarageLength,
            'outputFolderPath': outputFolderPath
        }
        status = 200
        responseJson = json.dumps(response)
    except:
        status = 400
        response ={
            'message':"Error for compress png v2"
        }
        responseJson = json.dumps(response)
    return status, responseJson


In [25]:
compress_lzw_png_v2('heeh.png', type='Q')

Dictionary {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, '10': 10, '11': 11, '12': 12, '13': 13, '14': 14, '15': 15, '16': 16, '17': 17, '18': 18, '19': 19, '20': 20, '21': 21, '22': 22, '23': 23, '24': 24, '25': 25, '26': 26, '27': 27, '28': 28, '29': 29, '30': 30, '31': 31, '32': 32, '33': 33, '34': 34, '35': 35, '36': 36, '37': 37, '38': 38, '39': 39, '40': 40, '41': 41, '42': 42, '43': 43, '44': 44, '45': 45, '46': 46, '47': 47, '48': 48, '49': 49, '50': 50, '51': 51, '52': 52, '53': 53, '54': 54, '55': 55, '56': 56, '57': 57, '58': 58, '59': 59, '60': 60, '61': 61, '62': 62, '63': 63, '64': 64, '65': 65, '66': 66, '67': 67, '68': 68, '69': 69, '70': 70, '71': 71, '72': 72, '73': 73, '74': 74, '75': 75, '76': 76, '77': 77, '78': 78, '79': 79, '80': 80, '81': 81, '82': 82, '83': 83, '84': 84, '85': 85, '86': 86, '87': 87, '88': 88, '89': 89, '90': 90, '91': 91, '92': 92, '93': 93, '94': 94, '95': 95, '96': 96, '97': 97, '98': 98, '99': 99, '100': 1

(200,
 '{"inputSize": 4680, "compressedSize": 2040, "elapsedTime": "0.12658s", "cr": "229.41%%", "avarageLength": "8.0 bits/symbol", "outputFolderPath": "heeh.comPngv2"}')

In [16]:
for chanel in range(3,):
    print(chanel)

0
1
2


In [46]:
def setUpPathToDecomv2(inputPath, opt='png'):
    # Check valid
    if inputPath.endswith('.comPngv2.zip'):
        message = "Invalid file"
    
    inputName = os.path.basename(inputPath).split('.')[0]
    inputParentPath = os.path.dirname(inputPath)

    outputFileName = inputName + '.png'
    comparePath = os.path.join(inputParentPath,outputFileName)

    outputFolderName = inputName + '.decomPngv2'
    outputFolderPath = os.path.join(inputParentPath,outputFolderName)

    

    toDecompressedPaths = [os.path.join(outputFolderPath, '{}({}).comPngv2.bin'.format(inputName,i)) for i in range(3)]

    decompressedPath = os.path.join(outputFolderPath, '{}.decom.png'.format(inputName))

    if not os.path.exists(outputFolderPath):
        os.mkdir(outputFolderPath)
    
    return toDecompressedPaths, decompressedPath, outputFolderPath, comparePath


def decompress_pngv2(inputPaths, outputPath, type="H"):

    image = []
    for chanel in range(3):
        with open(inputPaths[chanel], "rb") as input_file:
            compressed_data = input_file.read()
            print(compressed_data)

        if chanel == 2:
            compressed_data, width, height = compressed_data.split(bytes('<.>', 'utf-8'))[:]
            width = int(width)
            height = int(height)
        elif chanel == 0:
            compressed_data, type = compressed_data.split(bytes('<.>', 'utf-8'))[:]
        
        print(type)
        step = 2
        match type:
            case b'H':
                step = 2
            case b'I':
                step = 4
            case b'Q':
                step = 8
        
        print(step)
        dictionary = {str(i): i for i in range(256)}
        code = 256
        decompressed_data = []

        def takeSequence(sequence):
            result = [int(i) for i in sequence.split('+')]
            return result


        current_sequence = ""
        for i in range(0, len(compressed_data), step):
            value = str(struct.unpack(type, compressed_data[i:i+step])[0])
            # print("value", value)
            if value in dictionary:
                sequence = str(dictionary[value])
                # print('In dic', sequence)
            elif value == str(code):
                # print("Value = code")
                sequence = current_sequence + "+" + str(takeSequence(current_sequence)[0])
            else:
                raise ValueError("Invalid compressed data")
            for i in takeSequence(sequence):
                decompressed_data.append(i) 
            
            if current_sequence:
                # print("New", str(current_sequence + "+" +str(takeSequence(sequence)[0])))
                dictionary[str(code)] = str(current_sequence + "+" + str(takeSequence(sequence)[0]))
                code += 1
                # print('Code', code)
            current_sequence = sequence
            # print('Current', current_sequence)
            # print("Output",decompressed_data)
            # print("Dic",dictionary)
        image.append(decompressed_data)

    # print(np.array(image).shape)
    image_merge = cv2.merge([np.array(image[2]).reshape(width,height).astype(np.uint8), np.array(image[1]).reshape(width,height).astype(np.uint8), np.array(image[0]).reshape(width,height).astype(np.uint8)])
    print(np.array(image[0]).reshape(width,height).astype(np.uint8))
    cv2.imwrite(outputPath, image_merge)


def calDiffPngv2(firstPath, secondPath):
    firstImg = cv2.imread(firstPath)
    secondImg = cv2.imread(secondPath)

    diff = 0
    w, h, _ = firstImg.shape
    for i in range(3):
        for j in range(w):
            for k in range(h):
                if firstImg[j,k,i] != secondImg[j,k,i]:
                    diff += 1
    return diff

    

def decompress_lzw_png_v2(inputPath, type="H"):
    try:
        startTime = time.time()
        toDecompressedPaths, decompressedPath, outputFolderPath, comparePath = setUpPathToDecomv2(inputPath, opt='png')
        unpackZip(inputPath ,outputFolderPath)
        decompress_pngv2(toDecompressedPaths, decompressedPath, type=type)

        diff = calDiffPngv2(decompressedPath,comparePath)
        elapsedTime = '{:.5}s'.format(time.time() - startTime)
        print("Eslapsed time: ", elapsedTime)
        message = "Decompression Png v2 Successful"

        response ={
            'elapsedTime': elapsedTime,
            'diff': diff,
            'outputFolderPath': outputFolderPath,
            'message':message
        }
        status = 200
        responseJson = json.dumps(response)
    except:
        status = 400
        response ={
            'message':"Error Decompress Png v2"
        }
        responseJson = json.dumps(response)
    return status, responseJson
    

In [33]:
img = cv2.imread('D:\Project\DPT\LzwCompressor\output.png')
print(img[:,:,0].shape)

(768, 1366)


In [37]:
inputPath = 'D:\Project\DPT\LzwCompressor\heeh.comPngv2.zip'
decompress_lzw_png_v2(inputPath)

b'\xff\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\x03\x01\x00\x00\x00\x00\x00\x00\x04\x01\x00\x00\x00\x00\x00\x00\x05\x01\x00\x00\x00\x00\x00\x00\x06\x01\x00\x00\x00\x00\x00\x00\x07\x01\x00\x00\x00\x00\x00\x00\x08\x01\x00\x00\x00\x00\x00\x00\x03\x01\x00\x00\x00\x00\x00\x00\xfb\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x00\x00\x00\x00\x00\xfb\x00\x00\x00\x00\x00\x00\x00\r\x01\x00\x00\x00\x00\x00\x00\x0e\x01\x00\x00\x00\x00\x00\x00\xf3\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\xfb\x00\x00\x00\x00\x00\x00\x00\xda\x00\x00\x00\x00\x00\x00\x00\xf7\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\xda\x00\x00\x00\x00\x00\x00\x00\x18\x01\x00\x00\x00\x00\x00\x00\xda\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\xde\x00\x00\x00\x00\x00\x00\x00\xff\x00\x00\x00\x00\x00\x00\x00\x12\x01\x00\x00\x00\x00\x00\x00\xd4\x00

(200,
 '{"elapsedTime": "0.05597s", "diff": 0, "outputFolderPath": "D:\\\\Project\\\\DPT\\\\LzwCompressor\\\\heeh.decomPngv2", "message": "Decompression Png v2 Successful"}')

# GUI

In [None]:
from tkinter import *
from tkinter import ttk
from tkinter import filedialog as fd
import os
import time
import threading
import customtkinter

from PIL import Image
import struct
import os

customtkinter.set_appearance_mode("System")  # Modes: system (default), light, dark
customtkinter.set_default_color_theme("blue")  # Themes: blue (default), dark-blue, green

root = customtkinter.CTk()
root.title("Compress and Decompress Text File/ Image")

# Example text
filePath = 'Nhập đường dẫn ở đây ...'


# Define function
def com(opt='com', type='txt'):
    my_progress.start()
    root.update_idletasks()

    time.sleep(1)
    pathToCompress = feet.get()

            #     'inputSize': inputSize,
            # 'compressedSize': compressedSize,
            # 'elapsedTime': elapsedTime,
            # 'cr': compression_ratio,
            # 'outputFolderPath': outputFolderPath,
            # 'message':message
    if(opt=='com'):
        match type:
            case 'txt':
                status, responseJson = compress_lzw_utf8(pathToCompress)
            case 'png':
                status, responseJson = compress_lzw_png(pathToCompress)
            case 'pngv2':
                status, responseJson = compress_lzw_png_v2(pathToCompress)

        if(status == 200):
            response = json.loads(responseJson)                
            outFile = response['outputFolderPath'].replace('\\', '/') + '.zip'
            out.set(value=str(outFile))
            infoFileOutText.set(value=str('Average Length: {}\nBefore compression: {} bits\n After compressed {} bits\nExecution time: {}\nCompress ratio: {}'.format(response['avarageLength'], response['inputSize'], response['compressedSize'], response['elapsedTime'],response['cr'])))
            print("Finish compressed")
        else:
            response = json.loads(responseJson)
            infoFileOutText.set(response['message'])

    else:

        
        # response ={
        #     'elapsedTime': elapsedTime,
        #     'outputFolderPath': outputFolderPath,
        #     'message':message
        # }
        if pathToCompress.endswith('.comTxt.zip'):
            status, responseJson = decompress_lzw_utf8(pathToCompress)
        elif pathToCompress.endswith('.comPng.zip'):
            status, responseJson = decompress_lzw_png(pathToCompress)
        else:
            status, responseJson = decompress_lzw_png_v2(pathToCompress)

        if(status == 200):
            response = json.loads(responseJson)                
            outFile = response['outputFolderPath'].replace('\\', '/')
            out.set(value=str(outFile))
            infoFileOutText.set(value=str('Difference: {}\nTime: {}\n Message: {}'.format(response['diff'], response['elapsedTime'],response['message'])))
            print("Finish decompressed")
        else:
            response = json.loads(responseJson)
            infoFileOutText.set(response['message'])


    my_progress.stop()
    my_progress['value'] = 0

def stop():
    my_progress.stop()
    my_progress['value'] = 0

def convert_bytes(num):
    """
    this function will convert bytes to MB.... GB... etc
    """
    for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
        if num < 1024.0:
            return "%3.1f %s" % (num, x)
        num /= 1024.0
 
def file_size(file_path):
    """
    this function will return the file size
    """
    if os.path.isfile(file_path):
        file_info = os.stat(file_path)
        return convert_bytes(file_info.st_size)

def select_file():
    filetypes = (
        ('text files', '*.txt'),
        ('text files', '*.zip'),
        ('All files', '*.*')
    )

    filename = fd.askopenfilename(
        title='Open a file',
        initialdir='/',
        filetypes=filetypes)

    feet.set(value=str(filename))
    infoFileText.set(value=str(file_size(filename)))

    print(infoFile)

def select_image():
    filetypes = (
    ('png images', '*.png'),
    ('jpg images', '*.jpg'),
    ('All files', '*.*')
    )
    filename = fd.askopenfilename(
    title='Open a file',
    initialdir='/',
    filetypes=filetypes)
    
    feet.set(value=str(filename))
    infoFileText.set(value=str(file_size(filename)))

    print(infoFile)


def compress():
    threading.Thread(target=com('com', 'txt')).start()

def compressPng():
    threading.Thread(target=com('com', 'png')).start()

def compressPngv2():
    threading.Thread(target=com('com', 'pngv2')).start()

def decompress():
    threading.Thread(target=com(opt='des')).start()


mainframe = customtkinter.CTkFrame(root)
# mainframe = customtkinter.CTkFrame(root, padding="3 3 12 12")
mainframe.grid(column=0, row=0, sticky=(N, W, E, S))
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)

feet = StringVar(mainframe)
feet.set('Nhập đường dẫn ở đây ...')
feet_entry = customtkinter.CTkEntry(mainframe, width=100, textvariable=feet)
feet_entry.grid(column=1, row=1, sticky=(W, E))

out = StringVar(mainframe)
out.set('')
out_entry = customtkinter.CTkEntry(mainframe, width=100, textvariable=out)
out_entry.grid(column=3, row=1, sticky=(W, E))

infoFileText = StringVar(mainframe)
infoFileText.set('')
infoFile = customtkinter.CTkLabel(mainframe, textvariable=infoFileText).grid(column=1, row=2, sticky=(W,E))

infoFileOutText = StringVar(mainframe)
infoFileOutText.set('')
infoFileOut = customtkinter.CTkLabel(mainframe, textvariable=infoFileOutText).grid(column=3, row=2, sticky=(W,E))

my_progress = customtkinter.CTkProgressBar(mainframe, orientation='horizontal', mode='indeterminate')
# my_progress = customtkinter.CTkProgressBar(mainframe, orientation=HORIZONTAL,length=100, mode='indeterminate')
my_progress.grid(column=2, row=7)


meters = StringVar()
customtkinter.CTkLabel(mainframe, text=meters).grid(column=2, row=2, sticky=(N, S))

customtkinter.CTkButton(mainframe, text="Compress", command=compress).grid(column=2, row=3, sticky=(W,E))
customtkinter.CTkButton(mainframe, text="Compress PNG", command=compressPng).grid(column=2, row=4, sticky=(W,E))
customtkinter.CTkButton(mainframe, text="Compress PNGv2", command=compressPngv2).grid(column=2, row=5, sticky=(W,E))

customtkinter.CTkButton(mainframe, text="Decompress", command=decompress).grid(column=2, row=6, sticky=(W,E))

customtkinter.CTkButton(mainframe, text="Open file", command=select_file).grid(column=2, row=1, sticky=(W,E))
customtkinter.CTkButton(mainframe, text="Open PNG", command=select_image).grid(column=2, row=2, sticky=(W,E))
# ttk.Label(mainframe, text="is equivalent to").grid(column=1, row=2, sticky=E)
# ttk.Label(mainframe, text="meters").grid(column=3, row=2, sticky=W)

for child in mainframe.winfo_children(): 
    child.grid_configure(padx=5, pady=5)

feet_entry.focus()

# Short cut
# root.bind("<Return>", calculate)

root.mainloop()

In [19]:
inputPath = 'D:\Project\DPT\LzwCompressor\input.comTxt.zip'
status, response = decompress_lzw_utf8(inputPath)

b"T\x00\xc3\x00\xb4\x00i\x00\n\x00\n\x00U\x00n\x00l\x00o\x00c\x00k\x00 \x00Y\x00o\x00u\x00r\x00 \x00V\x00e\x00r\x00t\x00i\x00c\x00a\x00l\x00 \x00P\x00o\x00t\x00e\x00n\x00\x15\x01\x18\x01:\x00 \x00M\x00a\x00s\x00\x1d\x01\x10\x01t\x00h\x00e\x00 \x00A\x00\x14\x01 \x00o\x00f\x00 \x00E\x00x\x00p\x00\x08\x01s\x00i\x00v\x00+\x01J\x00u\x00m\x00p\x00i\x00n\x00g\x00!\x00\x04\x01F\x00o\x00r\x00g\x00e\x00t\x00 \x00a\x00b\x00\x0e\x01I\x01\x1e\x01d\x00l\x00e\x00s\x00s\x00 \x00\x15\x01p\x00T\x01a\x00n\x00d\x00 \x00?\x01e\x00f\x00f\x00e\x00c\x00\x15\x019\x01 \x00m\x00H\x01h\x00o\x00d\x00s\x00.\x00 \x00W\x00e\x00'\x00d\x01g\x00\x1c\x01 \x00y\x00\x0e\x01 \x00c\x00o\x009\x01r\x00e\x00[\x01w\x00i\x00)\x01U\x01*\x01 \x00u\x00l\x00\x15\x01m\x00a\x00\x1d\x01 \x00s\x00o\x00l\x00u\x00\x15\x01o\x00n\x00U\x01o\x00\x8a\x01k\x00y\x00r\x00\t\x01k\x00H\x01t\x01\x0e\x01\x10\x01z\x01\x15\x01\x17\x01\x19\x01j\x00<\x01p\x00l\x01\n\x00S\x00\x1d\x01p\x00 \x00b\x00y\x00\x8a\x01\xa8\x01,\x00 \x00w\x00o\x01l\x00\x19\x01g\x00

In [20]:
import os
import struct
import csv
import time
import shutil

def findRangePow(x):
    i = 0
    while pow(2,i) - 1 < x:i+=1
    return i

def packZip(outputFolderName, inputParentPath):
    shutil.make_archive(outputFolderName, 'zip', inputParentPath)

def unpackZip(inputPath, inputParentPath):
    shutil.unpack_archive(inputPath ,inputParentPath, 'zip')

def setUpPathToCom(inputPath, opt='png'):
    inputName = os.path.basename(inputPath).split('.')[0]
    inputParentPath = os.path.dirname(inputPath)
    if opt == 'png':
        outputFolderName = inputName + '.comPng'

        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        dicPath = os.path.join(outputFolderPath, 'dictonary.csv')
        compressedPath = os.path.join(outputFolderPath, '{}.comPng.bin'.format(inputName))
    else:
        outputFolderName = inputName + '.comTxt'

        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        dicPath = os.path.join(outputFolderPath, 'dictonary.csv')
        compressedPath = os.path.join(outputFolderPath, '{}.comTxt.bin'.format(inputName))
        
    if not os.path.exists(outputFolderPath):
        os.mkdir(outputFolderPath)

    return outputFolderPath, outputFolderName, dicPath, compressedPath

def setUpPathToDecom(inputPath, opt='txt'):
    # Check valid
    if inputPath.endswith('.comTxt.zip') or inputPath.endswith('.comPng.zip'):
        message = "Invalid file"
    
    inputName = os.path.basename(inputPath).split('.')[0]
    inputParentPath = os.path.dirname(inputPath)



    if opt == 'txt':
        outputFolderName = inputName + '.decomTxt'
        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        toDecompressedPath = os.path.join(outputFolderPath, '{}.comTxt.bin'.format(inputName))
        decompressedPath = os.path.join(outputFolderPath, '{}.decom.txt'.format(inputName))
    else:
        outputFolderName = inputName + '.decomPng'
        outputFolderPath = os.path.join(inputParentPath,outputFolderName)

        toDecompressedPath = os.path.join(outputFolderPath, '{}.comPng.bin'.format(inputName))

        decompressedPath = os.path.join(outputFolderPath, '{}.decomPng.png'.format(inputName))

    if not os.path.exists(outputFolderPath):
        os.mkdir(outputFolderPath)
    
    return toDecompressedPath, decompressedPath, outputFolderPath
    


def saveDicttoCSV(dicPath, dictionary):
    headers = ['CodeWord', 'Code']
    with open(dicPath, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(headers)
        for key, value in dictionary.items():
            # print(key, value)
            writer.writerow([key,value])

def compress_txt(inputPath, outputPath, type='H'):
    # Set up parameter
    code = 256
    dictionary = {bytes([i]): i for i in range(code)}
    compressed_data = bytearray()
    current_sequence = b""

    # Read input file
    with open(inputPath, "r", encoding="utf-8") as input_file:
        data = input_file.read().encode("utf-8")

    inputSize = len(data)*8
    compressedSize = 0

    # Compress data using LZW
    for byte in data:
        sequence = current_sequence + bytes([byte])
        if sequence in dictionary:
            current_sequence = sequence
        else:
            compressedSize += findRangePow(dictionary[current_sequence])
            compressed_data += struct.pack(type, dictionary[current_sequence])
            dictionary[sequence] = code
            code += 1
            current_sequence = bytes([byte])

    if current_sequence:
        compressedSize += findRangePow(dictionary[current_sequence])
        compressed_data += struct.pack(type, dictionary[current_sequence])

    #Save type in compressed data
    compressed_data = compressed_data + bytes('<.>{}'.format(type), 'utf-8')
    
    # Write compressed data to output file
    with open(outputPath, "wb") as output_file:
        output_file.write(compressed_data)

    return inputSize, compressedSize, dictionary

def compress_png(inputPath, outputPath, type='H'):
    # Set up parameter
    code = 256
    dictionary = {bytes([i]): i for i in range(code)}
    compressed_data = bytearray()
    current_sequence = b""

    # Open image and convert to raw pixel data
    with Image.open(inputPath) as img:
        data = img.convert("RGB").tobytes()
        width, height = img.size
        print(width, height)

    inputSize = len(data)*8
    compressedSize = 0

    # Compress data using LZW
    for byte in data:
        sequence = current_sequence + bytes([byte])
        if sequence in dictionary:
            current_sequence = sequence
        else:
            compressedSize += findRangePow(dictionary[current_sequence])
            compressed_data += struct.pack(type, dictionary[current_sequence])
            dictionary[sequence] = code
            code += 1
            current_sequence = bytes([byte])

    if current_sequence:
        compressedSize += findRangePow(dictionary[current_sequence])
        compressed_data += struct.pack(type, dictionary[current_sequence])

    #Save type in parameter image and type
    compressed_data = compressed_data + bytes('<.>{}'.format(width), 'utf-8')
    compressed_data = compressed_data + bytes('<.>{}'.format(height), 'utf-8')
    compressed_data = compressed_data + bytes('<.>{}'.format(type), 'utf-8')

    print(compressed_data)
    
    # Write compressed data to output file
    with open(outputPath, "wb") as output_file:
        output_file.write(compressed_data)

    return inputSize, compressedSize, dictionary

def decompress_txt(inputPath, outputPath):
    # Read compressed data from input file
    with open(inputPath, "rb") as input_file:
        compressed_data = input_file.read()
    print(compressed_data)

    compressed_data, type = compressed_data.split(bytes('<.>', 'utf-8'))[:]
    step = 2
    match type:
        case 'H':
            step = 2
        case 'I':
            step = 4
        case 'Q':
            step = 8

    # Decompress data using LZW
    dictionary = {i: bytes([i]) for i in range(256)}
    code = 256
    decompressed_data = bytearray()
    current_sequence = bytes()
    

    for i in range(0, len(compressed_data), step):
        value = struct.unpack(type, compressed_data[i:i+step])[0]
        if value in dictionary:
            sequence = dictionary[value]
            # print("In dic", sequence[0])
            # print("sequence", sequence)
        elif value == code:
            sequence = current_sequence + bytes([current_sequence[0]])
        else:
            raise ValueError("Invalid compressed data")
        decompressed_data += sequence
        # print("Decompress", decompressed_data)
        if current_sequence:
            print("in if current", sequence[0])
            dictionary[code] = current_sequence + bytes([sequence[0]])
            code += 1
            # print("Code", code)
        current_sequence = sequence

    # Write decompressed data to output file
    with open(outputPath, "w", encoding="utf-8") as output_file:
        output_file.write(decompressed_data.decode("utf-8"))


def decompress_png(inputPath, outputPath):
    # Read compressed data from input file
    with open(inputPath, "rb") as input_file:
        compressed_data = input_file.read()
    print(compressed_data)

    compressed_data, width, height, type = compressed_data.split(bytes('<.>', 'utf-8'))[:]
    print(type)
    step = 2
    match type:
        case 'H':
            step = 2
        case 'I':
            step = 4
        case 'Q':
            step = 8

    # Decompress data using LZW
    dictionary = {i: bytes([i]) for i in range(256)}
    code = 256
    decompressed_data = b""
    current_sequence = b""
    


    for i in range(0, len(compressed_data), step):
        value = struct.unpack(type, compressed_data[i:i+step])[0]
        if value in dictionary:
            sequence = dictionary[value]
            # print("In dic", sequence[0])
            # print("sequence", sequence)
        elif value == code:
            sequence = current_sequence + bytes([current_sequence[0]])
        else:
            raise ValueError("Invalid compressed data")
        decompressed_data += sequence
        # print("Decompress", decompressed_data)
        if current_sequence:
            # print("in if current", sequence[0])
            dictionary[code] = current_sequence + bytes([sequence[0]])
            code += 1
            # print("Code", code)
        current_sequence = sequence

    width = int(width)
    height = int(height)

    # width, height = original_image.size
    print('Width', width, 'Height', height)
    img = Image.frombytes("RGB", (width, height), decompressed_data)
    img.save(outputPath, format="PNG")

def compress_lzw_utf8(inputPath, type="H"):

    startTime = time.time()
    outputFolderPath, outputFolderName, dicPath, compressedPath = setUpPathToCom(inputPath)

    inputSize, compressedSize, dictionary = compress_txt(inputPath,compressedPath, type=type)

    saveDicttoCSV(dicPath,dictionary)
    packZip(outputFolderName, outputFolderPath)

    # Check if compressed file is smaller than input file
    elapsedTime = '{:.5}s'.format(time.time() - startTime)
    print("Eslapsed time: ", elapsedTime)
    compression_ratio = compressedSize / inputSize
    if compression_ratio >= 1:
        message = "WARNING: Compressed file is not smaller than input file"
    else:
        message = "Compression successful: {:.2%} reduction in file size".format(1 - compression_ratio)
    print(message)

    return outputFolderPath

def decompress_lzw_utf8(inputPath):

    startTime = time.time()
    toDecompressedPath, decompressedPath, outputFolderPath = setUpPathToDecom(inputPath, opt='png')
    unpackZip(inputPath ,outputFolderPath)
    decompress_png(toDecompressedPath, decompressedPath)
    print("Decompression successful")

    return outputFolderPath



In [21]:
imagePath = 'heeh.png'

img = cv2.imread(imagePath)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
w,h,_ = img.shape

# print(np.array(img[:,:,0]).flatten())
for chanel in range(3):
    imgArr = np.array(img[:,:,chanel])

    imgFla = imgArr.flatten()

    print(max(imgFla))
    print(len(imgFla))


    dictionary = {str(i): i for i in range(256)}
    print("Dictionary", dictionary)

    code = 256
    compressed_data = bytearray()
    current_sequence = ""
    output_data = []

    for byte in imgFla:
        # print("Byte: ", byte)
        if current_sequence:
            sequence = current_sequence + "+" + str(byte)
        else:
            sequence = str(byte)
        # print("Current sequence:", current_sequence)
        # print("Sequence", sequence)
        if sequence in dictionary:
            # print("In dictionary")
            current_sequence = sequence
        else:
            # print('What is struct:', dictionary[current_sequence])
            compressed_data += struct.pack("H", dictionary[current_sequence])
            output_data.append(dictionary[current_sequence])
            dictionary[sequence] = code
            code += 1
            current_sequence = str(byte)

        # print(compressed_data)

    if current_sequence:
        compressed_data += struct.pack("H", dictionary[current_sequence])
        output_data.append(dictionary[current_sequence])

    if chanel == 2:
        compressed_data = compressed_data + bytes('<.>{}'.format(w), 'utf-8')
        compressed_data = compressed_data + bytes('<.>{}'.format(h), 'utf-8')
    # print("Final compressed data: ", compressed_data)
    # print("Final output data: ", output_data[:])
    # print("Final output data: ", imgFla[:])
    # print("Dictionary", dictionary)

    output_path = 'anhnen{}.bin'.format(chanel)
    with open(output_path, "wb") as output_file:
        output_file.write(compressed_data)

255
195
Dictionary {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, '10': 10, '11': 11, '12': 12, '13': 13, '14': 14, '15': 15, '16': 16, '17': 17, '18': 18, '19': 19, '20': 20, '21': 21, '22': 22, '23': 23, '24': 24, '25': 25, '26': 26, '27': 27, '28': 28, '29': 29, '30': 30, '31': 31, '32': 32, '33': 33, '34': 34, '35': 35, '36': 36, '37': 37, '38': 38, '39': 39, '40': 40, '41': 41, '42': 42, '43': 43, '44': 44, '45': 45, '46': 46, '47': 47, '48': 48, '49': 49, '50': 50, '51': 51, '52': 52, '53': 53, '54': 54, '55': 55, '56': 56, '57': 57, '58': 58, '59': 59, '60': 60, '61': 61, '62': 62, '63': 63, '64': 64, '65': 65, '66': 66, '67': 67, '68': 68, '69': 69, '70': 70, '71': 71, '72': 72, '73': 73, '74': 74, '75': 75, '76': 76, '77': 77, '78': 78, '79': 79, '80': 80, '81': 81, '82': 82, '83': 83, '84': 84, '85': 85, '86': 86, '87': 87, '88': 88, '89': 89, '90': 90, '91': 91, '92': 92, '93': 93, '94': 94, '95': 95, '96': 96, '97': 97, '98': 98, '99': 99, 

In [22]:
imgFla = np.array([1, 7, 7, 7, 5, 88, 88, 88, 88, 88, 88, 9])

print(max(imgFla))
print(len(imgFla))


dictionary = {str(i): i for i in range(256)}
print("Dictionary", dictionary)

code = 256
compressed_data = bytearray()
current_sequence = ""
output_data = []

for byte in imgFla:
    # print("Byte: ", byte)
    if current_sequence:
        sequence = current_sequence + "+" + str(byte)
    else:
        sequence = str(byte)
    # print("Current sequence:", current_sequence)
    # print("Sequence", sequence)
    if sequence in dictionary:
        # print("In dictionary")
        current_sequence = sequence
    else:
        # print('What is struct:', dictionary[current_sequence])
        compressed_data += struct.pack("H", dictionary[current_sequence])
        output_data.append(dictionary[current_sequence])
        dictionary[sequence] = code
        code += 1
        current_sequence = str(byte)

    # print(compressed_data)

if current_sequence:
    compressed_data += struct.pack("H", dictionary[current_sequence])
    output_data.append(dictionary[current_sequence])

print("Final compressed data: ", compressed_data)
print("Final output data: ", output_data[:])
print("Final output data: ", imgFla[:])
print("Dictionary", dictionary)

88
12
Dictionary {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, '10': 10, '11': 11, '12': 12, '13': 13, '14': 14, '15': 15, '16': 16, '17': 17, '18': 18, '19': 19, '20': 20, '21': 21, '22': 22, '23': 23, '24': 24, '25': 25, '26': 26, '27': 27, '28': 28, '29': 29, '30': 30, '31': 31, '32': 32, '33': 33, '34': 34, '35': 35, '36': 36, '37': 37, '38': 38, '39': 39, '40': 40, '41': 41, '42': 42, '43': 43, '44': 44, '45': 45, '46': 46, '47': 47, '48': 48, '49': 49, '50': 50, '51': 51, '52': 52, '53': 53, '54': 54, '55': 55, '56': 56, '57': 57, '58': 58, '59': 59, '60': 60, '61': 61, '62': 62, '63': 63, '64': 64, '65': 65, '66': 66, '67': 67, '68': 68, '69': 69, '70': 70, '71': 71, '72': 72, '73': 73, '74': 74, '75': 75, '76': 76, '77': 77, '78': 78, '79': 79, '80': 80, '81': 81, '82': 82, '83': 83, '84': 84, '85': 85, '86': 86, '87': 87, '88': 88, '89': 89, '90': 90, '91': 91, '92': 92, '93': 93, '94': 94, '95': 95, '96': 96, '97': 97, '98': 98, '99': 99, '1

# PNG


In [23]:
image = []
for chanel in range(3):
    with open('anhnen{}.bin'.format(chanel), "rb") as input_file:
        compressed_data = input_file.read()
        print(compressed_data)
    if chanel == 2:
        compressed_data, width, height = compressed_data.split(bytes('<.>', 'utf-8'))[:]
        width = int(width)
        height = int(height)
    dictionary = {str(i): i for i in range(256)}
    code = 256
    decompressed_data = []

    def takeSequence(sequence):
        result = [int(i) for i in sequence.split('+')]
        return result


    current_sequence = ""
    for i in range(0, len(compressed_data), 2):
        value = str(struct.unpack("H", compressed_data[i:i+2])[0])
        # print("value", value)
        if value in dictionary:
            sequence = str(dictionary[value])
            # print('In dic', sequence)
        elif value == str(code):
            # print("Value = code")
            sequence = current_sequence + "+" + str(takeSequence(current_sequence)[0])
        else:
            raise ValueError("Invalid compressed data")
        for i in takeSequence(sequence):
            decompressed_data.append(i) 
        
        if current_sequence:
            # print("New", str(current_sequence + "+" +str(takeSequence(sequence)[0])))
            dictionary[str(code)] = str(current_sequence + "+" + str(takeSequence(sequence)[0]))
            code += 1
            # print('Code', code)
        current_sequence = sequence
        # print('Current', current_sequence)
        # print("Output",decompressed_data)
        # print("Dic",dictionary)
    image.append(decompressed_data)

print(np.array(image).shape)
image_merge = cv2.merge([np.array(image[2]).reshape(width,height).astype(np.uint8), np.array(image[1]).reshape(width,height).astype(np.uint8), np.array(image[0]).reshape(width,height).astype(np.uint8)])
print(np.array(image[0]).reshape(w,h).astype(np.uint8))
cv2.imwrite('hahah.png', image_merge)


b'\xff\x00\x00\x01\x01\x01\x02\x01\x03\x01\x04\x01\x05\x01\x06\x01\x07\x01\x08\x01\x03\x01\xfb\x00\x02\x01\xfb\x00\r\x01\x0e\x01\xf3\x00\x00\x01\xfb\x00\xda\x00\xf7\x00\x00\x01\xe4\x00\xd4\x00\xda\x00\x18\x01\xda\x00\xd4\x00\xde\x00\xff\x00\x12\x01\xd4\x00\xf1\x00\x00\x01\xde\x00\x17\x01\x1a\x01\x1a\x01\x1c\x01\xf7\x00\xd4\x00\xd4\x00\xea\x00!\x01#\x01\x18\x01\x1b\x01\x1d\x01\x1a\x01\xe4\x00/\x01\x19\x01-\x01\x1c\x01\x1e\x01\x1c\x01\xf3\x00,\x014\x01/\x01.\x01\xee\x009\x01%\x01;\x01\xda\x00\xff\x00\xea\x00\x17\x01\xde\x00\xde\x00:\x01\x12\x01\x18\x01\xff\x00\x16\x01\xda\x00\xe4\x00F\x01?\x01\x12\x01'
b'\xff\x00\x00\x01\x01\x01\x02\x01\x03\x01\x04\x01\x05\x01\x06\x01\x07\x01\x08\x01\x03\x01\xe6\x00\x01\x01\xfe\x00\xe6\x00\x0e\x01\x0f\x01\xf1\x00\x00\x01\xe6\x00"\x00\xd8\x00\x00\x01\x83\x00\x06\x00"\x00\x19\x01"\x00\x06\x00Z\x00\xff\x00\x13\x01\x06\x00\xba\x00\x00\x01Z\x00\x18\x01\x1b\x01\x1b\x01\x1d\x01\xd8\x00\x06\x00\x06\x00\x98\x00\x00\x01<\x00$\x01\x19\x01\x1c\x01\x1e\x01\x1b\x01p\x

True

In [24]:
total = 0
for index, value in enumerate(decompressed_data):
    if value != imgFla[index]:
        total +=1
        # print(index)

print(total)

IndexError: index 12 is out of bounds for axis 0 with size 12

In [None]:
outImgFla = np.array(decompressed_data)
root = outImgFla.reshape(w,h)
root = root.astype(np.uint8)
print(root)
cv2.imshow('haha',root)

[[28 29 24 ... 24 24 24]
 [29 29 24 ... 24 24 24]
 [25 29 24 ... 24 24 24]
 ...
 [26 26  0 ... 24 24 24]
 [26 27  0 ... 24 24 24]
 [26 25  0 ... 24 24 24]]


: 

In [None]:
dictionary = {str(i): i for i in range(256)}
print("Dictionary:" ,dictionary)
for key, value in dictionary.items():
    print(key, value)
        