In [1]:
import pandas as pd
import wfdb
import os
import glob
from collections import Counter
import numpy as np
from scipy.interpolate import interp1d
import torch
from torch import tensor
import matplotlib.pyplot as plt
import pywt

In [2]:
def WTfilt_1d(sig):
    """
    对信号进行小波变换滤波
    :param sig: 输入信号，1-d array
    :return: 小波滤波后的信号，1-d array
    """
    coeffs = pywt.wavedec(sig, 'db6', level=9)
    coeffs[-1] = np.zeros(len(coeffs[-1]))
    coeffs[-2] = np.zeros(len(coeffs[-2]))
    coeffs[0] = np.zeros(len(coeffs[0]))
    sig_filt = pywt.waverec(coeffs, 'db6')
    return sig_filt

In [3]:
# def Z_ScoreNormalization(data):
#     mean = np.mean(data, axis=0)
#     std_dev = np.std(data, axis=0)
#     normalized_data = (data - mean) / std_dev
#     return normalized_data

def min_max_normalization(signal, new_min=0, new_max=1):
    signal = np.array(signal)
    min_val = np.min(signal)
    max_val = np.max(signal)

    if max_val == min_val:
        return np.zeros_like(signal)  # 避免除零错误，所有值相同则返回全零

    norm_signal = (signal - min_val) / (max_val - min_val)  # 归一化到 [0,1]
    norm_signal = norm_signal * (new_max - new_min) + new_min  # 缩放到 [new_min, new_max]

    return norm_signal

In [4]:
def Segmentation(sig, size, sample, aux_note):
    
    j=0;
    records_N = []
    records_AF = []

    for i in range(0, len(sample)-1):

        if (aux_note[i]=='(N' or aux_note[i]=='(J'):
            j = sample[i]
            while(sample[i+1]-j >= size):
                data = sig[np.arange(j, j+size)]
                if len(data) != 0:
                    data = min_max_normalization(data)
                    records_N.append(data)              
                j += size

        elif (aux_note[i]=='(AFIB' or aux_note[i]=='(AFL'):
            j = sample[i]
            while(sample[i+1]-j >= size):
                data = sig[np.arange(j, j+size)]
                if len(data) != 0:
                    data = min_max_normalization(data)
                    records_AF.append(data)
                j += size

    return np.array(records_N), np.array(records_AF)

In [5]:
"""
训练集划分
"""
path = './data/'
Test_file = [path + '08215.dat', path + '08219.dat', path + '08378.dat', path + '08405.dat', path + '08434.dat', path + '08455.dat']
files = set(glob.glob('./data/*.dat'))
files = [addr.replace('\\', '/') for addr in files]
print(files)

Train_file = sorted(list(set(files).difference(set(Test_file))))
print(Train_file)
Train_Records_N = []
Train_Records_AF = []
Train_Records_info = []

for f in Train_file:                                       
    signal, fields = wfdb.rdsamp(f[:-4], channels=[1])  
    ann = wfdb.rdann(f[:-4], 'atr')
    
    sig = signal.squeeze(1)
    sample = ann.sample
    aux_note = ann.aux_note
    
    # sig = WTfilt_1d(sig)
    
    data_N, data_AF = Segmentation(sig, 2500, sample, aux_note)
    
    if len(data_N)!=0:
        Train_Records_N.append(data_N)
    if len(data_AF)!=0:
        Train_Records_AF.append(data_AF)
    
    info = {'name': ann.record_name, 'N_num': len(data_N), 'AF_num': len(data_AF)}
    Train_Records_info.append(info)

X_Train_N = np.concatenate(Train_Records_N, axis=0)
X_Train_AF = np.concatenate(Train_Records_AF, axis=0)

indices = np.arange(len(X_Train_N))
sel_indices = np.random.choice(indices, size=len(X_Train_AF), replace=False)
X_Train_N = X_Train_N[sel_indices]
print("N: ", X_Train_N.shape,'\n','AF: ', X_Train_AF.shape)
train_label_N = np.zeros(X_Train_N.shape[0])
train_label_AF = np.ones(X_Train_AF.shape[0])

Data_train = np.concatenate((X_Train_N,  X_Train_AF), axis=0)
Label_train = np.concatenate((train_label_N,  train_label_AF), axis=0)

['./data/04015.dat', './data/06426.dat', './data/07859.dat', './data/05091.dat', './data/07910.dat', './data/08378.dat', './data/04048.dat', './data/07879.dat', './data/08215.dat', './data/04936.dat', './data/04908.dat', './data/07162.dat', './data/05121.dat', './data/04746.dat', './data/08405.dat', './data/05261.dat', './data/04126.dat', './data/06453.dat', './data/08219.dat', './data/04043.dat', './data/08455.dat', './data/08434.dat', './data/06995.dat']
['./data/04015.dat', './data/04043.dat', './data/04048.dat', './data/04126.dat', './data/04746.dat', './data/04908.dat', './data/04936.dat', './data/05091.dat', './data/05121.dat', './data/05261.dat', './data/06426.dat', './data/06453.dat', './data/06995.dat', './data/07162.dat', './data/07859.dat', './data/07879.dat', './data/07910.dat']
N:  (13584, 2500) 
 AF:  (13584, 2500)


In [6]:
np.save('./afdb/train_data.npy', Data_train)
np.save('./afdb/train_label.npy', Label_train)

In [7]:
"""
测试集划分
"""
Test_Records_N = []
Test_Records_AF = []
Test_Records_info = []

for f in list(Test_file):                                        
    signal, fields = wfdb.rdsamp(f[:-4], channels=[1]) 
    ann = wfdb.rdann(f[:-4], 'atr')
    
    sig = signal.squeeze(1)
    sample = ann.sample
    aux_note = ann.aux_note
    
    # sig = WTfilt_1d(sig)
    
    data_N, data_AF = Segmentation(sig, 2500, sample, aux_note)
    
    if len(data_N)!=0:
        Test_Records_N.append(data_N)
    if len(data_AF)!=0:
        Test_Records_AF.append(data_AF)
    
    info = {'name': ann.record_name, 'N_num': len(data_N), 'AF_num': len(data_AF)}
    Test_Records_info.append(info)
    
X_Test_N = np.concatenate(Test_Records_N, axis=0)
X_Test_AF = np.concatenate(Test_Records_AF, axis=0)

indices = np.arange(len(X_Test_N))
sel_indices = np.random.choice(indices, size=len(X_Test_AF), replace=False)
X_Test_N = X_Test_N[sel_indices]
print("N: ", X_Test_N.shape,'\n','AF: ', X_Test_AF.shape)
test_label_N = np.zeros(X_Test_N.shape[0])
test_label_AF = np.ones(X_Test_AF.shape[0])

Data_test = np.concatenate((X_Test_N,  X_Test_AF), axis=0)
Label_test = np.concatenate((test_label_N,  test_label_AF), axis=0)

N:  (2419, 2500) 
 AF:  (2419, 2500)


In [8]:
np.save('./afdb/test_data.npy', Data_test)
np.save('./afdb/test_label.npy', Label_test)

In [9]:
import time
print(time.ctime())

Mon Mar 17 14:09:59 2025
