In [3]:
import os
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from File_Reader import File_Reader
from DelsysFile import DelsysFile
from Component import Component
from Channel import Channel

In [9]:
class EMGProcessing:
    def __init__(self, dll_path):
        """초기화 시 DLL 파일 경로만 설정합니다."""
        self.dll_path = dll_path
        self.fileReader = File_Reader(dll_path)
        self.file = None
        self.component_data = {}
        self.sample_rates = {}
        self.units = {}
        self.scaled_data = None 

    def file_loader(self, file_path):
        """파일 경로를 받아 데이터를 로드합니다."""
        self.fileReader.readFile(file_path)
        self.file = self.fileReader.ParsedFile()
        self.component_data = {}
        self.sample_rates = {}
        self.units = {}
        self.scaled_data = None 

        # 파일을 로드한 후, 각 컴포넌트와 채널의 데이터를 가져옵니다.
        self.process_components()
        self.process_emg()

    def process_components(self):
        """EMG가 포함된 모든 컴포넌트와 채널의 데이터를 저장합니다."""
        for i in range(self.file.ComponentCount()):
            component = self.file.Component(i)
            comp_name = component.Name()  # 컴포넌트 이름으로 저장
            for j in range(component.ChannelCount()):
                channel = component.Channel(j)
                if "EMG" in channel.Name():
                    self.component_data[comp_name] = channel.Data()
                    self.sample_rates[comp_name] = channel.SampleRate()
                    self.units[comp_name] = channel.Units()

    def apply_bandpass_filter(self, raw_data, lowcut=20, highcut=450, order=4):
        """EMG 데이터에 밴드패스 필터를 적용합니다."""
        filtered_data = {}
        for comp_name, data in raw_data.items():
            fs = self.sample_rates[comp_name]
            nyq = 0.5 * fs
            low = lowcut / nyq
            high = highcut / nyq
            b, a = butter(order, [low, high], btype='band')
            filtered_data[comp_name] = filtfilt(b, a, data)
        return filtered_data

    def rectify(self, data):
        """정류 (Rectification) 과정을 수행합니다."""
        return {comp_name: np.abs(d) for comp_name, d in data.items()}

    def apply_lowpass_filter(self, data, cutoff=3, order=4):
        """EMG 데이터에 저주파 필터를 적용합니다 (스무딩)."""
        smoothed_data = {}
        for comp_name, d in data.items():
            fs = self.sample_rates[comp_name]
            nyq = 0.5 * fs
            Wn = cutoff / nyq
            b, a = butter(order, Wn, btype='low')
            smoothed_data[comp_name] = filtfilt(b, a, d)
        return smoothed_data

    def scale(self, data):
        """스케일링 과정을 수행합니다 (0~1)."""
        scaled_data = {}
        for comp_name, d in data.items():
            min_val = np.min(d)
            max_val = np.max(d)
            scaled_data[comp_name] = (d - min_val) / (max_val - min_val)
        return scaled_data

    def process_emg(self):
        """EMG 데이터를 처리하고 최종 스케일된 데이터를 반환합니다."""
        # 1. 밴드패스 필터 적용
        self.filtered_data = self.apply_bandpass_filter(self.component_data)

        # 2. 정류
        self.rectified_data = self.rectify(self.filtered_data)

        # 3. 저주파 필터 (스무딩) 적용
        self.smoothed_data = self.apply_lowpass_filter(self.rectified_data)

        # 4. 스케일링
        self.scaled_data = self.scale(self.smoothed_data)

    def get_scaled_data(self):
        """최종적으로 스케일된 데이터를 DataFrame으로 반환합니다, 시간 포함."""
        if self.scaled_data is None:
            raise ValueError("Scaled data is not available. Run 'process_emg' first.")
        
        # 빈 DataFrame 생성
        df = pd.DataFrame()

        # 시간 계산 (모든 컴포넌트는 동일한 시간축을 가짐)
        comp_name = list(self.scaled_data.keys())[0]  # 첫 번째 컴포넌트 이름 사용
        time = np.arange(len(self.scaled_data[comp_name])) / self.sample_rates[comp_name]
        df["time"] = time  # 시간 열 추가

        # 각 컴포넌트의 데이터를 DataFrame으로 변환
        for comp_name, data in self.scaled_data.items():
            df[comp_name] = data  # 스케일된 데이터 열 추가

        return df
    
    def export_csv(self, input_folder="Data", output_folder="ProcessedData"):
        """Data 폴더의 모든 파일을 처리하고 같은 경로에 CSV로 내보냅니다."""
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        # 폴더 내 모든 파일 순회
        for file_name in os.listdir(input_folder):
            file_path = os.path.join(input_folder, file_name)
            
            if file_path.endswith(".shpf"):  # 확장자가 .shpf인 파일만 처리
                try:
                    self.file_loader(file_path)
                    scaled_data = self.get_scaled_data()

                    # CSV 파일로 내보내기
                    output_file_name = file_name.replace(".shpf", ".csv")
                    output_file_path = os.path.join(output_folder, output_file_name)
                    scaled_data.to_csv(output_file_path, index=False)

                    print(f"Processed and exported: {output_file_path}")
                except Exception as e:
                    print(f"Failed to process {file_name}: {e}")

        # def rename_raw_folders_to_analysis(input_folder):
        #     """모든 RAW 폴더를 찾아서 이름을 Analysis로 변경합니다."""
        #     for root, dirs, files in os.walk(input_folder):
        #         for dir_name in dirs:
        #             if dir_name.lower() == "raw":  # 폴더 이름이 RAW인 경우
        #                 raw_folder_path = os.path.join(root, dir_name)
        #                 new_folder_path = os.path.join(root, "Analysis")

        #                 try:
        #                     os.rename(raw_folder_path, new_folder_path)
        #                     print(f"Renamed: {raw_folder_path} -> {new_folder_path}")
        #                 except Exception as e:
        #                     print(f"Failed to rename {raw_folder_path}: {e}")
                            
        # rename_raw_folders_to_analysis(output_folder)

    def plot_emg(self, data, title="EMG Data"):
        """EMG 데이터를 컴포넌트 이름별로 플로팅합니다."""
        fig = make_subplots(rows=len(data), cols=1, shared_xaxes=True, vertical_spacing=0.05)
        for i, (comp_name, d) in enumerate(data.items()):
            time = np.arange(len(d)) / self.sample_rates[comp_name]
            fig.add_trace(go.Scatter(x=time, y=d, mode='lines', name=comp_name), row=i+1, col=1)
            if "scale" in title.lower():
                title_text = f"Scaled {comp_name}"
            else:
                title_text = self.units[comp_name]
            fig.update_yaxes(title_text=title_text, row=i+1, col=1)

        fig.update_xaxes(title_text="Time (s)", row=len(data), col=1)
        fig.update_layout(
            title=title,
            height=1200,  # 그래프 높이를 크게 설정
            width=1200   # 그래프 너비를 크게 설정
        )
        fig.show()

    def plot_emg_processing(self):
        """단계별로 EMG 데이터를 플로팅합니다."""

        # 각 단계별로 개별 플롯 생성
        self.plot_emg(self.component_data, title="Raw EMG Data")
        self.plot_emg(self.filtered_data, title="Filtered EMG Data")
        self.plot_emg(self.rectified_data, title="Rectified EMG Data")
        self.plot_emg(self.smoothed_data, title="Smoothed EMG Data")
        self.plot_emg(self.scaled_data, title="Scaled EMG Data")

