In [1]:
import os
import wave
from scipy.io import wavfile
import numpy as np
import struct
from functools import partial

In [2]:
def read_wav_header(input_file):
    """ Reads and extracts WAV file header information. """
    header_info = input_file.read(44)
    header_fields = {
        "Chunk Size": struct.unpack('<L', header_info[4:8])[0],
        "Format": header_info[8:12],
        "Subchunk Size": struct.unpack('<L', header_info[16:20])[0],
        "Audio Format": struct.unpack('<H', header_info[20:22])[0],
        "Number of Channels": struct.unpack('<H', header_info[22:24])[0],
        "Sample Rate": struct.unpack('<L', header_info[24:28])[0],
        "Byte Rate": struct.unpack('<L', header_info[28:32])[0],
        "Block Align": struct.unpack('<H', header_info[32:34])[0],
        "Bits per sample": struct.unpack('<H', header_info[34:36])[0],
        "File Size": struct.unpack('<L', header_info[40:44])[0]
    }
    for key, value in header_fields.items():
        print(f"{key}: {value}")
    return header_info

In [3]:
def encode_rice(sample, k):
    """ Applies Rice encoding to a single sample. """
    M = 2**k
    sign_bit = '1' if sample < 0 else '0'
    s = abs(sample)
    R = bin(s & (M - 1))[2:].zfill(k)  # Extract remainder bits
    Q = '1' * (s >> k) + '0'  # Unary quotient encoding
    return sign_bit + Q + R

In [4]:
def rice_encoding_compression(filename, k):
    """ Compresses WAV file samples using Rice encoding. """
    try:
        input_file = open(filename, 'rb')
        encoded_file = open(f"{filename.split('.')[0]}_Enc.ex2", "w")
        header_info = read_wav_header(input_file)
        
        compressed_bytes = 0
        uncompressed_bytes = 0
        M = 2**k
        
        for samplebyte in iter(partial(input_file.read, 2), b''):
            sample = struct.unpack('<h', samplebyte)[0]
            
            if -128 < sample < 127:
                encoded_code = encode_rice(sample, k)
                hex_obj = hex(int(encoded_code, 2))[2:]
                encoded_file.write(f"/{hex_obj}")
                compressed_bytes += 1
            else:
                encoded_file.write(f"\\{samplebyte.hex()}")
                uncompressed_bytes += 1

        print(f"Compressed bytes: {compressed_bytes}")
        print(f"Uncompressed bytes: {uncompressed_bytes}")
        
    except Exception as e:
        print("An error occurred:", e)
    finally:
        input_file.close()
        encoded_file.close()

In [5]:
def write_wav_header_info(decoded_file):
    """ Sets header parameters for decoded WAV file. """
    decoded_file.setnchannels(1)
    decoded_file.setsampwidth(2)
    decoded_file.setframerate(44100)

In [6]:
def convert_hex_to_decoded_data(hex_string, is_compressed, M):
    """ Converts hex-encoded data back into WAV format. """
    if is_compressed:
        integer_value = int(hex_string, 16)
        encoded_code = bin(integer_value)[2:]
        
        sign_bit = encoded_code[0] if len(encoded_code) > 1 else '0'
        encoded_code = encoded_code[1:] if len(encoded_code) > 1 else encoded_code
        
        Q = encoded_code.count('1')
        encoded_code = encoded_code[Q:]
        R = int(encoded_code[-4:], 2) if encoded_code else 0
        s = Q * M + R
        
        if sign_bit == '1':
            s = -s
        return struct.pack('<h', s)
    else:
        return bytearray.fromhex(hex_string)


In [7]:
def rice_compression_decoding(filename, k):
    """ Decompresses a Rice encoded file back into a WAV file. """
    try:
        M = 2**k
        decoded_file = wave.open(f"{filename.split('.')[0]}Dec.wav", "wb")
        write_wav_header_info(decoded_file)
        
        with open(filename, 'r') as input_file:
            hex_string = ""
            following_data_is_compressed = False
            for c in iter(partial(input_file.read, 1), ''):
                if c == "/":
                    if hex_string:
                        data = convert_hex_to_decoded_data(hex_string, following_data_is_compressed, M)
                        decoded_file.writeframes(data)
                    hex_string = ""
                    following_data_is_compressed = True
                elif c == "\\":
                    if hex_string:
                        data = convert_hex_to_decoded_data(hex_string, following_data_is_compressed, M)
                        decoded_file.writeframes(data)
                    hex_string = ""
                    following_data_is_compressed = False
                else:
                    hex_string += c
        print("Decompression complete.")
    except Exception as e:
        print("An error occurred during decompression:", e)
    finally:
        decoded_file.close()

In [8]:
def analyze_compression(original, encoded, decoded):
    """ Analyzes compression efficiency. """
    print("Compression Analysis:")
    print(f"Original file size: {original} bytes")
    print(f"Encoded file size: {encoded} bytes")
    print(f"Decoded file size: {decoded} bytes")
    print(f"Compression: {round((original - encoded) / original * 100, 2)}%")

# Sound 1

