# Defintions, function utilities and constants

In [1]:
import struct #Used in load_cs8, using bb data type to read binaries
import scipy.signal as sig
import numpy as np #Uses fft, and ffftshift
import matplotlib.pyplot as plt
import math
import cmath
import os

def load_cs8(filename):
    "Load binary .cs8 file data in IQ vectors"
    I = []
    Q = []
    
    with open(filename, 'rb') as f:
        data = f.read()
    
   
    for i in range(0, len(data), 2):
        real_part, imag_part = struct.unpack('bb', data[i:i+2])
        I.append(real_part)
        Q.append(imag_part)
        
    return I, Q

## Welch's Method for Power Spectral Density Estimation

Welch's method is a technique for estimating the power spectral density of a signal. It's an improvement over the basic periodogram method as it reduces noise in the PSD estimate by averaging multiple modified periodograms.

---

### 1. Windowing Function

A **window function** is applied to each segment of the signal to reduce spectral leakage. Common window functions include Hamming, Hann, and Rectangular.
The code defines $w[n]$ based on the `window_type` parameter.

* **Hamming Window:**
    $$w[n] = 0.54 - 0.46 \cos\left(\frac{2\pi n}{M-1}\right), \quad 0 \le n \le M-1$$
* **Hann Window:**
    $$w[n] = 0.5 - 0.5 \cos\left(\frac{2\pi n}{M-1}\right), \quad 0 \le n \le M-1$$
* **Rectangular Window:**
    $$w[n] = 1, \quad 0 \le n \le M-1$$

Where $M$ is the `segment_length`.

---

### 2. Normalization Factor ($U$)

The **normalization factor** $U$ accounts for the power of the window function. It ensures that the PSD estimate has the correct units.

$$U = \sum_{n=0}^{M-1} w[n]^2$$

---

### 3. Segmenting the Signal

The input signal is divided into $K$ overlapping segments. The overlap percentage is defined by `overlap`.

* **Number of segments ($K$):**
    $$K = \left\lfloor \frac{N - M}{\text{step}} \right\rfloor + 1$$
    where $N$ is the total length of the signal, $M$ is the `segment_length`, and `step` is given by:
    $$\text{step} = M \times (1 - \text{overlap})$$

* **Individual Segment ($s_k[n]$):**
    Each segment $k$ is extracted and multiplied by the window function:
    $$s_k[n] = x[n + k \cdot \text{step}] \cdot w[n], \quad 0 \le n \le M-1$$
    where $x[n]$ is the original signal.

---

### 4. Discrete Fourier Transform (DFT) of Each Segment

For each windowed segment $s_k[n]$, its **Discrete Fourier Transform (DFT)** is computed. This is done using the Fast Fourier Transform (FFT) algorithm.

$$X_k[f] = \sum_{n=0}^{M-1} s_k[n] \cdot e^{-j 2\pi f n / M}$$

---

### 5. Periodogram of Each Segment ($P_k[f]$)

The **periodogram** for each segment is calculated from the magnitude squared of its DFT, normalized by the sampling frequency ($f_s$) and the window normalization factor ($U$).

$$P_k[f] = \frac{1}{f_s \cdot U} |X_k[f]|^2$$

---

### 6. Averaging the Periodograms

The final Welch PSD estimate is obtained by **averaging the periodograms** of all $K$ segments.

$$P_{\text{welch}}[f] = \frac{1}{K} \sum_{k=0}^{K-1} P_k[f]$$

---

### 7. Frequency Vector ($f$)

The corresponding **frequency values** for the PSD are generated using:

$$f = \text{FFTShift}\left(\text{frequencies for } \frac{1}{M \cdot \Delta t}\right)$$
where $\Delta t = 1/f_s$.

