In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import random
from scipy.io import loadmat
import torch
import urllib.request
import zipfile

# Set Seed

In [2]:
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    
set_seed(42)

# Download and extract

In [3]:
# UO bearings dataset
UO_links = {
    'UO': 'https://prod-dcd-datasets-cache-zipfiles.s3.eu-west-1.amazonaws.com/v43hmbwxpm-1.zip'
}

def download_and_extract(file_name, url, folder_path, dtype, extract_function):
    while True:
        try:
            print(f"Downloading {url}")
            urllib.request.urlretrieve(url, os.path.join(folder_path, f'{file_name}{dtype}'))
            print(f'Extracting {file_name}{dtype}')
            extract_function(folder_path, file_name)
            break  # Exit the loop if the download is successful
        except Exception as e:
            print(f"Failed to download {url}: {e}")
            
def extract_zip(folder, file_name):
    with zipfile.ZipFile(os.path.join(folder, f'{file_name}.zip'), 'r') as zip_ref:
        zip_ref.extractall(os.path.join(folder, file_name))
        
# Download & Extract UO dataset
folder_path = os.path.join(os.getcwd(), 'UO')
os.makedirs(folder_path, exist_ok=True)
for file_name, url_link in UO_links.items():
    if not os.path.exists(os.path.join(folder_path, file_name)):
        download_and_extract(file_name, url_link, folder_path, '.zip', extract_zip)

Downloading https://prod-dcd-datasets-cache-zipfiles.s3.eu-west-1.amazonaws.com/v43hmbwxpm-1.zip
Extracting UO.zip


In [4]:
domain_A = {
    0: ['H-A-1.mat', 'H-A-2.mat', 'H-A-3.mat'],
    1: ['I-A-1.mat', 'I-A-2.mat', 'I-A-3.mat'],
    3: ['O-A-1.mat', 'O-A-2.mat', 'O-A-3.mat']
}

domain_B = {
    0: ['H-B-1.mat', 'H-B-2.mat', 'H-B-3.mat'],
    1: ['I-B-1.mat', 'I-B-2.mat', 'I-B-3.mat'],
    3: ['O-B-1.mat', 'O-B-2.mat', 'O-B-3.mat']
}

domain_C = {
    0: ['H-C-1.mat', 'H-C-2.mat', 'H-C-3.mat'],
    1: ['I-C-1.mat', 'I-C-2.mat', 'I-C-3.mat'],
    3: ['O-C-1.mat', 'O-C-2.mat', 'O-C-3.mat']
}

domain_D = {
    0: ['H-D-1.mat', 'H-D-2.mat', 'H-D-3.mat'],
    1: ['I-D-1.mat', 'I-D-2.mat', 'I-D-3.mat'],
    3: ['O-D-1.mat', 'O-D-2.mat', 'O-D-3.mat']
}

folder_path = os.path.join(os.getcwd(), "UO", "UO")

def read_dict(mat_dict):
    x, y = [], []
    for label, file_list in mat_dict.items():
        x_tensor, y_tensor = read_list(file_list, label)
        x.append(x_tensor)
        y.append(y_tensor)
        
    x = torch.cat(x, dim=0)
    y = torch.cat(y, dim=0)
    
    return x, y
        
def read_list(file_list, label):
    x, y = [], []
    for file_name in file_list:
        file_path = os.path.join(folder_path, file_name)
        data = loadmat(file_path)
        channel_1, channel_2 = data['Channel_1'], data['Channel_2']
        
        combined_channels = np.stack((channel_1.squeeze(), channel_2.squeeze()), axis=0)
        combined_tensor = torch.tensor(combined_channels)
        
        sample_tensor = sliding_window_subsample(combined_tensor, window_size=1024, step=1024)
        label_tensor = labels = torch.full((sample_tensor.shape[0],), label)
        x.append(sample_tensor)
        y.append(label_tensor)
        
    x = torch.cat(x, dim=0)
    y = torch.cat(y, dim=0)
    
    return x, y
        
