In [243]:
#目录配置
originalFilePath="originalfile.txt"
encodedFilePath="encodedfile.txt"
huffmanFilePath="huffmanfile.txt"
codewordbookPath="codewordbook.txt"
analysisPath="analysis.txt"
decodedPath="decodedfile.txt"

In [244]:
from dataclasses import dataclass
import numpy as np

@dataclass
class Node:
    freq:np.uint32=0
    ch:np.int32=-1
    lc:np.int32=-1
    rc:np.int32=-1
    parent:np.int32=-1
    def __lt__(self,obj):
        if self.freq<obj.freq:return True

@dataclass
class CompressInfo:
    originalSize:np.uint64=0
    compressSize:np.uint64=0
@dataclass
class CodingTableValue:
    freq:np.uint32=0
    code:str=""

In [245]:
from typing import List,Dict,Tuple
from collections import defaultdict
import os
class Compressor:
    nodes:List[Node]
    codingTable:defaultdict[int,CodingTableValue]
    info:CompressInfo
    encoded:bool=False
    srcPath:str
    def __init__(self):
        self.info=CompressInfo()
        self.codingTable=defaultdict(lambda:CodingTableValue())
    def encode(self,filepath:str):
        self.srcPath=filepath
        #边界情况
        if os.stat(filepath).st_size==0:raise 'empty string not allow'

        with open(filepath,mode='r',encoding='utf-8') as file:
            while True:
                ch=file.read(1)
                if not ch:break
                ch=ord(ch)
                self.codingTable[ch].freq+=1
        self.nodes=[Node(ch=ch,freq=value.freq) for ch,value in self.codingTable.items()]
        build_tree(self.nodes)

        def create_coding_table(n:int,code:str):
            if self.nodes[n].ch!=-1:
                self.codingTable[self.nodes[n].ch].code=code
            else:
                if self.nodes[n].lc!=-1:create_coding_table(self.nodes[n].lc,code+"0")
                if self.nodes[n].rc!=-1:create_coding_table(self.nodes[n].rc,code+"1")
        create_coding_table(0,"")
        for item in self.codingTable.values():
            self.info.compressSize+=item.freq*len(item.code)
            self.info.originalSize+=item.freq*8
        self.encoded=True
        return self
    def toFile(self,filepath:str):
        with open(filepath,mode='w',encoding='utf-8') as dest:
            with open(self.srcPath,mode='r',encoding='utf-8') as src:
                dest.write(f'{len(self.codingTable)}\n')
                for key,value in self.codingTable.items():
                    dest.write(f'{key} {value.freq}\n')
                while True:
                    ch=src.read(1)
                    if not ch:break
                    dest.write(self.codingTable[ord(ch)].code)
        return self

class Decompressor:
    nodes:List[Node]=[]
    srcPath:str
    def __init__(self,srcPath:str):
        self.srcPath=srcPath
    def decode(self,destPath:str): 
        with open(self.srcPath,'r',encoding='utf-8') as src:
            size=int(src.readline())
            while size:
                ch,freq=list(map(int,src.readline().split(' ')))
                self.nodes.append(Node(freq=freq,ch=ch))
                size-=1
            build_tree(self.nodes)
            cur=0
            with open(destPath,'w',encoding='utf-8')as dest:
                while True:
                    ch=src.read(1)
                    if not ch:break
                    if ch=="0":cur=self.nodes[cur].lc
                    elif ch=="1":cur=self.nodes[cur].rc
                    if self.nodes[cur].ch!=-1:
                        dest.write(chr(self.nodes[cur].ch))
                        cur=0

In [246]:
import heapq
def build_tree(nodes:List[Node]):
    nodes.sort(key=lambda x:x.ch)#保证编码解码得到的码表顺序一致
    q=nodes.copy()
    heapq.heapify(q)
    size=len(nodes)
    #边界情况
    if size==0:
        nodes.insert(0,Node(lc=1))
        return

    nodes[:]=[Node()]*(2*size-1)
    
    for i in range(2*size-2,0,-2):
        nodes[i]=heapq.heappop(q)
        nodes[i-1]=heapq.heappop(q)
        heapq.heappush(q,Node(lc=i-1,rc=i))
        l,r=nodes[i].lc,nodes[i].rc
        if l!=-1:nodes[l].parent=i
        if r!=-1:nodes[r].parent=i

        l,r=nodes[i-1].lc,nodes[i-1].rc
        if l!=-1:nodes[l].parent=i-1
        if r!=-1:nodes[r].parent=i-1
    
    nodes[0]=heapq.heappop(q)

def print_tree(nodes:List[Node],destPath:str):
    with open(destPath,'w',encoding='utf-8') as dest:
        def print_node(n:int):
            if nodes[n].ch==-1:dest.write('none\n')
            else:
                ch=str(nodes[n].ch)
                if ch.isalnum():dest.write(chr(nodes[n].ch)+'\n')
                else:dest.write(f'special-{nodes[n].ch}')

        def dfs(prefix:str,n:int,isLeft:bool):
            if (n != -1):
                dest.write(prefix)
                dest.write( "├──"if isLeft else "└──" );

                print_node(n);

                dfs(prefix + ( "│   "if isLeft else "    "), nodes[n].lc, True);
                dfs(prefix + ( "│   "if isLeft else "    "), nodes[n].rc, False);
        dfs("",0,False)
def print_codingTalbe(codingTable:defaultdict[np.int32,CodingTableValue],destPath:str):
    with open(destPath,'w',encoding='utf-8') as dest:
        for key,value in codingTable.items():
            dest.write(f'{key} {value.code}\n')
def print_analysis(info:CompressInfo,destPath:str):
    with open(destPath,'w',encoding='utf-8') as dest:
        dest.write(f'原始大小:{info.originalSize}\n')
        dest.write(f'压缩大小:{info.compressSize}\n')
        dest.write(f'压缩率:{info.compressSize/info.originalSize*100}%\n')



In [247]:
compressor=Compressor()
compressor.encode(originalFilePath).toFile(encodedFilePath)
print_tree(compressor.nodes,huffmanFilePath)
print_codingTalbe(compressor.codingTable,codewordbookPath)
print_analysis(compressor.info,analysisPath)
decompressor=Decompressor(encodedFilePath)
decompressor.decode(decodedPath)