In [1]:
import os
import csv
import numpy as np
from scipy.signal import butter, filtfilt
from scipy.fftpack import fft
import pywt
import pandas as pd
from datetime import datetime

In [3]:
def load_metadata(txt_path):
    with open(txt_path, 'r') as f:
        metadata = f.read().strip()  # 读取所有文本内容，去掉前后空白字符
    return metadata

In [5]:
def butter_bandpass(lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs  # 奈奎斯特频率
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')  # 带通滤波器系数
    return b, a

In [7]:
def clean_filter_data(data, fs):
    data = np.nan_to_num(data)  # 替换NaN和Inf为0或有限数值，避免后续运算异常
    b, a = butter_bandpass(0.5, 40, fs)
    filtered = filtfilt(b, a, data)  # 双向滤波，避免相位失真
    mean = np.mean(filtered)
    std = np.std(filtered)
    mask = (filtered > mean - 3*std) & (filtered < mean + 3*std)  # 只保留均值±3倍标准差的数据
    cleaned = filtered[mask]
    return cleaned

In [9]:
def extract_features(data, fs):
    features = {}
    # 时域
    features['mean'] = np.mean(data)
    features['std'] = np.std(data)
    features['rms'] = np.sqrt(np.mean(data**2))

    # 频域（FFT频谱幅值）
    freq_spectrum = np.abs(fft(data))
    features['freq_mean'] = np.mean(freq_spectrum)
    features['freq_max'] = np.max(freq_spectrum)

    # 时频域（小波分解能量）
    coeffs = pywt.wavedec(data, 'db4', level=4)
    for i, coef in enumerate(coeffs):
        features[f'wavelet_energy_level_{i}'] = np.sum(coef**2)

    return features

In [11]:
def process_folder(folder_path, fs=1000):
    txt_files = [f for f in os.listdir(folder_path) if f.endswith('.txt')]
    metadata = None
    if txt_files:
        metadata = load_metadata(os.path.join(folder_path, txt_files[0]))

    npy_files = [f for f in os.listdir(folder_path) if f.endswith('.npy')]
    all_features = []
    for npy_file in npy_files:
        data = np.load(os.path.join(folder_path, npy_file))
        cleaned_data = clean_filter_data(data, fs)
        features = extract_features(cleaned_data, fs)
        features['file'] = npy_file
        all_features.append(features)

    return metadata, all_features


In [13]:
def save_features_to_csv(metadata, features_list, csv_path):
    if not features_list:
        print("没有特征数据可写入。")
        return

    keys = list(features_list[0].keys())
    keys = ['file', 'metadata'] + [k for k in keys if k not in ['file']]

    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=keys)
        writer.writeheader()

        for feat in features_list:
            row = {'file': feat.get('file', ''),
                   'metadata': metadata if metadata else ''}
            for k in keys:
                if k not in ['file', 'metadata']:
                    row[k] = feat.get(k, '')
            writer.writerow(row)
    print(f"特征已保存到 {csv_path}")

In [15]:
folder = 'C:/Users/yil/Desktop/graduation-project/SKF-6204-outer fault'
output_csv = '输出特征.csv'

In [17]:
metadata, features_list = process_folder(folder)

In [19]:
save_features_to_csv(metadata, features_list, output_csv)

特征已保存到 输出特征.csv