In [10]:
# DLL 경로 및 파일 경로 설정
dll_path = os.path.join(os.getcwd(), "FileReader.dll")

# EMG 처리 클래스 생성
EMG_Processor = EMGProcessing(dll_path)

In [11]:
# 파일 로드
file_path = os.path.join(os.getcwd(), "FTSTS_000.shpf")
EMG_Processor.file_loader(file_path)

In [12]:
# 최종 스케일된 데이터 가져오기
scaled_data = EMG_Processor.get_scaled_data()
scaled_data

Unnamed: 0,time,L_GL,L_TA,L_VM,L_VL,R_GL,R_TA,R_VM,R_VL
0,0.000000,0.000000,0.043533,0.045212,0.034234,0.000000,0.000000,0.074270,0.083479
1,0.000466,0.000428,0.043681,0.045362,0.034417,0.000329,0.000330,0.074341,0.083538
2,0.000931,0.000857,0.043829,0.045512,0.034601,0.000658,0.000661,0.074412,0.083596
3,0.001397,0.001285,0.043977,0.045662,0.034784,0.000987,0.000991,0.074483,0.083654
4,0.001862,0.001714,0.044125,0.045812,0.034968,0.001316,0.001322,0.074554,0.083712
...,...,...,...,...,...,...,...,...,...
53297,24.810672,0.071857,0.073121,0.065465,0.062913,0.058362,0.075212,0.085838,0.090250
53298,24.811138,0.071857,0.073121,0.065465,0.062913,0.058362,0.075212,0.085838,0.090250
53299,24.811603,0.071857,0.073121,0.065465,0.062913,0.058362,0.075212,0.085838,0.090250
53300,24.812069,0.071857,0.073121,0.065465,0.062913,0.058362,0.075212,0.085838,0.090250


In [None]:
# 데이터 처리 및 플로팅
EMG_Processor.plot_emg_processing()

In [13]:
# 데이터 Export to CSV
EMG_Processor.export_csv(input_folder="Data", output_folder="ProcessedData")

Processed and exported: ProcessedData\Subject000\RAW\FTSTS_000\FTSTS_000.csv
Processed and exported: ProcessedData\Subject000\RAW\FTSTS_001\FTSTS_001.csv
Processed and exported: ProcessedData\Subject000\RAW\FTSTS_002\FTSTS_002.csv
Processed and exported: ProcessedData\Subject000\RAW\PICK_000\PICK_000.csv
Processed and exported: ProcessedData\Subject000\RAW\PICK_001\PICK_001.csv
Processed and exported: ProcessedData\Subject000\RAW\PICK_002\PICK_002.csv
Processed and exported: ProcessedData\Subject000\RAW\STEP_L_000\STEP_L_000.csv
Processed and exported: ProcessedData\Subject000\RAW\STEP_L_001\STEP_L_001.csv
Processed and exported: ProcessedData\Subject000\RAW\STEP_L_002\STEP_L_002.csv
Processed and exported: ProcessedData\Subject000\RAW\STEP_R_000\STEP_R_000.csv
Processed and exported: ProcessedData\Subject000\RAW\STEP_R_001\STEP_R_001.csv
Processed and exported: ProcessedData\Subject000\RAW\STEP_R_002\STEP_R_002.csv
Processed and exported: ProcessedData\Subject000\RAW\STS_000\STS_000.c