# 02 - Software Digital Down-Conversion

In [None]:
import sys
sys.path.insert(0, '..')

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import welch
from matplotlib import rcParams
rcParams['figure.figsize'] = (14, 5)
rcParams['figure.dpi'] = 100

from osmium.utils.constants import (
    DDC_OUTPUT_RATE, TARGET_BASEBAND_RATE, ATSC_SYMBOL_RATE,
    ATSC_RRC_ALPHA, ATSC_CHANNEL_BW,
)
from osmium.capture.adc_capture import ADCCapture
from osmium.ddc.software_ddc import SoftwareDDC

## 1. Load Captured IQ Data

In [None]:
CAPTURE_PATH = '../tests/fixtures/capture_ch36.npz'

try:
    iq_samples, metadata = ADCCapture.load_from_file(CAPTURE_PATH)
    input_rate = float(metadata.get('sample_rate', DDC_OUTPUT_RATE))
    print(f'Loaded {len(iq_samples)} samples at {input_rate/1e6:.1f} MSPS')
except FileNotFoundError:
    print(f'{CAPTURE_PATH} not found. Generating synthetic test signal...')
    input_rate = DDC_OUTPUT_RATE
    N = 2**20
    t = np.arange(N) / input_rate
    # 6 MHz wide band-limited noise + pilot
    from scipy.signal import firwin, lfilter
    noise = np.random.randn(N) + 1j * np.random.randn(N)
    bw_filt = firwin(127, 3e6, fs=input_rate)
    iq_samples = lfilter(bw_filt, 1.0, noise).astype(np.complex64)
    iq_samples += 0.1 * np.exp(2j * np.pi * 309440 * t).astype(np.complex64)
    metadata = {'channel': 36, 'sample_rate': input_rate}
    print(f'Generated {len(iq_samples)} synthetic samples at {input_rate/1e6:.1f} MSPS')

## 2. Apply Software Decimation

In [None]:
ddc = SoftwareDDC(input_rate=input_rate, output_rate=TARGET_BASEBAND_RATE)
print(ddc)
print(f'Resampling ratio: {ddc.up} / {ddc.down}')
print(f'Actual output rate: {ddc.actual_output_rate/1e6:.6f} MSPS')
print(f'Target output rate: {TARGET_BASEBAND_RATE/1e6:.6f} MSPS')
print(f'Rate error: {abs(ddc.actual_output_rate - TARGET_BASEBAND_RATE):.2f} Hz')

In [None]:
%%time
baseband = ddc.process(iq_samples)
output_rate = ddc.actual_output_rate

print(f'Input:  {len(iq_samples)} samples at {input_rate/1e6:.1f} MSPS')
print(f'Output: {len(baseband)} samples at {output_rate/1e6:.4f} MSPS')
print(f'Decimation factor: {len(iq_samples)/len(baseband):.2f}x')
print(f'Samples per symbol: {output_rate/ATSC_SYMBOL_RATE:.4f}')

## 3. Spectrum Comparison: Before vs After Decimation

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(16, 5))

# Before decimation
nperseg_in = min(4096, len(iq_samples) // 4)
f_in, psd_in = welch(iq_samples, fs=input_rate, nperseg=nperseg_in,
                     return_onesided=False, scaling='density')
idx = np.argsort(f_in)
axes[0].plot(f_in[idx]/1e6, 10*np.log10(psd_in[idx]+1e-20), linewidth=0.5)
axes[0].set_xlabel('Frequency (MHz)')
axes[0].set_ylabel('PSD (dB/Hz)')
axes[0].set_title(f'Before Decimation ({input_rate/1e6:.1f} MSPS)')
axes[0].grid(True, alpha=0.3)
axes[0].axvspan(-3, 3, alpha=0.1, color='green', label='ATSC channel')
axes[0].legend()

# After decimation
nperseg_out = min(4096, len(baseband) // 4)
f_out, psd_out = welch(baseband, fs=output_rate, nperseg=nperseg_out,
                       return_onesided=False, scaling='density')
idx = np.argsort(f_out)
axes[1].plot(f_out[idx]/1e6, 10*np.log10(psd_out[idx]+1e-20), linewidth=0.5)
axes[1].set_xlabel('Frequency (MHz)')
axes[1].set_ylabel('PSD (dB/Hz)')
axes[1].set_title(f'After Decimation ({output_rate/1e6:.4f} MSPS)')
axes[1].grid(True, alpha=0.3)
axes[1].axvline(x=0.309, color='g', linestyle='--', alpha=0.7, label='Pilot')
axes[1].legend()

plt.tight_layout()
plt.show()

## 4. Verify Anti-Alias Filtering

In [None]:
signal_bw = ATSC_SYMBOL_RATE / 2.0 * (1.0 + ATSC_RRC_ALPHA)  # ~6 MHz
nyquist_out = output_rate / 2.0

print(f'ATSC signal bandwidth: {signal_bw/1e6:.3f} MHz')
print(f'Output Nyquist freq:   {nyquist_out/1e6:.3f} MHz')
print(f'Guard band:            {(nyquist_out - signal_bw)/1e6:.3f} MHz')

in_band = np.abs(f_out[idx]) < signal_bw
out_band = (np.abs(f_out[idx]) > signal_bw) & (np.abs(f_out[idx]) < nyquist_out * 0.95)

in_power = np.mean(psd_out[idx][in_band])
out_power = np.mean(psd_out[idx][out_band]) if np.any(out_band) else 1e-20

suppression = 10 * np.log10(in_power / out_power)
print(f'\nIn-band / out-band ratio: {suppression:.1f} dB')
if suppression > 30:
    print('Anti-alias filtering looks excellent.')
elif suppression > 20:
    print('Anti-alias filtering is adequate.')
else:
    print('Warning: possible aliasing. Check filter design.')

## 5. Save Baseband IQ for Next Stage

In [None]:
BASEBAND_PATH = '../tests/fixtures/baseband_ch36.npz'

ADCCapture.save_to_file(
    baseband, BASEBAND_PATH,
    metadata={
        'channel': metadata.get('channel', 36),
        'sample_rate': output_rate,
        'samples_per_symbol': output_rate / ATSC_SYMBOL_RATE,
    }
)
print(f'Saved {len(baseband)} baseband samples to {BASEBAND_PATH}')