In [17]:
import numpy as np
import ccdm
import numpy as np
from functools import reduce
import itertools
import copy
from typing import List

In [18]:
def cumulative_prob(P):
    """Cria os limites cumulativos para cada símbolo"""
    cum = {}
    total = 0.0
    for symbol, prob in sorted(P.items()):
        cum[symbol] = (total, total + prob)
        total += prob
    return cum

def update_src_interval(src_interval, src_probability, src_symbols):
    """
    Updates the source interval based on the symbol and probability model.
    """
    newborder = np.uint32(src_interval.lowerBound + (np.uint32(src_interval.upperBound - src_interval.lowerBound) * src_probability[0]))
    if src_symbols == 0:
        src_interval.upperBound = newborder
    else:
        src_interval.lowerBound = newborder

def matching_canditade_intervals(symbol, source_interval, cum):
    """Encontra os intervalos de símbolos que coincidem com os da fonte"""
    matching = {}
    symbol_low_source, symbol_high_source = source_interval
    for cc in cum:
        symbol_low_canditate, symbol_high_canditate = cum[cc]
        if symbol_low_source >= symbol_low_canditate and symbol_high_source <= symbol_high_canditate:
            return True, symbol, symbol_low_canditate, symbol_high_canditate
    return False, None, None, None

def matching_canditade_intervals_finalize(symbol, source_interval, cum):
    """Encontra os intervalos de símbolos que coincidem com os da fonte"""
    matching = {}
    high = []
    low = []
    symbol_low_source, symbol_high_source = source_interval
    for cc in cum:
        symbol_low_canditate, symbol_high_canditate = cum[cc]
        if symbol_low_source >= symbol_low_canditate:
            matching = {cc: (symbol_low_canditate, symbol_high_canditate)}
            low.append(matching)
        if symbol_high_source <= symbol_high_canditate:
            matching = {cc: (symbol_low_canditate, symbol_high_canditate)}
            high.append(matching)
    return max(low, key=lambda d: list(d.values())[0][0]) | min(high, key=lambda d: list(d.values())[0][0])

def rescale_source_interval(source_interval, new_low, new_high):
    """Redimensiona os intervalos cumulativos para corresponder aos da fonte"""
    
    low, high = source_interval
    range_ = new_high - new_low
    new_bounder_low = (low - new_low)/ range_
    new_bounder_high = (high - new_low) / range_
    mid = (new_bounder_low + new_bounder_high) / 2
    P_source = {0: (new_bounder_low, mid), 1: (mid, new_bounder_high)}
    return P_source

def rescale_source_code(P_code, new_low, new_high, P=0.5):
    """Redimensiona os intervalos cumulativos para corresponder aos da fonte"""  
    range_ = new_high - new_low
    mid = (new_low + P * range_)
    P_code = {0: (new_low, mid), 1: (mid, new_high)}
    return P_code

def update_code_interval(code_intervals, P):
    _, m1 = code_intervals[0]
    new_code_intervals = []
    for symbol in code_intervals:
        lowerBound, upperBound  = code_intervals[symbol]
        probability = P[symbol]
        if symbol == 0:
            upperBound = m1 - (1-probability) * (m1 - lowerBound)
            new_code_intervals.append((lowerBound, upperBound))
            new_code_intervals.append((upperBound, m1))
        else:
            lowerBound = m1 + (1-probability) * (upperBound - m1)
            new_code_intervals.append((m1, lowerBound))
            new_code_intervals.append((lowerBound, upperBound))
    return new_code_intervals


