## Transmission Competition

### Imports

In [None]:
from typing import Optional
import numpy as np
import os
import matplotlib.pyplot as plt
from transmission_competition.HuffmanCoder import HuffmanCoder
from transmission_competition.LempelZivCoder import LempelZivCoder
from transmission_competition.HammingCoder74 import HammingCoder74
from transmission_competition.EntropyCalculator import EntropyCalculator
from transmission_competition.PSKModulator import PSKModulator
from transmission_competition.CSSModulator import CSSModulator
from transmission_competition.AudioInterface import AudioInterface

### Input Definition
Here we can change our input string.


In [None]:
file_path = "text.txt"
file_handle = open(file_path, "r")
input_text = file_handle.read()
file_handle.close()

print(f"Input text: {input_text}")

### Entropy Calculations
Here we can calculate a couple of interesting values.

In [None]:
entropy_calculator = EntropyCalculator(input_text)
print(f"Value entropy H: {entropy_calculator.H}")
print(f"Value max entropy H0: {entropy_calculator.H0}")
print(f"Value absolute redundancy R: {entropy_calculator.R}")
print(f"Value relative redundancy r: {entropy_calculator.r}")

### Soruce Coding
We implemented Huffman and Lempel Ziv source coding.
Both of them have a encode and a decode method. We convert from a string to an np.ndarray and vice versa.

In [None]:
huffman_coder = HuffmanCoder()
lempel_ziv_coder = LempelZivCoder()

# Huffman Coding
huffman_encoded = huffman_coder.encode(input_text)
huffman_decoded = huffman_coder.decode(huffman_encoded)

print("***** HUFFMAN CODING **********************************************")
print(f"Original Text: {input_text!r}")
print(f"Original Text Length: {len(input_text)} characters")
print(f"Encoded (Huffman): {huffman_encoded}")
print(f"Encoded Length: {len(huffman_encoded)} bits")
print(f"Decoded (Huffman): {huffman_decoded!r}")
print("Is Decoded Equal to Original?:", huffman_decoded == input_text)
print("*******************************************************************\n")

# Lempel-Ziv Coding
lempelziv_encoded = lempel_ziv_coder.encode(input_text)
lempelziv_decoded = lempel_ziv_coder.decode(lempelziv_encoded)

print("***** LEMPEL-ZIV CODING *******************************************")
print(f"Original Text: {input_text!r}")
print(f"Original Text Length: {len(input_text)} characters")
print(f"Encoded (Lempel-Ziv): {lempelziv_encoded}")
print(f"Encoded Length: {len(lempelziv_encoded)} bits")
print(f"Decoded (Lempel-Ziv): {lempelziv_decoded!r}")
print("Is Decoded Equal to Original?:", lempelziv_decoded == input_text)
print("*******************************************************************")

### Channel Coding
We implemented (7,4) Hamming channel coding.
The input as well as the output is obviously a np.ndarray. The channel coded message is expected to be 7/4 times larger than the source coded version.

In [None]:
hamming_coder_74 = HammingCoder74()

test_sequence = np.array([0, 1, 0, 0, 1, 0, 1, 1])

# Hamming Coding
hamming_encoded = hamming_coder_74.encode(test_sequence)

# Simulate channel errors
per_bit_error_rate = 0.00  # Example error rate; adjust as needed
transmitted, error_desc = hamming_coder_74.channel_simulator(hamming_encoded, per_bit_error_rate)

# Decode the transmitted (possibly erroneous) code
hamming_decoded = hamming_coder_74.decode(transmitted)

print("***** HAMMING CODING **********************************************")
print(f"Original Sequence: {test_sequence}")
print(f"Encoded (Hamming): {hamming_encoded}")
print(f"Transmitted (after channel simulation): {transmitted}")
print(f"Error Description: {error_desc}")
print(f"Decoded (Hamming): {hamming_decoded}")
print("Is Decoded Equal to Original?:", np.array_equal(hamming_decoded, test_sequence))
print("*******************************************************************\n")


### Modulation

We have implemented two types of modulation:

- PSK
- Chirp Modulation

