In [2]:
from process_wav import read_wav_file, brainwire_quant
from arithmetic import load_probabilities, build_cdf, arithmetic_decode, arithmetic_encode

In [3]:
# Load model
probs = load_probabilities("ngram_prob_python.txt")
cdf = build_cdf(probs)

In [4]:
len(cdf), type(cdf)

(1427664, dict)

In [5]:
# Load and quantize a test file
some_wav = './data/d1768885-813b-42b4-9b90-150a8b47e1db.wav'
samples = read_wav_file(some_wav)
quantized = [brainwire_quant(s) for s in samples]

In [6]:
# Form n-grams
n = 3
symbols = [tuple(quantized[i:i+n]) for i in range(len(quantized) - n + 1)]

In [22]:
# len(symbols), symbols[:10]

In [8]:
# Encode
code = arithmetic_encode(symbols, cdf) # this is a single float

# Decode
decoded_symbols = arithmetic_decode(code, cdf, len(symbols))

In [23]:
# check that the decoded symbols are the same as the original
# symbols[:10], decoded_symbols[:10] # not the same yet

In [13]:
import struct

def write_compressed(code, total_symbols, output_path):
    '''Save the encoded into a file'''
    with open(output_path, "wb") as f:
        # Write the code (64-bit float)
        f.write(struct.pack("d", code))  # 'd' = double = 8 bytes
        # Write the number of symbols (so decoder knows when to stop)
        f.write(struct.pack("I", total_symbols))  # 'I' = unsigned int = 4 bytes

def read_compressed(input_path):
    '''Load the encoded from a file'''
    with open(input_path, "rb") as f:
        code = struct.unpack("d", f.read(8))[0]
        total_symbols = struct.unpack("I", f.read(4))[0]
    return code, total_symbols

write_compressed(code, len(symbols), "compressed.bin")


In [14]:
# compute compression ratio
import os
original_size = os.path.getsize(some_wav)
compressed_size = os.path.getsize("compressed.bin")
print(f"Compression ratio: {original_size / compressed_size:.2f}x")

Compression ratio: 16447.33x


In [17]:
import numpy as np

def fit_ar_model(signal, order=3):
    X = np.column_stack([signal[i:-(order - i)] for i in range(order)])
    y = signal[order:]
    coeffs = np.linalg.lstsq(X, y, rcond=None)[0]
    return coeffs

def compute_residuals(signal, coeffs):
    order = len(coeffs)
    residuals = []
    for i in range(order, len(signal)):
        pred = sum(coeffs[j] * signal[i - order + j] for j in range(order))
        residuals.append(signal[i] - pred)
    return residuals

def quantize_residuals(residuals, scale=16):
    return [int(round(r / scale)) for r in residuals]



### Fit AR model

In [None]:
# Step 2: Fit AR model (e.g., order-3)
ar_order = 3
coeffs = fit_ar_model(quantized, order=ar_order)

# Step 3: Compute residuals
residuals = compute_residuals(quantized, coeffs)

In [28]:
len(quantized), len(residuals), len(coeffs)

(98662, 98659, 3)

In [30]:
quantized[:10], residuals[:10], coeffs

([8, 8, 11, 11, 8, 3, 6, 1, 11, 8],
 [1.0168287436426517,
  -2.348786679118005,
  -5.513288214462911,
  1.5109744104229677,
  -4.715165016297697,
  9.177349857433601,
  -0.890275365502454,
  -7.314322025729446,
  -2.293837268301899,
  -6.959172950986773],
 array([0.11989662, 0.12187181, 0.73172944]))

### Evaluate goodness of fit with AR

In [29]:
def mse(y_true, y_pred):
    return np.mean((np.array(y_true) - np.array(y_pred))**2)

def mae(y_true, y_pred):
    return np.mean(np.abs(np.array(y_true) - np.array(y_pred)))

def r_squared(y_true, y_pred):
    ss_res = np.sum((np.array(y_true) - np.array(y_pred))**2)
    ss_tot = np.sum((np.array(y_true) - np.mean(y_true))**2)
    return 1 - ss_res / ss_tot


preds = []
for i in range(ar_order, len(quantized)):
    pred = sum(coeffs[j] * quantized[i - ar_order + j] for j in range(ar_order))
    preds.append(pred)

true = quantized[ar_order:]
print("MSE:", mse(true, preds))
print("MAE:", mae(true, preds))
print("R^2:", r_squared(true, preds))



MSE: 27.708680093847015
MAE: 4.178020697221381
R^2: 0.8642295288734263


### Fit arithmetic on AR residuals

In [35]:
def process_file(residuals, stats, ngram_size = 3):
    quantized_residuals = quantize_residuals(residuals, scale=16)

    residual_symbols = [tuple(quantized_residuals[i:i+ngram_size])
                        for i in range(len(quantized_residuals) - ngram_size + 1)]
    
    for i in range(len(quantized_residuals) - ngram_size + 1):
        ngram = tuple(quantized_residuals[i:i+ngram_size])
        stats[ngram] += 1

    return residual_symbols

from collections import defaultdict
stats = defaultdict(int)
residual_symbols = process_file(residuals, stats)

total_count = sum(stats.values())
sorted_probs = sorted(((k, v / total_count) for k, v in stats.items()), key=lambda x: -x[1])


In [32]:
prob_file_name = "residuals_ngram_prob.txt"

with open(prob_file_name, "w") as f:
    f.write("Symbol Probabilities:\n")
    for symbol, prob in sorted_probs:
        f.write(f"Symbol: {list(symbol)} | Probability: {prob:.20f}\n")

print(f"\nSum of all probabilities: {sum(prob for _, prob in sorted_probs)}")
print(f"Number of unique symbols: {len(sorted_probs)}")


Sum of all probabilities: 1.0
Number of unique symbols: 30


In [37]:
# Load model
probs = load_probabilities(prob_file_name)
cdf = build_cdf(probs)
len(cdf), type(cdf)

(30, dict)

In [38]:
code = arithmetic_encode(residual_symbols, cdf)
write_compressed(code, len(residual_symbols), "compressed_resid.bin")

In [39]:
# compute compression ratio
original_size = os.path.getsize(some_wav)
compressed_size = os.path.getsize("compressed_resid.bin")
print(f"Compression ratio: {original_size / compressed_size:.2f}x")

Compression ratio: 16447.33x