In [None]:
def arithmetic_encode(sequence, P, P_source):
    """Codifica uma sequência usando codificação aritmética"""
    cum_candidate = cumulative_prob(P)
    cum_source = cumulative_prob(P_source)
    out_bits = 0
    out = []
    print("Intervalo Fonte:", cum_source)
    print("Intervalo Candidato:", cum_candidate)
    

    for symbol in sequence:
        print("\n--> Símbolo em teste", symbol)
        source_interval = cum_source[symbol]
        success, new_sym, new_low, new_high = matching_canditade_intervals(symbol, source_interval, cum_candidate)
        while success:
            cum_source = rescale_source_interval(source_interval, new_low, new_high)
            print("\nIntervalo encontrado")
            print("Intervalo Fonte:", cum_source)
            print("Intervalo Candidato:", cum_candidate)
            out.extend([new_sym])
            out_bits+=1
        
            new_interval = update_code_interval(cum_candidate, P)
            n_bits = np.log2(len(new_interval)).astype(int)
            chaves = list(itertools.product([0, 1], repeat=n_bits))
            new_code_interval = {chave: intervalo for chave, intervalo in zip(chaves, new_interval)}
            print("\nIntervalo Fonte:", cum_source)
            print("Intervalo Candidato:", new_code_interval)
            success, new_sym, new_low, new_high = matching_canditade_intervals(symbol, source_interval, new_code_interval)
            
            
    new_code_interval = matching_canditade_intervals_finalize(symbol, source_interval, new_code_interval)
    out_bits+=1
    subdivididos = {}
    for chave, (inicio, fim) in new_code_interval.items():
        comprimento = fim - inicio
        ponto_meio = inicio + 0.4 * comprimento  # 40% do intervalo
        
        subdivididos[(chave, 0)] = (inicio, ponto_meio)
        subdivididos[(chave, 1)] = (ponto_meio, fim)

    fim = []

    for chave, (low_code, high_code) in subdivididos.items():
        
        low_source, high_source = source_interval
        if low_source < low_code and high_source > high_code:
            fim.append({chave: (low_code, high_code)})

    print(fim)
    maior = max(fim, key=lambda d: list(d.values())[0][1] - list(d.values())[0][0])          
    print(maior)
    

    for chave in maior.keys():
        for elemento in chave:
            if isinstance(elemento, tuple):
                out.extend(elemento)
            else:
                out.append(elemento)

    print("CC:", out)
    
    return out



In [20]:
# Exemplo
P_code = {0: 0.4, 1: 0.6}
P_source = {0: 0.5, 1: 0.5}
sequence = [1, 0]

out = arithmetic_encode(sequence, P_code, P_source)


Intervalo Fonte: {0: (0.0, 0.5), 1: (0.5, 1.0)}
Intervalo Candidato: {0: (0.0, 0.4), 1: (0.4, 1.0)}

--> Símbolo em teste 1

Intervalo encontrado
Intervalo Fonte: {0: (0.16666666666666663, 0.5833333333333333), 1: (0.5833333333333333, 1.0)}
Intervalo Candidato: {0: (0.0, 0.4), 1: (0.4, 1.0)}

Intervalo Fonte: {0: (0.16666666666666663, 0.5833333333333333), 1: (0.5833333333333333, 1.0)}
Intervalo Candidato: {(0, 0): (0.0, 0.16000000000000003), (0, 1): (0.16000000000000003, 0.4), (1, 0): (0.4, 0.64), (1, 1): (0.64, 1.0)}

--> Símbolo em teste 0
[{((0, 1), 1): (0.256, 0.4)}, {((1, 0), 0): (0.4, 0.496)}]
{((0, 1), 1): (0.256, 0.4)}
CC: [1, 0, 1, 1]


In [21]:
class CodeCandidate:
    """
    Represents a candidate for arithmetic coding with associated probability interval and symbol sequence.

    Attributes
    ----------
    lowerBound : np.uint32
        Lower bound of the probability interval.
    upperBound : np.uint32
        Upper bound of the probability interval.
    probability : np.uint32
        Width of the probability interval.
    symbols : List[int]
        Symbol sequence associated with this candidate.
    """
    def __init__(self, lowerBound: np.uint32 = np.uint32(0), 
                 upperBound: np.uint32= np.uint32(0), 
                 probability: np.uint32 = np.uint32(0), 
                 symbols: List[int] = []):
        self.lowerBound = lowerBound
        self.upperBound = upperBound
        self.probability = probability
        self.symbols = symbols

    def print(self):   
        print(f"Upper Bound: {self.upperBound}")
        print(f"Lower Bound: {self.lowerBound}")
        print(f"Probability: {self.probability}")
        print(f"Symbols: {self.symbols}")