All of them take a numpy array (`np.ndarray`) as input and output a numpy array with float values representing the signal.

In [None]:
psk_modulator = PSKModulator()
css_modulator = CSSModulator()

# PSK (QPSK) requires EVEN number of bits (2 bits per symbol)
# CSS can handle any number of bits (1 bit per symbol)
test_sequence = np.array([0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1])

# Noise parameters
SNR_DB = 10.0  # Signal-to-Noise Ratio in dB

# PSK Modulation
psk_modulated = psk_modulator.PSK_modulate(test_sequence)
psk_noisy = psk_modulator.add_awgn_noise(psk_modulated, SNR_DB)
psk_demodulated = psk_modulator.PSK_demodulate(psk_noisy)

print("***** PSK MODULATION **********************************************")
print(f"Original Sequence: {test_sequence}")
print(f"Modulated Signal Shape: {psk_modulated.shape}")
print(f"SNR: {SNR_DB} dB")
print(f"Modulated Signal (first 20 samples): {psk_modulated[:20]}")
print(f"Noisy Signal (first 20 samples): {psk_noisy[:20]}")
print(f"Demodulated Sequence: {psk_demodulated}")
print("Is Demodulated Equal to Original?:", np.array_equal(psk_demodulated, test_sequence))
print("*******************************************************************\n")

# CSS (Chirp Spread Spectrum) Modulation
css_modulated = css_modulator.CSS_modulate(test_sequence)
css_noisy = css_modulator.add_awgn_noise(css_modulated, SNR_DB)
css_demodulated = css_modulator.CSS_demodulate(css_noisy)

print("***** CSS (CHIRP) MODULATION **************************************")
print(f"Original Sequence: {test_sequence}")
print(f"Modulated Signal Shape: {css_modulated.shape}")
print(f"SNR: {SNR_DB} dB")
print(f"Modulated Signal (first 20 samples): {css_modulated[:20]}")
print(f"Noisy Signal (first 20 samples): {css_noisy[:20]}")
print(f"Demodulated Sequence: {css_demodulated}")
print("Is Demodulated Equal to Original?:", np.array_equal(css_demodulated, test_sequence))
print("*******************************************************************\n")


In [None]:
# Visualization of Modulated Signals
N = 1000  # Number of first samples to plot

fig, axes = plt.subplots(3, 3, figsize=(18, 12))

# Row 1: Original Signal (centered in middle column)
axes[0, 0].axis('off')  # Turn off left subplot
axes[0, 1].stem(test_sequence, basefmt=' ', linefmt='blue', markerfmt='bo')
axes[0, 1].set_title('Original Bit Sequence', fontsize=14, fontweight='bold')
axes[0, 1].set_xlabel('Bit Index', fontsize=12)
axes[0, 1].set_ylabel('Bit Value', fontsize=12)
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].set_ylim(-0.5, 1.5)
axes[0, 2].axis('off')  # Turn off right subplot

# Row 2: PSK Modulation Chain
axes[1, 0].plot(psk_modulated[:N], linewidth=1.5, color='blue', label='PSK Modulated')
axes[1, 0].set_title('PSK Modulated Signal', fontsize=14, fontweight='bold')
axes[1, 0].set_xlabel('Sample Index', fontsize=12)
axes[1, 0].set_ylabel('Amplitude', fontsize=12)
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].set_xlim(0, N)
axes[1, 0].legend()

axes[1, 1].plot(psk_noisy[:N], linewidth=1.5, color='darkblue', alpha=0.7, label=f'PSK Noisy (SNR={SNR_DB} dB)')
axes[1, 1].set_title('PSK Noisy Signal', fontsize=14, fontweight='bold')
axes[1, 1].set_xlabel('Sample Index', fontsize=12)
axes[1, 1].set_ylabel('Amplitude', fontsize=12)
axes[1, 1].grid(True, alpha=0.3)
axes[1, 1].set_xlim(0, N)
axes[1, 1].legend()

