# LZ78 Sequential Probability Assignment: Python Implementation
This code is associated with the paper [A Family of LZ78-based Universal Sequential Probability Assignments](https://arxiv.org/abs/2410.06589).

This codebase is in Python, which is more popular than Rust. This Python codebase gives users the option to experiment more comfortably with implementation modifications at the cost of slower runtime (on the order of about 10x).

## Setup

1. (Optional) Set up and activate a virtual environment for this project.
2. Install the `lz78_python` package: `pip install --editable .`. Note that the `--editable` option allows you to implementation modifications to propagate to the package without having to rerun `pip install .`.

You should be all set! This tutorial will walk you through the functionalities that the Python codebase offers parallel to the Rust codebase functionalities.

## Imports

In [None]:
!pip install lorem

In [1]:
from lz78_python.utils.CharacterMap import CharacterMap
from lz78_python.naive.encoder import LZ78_encode
from lz78_python.naive.decoder import LZ78_decode
from lz78_python.streamed.encoder import BlockLZ78Encoder
from lz78_python.spa.encoder import LZ78SPA
import lorem, bitarray
from os import makedirs
import numpy as np

### 1. Sequences

This class does not explicitly exist in this version of the codebase. We can directly use Python lists for integer sequences and Python strings for character sequences. 

#### 1.1 Example: Integer Sequence

We will not go into depth with this example, given that you should be able to recreate the same behaviors through Python list.

However, Python list does not have a direct method to check the number of unique symbols (ie. alphabet size), but you can always do `len(set(lst))` for that functionality.

#### 1.2 `CharacterMap`



In [None]:
# generate some dummy data and make a character map
s = " ".join(([lorem.paragraph() for _ in range(10)]))
charmap = CharacterMap(s)

In [None]:
charmap.encode("lorem ipsum")

In [None]:
charmap.encode("hello world")

In [None]:
charmap.filter_string("hello world. Lorem ipsum! @#$%^&*()")

In [None]:
charmap.decode(charmap.encode("lorem ipsum"))

In [None]:
charmap.alphabet_size()

#### 1.3 Example: Character Sequence

In [None]:
charmap = CharacterMap("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ. ?,")
charseq = " ".join(([lorem.paragraph() for _ in range(1000)]))

In [None]:
charmap.encode(charseq[100:130])

In [None]:
charseq[100:130]
charmap.decode(charmap.encode(charseq[100:130]))

### 2. LZ78 Compression

In [None]:
data = " ".join(([lorem.paragraph() for _ in range(10_000)]))
charmap = CharacterMap(data)

In [None]:
encoded = LZ78_encode(data, custom_char_map=charmap)

In [None]:
encoded.compression_ratio

In [None]:
# reading bitarray from file may include additional 0's because of padding
# so must also track length and apply it when loading the bits
encoded_bitlength = len(encoded.bits)
makedirs("test_data", exist_ok=True)
with open("test_data/saved_encoded_sequence.bin", 'wb') as file:
    encoded.bits.tofile(file)

In [None]:
bits = bitarray.bitarray()
with open("test_data/saved_encoded_sequence.bin", 'rb') as file:
    bits.fromfile(file)
bits = bits[:encoded_bitlength]

In [None]:
decoded = LZ78_decode(
    bits,
    alphabet_size=charmap.alphabet_size(), 
    return_str=True,
    custom_char_map=charmap,
)

In [None]:
assert decoded == data

#### 2.3 Block-Wise Compression

In [None]:
charmap = CharacterMap("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ. ,?")

In [None]:
encoder = BlockLZ78Encoder(
    alphabet_size=charmap.alphabet_size(),
    input_is_string=True,
    custom_char_map=charmap
)

In [None]:
for _ in range(1000):
    encoder.encode_block(lorem.paragraph())

In [None]:
encoder.encode_block([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

In [None]:
print(encoder.get_encoded_sequence())
encoder.compression_ratio()

In [None]:
decoded = LZ78_decode(
    encoder.get_encoded_sequence(),
    alphabet_size=charmap.alphabet_size(), 
    return_str=True,
    custom_char_map=charmap,
)
print(decoded[376:400])
charmap.encode(decoded[376:400])

### 3. LZ78 Sequential Probability Assignment (SPA)

#### 3.1 Example: LZ78 SPA on Markov Data

In [2]:
# Helper methods for generating data; feel free run the cell without
# reading the code
def sample_index_from_dist(probabilities):
    cdf = np.cumsum(probabilities)
    cdf[-1] = 1 # in case of FP error
    return int(np.where(np.random.random() < cdf)[0][0])

def entropy(probs):
    return sum([-x * np.log2(x) for x in probs if x > 0])

def get_stationary_dist(transition_probabilities):
    eigvals, eigvecs = np.linalg.eig(transition_probabilities.T)
    # all eigenvalues will be <= 1, and one will be =1
    stationary_dist = eigvecs[:, np.argmax(eigvals)]
    return stationary_dist / sum(stationary_dist)

def entropy_rate(transition_probabilities):
    stationary_dist = get_stationary_dist(transition_probabilities)
    return sum([prob * entropy(transition_probabilities[i]) 
                for i, prob in enumerate(stationary_dist)])

In [3]:
## You can change these
ALPHABET_SIZE = 2
PEAK_PROB = 0.9
K = 5
N = 1_000_000
N_TEST = 10_000

In [4]:
# Build data array; feel free to ignore this code and just run the cell
transition_probabilities = np.eye(ALPHABET_SIZE) * PEAK_PROB + \
    (np.ones((ALPHABET_SIZE, ALPHABET_SIZE)) - np.eye(ALPHABET_SIZE)) * (1 - PEAK_PROB) / (ALPHABET_SIZE - 1)
start_prob = np.ones(ALPHABET_SIZE) / ALPHABET_SIZE

data = np.zeros(N, dtype=int)
for i in range(K):
    data[i] = sample_index_from_dist(start_prob)
for i in range(K,N):
    data[i] = sample_index_from_dist(transition_probabilities[data[i-K]])

In [5]:
spa = LZ78SPA()
spa.train_on_block(data[:-N_TEST])

In [6]:
spa.compute_scaled_log_loss_using_fixed_spa(
    data[-N_TEST:], 
    len(data[-N_TEST:]),
    include_prev_context=True
) / N_TEST

TypeError: LZ78SPA.compute_scaled_log_loss_using_fixed_spa() missing 1 required positional argument: 'length'