class Interval:
    """
    Represents the current encoding or decoding interval for the source.

    Attributes
    ----------
    lowerBound : np.uint32
        Lower bound of the interval.
    upperBound : np.uint32
        Upper bound of the interval.
    """
    def __init__(self, lowerBound: np.uint32 = np.uint32(0), 
                 upperBound: np.uint32 = np.uint32(0)):
        self.lowerBound = lowerBound
        self.upperBound = upperBound

    def print(self):
        print(f"Upper Bound: {self.upperBound}")
        print(f"Lower Bound: {self.lowerBound}")

class CodeCandidateList:
    """
    Maintains a list of code candidates used during encoding or decoding.

    Attributes
    ----------
    list : List[CodeCandidate]
        The list of code candidates.
    k : int
        Alphabet size.
    """
    def __init__(self, k: int):
        self.list = []
        self.k = k

    def print(self):
        print(f"k: {self.k}")
        print(f"List of Candidates:")
        for i, candidate in enumerate(self.list):
            print(f"Candidate {i + 1}:")
            candidate.print(candidate)

In [42]:
def update_code_interval(code_intervals, P):
    code_interval_list = CodeCandidateList(2*code_intervals.k)
    code_candidate = CodeCandidate()
    for cc in code_intervals.list:
        probability = P[cc.symbols[0]]
        if cc.symbols[-1] == 0:
            range_ = cc.upperBound - cc.lowerBound
            l2 = cc.upperBound - (1-cc.probability) * range_
            code_candidate.symbols = cc.symbols.extend([0])
            code_candidate.upperBound = l2
            code_candidate.lowerBound = cc.lowerBound
            code_candidate.probability = probability
            code_interval_list.list.append(code_candidate)

            code_interval_list.symbols = cc.symbols.extend([1])
            code_candidate.upperBound = cc.upperBound
            code_candidate.lowerBound = l2
            code_candidate.probability = 1-probability
            code_interval_list.list.append(code_candidate)
            
        else:
            range_ = cc.upperBound - cc.lowerBound
            u2 = cc.lowerBound + (cc.probability * range_)

            code_candidate.symbols = cc.symbols.extend([0])
            code_candidate.upperBound = u2
            code_candidate.lowerBound = cc.lowerBound
            code_candidate.probability = probability
            code_interval_list.list.append(code_candidate)

            code_interval_list.symbols = cc.symbols.extend([1])
            code_candidate.upperBound = cc.upperBound
            code_candidate.lowerBound = u2
            code_candidate.probability = 1-probability
            code_interval_list.list.append(code_candidate)

    return code_interval_list

def matching_canditade_intervals(source_interval, code_interval_list):
    """Encontra os intervalos de símbolos que coincidem com os da fonte"""
    
    for cc in code_interval_list.list:
        if source_interval.lowerBound >= cc.lowerBound and source_interval.upperBound <= cc.upperBound:
            return True, cc
    return False, None

def find_code_interval_from_candidates(cc_list, searchList):
    """
    Finds the interval corresponding to a given sequence of symbols.
    """
    code_interval = Interval()
    for cc in cc_list.list:
        if cc.symbols == searchList:
            code_interval.lowerBound = cc.lowerBound
            code_interval.upperBound = cc.upperBound
            searchList.clear()     
    return code_interval