In [9]:
rice_encoding_compression('Sound1.wav', 4)

Chunk Size: 1002080
Format: b'WAVE'
Subchunk Size: 16
Audio Format: 1
Number of Channels: 1
Sample Rate: 44100
Byte Rate: 88200
Block Align: 2
Bits per sample: 16
File Size: 1002044
Compressed bytes: 275532
Uncompressed bytes: 225490


In [10]:
rice_compression_decoding('Sound1_Enc.ex2', 4)

Decompression complete.


In [11]:
original_size = os.path.getsize("Sound1.wav")
encoded_file_size = os.path.getsize("Sound1_Enc.ex2")
decoded_file_size = os.path.getsize("Sound1_EncDec.wav")

print("Where k = 4:")
print(f"original file size: {original_size}")
print(f"encoded file size: {encoded_file_size}")
print(f"decoded file size: {decoded_file_size}")
print(f"compression: {round((original_size - encoded_file_size)/original_size * 100, 2)}%")

Where k = 4:
original file size: 1002088
encoded file size: 1875237
decoded file size: 1002086
compression: -87.13%


In [12]:
rice_encoding_compression('Sound1.wav', 2)

Chunk Size: 1002080
Format: b'WAVE'
Subchunk Size: 16
Audio Format: 1
Number of Channels: 1
Sample Rate: 44100
Byte Rate: 88200
Block Align: 2
Bits per sample: 16
File Size: 1002044
Compressed bytes: 275532
Uncompressed bytes: 225490


In [13]:
rice_compression_decoding('Sound1_Enc.ex2', 2)

Decompression complete.


In [14]:
original_size = os.path.getsize("Sound1.wav")
encoded_file_size = os.path.getsize("Sound1_Enc.ex2")
decoded_file_size = os.path.getsize("Sound1_EncDec.wav")

print("Where k = 2:")
print(f"original file size: {original_size}")
print(f"encoded file size: {encoded_file_size}")
print(f"decoded file size: {decoded_file_size}")
print(f"compression: {round((original_size - encoded_file_size)/original_size * 100, 2)}%")

Where k = 2:
original file size: 1002088
encoded file size: 1994540
decoded file size: 1002086
compression: -99.04%


# Sound 2

In [15]:
rice_encoding_compression('Sound2.wav', 4)

Chunk Size: 1008036
Format: b'WAVE'
Subchunk Size: 16
Audio Format: 1
Number of Channels: 1
Sample Rate: 44100
Byte Rate: 88200
Block Align: 2
Bits per sample: 16
File Size: 1008000
Compressed bytes: 13992
Uncompressed bytes: 490008


In [16]:
rice_compression_decoding('Sound2_Enc.ex2', 4)

Decompression complete.


In [17]:
original_size = os.path.getsize("Sound2.wav")
encoded_file_size = os.path.getsize("Sound2_Enc.ex2")
decoded_file_size = os.path.getsize("Sound2_EncDec.wav")

print("Where k = 4:")
print(f"original file size: {original_size}")
print(f"encoded file size: {encoded_file_size}")
print(f"decoded file size: {decoded_file_size}")
print(f"compression: {round((original_size - encoded_file_size)/original_size * 100, 2)}%")

Where k = 4:
original file size: 1008044
encoded file size: 2499947
decoded file size: 1008042
compression: -148.0%


In [18]:
rice_encoding_compression('Sound2.wav', 2)

Chunk Size: 1008036
Format: b'WAVE'
Subchunk Size: 16
Audio Format: 1
Number of Channels: 1
Sample Rate: 44100
Byte Rate: 88200
Block Align: 2
Bits per sample: 16
File Size: 1008000
Compressed bytes: 13992
Uncompressed bytes: 490008


In [19]:
rice_compression_decoding('Sound2_Enc.ex2', 2)

Decompression complete.


In [20]:
original_size = os.path.getsize("Sound2.wav")
encoded_file_size = os.path.getsize("Sound2_Enc.ex2")
decoded_file_size = os.path.getsize("Sound2_EncDec.wav")

print("Where k = 2:")
print(f"original file size: {original_size}")
print(f"encoded file size: {encoded_file_size}")
print(f"decoded file size: {decoded_file_size}")
print(f"compression: {round((original_size - encoded_file_size)/original_size * 100, 2)}%")

Where k = 2:
original file size: 1008044
encoded file size: 2535876
decoded file size: 1008042
compression: -151.56%


|  | Original size | Rice (K = 4 bits) | Rice (K = 2 bits) | % Compression (K = 4 bits) | % Compression (K = 2 bits) |
| --- | --- | --- | --- | --- | --- |
| Sound1.wav | 1002088 | 1875237 | 1994540 | -87.13% | -99.04% |
| Sound2.wav | 1008044 | 2499947 | 2535876 | -148.0% | -151.56% |


# Further Development

In [21]:
from bitarray import bitarray
from tabulate import tabulate

In [22]:
def adaptive_k_selection(samples):
    """ Determines optimal k based on signal variance, with a cap at k = 4. """
    std_dev = np.std(samples)
    return 2 if std_dev < 32 else 4  # Maximum k capped at 4