def sliding_window_subsample(tensor, window_size=1024, step=1024):
    tensor = tensor.unsqueeze(1)
    return tensor.unfold(2, window_size, step).transpose(0, 1).transpose(1, 2).squeeze(0)

A_x, A_y = read_dict(domain_A)
B_x, B_y = read_dict(domain_B)
C_x, C_y = read_dict(domain_C)
D_x, D_y = read_dict(domain_D)

print(A_x.shape, A_y.shape)
print(B_x.shape, B_y.shape)
print(C_x.shape, C_y.shape)
print(D_x.shape, D_y.shape)

torch.Size([17577, 2, 1024]) torch.Size([17577])
torch.Size([17577, 2, 1024]) torch.Size([17577])
torch.Size([17577, 2, 1024]) torch.Size([17577])
torch.Size([17577, 2, 1024]) torch.Size([17577])


In [5]:
def train_test_split(x, y): # Split the tensor into training, validation and testing
    dataset = torch.utils.data.TensorDataset(x, y) # Combine x and y to ensure both are split in the same way
    
    total_size = len(dataset)
    train_size = int(0.6 * total_size)
    val_size = int(0.2 * total_size)
    test_size = total_size - train_size - val_size

    # Split the dataset
    train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])
    
    # Split x and y to maintain consistency with other dataset
    train = split_xy(train_dataset)
    val = split_xy(val_dataset)
    test = split_xy(test_dataset)
    
    return train, val, test 

# Split x and y to maintain consistency with other dataset
def split_xy(dataset):
    x, y = [], []
    for x_tensor, y_tensor in dataset:
        x.append(x_tensor)
        y.append(y_tensor)
    # Convert lists to tensors
    x = torch.stack(x)
    y = torch.stack(y)
    print(x.shape, y.shape)
    
    return {"samples": x,  "labels": y}

train_A, val_A, test_A = train_test_split(A_x, A_y)
train_B, val_B, test_B = train_test_split(B_x, B_y)
train_C, val_C, test_C = train_test_split(C_x, C_y)
train_D, val_D, test_D = train_test_split(D_x, D_y)

torch.Size([10546, 2, 1024]) torch.Size([10546])
torch.Size([3515, 2, 1024]) torch.Size([3515])
torch.Size([3516, 2, 1024]) torch.Size([3516])
torch.Size([10546, 2, 1024]) torch.Size([10546])
torch.Size([3515, 2, 1024]) torch.Size([3515])
torch.Size([3516, 2, 1024]) torch.Size([3516])
torch.Size([10546, 2, 1024]) torch.Size([10546])
torch.Size([3515, 2, 1024]) torch.Size([3515])
torch.Size([3516, 2, 1024]) torch.Size([3516])
torch.Size([10546, 2, 1024]) torch.Size([10546])
torch.Size([3515, 2, 1024]) torch.Size([3515])
torch.Size([3516, 2, 1024]) torch.Size([3516])


In [6]:
# Get parent directory
parent_dir = os.path.dirname(os.getcwd())
uo_dir = os.path.join(parent_dir, "dataset", "UO")

# Save the datasets
torch.save(train_A, os.path.join(uo_dir, 'train_U1.pt'))
torch.save(val_A, os.path.join(uo_dir, 'val_U1.pt'))
torch.save(test_A, os.path.join(uo_dir, 'test_U1.pt'))

torch.save(train_B, os.path.join(uo_dir, 'train_U2.pt'))
torch.save(val_B, os.path.join(uo_dir, 'val_U2.pt'))
torch.save(test_B, os.path.join(uo_dir, 'test_U2.pt'))

torch.save(train_C, os.path.join(uo_dir, 'train_U3.pt'))
torch.save(val_C, os.path.join(uo_dir, 'val_U3.pt'))
torch.save(test_C, os.path.join(uo_dir, 'test_U3.pt'))

torch.save(train_D, os.path.join(uo_dir, 'train_U4.pt'))
torch.save(val_D, os.path.join(uo_dir, 'val_U4.pt'))
torch.save(test_D, os.path.join(uo_dir, 'test_U4.pt'))