axes[1, 2].stem(psk_demodulated, basefmt=' ', linefmt='darkblue', markerfmt='o')
axes[1, 2].set_title('PSK Demodulated Bits', fontsize=14, fontweight='bold')
axes[1, 2].set_xlabel('Bit Index', fontsize=12)
axes[1, 2].set_ylabel('Bit Value', fontsize=12)
axes[1, 2].grid(True, alpha=0.3)
axes[1, 2].set_ylim(-0.5, 1.5)

# Row 3: CSS Modulation Chain
axes[2, 0].plot(css_modulated[:N], linewidth=1.5, color='green', label='CSS Modulated')
axes[2, 0].set_title('CSS (Chirp) Modulated Signal', fontsize=14, fontweight='bold')
axes[2, 0].set_xlabel('Sample Index', fontsize=12)
axes[2, 0].set_ylabel('Amplitude', fontsize=12)
axes[2, 0].grid(True, alpha=0.3)
axes[2, 0].set_xlim(0, N)
axes[2, 0].legend()

axes[2, 1].plot(css_noisy[:N], linewidth=1.5, color='darkgreen', alpha=0.7, label=f'CSS Noisy (SNR={SNR_DB} dB)')
axes[2, 1].set_title('CSS Noisy Signal', fontsize=14, fontweight='bold')
axes[2, 1].set_xlabel('Sample Index', fontsize=12)
axes[2, 1].set_ylabel('Amplitude', fontsize=12)
axes[2, 1].grid(True, alpha=0.3)
axes[2, 1].set_xlim(0, N)
axes[2, 1].legend()

axes[2, 2].stem(css_demodulated, basefmt=' ', linefmt='darkgreen', markerfmt='o')
axes[2, 2].set_title('CSS Demodulated Bits', fontsize=14, fontweight='bold')
axes[2, 2].set_xlabel('Bit Index', fontsize=12)
axes[2, 2].set_ylabel('Bit Value', fontsize=12)
axes[2, 2].grid(True, alpha=0.3)
axes[2, 2].set_ylim(-0.5, 1.5)

plt.tight_layout()
plt.show()


### Audio Interface
We implemented an AudioInterface class that converts modulated signals (numpy arrays) to audio files and back.
The class provides methods to:
- Write a numpy array to a WAV file
- Read a WAV file back to a numpy array
- Play audio files or signals directly

In [None]:
audio_interface = AudioInterface()

# Create a simple test signal (numpy array with some values)
# This simulates a modulated signal that we want to save/load via audio
test_signal = np.array([0.5, -0.3, 0.8, -0.1, 0.6, -0.7, 0.2, -0.9, 0.4, -0.5,
                       0.1, -0.2, 0.7, -0.6, 0.3, -0.8, 0.9, -0.4, 0.0, -0.1])

# Use a standard audio sample rate
sample_rate = 48000

print("***** AUDIO INTERFACE TEST ****************************************")
print(f"Test signal: {test_signal}")
print(f"Signal length: {len(test_signal)} samples")
print(f"Sample rate: {sample_rate} Hz")
print(f"Signal duration: {len(test_signal) / sample_rate:.6f} seconds")

# Write to audio file
audio_file_path = "test_audio_signal.wav"
audio_interface.write_audio(test_signal, sample_rate, audio_file_path)
print(f"Audio file written: {audio_file_path}")

# Read back from audio file
read_signal, read_sample_rate = audio_interface.read_audio(audio_file_path)
print(f"Audio file read: {len(read_signal)} samples at {read_sample_rate} Hz")

# Denormalize the read signal to match original amplitude
# (write_audio normalizes to [-1, 1], so we need to scale back)
max_original = np.max(np.abs(test_signal))
read_signal_scaled = read_signal * max_original

print(f"Read signal (scaled): {read_signal_scaled}")

# Check if they match exactly
signals_match = np.array_equal(test_signal, read_signal_scaled)
print(f"Original and read signals match exactly: {'✓' if signals_match else '✗'}")