In [23]:
def encode_golomb(sample, k):
    """ Applies Golomb encoding to a single sample. """
    M = 2**k
    sign_bit = '1' if sample < 0 else '0'
    s = abs(sample)
    R = bin(s & (M - 1))[2:].zfill(k)  # Extract remainder bits
    Q = s >> k  # Quotient calculation
    
    # Golomb encoding: quotient is now stored in binary instead of unary
    Q_bin = bin(Q + 1)[2:]  # +1 ensures nonzero encoding
    encoded_code = sign_bit + Q_bin + R
    return bitarray(encoded_code)  # Store as bitarray for efficient packing


In [24]:
def apply_delta_encoding(samples):
    """ Converts raw sample values into delta values. """
    delta_samples = [samples[0]]  # First sample remains the same
    for i in range(1, len(samples)):
        delta_samples.append(samples[i] - samples[i-1])
    return delta_samples


In [25]:
def read_wav_header(input_file):
    """ Reads and extracts WAV file header information. """
    header_info = input_file.read(44)
    header_fields = {
        "Chunk Size": struct.unpack('<L', header_info[4:8])[0],
        "Format": header_info[8:12],
        "Subchunk Size": struct.unpack('<L', header_info[16:20])[0],
        "Audio Format": struct.unpack('<H', header_info[20:22])[0],
        "Number of Channels": struct.unpack('<H', header_info[22:24])[0],
        "Sample Rate": struct.unpack('<L', header_info[24:28])[0],
        "Byte Rate": struct.unpack('<L', header_info[28:32])[0],
        "Block Align": struct.unpack('<H', header_info[32:34])[0],
        "Bits per sample": struct.unpack('<H', header_info[34:36])[0],
        "File Size": struct.unpack('<L', header_info[40:44])[0]
    }
    for key, value in header_fields.items():
        print(f"{key}: {value}")
    return header_info


In [26]:
def rice_encoding_compression(filename):
    """ Compresses WAV file samples using Delta + Golomb encoding with bit-packing. """
    try:
        input_file = open(filename, 'rb')
        encoded_file = open(f"{filename.split('.')[0]}_DeltaGolombPacked.bin", "wb")
        header_info = read_wav_header(input_file)
        
        samples = []
        for samplebyte in iter(partial(input_file.read, 2), b''):
            sample = struct.unpack('<h', samplebyte)[0]
            samples.append(sample)
        
        delta_samples = apply_delta_encoding(samples)  # Apply delta encoding
        k = adaptive_k_selection(delta_samples)  # Dynamically select k, capped at 4
        print(f"Using adaptive k = {k} (Delta + Golomb encoding + Bit-Packing)")
        
        compressed_bits = bitarray()
        compressed_bytes = 0
        uncompressed_bytes = 0
        M = 2**k
        
        for sample in delta_samples:
            if -128 < sample < 127:
                encoded_bits = encode_golomb(sample, k)
                compressed_bits.extend(encoded_bits)
                compressed_bytes += len(encoded_bits) // 8
            else:
                uncompressed_bytes += 2  # Each uncompressed sample is 2 bytes
                compressed_bits.frombytes(struct.pack('<h', sample))

        compressed_bits.tofile(encoded_file)
        print(f"Compressed bytes: {compressed_bytes}")
        print(f"Uncompressed bytes: {uncompressed_bytes}")
        
    except Exception as e:
        print("An error occurred:", e)
    finally:
        input_file.close()
        encoded_file.close()


In [27]:
def analyze_compression(original, encoded):
    """ Analyzes compression efficiency and prints a table. """
    data = [
        ["Original", original, "0%"],
        ["Encoded (Delta + Golomb + Bit-Packing, max 4)", encoded, f"{round((original - encoded) / original * 100, 2)}%"]
    ]
    
    print("\nCompression Analysis:")
    print(tabulate(data, headers=["File Type", "File Size (bytes)", "Compression (%)"], tablefmt="grid"))


In [28]:
original_size = os.path.getsize("Sound1.wav")
rice_encoding_compression('Sound1.wav')
delta_golomb_packed_size = os.path.getsize("Sound1_DeltaGolombPacked.bin")
analyze_compression(original_size, delta_golomb_packed_size)


Chunk Size: 1002080
Format: b'WAVE'
Subchunk Size: 16
Audio Format: 1
Number of Channels: 1
Sample Rate: 44100
Byte Rate: 88200
Block Align: 2
Bits per sample: 16
File Size: 1002044
Using adaptive k = 4 (Delta + Golomb encoding + Bit-Packing)
Compressed bytes: 83445
Uncompressed bytes: 172374

Compression Analysis:
+-----------------------------------------------+---------------------+-------------------+
| File Type                                     |   File Size (bytes) | Compression (%)   |
| Original                                      |             1002088 | 0%                |
+-----------------------------------------------+---------------------+-------------------+
| Encoded (Delta + Golomb + Bit-Packing, max 4) |              514443 | 48.66%            |
+-----------------------------------------------+---------------------+-------------------+
