# 2. FG Dataset

In [1]:
import pandas as pd
import numpy as np

sampling_rate = 20*1000 # 20kHz
motor_speed = 3000 # 3000 RPM fixed

## 2.1 개별 데이터 확인
- 먼저 개별 데이터(파일)이 어떤 형태로 존재하는 지 확인해 본다.
- 확인 후, 데이터를 센서 단위로 잘라서 np.float32로 바꾸는 작업까지 해둔다.
> 개별 파일에서 y_pulley, x_disk, y_disk 를 추출하는 작업

![Sample Image](figs/2_sensor.png )

In [None]:
# .npy 파일은 논문의 벤치마크를 위해 특징값을 추출해 둔것임(원 데이터 아님)

file_path = '/home/lilmae/Desktop/Rotray_Machine_Data/dataset/FG/0D.csv'
data_pd = pd.read_csv(file_path)
# 1. 'Measured_RPM' 칼럼을 int 형으로 변경
data_pd['Measured_RPM'] = data_pd['Measured_RPM'].astype(int)
# 2. 'Measured_RPM'이 100 이하인 행 제거
data_pd = data_pd[data_pd['Measured_RPM'] > 100]

print(data_pd.head())

x_disk_np = data_pd['Vibration_1'].to_numpy('float32')
y_disk_np = data_pd['Vibration_2'].to_numpy('float32')
y_pulley_np = data_pd['Vibration_3'].to_numpy('float32')
motor_speed = data_pd['Measured_RPM'].to_numpy('float32')

## 2.2 데이터 interpolation
- 서로 다른 sampling rate 를 가지는 데이터를 활용하기 때문에 이 sampling rate 를 맞추어 주는 것이 필요하다.
- target sampling rate 를 기준으로 데이터를 interpolation 할 수 있도록 한다.

In [23]:
from scipy.interpolate import interp1d

signal_np = x_disk_np # 센서 하나씩 interpolation할 수 있도록 한다 : 예시 x-sensor

time_np = np.arange(len(signal_np))/sampling_rate
end = time_np[-1]
target_sampling = 10*1000 # 기준으로 정한 sampling rate : 예시 10kHz
target_time = np.linspace(0, end, target_sampling, endpoint=False) # target sampling rate에 맞도록 시간축 생성

interpolator = interp1d(time_np, signal_np, kind='linear')
interpolated_signal = interpolator(target_time)

## 2.3 전체 데이터를 순회해서 접근
- 전체 데이터를 순회하여 접근하고 데이터 파일과 클래스 정보를 확인하는 작업을 해준다.
- 이를 파일 이름, 파일 경로, 클래스 정보 로 데이터베이스화 한다.

In [24]:
import os

fg_root = os.path.join(os.getcwd(), 'dataset', 'FG')

data ={
    'file_name' : [],
    'file_path' : [],
    'class' : [],
    'severity' : [],
    'sampling_rate' : sampling_rate,
}
severity_dict = {
    '0' : None,
    '1' : 45.9,
    '2' : 60.7,
    '3' : 75.5,
    '4' : 152.1
}

files = [f for f in os.listdir(fg_root) if f.endswith('.csv')]
for file_name in files:
    
    file_path = os.path.join(fg_root, file_name)
    
    class_name = 'normal' if file_name[0]=='0' else 'unbalance'
    severity = severity_dict[file_name[0]]
    
    data['file_name'].append(file_name)
    data['file_path'].append(file_path)
    data['class'].append(class_name)
    data['severity'].append(severity)

data_pd = pd.DataFrame(data)

In [None]:
data_pd.head()

## 2.4 전체 함수화
1. `generate_fg_df` : 전체 데이터를 순회해 pd.DataFrame 으로 만드는 함수
2. `open_fg_file` : 개별 데이터를 열어 x_disk ,y_dixk , y_pulley, motor_speed 의 np.array 로 추출해 주는 함수
3. `interpol_fg` :센서 데이터를 지정하는 sampling rate로 interpolation 해주는 함수

In [3]:
import os
import pandas as pd
import numpy as np
from scipy.interpolate import interp1d

# 1. generate_fg_df : 전체 데이터를 순회해 pd.DataFrame 으로 만드는 함수
def generate_fg_df(fg_root):
    
    sampling_rate = 20*1000 # 20kHz
    data ={
        'file_name' : [],
        'file_path' : [],
        'class' : [],
        'severity' : [],
        'sampling_rate' : sampling_rate,
    }
    severity_dict = {
        '0' : None,
        '1' : 45.9,
        '2' : 60.7,
        '3' : 75.5,
        '4' : 152.1
    }

    files = [f for f in os.listdir(fg_root) if f.endswith('.csv')]
    for file_name in files:
        
        file_path = os.path.join(fg_root, file_name)
        
        class_name = 'normal' if file_name[0]=='0' else 'unbalance'
        severity = severity_dict[file_name[0]]
        
        data['file_name'].append(file_name)
        data['file_path'].append(file_path)
        data['class'].append(class_name)
        data['severity'].append(severity)

    data_pd = pd.DataFrame(data)

    return data_pd

