In [1]:
import os
import shutil
import random

import numpy as np
import pandas as pd

import torch
# import torch.nn as nn
# import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
# import torch.nn.functional as F

In [None]:
DATA_RAW_PATH = 'JNUData'
DATA_PATH = 'data'
if not os.path.exists(DATA_PATH):
    os.mkdir(DATA_PATH)

In [None]:
# run only once
# n-healthy, t-rolling element fault, o-outer ring fault, i-inner ring fault

files_names = os.listdir(DATA_RAW_PATH)
SAMPLES_NUMBER = 50
SAMPLE_WINDOW = 10000

for file_name in files_names:
    file_path = os.path.join(DATA_RAW_PATH, file_name)
    data = pd.read_csv(file_path)
    for i in range(SAMPLES_NUMBER):
        sample = data.iloc[i*SAMPLE_WINDOW:(i+1)*SAMPLE_WINDOW]
        sample.to_csv(os.path.join(
            DATA_PATH, f'{file_name[0]}_{i}.csv'), index=False)

In [None]:
files_names = os.listdir(DATA_PATH)
DATA_FFT_PATH = 'data_fft'
if not os.path.exists(DATA_FFT_PATH):
    os.mkdir(DATA_FFT_PATH)

for file_name in files_names:
    file_path = os.path.join(DATA_PATH, file_name)
    data = pd.read_csv(file_path)
    fft_data = np.fft.fft(data)
    fft_data = np.abs(fft_data)
    np.savetxt(os.path.join(DATA_FFT_PATH, file_name), fft_data)

In [None]:
# import matplotlib.pyplot as plt
# plt.plot(fft_data)

In [None]:
TEST_RATE = 0.2
TRAIN_SET_PATH = os.path.join(DATA_FFT_PATH, 'train')
TEST_SET_PATH = os.path.join(DATA_FFT_PATH, 'test')

In [None]:
for file_name in files_names:
    file_path = os.path.join(DATA_FFT_PATH, file_name)
    class_folder = file_name[0]
    if not os.path.exists(os.path.join(DATA_FFT_PATH, class_folder)):
        os.mkdir(os.path.join(DATA_FFT_PATH, class_folder))
    shutil.move(file_path, os.path.join(
        DATA_FFT_PATH, class_folder, file_name))

In [None]:
classes_folder = os.listdir(DATA_FFT_PATH)
if not os.path.exists(TRAIN_SET_PATH):
    os.mkdir(TRAIN_SET_PATH)
if not os.path.exists(TEST_SET_PATH):
    os.mkdir(TEST_SET_PATH)
for class_folder in classes_folder:
    files_names = os.listdir(os.path.join(DATA_FFT_PATH, class_folder))
    random.shuffle(files_names)
    testset_number = int(len(files_names) * TEST_RATE)
    testset_files_names = files_names[:testset_number]
    trainset_files_names = files_names[testset_number:]

    if not os.path.exists(os.path.join(TRAIN_SET_PATH, class_folder)):
        os.mkdir(os.path.join(TRAIN_SET_PATH, class_folder))
    if not os.path.exists(os.path.join(TEST_SET_PATH, class_folder)):
        os.mkdir(os.path.join(TEST_SET_PATH, class_folder))

    for testset_file_name in testset_files_names:
        file_path = os.path.join(
            DATA_FFT_PATH, class_folder, testset_file_name)
        shutil.move(file_path, os.path.join(
            TEST_SET_PATH, class_folder, testset_file_name))

    for trainset_file_name in trainset_files_names:
        file_path = os.path.join(
            DATA_FFT_PATH, class_folder, trainset_file_name)
        shutil.move(file_path, os.path.join(
            TRAIN_SET_PATH, class_folder, trainset_file_name))
    assert len(os.listdir(os.path.join(DATA_FFT_PATH, class_folder))) == 0  # 确保旧文件夹中的所有图像都被移动走
    shutil.rmtree(os.path.join(DATA_FFT_PATH, class_folder))  # 删除文件夹

In [None]:
LABELS_MAP = {'n': 0, 't': 1, 'o': 2, 'i': 3}

In [None]:
labbb = []
labbb.append(LABELS_MAP['n'])

labbb.append(LABELS_MAP['i'])
labbb

In [None]:
class JNUDataset(Dataset):
    def __init__(self, data_path, is_train=True):
        self.data_path = data_path
        if is_train:
            path = os.path.join(data_path, 'train')
            classes_names = os.listdir(path)
        else:
            path = os.path.join(data_path, 'test')
            classes_names = os.listdir(path)
        self.data = []
        self.labels = []
        for class_name in classes_names:
            files_names = os.listdir(os.path.join(path, class_name))
            for file_name in files_names:
                tmp = pd.read_csv(os.path.join(path, class_name, file_name))
                tmp = tmp.values.squeeze().tolist()
                self.data.append(tmp)
                self.labels.append(LABELS_MAP[class_name])
        self.data = torch.tensor(self.data)
        self.labels = torch.tensor(self.labels)
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        return sample, label

In [None]:
BATCH_SIZE = 5

In [None]:
train_loader = DataLoader(JNUDataset(DATA_FFT_PATH, is_train=True), batch_size=BATCH_SIZE, shuffle=True)

for idx, (data, target) in enumerate(train_loader):
    print(f'Batch {idx+1}:')
    print(f'Data shape: {data.shape}')
    print(f'Target shape: {target}')


In [None]:
# class JNUDataset(Dataset):
#     def __init__(self, data_path, transform=None):
#         self.data_path = data_path
#         self.transform = transform
#         self.files_names = os.listdir(data_path)
#     def __len__(self):
#         return len(self.files_names)
    
#     def __getitem__(self, idx):
#         file_name = self.files_names[idx]
#         class_folder = file_name[0]
#         return self.transform(file_name, class_folder)
