In [None]:
# 为了加深对频域的理解，手写一个fft的代码

In [5]:
import numpy as np
import time
from scipy import signal as sp_signal

def corrected_direct_rfft(x, fs=1.0):
    """
    Corrected direct implementation of Real-valued FFT with proper scaling
    """
    N = len(x)
    num_freqs = N // 2 + 1
    X = np.zeros(num_freqs, dtype=complex)
    
    for k in range(num_freqs):
        basis = np.exp(-2j * np.pi * k * np.arange(N) / N)
        X[k] = np.dot(x, basis)
    
    return X

def corrected_welch_psd(x, fs=1.0, nperseg=1024, noverlap=512):
    """
    Corrected Welch PSD using direct DFT computation with proper normalization
    """
    step = nperseg - noverlap
    w = np.hanning(nperseg)
    
    # Proper window normalization
    U = np.mean(w**2)  # Use mean instead of sum for better normalization
    
    acc = None
    K = 0
    
    for i in range(0, len(x)-nperseg+1, step):
        seg = x[i:i+nperseg] * w
        
        # Use direct RFFT
        E = corrected_direct_rfft(seg)
        
        # Proper PSD normalization
        # For real signals, we need to consider:
        # 1. Window effect
        # 2. Frequency resolution (df = fs/nperseg)
        # 3. Single-sided spectrum (multiply by 2 for non-DC, non-Nyquist)
        
        P = np.zeros_like(E)
        df = fs / nperseg  # Frequency resolution
        
        for k in range(len(E)):
            if k == 0 or k == len(E)-1:  # DC and Nyquist components
                scaling = 1.0
            else:  # Other frequencies (multiply by 2 for single-sided)
                scaling = 2.0
            
            # Proper PSD calculation
            P[k] = (np.abs(E[k])**2) * scaling / (fs * U * nperseg)
        
        acc = P if acc is None else acc + P
        K += 1
    
    psd = acc / K
    freqs = np.fft.rfftfreq(nperseg, 1/fs)
    
    return freqs, psd

def analyze_normalization_issues():
    """
    Analyze and demonstrate the normalization differences
    """
    print("=== Normalization Analysis ===")
    
    # Create test signal
    fs = 1000
    t = np.linspace(0, 1, 1000, endpoint=False)
    signal = np.sin(2 * np.pi * 100 * t)  # Pure 100Hz sine wave
    
    # Theoretical PSD value for a sine wave
    # For a sine wave: x(t) = A*sin(2πf₀t), the theoretical PSD at f₀ should be A²/2
    theoretical_psd_100hz = 1.0**2 / 2  # A=1, so PSD = 0.5
    
    print(f"Theoretical PSD at 100Hz: {theoretical_psd_100hz}")
    
    # Test different methods
    methods = []
    
    # 1. SciPy Welch (reference)
    f_sp, psd_sp = sp_signal.welch(signal, fs, nperseg=256, noverlap=128)
    idx_100hz = np.argmin(np.abs(f_sp - 100))
    methods.append(("SciPy Welch", psd_sp[idx_100hz]))
    
    # 2. Corrected direct method
    f_direct, psd_direct = corrected_welch_psd(signal, fs, nperseg=256, noverlap=128)
    idx_100hz_direct = np.argmin(np.abs(f_direct - 100))
    methods.append(("Corrected Direct", psd_direct[idx_100hz_direct]))
    
    # 3. Simple periodogram for comparison
    f_periodogram, psd_periodogram = simple_periodogram(signal, fs)
    idx_100hz_periodogram = np.argmin(np.abs(f_periodogram - 100))
    methods.append(("Simple Periodogram", psd_periodogram[idx_100hz_periodogram]))
    
    print("\n--- PSD at 100Hz Comparison ---")
    for name, value in methods:
        error = abs(value - theoretical_psd_100hz) / theoretical_psd_100hz * 100
        print(f"{name:20s}: {value:.6f} (error: {error:.2f}%)")
    
    return methods

def simple_periodogram(x, fs=1.0):
    """
    Simple periodogram for comparison
    """
    N = len(x)
    X = np.fft.rfft(x)
    
    # Simple periodogram normalization
    psd = (np.abs(X)**2) / (fs * N)
    
    # Convert to single-sided (except DC and Nyquist)
    psd[1:-1] *= 2
    
    freqs = np.fft.rfftfreq(N, 1/fs)
    return freqs, psd