# 2. open_vbl_file : 개별 데이터를 열어 time,x,y,z 의 np.array 로 추출해 주는 함수
def open_fg_file(file_path):
    
    data_pd = pd.read_csv(file_path)

    # 1. 'Measured_RPM' 칼럼을 int 형으로 변경
    data_pd = data_pd.dropna(subset=['Measured_RPM'])
    data_pd['Measured_RPM'] = data_pd['Measured_RPM'].astype(int)
    # 2. 'Measured_RPM'이 100 이하인 행 제거
    data_pd = data_pd[data_pd['Measured_RPM'] > 100]
    
    x_disk_np = data_pd['Vibration_1'].to_numpy('float32')
    y_disk_np = data_pd['Vibration_2'].to_numpy('float32')
    y_pulley_np = data_pd['Vibration_3'].to_numpy('float32')
    motor_speed = data_pd['Measured_RPM'].to_numpy('float32')
    
    return x_disk_np, y_disk_np, y_pulley_np, motor_speed

# 3. interpol_vbl :센서 데이터를 지정하는 sampling rate로 interpolation 해주는 함수
def interpol_fg(signal_np, target_sampling):
    
    sampling_rate = 20*1000 # 20kHz
    signal_np = x_disk_np # 센서 하나씩 interpolation할 수 있도록 한다 : 예시 x-sensor

    time_np = np.arange(len(signal_np))/sampling_rate
    end = time_np[-1]
    target_sampling = 10*1000 # 기준으로 정한 sampling rate : 예시 10kHz
    target_time = np.linspace(0, end, target_sampling, endpoint=False) # target sampling rate에 맞도록 시간축 생성

    interpolator = interp1d(time_np, signal_np, kind='linear')
    interpolated_signal = interpolator(target_time)
    
    return interpolated_signal

# 함수동작 확인

In [4]:
import os

fault_fg_root = os.path.join(os.getcwd(), 'dataset', 'FG')
target_sampling = 10*1000

data_pd = generate_fg_df(fault_fg_root)

for file_path in data_pd['file_path']:
    
    x_disk_np, y_disk_np, y_pulley_np, motor_speed = open_fg_file(file_path)
    
    interpolated_signal = interpol_fg(x_disk_np, target_sampling)
    
print('NO Problem')

NO Problem


In [7]:
print(f'class : {data_pd["class"].unique()}')

class : ['unbalance' 'normal']


In [None]:
import torch
from torch.utils.data import Dataset

class FG_Dataset(Dataset):
    def __init__(self, data_root, resample_rate, window_size, hop_size,
                 classes=['Looseness', 'Unbalance', 'Normal Condition', 'Misalignment']):
        # 회전 주파수 (Hz)
        self.sampling_rate = 10 * 1000  # 20kHz
        self.motor_speed = 1238  # 3000 RPM fixed
        self.classes = classes

        data_pd = generate_dxai_df(fault_dxai_root)
        x_list = []
        y_list = []
        global_indices = []

        for file_idx, (file_path, class_name) in enumerate(zip(data_pd['file_path'], data_pd['class'])):
            if class_name not in classes:
                continue

            y_pulley_np, x_pulley_np, y_disk_np, x_disk_np = open_dxai_file(file_path)

            y_pulley_np = interpol_vbl(y_pulley_np, resample_rate)
            x_pulley_np = interpol_vbl(x_pulley_np, resample_rate)
            y_disk_np = interpol_vbl(y_disk_np, resample_rate)
            x_disk_np = interpol_vbl(x_disk_np, resample_rate)

            data_len = len(y_pulley_np)
            max_len = (data_len - window_size) // hop_size * hop_size + window_size
            num_sample = (data_len - window_size) // hop_size

            y_pulley_np = y_pulley_np[:max_len]
            x_pulley_np = x_pulley_np[:max_len]
            y_disk_np = y_disk_np[:max_len]
            x_disk_np = x_disk_np[:max_len]

            x_set = {
                'y_pulley_np': y_pulley_np,
                'x_pulley_np': x_pulley_np,
                'y_disk_np': y_disk_np,
                'x_disk_np': x_disk_np
            }

            x_list.append(x_set)
            y_list.append(class_name)

            # 전역 인덱스 생성
            global_indices.extend([(file_idx, sample_idx) for sample_idx in range(num_sample)])

        self.x_list = x_list
        self.y_list = y_list
        self.global_indices = global_indices
        self.window_size = window_size
        self.hop_size = hop_size

    def __len__(self):
        return len(self.global_indices)

    def __getitem__(self, idx):
        # 전역 인덱스를 활용해 파일 인덱스와 샘플 인덱스를 가져옴
        file_idx, sample_idx = self.global_indices[idx]

        x_set = self.x_list[file_idx]
        class_name = self.y_list[file_idx]

        # 슬라이싱 범위 계산
        start_idx = sample_idx * self.hop_size
        end_idx = start_idx + self.window_size

        windowed_data = {
            'y_pulley_np': x_set['y_pulley_np'][start_idx:end_idx],
            'x_pulley_np': x_set['x_pulley_np'][start_idx:end_idx],
            'y_disk_np': x_set['y_disk_np'][start_idx:end_idx],
            'x_disk_np': x_set['x_disk_np'][start_idx:end_idx]
        }

        # 클래스 이름을 인덱스로 변환
        class_idx = self.classes.index(class_name)

        return windowed_data, class_idx
        
    
    def get_minimum_window(self, num_cycles = 10):

        rotation_frequency = self.motor_speed / 60

        # 윈도우 크기 (샘플 수)
        window_size = int(sampling_rate * num_cycles / rotation_frequency)
        window_time = window_size/sampling_rate

        print(f"회전 주파수: {rotation_frequency} Hz")
        print(f"윈도우 크기: {window_size} 샘플, {window_time} 초")

        return window_size