In [1]:
import numpy as np
import pywt
import pandas as pd


In [2]:
def wavelet_snr(srs: pd.Series, wavename: str):
    """
    使用小波分析将信号分解为趋势和冲击，计算冲击和趋势的能量比（信噪比）
    
    Returns:
    float: 冲击强度占趋势强度的比例
    """
    
    srs_vals = srs.values
    
    if np.sum(np.isnan(srs_vals)) > int(len(srs_vals) / 2):
        snr = np.nan
    else:
        srs_vals = srs_vals[~np.isnan(srs_vals)]
        try:
            cA, cD = pywt.dwt(srs_vals, wavename)
            ya = pywt.idwt(cA, None, wavename, 'smooth')  # approximated component
            yd = pywt.idwt(None, cD, wavename, 'smooth')  # detailed component
            snr = np.std(yd) / np.std(ya)
        except Exception:
            snr = np.nan
    return snr

In [5]:
import numpy as np
import pandas as pd
import pywt
import unittest

def wavelet_snr(srs: pd.Series, wavename: str):
    """
    使用小波分析将信号分解为趋势和冲击，计算冲击和趋势的能量比（信噪比）
    
    Returns:
    float: 冲击强度占趋势强度的比例
    """
    
    srs_vals = srs.values
    
    if np.sum(np.isnan(srs_vals)) > int(len(srs_vals) / 2):
        snr = np.nan
    else:
        srs_vals = srs_vals[~np.isnan(srs_vals)]
        try:
            cA, cD = pywt.dwt(srs_vals, wavename)
            ya = pywt.idwt(cA, None, wavename, 'smooth')  # approximated component
            yd = pywt.idwt(None, cD, wavename, 'smooth')  # detailed component
            snr = np.std(yd) / np.std(ya)
        except Exception:
            snr = np.nan
    return snr

In [6]:
# 测试小波名称
wavename = 'bior5.5'
signal = pd.Series(np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
# 计算信噪比
snr = wavelet_snr(signal, wavename)
print(snr)
signal = pd.Series(np.array([np.nan, np.nan, np.nan, 4, 5, 6, np.nan, 8, 9, 10]))
snr = wavelet_snr(signal, wavename)
print(snr)

0.029403140937550473
0.07186973462667981


In [None]:
def wavelet_multi_snr(srs: pd.Series, wavename: str, level_list=[1]):
    """
    使用小波分析将信号分解为趋势和冲击，计算冲击和趋势的能量比（信噪比）
    level_list必须是从小到大排列
    
    Returns:
    float: 冲击强度占趋势强度的比例
    """
    
    srs_vals = srs.values
    original_len = len(srs_vals)
    srs_vals = srs_vals[~np.isnan(srs_vals)]
    nna_len = len(srs_vals)
    
    max_level = 10
    
    if nna_len < int(original_len / 2):
        snr_list = ['8'] * len(level_list)
    else:
        coeffs = wavedec(srs_vals, wavename, level=max_level)
        snr_list = []
        for max_level in level_list:
            try:
                for k in range(max_level):
                    coeffs[max_level - k] = np.zeros_like(coeffs[max_level - k])
                denoised_series = waverec(coeffs, wavename)[:nna_len]
                noise_series = srs_vals - denoised_series
                snr = str(np.std(noise_series) / np.std(denoised_series))
            except Exception:
                snr = '8'
            snr_list.append(snr)
    snr_str = '_'.join(snr_list)
    return snr_str