def output_and_rescale(source_interval, code_intervals, P_code):

    checkcode = []
    cc = CodeCandidate()
    
    success, cc = matching_canditade_intervals(source_interval, code_intervals)
    
    while success:
        range_ = cc.upperBound - cc.lowerBound
        source_interval.lowerBound = (source_interval.lowerBound  - cc.lowerBound)/ range_
        source_interval.upperBound = (source_interval.upperBound- cc.lowerBound)/ range_
        #source_interval = rescale_source_interval(source_interval, new_low, new_high)
        print("\nIntervalo encontrado")
        print("Intervalo Fonte:")
        source_interval.print()
        print("Intervalo Candidato:") 
        cc.print()
        checkcode.extend(cc.symbols)
        
    
        code_intervals = update_code_interval(code_intervals, P_code)
        
        success, cc = matching_canditade_intervals(source_interval, code_intervals)

    return checkcode, source_interval

def decode(sequence_code, P_source, P_code, len_original_sequence):
    """Decodifica uma sequência usando codificação aritmética"""
    
    code_intervals = cumulative_prob(P_code)
    code_interval_list = CodeCandidateList(len(code_intervals))
    for cc in code_intervals:
        code_interval_list.list.append(CodeCandidate(lowerBound=code_intervals[cc][0], 
                                                     upperBound=code_intervals[cc][1], 
                                                     probability=code_intervals[cc][1]-code_intervals[cc][0], 
                                                     symbols=[cc]))
    len_sequence = len(sequence_code)
    sequence_decoded = []
    source_interval = Interval(0.0, 1.0)
    
    symbol_point_decoder_iterator = 0
    symbol_point_iterator = 0
    current_symbol = []
    while symbol_point_decoder_iterator <= len_sequence:
        #code_intervals_copy = copy.copy(code_interval_list)
        current_symbol.append(sequence_code[symbol_point_decoder_iterator])
        print(f'\nSímbolo Atual {current_symbol}:')
        code_interval = find_code_interval_from_candidates(code_interval_list, current_symbol)
        symbol_point_iterator = symbol_point_decoder_iterator + 1
        
        ##
        print("Intervalo Candidato:")
        code_interval.print()
        print("Intervalo Fonte:")
        source_interval.print()
        ##
        
        search = True
        while search:
            new_border = source_interval.lowerBound + (source_interval.upperBound - source_interval.lowerBound) * P_source[0]
            while code_interval.lowerBound >= new_border or code_interval.upperBound < new_border:
                if code_interval.lowerBound >= new_border and code_interval.upperBound < source_interval.upperBound:
                    sequence_decoded.append(1)
                    source_interval.lowerBound = new_border
                elif code_interval.upperBound < new_border and code_interval.lowerBound >= source_interval.lowerBound:
                    source_interval.upperBound = new_border
                    sequence_decoded.append(0)
                else: # Não é necessário para o código final
                    break
                if len(sequence_decoded) >= len_original_sequence:
                    print(sequence_decoded)
                    return sequence_decoded
                
                new_border = source_interval.lowerBound + (source_interval.upperBound - source_interval.lowerBound) * P_source[0]
                
                #----------------------------------------------------------
                k_interval = copy.deepcopy(source_interval)
                code_interval_list_copy = copy.deepcopy(code_interval_list)
                print("\n-------------------------")
                print("Simulação do Encoder")
                checkcode, k_interval = output_and_rescale(k_interval, code_interval_list_copy, P_code)
                print("-------------------------")
                if k_interval.lowerBound != source_interval.lowerBound or k_interval.upperBound != source_interval.upperBound:
                    symbol_point_decoder_iterator = symbol_point_decoder_iterator + len(checkcode)
                    source_interval = k_interval
                    search = False
                    break
                
            if symbol_point_iterator >= len_sequence:
                code_interval.upperBound = code_interval.lowerBound + (code_interval.upperBound - code_interval.lowerBound) * P_source[0]
            
            else:
                probability = P_code[0]
                symbol = sequence_code[symbol_point_iterator]
                print(f"\nSímbolo Atual {[symbol]}")  
                if symbol == 0:                  
                    range_ = code_interval.upperBound - code_interval.lowerBound
                    newMidBound = code_interval.lowerBound + range_ * probability
                    code_interval.upperBound = newMidBound           
                    print("Intervalo Candidato:")
                    code_interval.print()
                    print("Intervalo Fonte:")
                    source_interval.print()
                    
                else:
                    #probability = P_code[symbol]
                    range_ = code_interval.upperBound - code_interval.lowerBound
                    newMidBound = code_interval.lowerBound + range_ * probability
                    code_interval.lowerBound = newMidBound 
                    print("Intervalo Candidato:")
                    code_interval.print()
                    print("Intervalo Fonte:")
                    source_interval.print()
            
            symbol_point_iterator += 1             
        

    return sequence_decoded

