# 신경 신호 데이터 전처리 및 특성 추출

이 노트북은 적응형 신경 전기자극 시스템에서 사용되는 신경 신호 데이터의 전처리 및 특성 추출 과정을 다룹니다. 효과적인 전기자극을 위해서는 신경 신호에서 유의미한 정보를 추출하는 것이 중요합니다. 이를 통해 신경 손상 정도와 재생 상태를 더 정확하게 평가할 수 있습니다.

## 1. 라이브러리 임포트

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal
from scipy.fft import fft, fftfreq
import pywt
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import os
import sys
import glob

# 스타일 설정
plt.style.use('seaborn-whitegrid')
sns.set_theme(style="whitegrid")

# 디렉토리 설정
sys.path.append('..')
from utils.data_utils import load_neural_data, save_processed_data

# 랜덤 시드 설정
np.random.seed(42)

## 2. 데이터 로드

In [None]:
# 데이터 경로 설정
data_path = '../data/neural_recordings/'

# 데이터 파일 목록 확인
data_files = glob.glob(os.path.join(data_path, '*.csv'))
print(f"발견된 데이터 파일: {len(data_files)}")
for file in data_files:
    print(f" - {os.path.basename(file)}")

# 데이터 로드 함수 (실제로는 utils에 구현)
def load_sample_data():
    # 샘플 데이터 생성 (실제 데이터가 없는 경우)
    np.random.seed(42)
    
    # 3개의 서로 다른 신경 상태 시뮬레이션
    n_samples = 1000
    time = np.linspace(0, 10, n_samples)
    
    # 정상 상태 (주파수가 높고 규칙적인 신호)
    normal_signal = np.sin(2 * np.pi * 5 * time) + 0.5 * np.sin(2 * np.pi * 10 * time)
    normal_signal += np.random.normal(0, 0.2, n_samples)
    
    # 손상 상태 (불규칙적이고 진폭이 낮은 신호)
    damaged_signal = 0.5 * np.sin(2 * np.pi * 2 * time) + 0.2 * np.sin(2 * np.pi * 7.5 * time)
    damaged_signal += np.random.normal(0, 0.5, n_samples)
    
    # 재생 상태 (규칙성이 회복되고 있는 신호)
    recovery_signal = 0.8 * np.sin(2 * np.pi * 4 * time) + 0.3 * np.sin(2 * np.pi * 8.5 * time)
    recovery_signal += np.random.normal(0, 0.3, n_samples)
    
    # 여러 채널의 신호 결합
    signals = np.column_stack([
        normal_signal, damaged_signal, recovery_signal,
        normal_signal + np.random.normal(0, 0.1, n_samples),
        damaged_signal + np.random.normal(0, 0.1, n_samples),
        recovery_signal + np.random.normal(0, 0.1, n_samples)
    ])
    
    # 레이블 생성 (0: 정상, 1: 손상, 2: 재생)
    labels = np.zeros(n_samples, dtype=int)
    labels[n_samples//3:(2*n_samples)//3] = 1  # 손상 상태
    labels[(2*n_samples)//3:] = 2  # 재생 상태
    
    return {
        'signals': signals,
        'labels': labels,
        'time': time,
        'channel_names': ['채널1', '채널2', '채널3', '채널4', '채널5', '채널6']
    }

# 데이터 로드
try:
    data = load_neural_data(data_path)
    print(f"실제 데이터를 성공적으로 로드했습니다.")
except Exception as e:
    print(f"실제 데이터 로드 실패: {e}\n샘플 데이터를 대신 생성합니다.")
    data = load_sample_data()

# 데이터 기본 정보 출력
print(f"데이터 형태: {data['signals'].shape}")
print(f"채널 수: {data['signals'].shape[1]}")
print(f"샘플 수: {data['signals'].shape[0]}")
print(f"레이블 클래스: {np.unique(data['labels'])}")
print(f"레이블 분포:\n{pd.Series(data['labels']).value_counts()}")

## 3. 데이터 시각화 및 탐색

In [None]:
# 신호 시각화 함수
def plot_signals(signals, time, labels, channel_names, n_channels=3, window_size=200):
    # 클래스별 색상 정의
    colors = ['#2ca02c', '#d62728', '#1f77b4']
    class_names = ['정상', '손상', '재생']
    
    # 시각화할 채널 선택 (최대 n_channels)
    n_vis_channels = min(n_channels, signals.shape[1])
    selected_channels = range(n_vis_channels)
    
    fig, axes = plt.subplots(n_vis_channels, 1, figsize=(15, 3*n_vis_channels), sharex=True)
    if n_vis_channels == 1:
        axes = [axes]
        
    for i, channel in enumerate(selected_channels):
        # 배경색으로 레이블 구분
        for label_value in np.unique(labels):
            mask = (labels == label_value)
            axes[i].fill_between(
                time[mask], 
                np.min(signals[:window_size, channel])*1.1, 
                np.max(signals[:window_size, channel])*1.1, 
                color=colors[label_value], 
                alpha=0.2, 
                label=class_names[label_value] if i == 0 else None
            )
        
        # 신호 플롯 (처음 window_size 샘플만)
        axes[i].plot(time[:window_size], signals[:window_size, channel], 'k-', lw=1)
        axes[i].set_ylabel(f'{channel_names[channel]}')
        axes[i].set_xlim(time[0], time[window_size-1])
        
    axes[-1].set_xlabel('시간 (초)')
    plt.tight_layout()
    
    # 범례 추가 (첫 번째 차트에만)
    if n_vis_channels > 0:
        handles, labels = axes[0].get_legend_handles_labels()
        fig.legend(handles, labels, loc='upper center', bbox_to_anchor=(0.5, 1.02), ncol=3)
        
    plt.show()

# 신호 시각화
plot_signals(data['signals'], data['time'], data['labels'], data['channel_names'])

In [None]:
# 신호의 스펙트럼 분석
def plot_spectrum(signals, sampling_rate, labels, channel_names, n_channels=3):
    # 각 클래스별 대표 구간 선택
    class_indices = {}
    for label in np.unique(labels):
        indices = np.where(labels == label)[0]
        class_indices[label] = indices[len(indices)//2]  # 각 클래스의 중간 지점
    
    # 시각화할 채널 선택
    n_vis_channels = min(n_channels, signals.shape[1])
    selected_channels = range(n_vis_channels)
    
    # 각 채널별, 클래스별 스펙트럼 시각화
    fig, axes = plt.subplots(n_vis_channels, len(class_indices), figsize=(15, 3*n_vis_channels), sharey='row')
    
    # 단일 채널 또는 단일 클래스인 경우 축 조정
    if n_vis_channels == 1 and len(class_indices) == 1:
        axes = np.array([[axes]])  
    elif n_vis_channels == 1:
        axes = np.array([axes])
    elif len(class_indices) == 1:
        axes = np.array([[ax] for ax in axes])
    
    class_names = ['정상', '손상', '재생']
    window_size = 256  # FFT 윈도우 크기
    
    for i, channel in enumerate(selected_channels):
        for j, (label, index) in enumerate(class_indices.items()):
            # 클래스별 대표 구간에서 데이터 추출
            segment = signals[index:index+window_size, channel]
            
            # FFT 계산
            fft_result = fft(segment)
            fft_mag = np.abs(fft_result[:window_size//2])
            freqs = fftfreq(window_size, 1/sampling_rate)[:window_size//2]
            
            # 스펙트럼 플롯
            axes[i, j].plot(freqs, fft_mag, 'k-')
            axes[i, j].set_title(f'{channel_names[channel]} - {class_names[label]}')
            axes[i, j].set_xlabel('주파수 (Hz)')
            
            # 첫 번째 채널의 첫 번째 클래스에만 y축 레이블 추가
            if j == 0:
                axes[i, j].set_ylabel('진폭')
                
            # x축 범위 설정
            axes[i, j].set_xlim(0, sampling_rate/2)  # 나이퀴스트 주파수까지
    
    plt.tight_layout()
    plt.show()

# 스펙트럼 시각화 (샘플링 레이트는 예시)
sampling_rate = 100  # 가정: 100 Hz 샘플링
plot_spectrum(data['signals'], sampling_rate, data['labels'], data['channel_names'])

## 4. 신호 전처리

In [None]:
# 주요 전처리 함수들
def preprocess_signals(signals, sampling_rate):
    """신경 신호에 대한 기본 전처리 수행"""
    # 채널 수 확인
    n_samples, n_channels = signals.shape
    processed_signals = np.zeros_like(signals)
    
    for ch in range(n_channels):
        # 1. 기준선 제거 (고역 통과 필터)
        b, a = signal.butter(4, 0.5/(sampling_rate/2), 'highpass')
        baseline_removed = signal.filtfilt(b, a, signals[:, ch])
        
        # 2. 노이즈 제거 (60Hz 노치 필터 - 전원 노이즈)
        b, a = signal.iirnotch(60, 30, sampling_rate)
        notch_filtered = signal.filtfilt(b, a, baseline_removed)
        
        # 3. 대역 통과 필터 (관심 주파수 대역 추출 예: 0.5-100Hz)
        b, a = signal.butter(4, [0.5/(sampling_rate/2), 100/(sampling_rate/2)], 'bandpass')
        bandpass_filtered = signal.filtfilt(b, a, notch_filtered)
        
        processed_signals[:, ch] = bandpass_filtered
    
    return processed_signals

# 신호 전처리 적용
processed_signals = preprocess_signals(data['signals'], sampling_rate)

# 전처리 전/후 비교
plt.figure(figsize=(15, 6))

# 원본 신호
plt.subplot(2, 1, 1)
window_size = 200  # 처음 200개 샘플만 표시
for ch in range(min(3, data['signals'].shape[1])):
    plt.plot(data['time'][:window_size], data['signals'][:window_size, ch], 
             label=f'{data["channel_names"][ch]} (원본)')
plt.legend()
plt.title('원본 신호')
plt.ylabel('진폭')

# 전처리된 신호
plt.subplot(2, 1, 2)
for ch in range(min(3, processed_signals.shape[1])):
    plt.plot(data['time'][:window_size], processed_signals[:window_size, ch], 
             label=f'{data["channel_names"][ch]} (전처리됨)')
plt.legend()
plt.title('전처리된 신호')
plt.xlabel('시간 (초)')
plt.ylabel('진폭')

plt.tight_layout()
plt.show()

## 5. 특성 추출

In [None]:
# 시간 도메인 특성 추출
def extract_time_features(signals, window_size=128, step=64):
    """시간 도메인 특성 추출"""
    n_samples, n_channels = signals.shape
    n_windows = (n_samples - window_size) // step + 1
    
    # 각 윈도우의 시작 인덱스
    window_starts = [i * step for i in range(n_windows)]
    
    # 특성 저장 배열
    features = np.zeros((n_windows, n_channels * 5))  # 채널당 5개 특성
    
    for i, start in enumerate(window_starts):
        window = signals[start:start+window_size, :]
        
        for ch in range(n_channels):
            ch_data = window[:, ch]
            
            # 평균
            features[i, ch*5 + 0] = np.mean(ch_data)
            # 표준편차
            features[i, ch*5 + 1] = np.std(ch_data)
            # 첨도 (Kurtosis)
            features[i, ch*5 + 2] = np.mean((ch_data - np.mean(ch_data))**4) / (np.std(ch_data)**4)
            # 왜도 (Skewness)
            features[i, ch*5 + 3] = np.mean((ch_data - np.mean(ch_data))**3) / (np.std(ch_data)**3)
            # 제로 교차율 (Zero-crossing rate)
            features[i, ch*5 + 4] = np.sum(np.abs(np.diff(np.signbit(ch_data)))) / (2 * len(ch_data))
    
    # 특성 이름 생성
    feature_names = []
    for ch in range(n_channels):
        channel_name = f'채널{ch+1}'
        feature_names.extend([
            f'{channel_name}_평균',
            f'{channel_name}_표준편차',
            f'{channel_name}_첨도',
            f'{channel_name}_왜도',
            f'{channel_name}_제로교차율'
        ])
    
    # 윈도우 레이블 (각 윈도우의 중간점에 해당하는 레이블 사용)
    window_labels = np.array([data['labels'][start + window_size//2] for start in window_starts])
    
    return features, window_labels, feature_names, window_starts

# 주파수 도메인 특성 추출
def extract_frequency_features(signals, sampling_rate, window_size=128, step=64):
    """주파수 도메인 특성 추출"""
    n_samples, n_channels = signals.shape
    n_windows = (n_samples - window_size) // step + 1
    
    # 각 윈도우의 시작 인덱스
    window_starts = [i * step for i in range(n_windows)]
    
    # 주파수 대역 정의 (Hz)
    bands = {
        'delta': (0.5, 4),
        'theta': (4, 8),
        'alpha': (8, 13),
        'beta': (13, 30),
        'gamma': (30, 100)
    }
    
    # 특성 저장 배열
    features = np.zeros((n_windows, n_channels * len(bands)))  # 채널당 5개 대역 특성
    
    for i, start in enumerate(window_starts):
        window = signals[start:start+window_size, :]
        
        for ch in range(n_channels):
            ch_data = window[:, ch]
            
            # FFT 계산
            fft_result = fft(ch_data)
            fft_mag = np.abs(fft_result[:window_size//2])
            freqs = fftfreq(window_size, 1/sampling_rate)[:window_size//2]
            
            # 주파수 대역별 파워 계산
            for j, (band_name, (low_freq, high_freq)) in enumerate(bands.items()):
                band_mask = (freqs >= low_freq) & (freqs <= high_freq)
                if np.any(band_mask):  # 해당 대역에 주파수가 있는지 확인
                    band_power = np.sum(fft_mag[band_mask] ** 2)
                    features[i, ch * len(bands) + j] = band_power
    
    # 특성 이름 생성
    feature_names = []
    for ch in range(n_channels):
        channel_name = f'채널{ch+1}'
        for band_name in bands.keys():
            feature_names.append(f'{channel_name}_{band_name}')
    
    # 윈도우 레이블 (각 윈도우의 중간점에 해당하는 레이블 사용)
    window_labels = np.array([data['labels'][start + window_size//2] for start in window_starts])
    
    return features, window_labels, feature_names, window_starts

# 특성 추출
time_features, time_labels, time_feature_names, window_starts = extract_time_features(processed_signals)
freq_features, freq_labels, freq_feature_names, _ = extract_frequency_features(processed_signals, sampling_rate)

# 특성 결합
all_features = np.hstack((time_features, freq_features))
all_feature_names = time_feature_names + freq_feature_names

print(f"시간 도메인 특성: {time_features.shape} (특성 수: {len(time_feature_names)})")
print(f"주파수 도메인 특성: {freq_features.shape} (특성 수: {len(freq_feature_names)})")
print(f"결합된 특성: {all_features.shape} (총 특성 수: {len(all_feature_names)})")

## 6. 특성 분석 및 시각화

In [None]:
# 특성 중요도 시각화 (상위 10개 특성)
def plot_feature_importance(features, labels, feature_names, n_top=10):
    """특성 중요도 계산 및 시각화"""
    from sklearn.ensemble import RandomForestClassifier
    
    # 랜덤 포레스트 모델 훈련
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(features, labels)
    
    # 특성 중요도 정렬
    importance = rf.feature_importances_
    indices = np.argsort(importance)[::-1][:n_top]
    
    # 시각화
    plt.figure(figsize=(12, 6))
    plt.title(f'상위 {n_top}개 특성 중요도')
    plt.bar(range(n_top), importance[indices], align='center')
    plt.xticks(range(n_top), [feature_names[i] for i in indices], rotation=45, ha='right')
    plt.tight_layout()
    plt.show()
    
    # 상위 특성 목록 반환
    return [(feature_names[i], importance[i]) for i in indices]

# 특성 중요도 시각화
top_features = plot_feature_importance(all_features, time_labels, all_feature_names)
print("\n상위 특성 목록:")
for name, importance in top_features:
    print(f"{name}: {importance:.4f}")

In [None]:
# PCA를 통한 차원 축소 및 시각화
def plot_pca_visualization(features, labels, feature_names):
    """PCA를 통한 특성 차원 축소 및 시각화"""
    # 데이터 정규화
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    
    # PCA 적용
    pca = PCA(n_components=2)
    features_pca = pca.fit_transform(features_scaled)
    
    # 결과 시각화
    plt.figure(figsize=(10, 8))
    
    # 클래스별 색상 및 마커 정의
    colors = ['#2ca02c', '#d62728', '#1f77b4']
    markers = ['o', 's', '^']
    class_names = ['정상', '손상', '재생']
    
    for i, label in enumerate(np.unique(labels)):
        plt.scatter(
            features_pca[labels == label, 0],
            features_pca[labels == label, 1],
            c=colors[i],
            marker=markers[i],
            alpha=0.7,
            label=class_names[i]
        )
    
    plt.title('PCA 시각화')
    plt.xlabel(f'PC1 (설명 분산: {pca.explained_variance_ratio_[0]:.2%})')
    plt.ylabel(f'PC2 (설명 분산: {pca.explained_variance_ratio_[1]:.2%})')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    return features_pca, pca

# PCA 시각화
features_pca, pca = plot_pca_visualization(all_features, time_labels, all_feature_names)

## 7. 웨이블릿 변환을 통한 시간-주파수 분석

In [None]:
def plot_wavelet_analysis(signal, sampling_rate, channel_name):
    """웨이블릿 변환을 통한 시간-주파수 분석"""
    # 분석할 데이터 (한 채널만)
    window_size = 512  # 분석 윈도우 크기
    data = signal[:window_size]
    time = np.arange(len(data)) / sampling_rate
    
    # 웨이블릿 변환을 위한 스케일 설정
    scales = np.arange(1, 128)
    
    # 웨이블릿 변환 수행 (Morlet 웨이블릿 사용)
    coefficients, frequencies = pywt.cwt(data, scales, 'morl', 1/sampling_rate)
    
    # 결과 시각화
    plt.figure(figsize=(12, 8))
    
    # 원본 신호
    plt.subplot(2, 1, 1)
    plt.plot(time, data)
    plt.title(f'{channel_name} 원본 신호')
    plt.xlabel('시간 (초)')
    plt.ylabel('진폭')
    
    # 웨이블릿 변환 결과 (스칼로그램)
    plt.subplot(2, 1, 2)
    plt.imshow(np.abs(coefficients), 
               extent=[time.min(), time.max(), frequencies[-1], frequencies[0]], 
               aspect='auto', 
               cmap='jet')
    plt.colorbar(label='진폭')
    plt.title(f'{channel_name} 웨이블릿 변환 (스칼로그램)')
    plt.xlabel('시간 (초)')
    plt.ylabel('주파수 (Hz)')
    plt.ylim([0, 50])  # 주요 주파수 영역만 표시
    
    plt.tight_layout()
    plt.show()

# 웨이블릿 분석 (첫 번째 채널 데이터 사용)
plot_wavelet_analysis(processed_signals[:, 0], sampling_rate, data['channel_names'][0])

## 8. 신경 상태별 특성 분포 분석

In [None]:
# 신경 상태별 주요 특성 분포 시각화
def plot_feature_distributions(features, labels, feature_names, top_n=5):
    """상위 특성들의 클래스별 분포 시각화"""
    from sklearn.ensemble import RandomForestClassifier
    
    # 특성 중요도 계산
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(features, labels)
    importance = rf.feature_importances_
    top_indices = np.argsort(importance)[::-1][:top_n]
    top_features = [feature_names[i] for i in top_indices]
    
    # 클래스 이름 정의
    class_names = ['정상', '손상', '재생']
    
    # 각 상위 특성별 분포 시각화
    plt.figure(figsize=(15, 3*top_n))
    
    for i, (idx, feature_name) in enumerate(zip(top_indices, top_features)):
        plt.subplot(top_n, 1, i+1)
        
        # 클래스별 분포 (바이올린 플롯)
        sns.violinplot(
            x=[class_names[label] for label in labels],
            y=features[:, idx],
            palette=['#2ca02c', '#d62728', '#1f77b4'],
            inner='quartile'
        )
        
        plt.title(f'{feature_name} 분포 (중요도: {importance[idx]:.4f})')
        plt.xlabel('신경 상태')
        plt.ylabel('특성값')
        plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# 상위 5개 특성의 분포 시각화
plot_feature_distributions(all_features, time_labels, all_feature_names, top_n=5)

## 9. 특성 상관관계 분석

In [None]:
# 특성 상관관계 시각화
def plot_feature_correlations(features, feature_names, n_top=10):
    """상위 특성들 간의 상관관계 시각화"""
    # 상위 특성들만 선택
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.preprocessing import MinMaxScaler
    
    # 특성 중요도로 상위 특성 선택
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(features, time_labels)
    importance = rf.feature_importances_
    top_indices = np.argsort(importance)[::-1][:n_top]
    
    # 상위 특성 추출
    top_features = features[:, top_indices]
    top_feature_names = [feature_names[i] for i in top_indices]
    
    # 특성 정규화
    scaler = MinMaxScaler()
    top_features_scaled = scaler.fit_transform(top_features)
    
    # 상관관계 계산
    corr_matrix = np.corrcoef(top_features_scaled.T)
    
    # 시각화 (히트맵)
    plt.figure(figsize=(12, 10))
    sns.heatmap(
        corr_matrix, 
        annot=True, 
        cmap='coolwarm', 
        xticklabels=top_feature_names,
        yticklabels=top_feature_names,
        vmin=-1, vmax=1
    )
    plt.title('상위 특성 간 상관관계')
    plt.tight_layout()
    plt.show()

# 상위 10개 특성간 상관관계 시각화
plot_feature_correlations(all_features, all_feature_names, n_top=10)

## 10. 전처리된 데이터 저장

In [None]:
# 전처리된 특성 데이터 저장
def save_processed_features(features, labels, feature_names, output_path):
    """전처리된 특성 데이터 저장"""
    # 디렉토리 생성
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    # 특성과 레이블을 DataFrame으로 변환
    df = pd.DataFrame(features, columns=feature_names)
    df['label'] = labels
    
    # CSV 파일로 저장
    df.to_csv(output_path, index=False)
    print(f"전처리된 특성 데이터가 {output_path}에 저장되었습니다.")
    
    return df

# 전처리된 특성 데이터 저장
output_path = '../data/processed/neural_features.csv'
feature_df = save_processed_features(all_features, time_labels, all_feature_names, output_path)

## 11. 결론 및 다음 단계

이 노트북에서는 신경 신호 데이터에 대한 전처리와 특성 추출 과정을 수행했습니다. 주요 단계는 다음과 같습니다:

1. **데이터 로드 및 시각화**: 원본 신호를 확인하고 기본적인 특성을 분석했습니다.
2. **신호 전처리**: 기준선 제거, 노이즈 제거, 대역 통과 필터링을 통해 신호 품질을 향상시켰습니다.
3. **특성 추출**: 시간 및 주파수 도메인 특성을 추출하여 각 신경 상태를 정량적으로 표현했습니다.
4. **특성 분석 및 시각화**: 특성 중요도, PCA, 웨이블릿 분석, 특성 분포 등을 통해 데이터의 패턴을 분석했습니다.
5. **데이터 저장**: 전처리된 특성 데이터를 저장하여 후속 모델링에 활용할 수 있도록 했습니다.

### 다음 단계:

1. **머신러닝 모델 개발**: 추출된 특성을 사용하여 신경 상태 분류 모델을 개발합니다.
2. **실시간 처리 구현**: 전처리 및 특성 추출 과정을 실시간으로 처리할 수 있는 파이프라인을 구축합니다.
3. **특성 선택 최적화**: 모델 성능 향상을 위해 특성 선택 방법을 개선합니다.
4. **고급 시간-주파수 분석**: 보다 복잡한 신호 패턴을 포착하기 위한 고급 분석 기법을 적용합니다.
5. **신경 상태 변화 추적**: 시간에 따른 신경 상태 변화를 추적하고 분석하는 방법을 개발합니다.

이러한 분석은 적응형 신경 전기자극 시스템의 정확성과 효과를 향상시키는 데 중요한 역할을 할 것입니다.