def demonstrate_calculation_steps():
    """
    Show step-by-step calculation to understand the normalization
    """
    print("\n=== Step-by-step PSD Calculation ===")
    
    # Simple example: 4-point sine wave
    fs = 4
    t = np.array([0, 0.25, 0.5, 0.75])  # 4 samples at 4Hz
    signal = np.sin(2 * np.pi * 1 * t)  # 1Hz sine wave
    
    print(f"Signal: {signal}")
    print(f"Sampling frequency: {fs} Hz")
    print(f"Signal length: {len(signal)}")
    
    # Step 1: Direct RFFT
    X = corrected_direct_rfft(signal)
    print(f"\nStep 1 - RFFT result: {X}")
    
    # Step 2: Raw periodogram
    raw_periodogram = (np.abs(X)**2) / len(signal)
    print(f"Step 2 - Raw periodogram: {raw_periodogram}")
    
    # Step 3: Apply scaling for single-sided spectrum
    scaled_psd = raw_periodogram.copy()
    scaled_psd[1:-1] *= 2  # Double non-DC, non-Nyquist components
    print(f"Step 3 - After single-sided scaling: {scaled_psd}")
    
    # Step 4: Normalize by frequency resolution
    df = fs / len(signal)  # Frequency resolution
    final_psd = scaled_psd / (fs)  # Divide by sampling frequency
    print(f"Step 4 - After frequency normalization: {final_psd}")
    
    freqs = np.fft.rfftfreq(len(signal), 1/fs)
    print(f"Frequencies: {freqs}")
    
    return freqs, final_psd

# Run the analysis
if __name__ == "__main__":
    # Analyze normalization issues
    methods = analyze_normalization_issues()
    
    # Demonstrate step-by-step calculation
    demonstrate_calculation_steps()
    
    # Test with your original signal
    print("\n=== Testing with Original Signal ===")
    
    # Recreate your test signal
    fs = 1000
    t = np.linspace(0, 2, 2000, endpoint=False)
    signal_psd = (np.sin(2 * np.pi * 100 * t) + 
                  0.5 * np.sin(2 * np.pi * 150 * t) +
                  0.2 * np.random.normal(0, 1, 2000))
    
    # Compare methods
    f_sp, psd_sp = sp_signal.welch(signal_psd, fs, nperseg=256, noverlap=128)
    f_direct, psd_direct = corrected_welch_psd(signal_psd, fs, nperseg=256, noverlap=128)
    
    # Find 100Hz component
    idx_100hz_sp = np.argmin(np.abs(f_sp - 100))
    idx_100hz_direct = np.argmin(np.abs(f_direct - 100))
    
    print(f"PSD at 100Hz:")
    print(f"  SciPy Welch:    {psd_sp[idx_100hz_sp]:.6f}")
    print(f"  Corrected Direct: {psd_direct[idx_100hz_direct]:.6f}")
    print(f"  Ratio: {psd_direct[idx_100hz_direct]/psd_sp[idx_100hz_sp]:.3f}")
    
    # Also check 150Hz component
    idx_150hz_sp = np.argmin(np.abs(f_sp - 150))
    idx_150hz_direct = np.argmin(np.abs(f_direct - 150))
    
    print(f"\nPSD at 150Hz:")
    print(f"  SciPy Welch:    {psd_sp[idx_150hz_sp]:.6f}")
    print(f"  Corrected Direct: {psd_direct[idx_150hz_direct]:.6f}")
    print(f"  Ratio: {psd_direct[idx_150hz_direct]/psd_sp[idx_150hz_sp]:.3f}")

=== Normalization Analysis ===
Theoretical PSD at 100Hz: 0.5

--- PSD at 100Hz Comparison ---
SciPy Welch         : 0.069271 (error: 86.15%)
Corrected Direct    : 0.069114+0.000000j (error: 86.18%)
Simple Periodogram  : 0.500000 (error: 0.00%)

=== Step-by-step PSD Calculation ===
Signal: [ 0.0000000e+00  1.0000000e+00  1.2246468e-16 -1.0000000e+00]
Sampling frequency: 4 Hz
Signal length: 4

Step 1 - RFFT result: [2.22044605e-16+0.0000000e+00j 1.22464680e-16-2.0000000e+00j
 1.11022302e-16+2.4492936e-16j]
Step 2 - Raw periodogram: [1.23259516e-32 1.00000000e+00 1.80790857e-32]
Step 3 - After single-sided scaling: [1.23259516e-32 2.00000000e+00 1.80790857e-32]
Step 4 - After frequency normalization: [3.08148791e-33 5.00000000e-01 4.51977143e-33]
Frequencies: [0. 1. 2.]

=== Testing with Original Signal ===
PSD at 100Hz:
  SciPy Welch:    0.071514
  Corrected Direct: 0.071349+0.000000j
  Ratio: 0.998+0.000j

PSD at 150Hz:
  SciPy Welch:    0.017317
  Corrected Direct: 0.017277+0.000000j
 