In [41]:
P_code = {0: 0.4, 1: 0.6}
P_source = {0: 0.5, 1: 0.5}
sequence = [1, 0]
out = [1, 0, 1, 1]
decode(out, P_source, P_code, len(sequence))


Símbolo Atual [1]:
Intervalo Candidato:
Upper Bound: 1.0
Lower Bound: 0.4
Intervalo Fonte:
Upper Bound: 1.0
Lower Bound: 0.0

Símbolo Atual [0]
Intervalo Candidato:
Upper Bound: 0.64
Lower Bound: 0.4
Intervalo Fonte:
Upper Bound: 1.0
Lower Bound: 0.0

Símbolo Atual [1]
Intervalo Candidato:
Upper Bound: 0.64
Lower Bound: 0.496
Intervalo Fonte:
Upper Bound: 1.0
Lower Bound: 0.0

Símbolo Atual [1]
Intervalo Candidato:
Upper Bound: 0.64
Lower Bound: 0.5536
Intervalo Fonte:
Upper Bound: 1.0
Lower Bound: 0.0
-------------------------

Simulação do Encoder

Intervalo encontrado
Intervalo Fonte:
Upper Bound: 1.0
Lower Bound: 0.16666666666666663
Intervalo Candidato:
Upper Bound: 1.0
Lower Bound: 0.4
Probability: 0.6
Symbols: [1]
-------------------------

Símbolo Atual [0]:
Intervalo Candidato:
Upper Bound: 0.4
Lower Bound: 0.0
Intervalo Fonte:
Upper Bound: 1.0
Lower Bound: 0.16666666666666663

Símbolo Atual [1]
Intervalo Candidato:
Upper Bound: 0.4
Lower Bound: 0.16000000000000003
Intervalo F

[1, 0]

In [None]:
0 + (1-0)*0.5

0.5

In [None]:
def real_para_binario(numero, precisao=10):
    # Separar parte inteira e parte fracionária
    parte_inteira = int(numero)
    parte_fracionaria = numero - parte_inteira

    # Converter parte inteira
    bin_inteira = bin(parte_inteira)[2:]  # Remove prefixo '0b'

    # Converter parte fracionária
    bin_fracionaria = ""
    while precisao > 0 and parte_fracionaria != 0:
        parte_fracionaria *= 2
        bit = int(parte_fracionaria)
        bin_fracionaria += str(bit)
        parte_fracionaria -= bit
        precisao -= 1

    # Montar resultado
    if bin_fracionaria:
        return f"{bin_inteira}.{bin_fracionaria}"
    else:
        return bin_inteira


In [None]:
p0 = 0.4
f0 = 0
Tx = f0 + (1/2)*p0
print(Tx)
trunc = np.ceil(np.log2(1/p0)+1)
print(trunc)
binario = real_para_binario(Tx, precisao=10)
print(f"{Tx} em binário é: {binario}")

0.2
3.0
0.2 em binário é: 0.0011001100


In [None]:
p = 0.6
f = 0.4
Tx = f + (1/2)*p
print(Tx)
trunc = np.ceil(np.log2(1/p)+1)
print(trunc)
binario = real_para_binario(Tx, precisao=10)
print(f"{Tx} em binário é: {binario}")

0.7
2.0
0.7 em binário é: 0.1011001100