if not signals_match:
    max_diff = np.max(np.abs(test_signal - read_signal_scaled))
    print(f"Max difference: {max_diff}")
    print("Note: This is expected due to 16-bit quantization in WAV format")
    print("The difference is very small and acceptable for audio applications")

    # Check if they are close within a reasonable tolerance
    tolerance = 1e-4  # 0.01% tolerance
    signals_close = np.allclose(test_signal, read_signal_scaled, atol=tolerance)
    print(f"Are signals close (tolerance {tolerance})? {'✓' if signals_close else '✗'}")

print("*******************************************************************")

# Entire Chain
Here we can observe the entire encoding and decoding chain.

In [None]:
# Complete Encoding/Decoding Chain with Audio Interface
import pandas as pd

SNR_DB_CHAIN = 12.0
USE_AUDIO_INTERFACE = True  # Set to True to save/load via audio file (CSS only)

# Test all 2 combinations: (Huffman/Lempel-Ziv) x (CSS)
# Note: Audio interface works with CSS (48kHz sample rate)
combinations = [
    ("Huffman + CSS", huffman_coder, css_modulator, "CSS"),
    ("Lempel-Ziv + CSS", lempel_ziv_coder, css_modulator, "CSS"),
]

results = []

for combo_name, source_coder, modulator, mod_name in combinations:
    # ENCODING CHAIN
    # 1. Source coding
    source_encoded = source_coder.encode(input_text)
    
    # 2. Channel coding (Hamming)
    channel_encoded = hamming_coder_74.encode(source_encoded)
    
    # 3. Modulation (CSS)
    modulated = modulator.CSS_modulate(channel_encoded)
    
    # 4. Audio Interface (write to file)
    audio_used = False
    if USE_AUDIO_INTERFACE:
        audio_file = f"transmission_{combo_name.replace(' + ', '_').replace('-', '')}.wav"
        sample_rate = int(modulator.fs)
        audio_interface.write_audio(modulated, sample_rate, audio_file)
        audio_used = True
    
    # 6. Add noise
    modulated_with_noise = modulator.add_awgn_noise(modulated, SNR_DB_CHAIN)
    
    # 6. Audio Interface (read from file with noise added before write)
    # For a realistic simulation, we add noise BEFORE writing to audio
    if USE_AUDIO_INTERFACE:
        # Write noisy signal to audio file
        audio_file_noisy = f"transmission_{combo_name.replace(' + ', '_').replace('-', '')}_noisy.wav"
        audio_interface.write_audio(modulated_with_noise, sample_rate, audio_file_noisy)
        
        # Read back from audio file
        received_signal, _ = audio_interface.read_audio(audio_file_noisy)
        
        # Scale back to original amplitude
        max_original = np.max(np.abs(modulated_with_noise))
        received_signal = received_signal * max_original
    else:
        received_signal = modulated_with_noise
    
    # DECODING CHAIN
    # 7. Demodulation (CSS)
    demodulated = modulator.CSS_demodulate(received_signal)
    
    # 8. Channel decoding (Hamming)
    channel_decoded = hamming_coder_74.decode(demodulated)
    
    # 9. Source decoding
    source_decoded = source_coder.decode(channel_decoded)
    
    # Store results
    results.append({
        'Combination': combo_name,
        'Input Text': input_text[:30] + '...' if len(input_text) > 30 else input_text,
        'Source Coding': f'{type(source_coder).__name__}',
        'Source Coded Bits': len(source_encoded),
        'Channel Coded Bits': len(channel_encoded),
        'Modulated Samples': len(modulated),
        'Audio Used': '✓' if audio_used else '✗',
        'Demodulated Bits': len(demodulated),
        'Decoded Text': source_decoded[:30] + '...' if len(source_decoded) > 30 else source_decoded,
        'Match': '✓' if source_decoded == input_text else '✗'
    })

# Create DataFrame
df = pd.DataFrame(results)

# Display with better formatting
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 40)

print("=" * 130)
print("COMPLETE TRANSMISSION CHAIN - RESULTS TABLE")
print(f"SNR: {SNR_DB_CHAIN} dB | Audio Interface: {'Enabled (CSS only)' if USE_AUDIO_INTERFACE else 'Disabled'}")
print("=" * 130)
print(df.to_string(index=False))
print("=" * 130)