In [2]:
def fft_scratch(x):
    """
    A custom, from-scratch implementation of the Cooley-Tukey FFT algorithm.
    It works for input arrays whose length is a power of 2.
    """
    N = len(x)
    if N <= 1:
        return x
    
    # Check if N is a power of 2
    if (N & (N - 1)) != 0:
        raise ValueError("FFT from scratch only works for signal lengths that are powers of 2.")

    even = fft_scratch(x[0::2])
    odd = fft_scratch(x[1::2])

    T = [0] * (N // 2)
    for k in range(N // 2):
        # Calculate twiddle factor
        twiddle = cmath.exp(-2j * cmath.pi * k / N)
        T[k] = twiddle * odd[k]
    
    X = [0] * N
    for k in range(N // 2):
        X[k] = even[k] + T[k]
        X[k + N // 2] = even[k] - T[k]
    
    return X


def welch_psd_scratch(signal, fs=1.0, segment_length=256, overlap=0.5, window_type='hamming'):
    """
    Calculates the Welch Power Spectral Density (PSD) from scratch,
    using a custom FFT implementation and pure Python for all other parts.
    
    The segment_length must be a power of 2 for the custom FFT to work.
    """
    if (segment_length & (segment_length - 1)) != 0:
        raise ValueError("For this scratch implementation, segment_length must be a power of 2.")

    N = len(signal)
    step = int(segment_length * (1 - overlap))
    
    # Create the window based on the window_type parameter
    window = []
    if window_type == 'hamming':
        for n in range(segment_length):
            window.append(0.54 - 0.46 * math.cos(2 * math.pi * n / (segment_length - 1)))
    elif window_type == 'hanning':
        for n in range(segment_length):
            window.append(0.5 * (1 - math.cos(2 * math.pi * n / (segment_length - 1))))
    elif window_type == 'rectangular':
        window = [1.0] * segment_length
    else:
        raise ValueError("Unsupported window type")

    # Calculate normalization factor U
    U = sum(w ** 2 for w in window)

    # Number of segments
    K = (N - segment_length) // step + 1
    
    # Initialize the PSD accumulator
    P_welch = [0.0] * segment_length

    # Main loop over segments
    for k in range(K):
        start = k * step
        segment = signal[start:start + segment_length]
        
        # Apply window
        windowed_segment = [segment[i] * window[i] for i in range(segment_length)]
        
        # Calculate the FFT of the segment using the custom function
        X_k = fft_scratch(windowed_segment)
        
        # Calculate PSD for the segment
        P_k = [(1 / (fs * U)) * abs(x)**2 for x in X_k]
        
        # Add to the accumulator list
        for i in range(segment_length):
            P_welch[i] += P_k[i]

    # Average over segments
    P_welch = [p / K for p in P_welch]
    
    # Create the frequency array from scratch
    f = [0.0] * segment_length
    for i in range(segment_length):
        if i < segment_length / 2:
            f[i] = i * fs / segment_length
        else:
            f[i] = (i - segment_length) * fs / segment_length

    return f, P_welch

In [3]:
#Path
current_dir = os.getcwd()
db_path = "/home/javastral/DATABASE-ANE/"
exchange_dir = os.path.join(current_dir, 'exchange')


dir_88_108_raw = os.path.join(db_path, os.listdir(db_path)[0])
test_sample = os.path.join(dir_88_108_raw, os.listdir(dir_88_108_raw)[0])
print(f"Exchange-dir: {exchange_dir}")
print(f"Sample-dir: {test_sample}")


#Constants

FS = 20e6
NPERSEG = 4096
OVERLAP = 0.5
WINDOW = 'hamming'
CENT_FREQ = 98e6
BW = 20e6
INF_FREQ = CENT_FREQ - BW
SUP_FREQ = CENT_FREQ + BW

Exchange-dir: /home/javastral/GIT/GCPDS--trabajos-/welch-ane2/PSD-Estimation-final/exchange
Sample-dir: /home/javastral/DATABASE-ANE/DATA-88-108-noUSRP/88-108-88.cs8


### Scratch vs Scipy

In [4]:
"""
I, Q = load_cs8(test_sample)

signal = np.array(I) + 1j*np.array(Q)

f_scratch, Pxx_scratch = welch_psd_scratch(signal, fs=FS, segment_length=NPERSEG, overlap=OVERLAP, window_type=WINDOW)
f_sci, Pxx_sci = sig.welch(signal, fs=FS, nperseg=NPERSEG, noverlap=OVERLAP, window=WINDOW)

f_scratch, f_sci = np.fft.fftshift(f_scratch), np.fft.fftshift(f_sci)
Pxx_scratch, Pxx_sci = np.fft.fftshift(Pxx_scratch), np.fft.fftshift(Pxx_sci) 
"""

'\nI, Q = load_cs8(test_sample)\n\nsignal = np.array(I) + 1j*np.array(Q)\n\nf_scratch, Pxx_scratch = welch_psd_scratch(signal, fs=FS, segment_length=NPERSEG, overlap=OVERLAP, window_type=WINDOW)\nf_sci, Pxx_sci = sig.welch(signal, fs=FS, nperseg=NPERSEG, noverlap=OVERLAP, window=WINDOW)\n\nf_scratch, f_sci = np.fft.fftshift(f_scratch), np.fft.fftshift(f_sci)\nPxx_scratch, Pxx_sci = np.fft.fftshift(Pxx_scratch), np.fft.fftshift(Pxx_sci) \n'

In [5]:
"""
plt.figure(figsize=(20, 10))
plt.semilogy(f_scratch, Pxx_scratch, label='Scratch')
plt.semilogy(f_sci, Pxx_sci, label='Scipy')
plt.title('Welch Scratch vs scipy')
plt.xlabel('F(Hz)')
plt.ylabel('Pxx(dB)')
plt.grid(True)
plt.legend()
#plt.xlim(-1,1)
plt.show()
"""

"\nplt.figure(figsize=(20, 10))\nplt.semilogy(f_scratch, Pxx_scratch, label='Scratch')\nplt.semilogy(f_sci, Pxx_sci, label='Scipy')\nplt.title('Welch Scratch vs scipy')\nplt.xlabel('F(Hz)')\nplt.ylabel('Pxx(dB)')\nplt.grid(True)\nplt.legend()\n#plt.xlim(-1,1)\nplt.show()\n"