In [1]:
import pickle
import numpy as np
from statsmodels.regression.linear_model import burg
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold

In [2]:
def get_df_data(dataset_name, class_idx, num_train_domains):

    # Load the dataset
    with open(f'data/{dataset_name}.pkl', 'rb') as f:
        x, y, k = pickle.load(f)

    with open(f'data/{dataset_name}_fs.pkl', 'rb') as f:
        fs = pickle.load(f)

    # Filter out the samples that are used for finetuning
    x = x[fs == 0]
    y = y[fs == 0]
    k = k[fs == 0]
    
    x_ = x[(y == class_idx) & (k < num_train_domains)]
    y_ = y[(y == class_idx) & (k < num_train_domains)]
    k_ = k[(y == class_idx) & (k < num_train_domains)]

    return x_, y_, k_


def get_dp_data(dataset_name, class_idx, num_train_domains):

    # Load the dataset
    with open(f'data/{dataset_name}.pkl', 'rb') as f:
        x, y, k = pickle.load(f)

    with open(f'data/{dataset_name}_fs.pkl', 'rb') as f:
        fs = pickle.load(f)

    # Filter out the samples that are used for finetuning
    x = x[fs == 0]
    y = y[fs == 0]
    k = k[fs == 0]
    
    x_ = x[(y == class_idx) & (k >= num_train_domains)]
    y_ = y[(y == class_idx) & (k >= num_train_domains)]
    k_ = k[(y == class_idx) & (k >= num_train_domains)] - num_train_domains

    return x_, y_, k_



def safe_burg(x, order=4):
    if np.std(x) > 1e-6:  # Ensures there's enough variation in the data
        return burg(x, order)[0]
    else:
        return np.zeros(order)  # Return zeroed features if input data is constant
    

# Ensure no zero values in the entropy calculation
def entropy_safe(x):
    x_safe = np.clip(x, 1e-6, None)  # clip only lower bound
    return -np.sum(x_safe * np.log(x_safe), axis=2)


def extract_features_all(x):
    mean = np.mean(x, axis=2)
    std = np.std(x, axis=2)
    var = np.var(x, axis=2)
    min = np.min(x, axis=2)
    max = np.max(x, axis=2)
    thirdmoment = np.mean((x - np.mean(x, axis=2, keepdims=True))**3, axis=2)
    fourthmoment = np.mean((x - np.mean(x, axis=2, keepdims=True))**4, axis=2)
    skewness = thirdmoment / ((std+1e-6)**3)
    kurtosis = fourthmoment / ((std+1e-6)**4)
    mad = np.median(np.abs(x - np.median(x, axis=2, keepdims=True)), axis=2)
    sma = np.sum(np.abs(x), axis=2)
    energy = np.sum(x**2, axis=2)
    iqr = np.percentile(x, 75, axis=2) - np.percentile(x, 25, axis=2)
    firstquartile = np.percentile(x, 25, axis=2)
    secondquartile = np.percentile(x, 50, axis=2)
    thirdquartile = np.percentile(x, 75, axis=2)
    entropy = entropy_safe(x)
    autocorr_x = np.array([safe_burg(x[i, 0, :], order=4) for i in range(x.shape[0])])
    autocorr_y = np.array([burg(x[i, 1, :], order=4)[0] for i in range(x.shape[0])])
    autocorr_z = np.array([burg(x[i, 2, :], order=4)[0] for i in range(x.shape[0])])
    
    return np.concatenate([mean, std, var, min, max, thirdmoment, fourthmoment, 
                           skewness, kurtosis, mad, sma, energy, iqr, firstquartile, 
                           secondquartile, thirdquartile, entropy, 
                           autocorr_x, autocorr_y, autocorr_z], axis=1)

def extract_temporal_features(x):
    x = np.clip(x, 0, 1)
    return extract_features_all(x)


def extract_spectral_features(x):
    x_freq = np.fft.rfft(x, axis=2)
    x_mag = np.abs(x_freq)
    return extract_features_all(x_mag)


def extract_features(x):
    x_temporal = extract_temporal_features(x)
    x_spectral = extract_spectral_features(x)
    return np.concatenate([x_temporal, x_spectral], axis=1)


def remap_labels(y):
    label_map = {clss: i for i, clss in enumerate(np.unique(y))}
    return np.array([label_map[clss] for clss in y])


def calculate_tstr_score(x_df, y_df, x_dp, y_dp):
    # Extract features
    x_df = extract_features(x_df)
    x_dp = extract_features(x_dp)

    # Remap labels
    y_df = remap_labels(y_df)
    y_dp = remap_labels(y_dp)

    train_data = lgb.Dataset(x_df, label=y_df)

    num_classes = len(np.unique(y_df))

    params = {
        'objective': 'multiclass' if num_classes > 2 else 'binary',
        'num_class': num_classes if num_classes > 2 else 1,
        'metric': 'multi_logloss' if num_classes > 2 else 'binary_logloss',
        'seed': 2710,
        'verbosity': -1
    }

    model = lgb.train(params, train_data)
    y_pred = model.predict(x_dp)
    acc = np.mean(np.argmax(y_pred, axis=1) == y_dp)
    
    return acc

In [3]:
dataset = 'realworld'

if dataset == 'realworld':
    dataset_name = 'realworld_128_3ch_4cl'
    num_df_domains = 10
    num_dp_domains = 5
    num_classes = 4
    class_names = ['WAL', 'RUN', 'CLD', 'CLU']

elif dataset == 'cwru':
    dataset_name = 'cwru_256_3ch_5cl'
    num_df_domains = 4
    num_dp_domains = 4
    num_classes = 5
    class_names = ['IR', 'Ball', 'OR_centred', 'OR_orthogonal', 'OR_opposite']

classes_dict = {clss: i for i, clss in enumerate(class_names)}

for src_class in class_names:
    trg_classes = [clss for clss in class_names if clss != src_class]

    x_df = []
    y_df = []
    k_df = []

    x_dp = []
    y_dp = []
    k_dp = []

    for trg_class in trg_classes:
        x_df_, y_df_, k_df_ = get_df_data(dataset_name, classes_dict[trg_class], num_df_domains)
        x_dp_, y_dp_, k_dp_ = get_dp_data(dataset_name, classes_dict[trg_class], num_df_domains)

        x_df.append(x_df_)
        y_df.append(y_df_)
        k_df.append(k_df_)

        x_dp.append(x_dp_)
        y_dp.append(y_dp_)
        k_dp.append(k_dp_)

    x_df = np.concatenate(x_df, axis=0)
    y_df = np.concatenate(y_df, axis=0)
    k_df = np.concatenate(k_df, axis=0)

    x_dp = np.concatenate(x_dp, axis=0)
    y_dp = np.concatenate(y_dp, axis=0)
    k_dp = np.concatenate(k_dp, axis=0)
    
    acc = calculate_tstr_score(x_df, y_df, x_dp, y_dp)

    print(f'{src_class} -> {trg_classes}: {acc:.2f}')

WAL -> ['RUN', 'CLD', 'CLU']: 0.93
RUN -> ['WAL', 'CLD', 'CLU']: 0.76
CLD -> ['WAL', 'RUN', 'CLU']: 0.88
CLU -> ['WAL', 'RUN', 'CLD']: 0.89


In [4]:
dataset = 'cwru'

if dataset == 'realworld':
    dataset_name = 'realworld_128_3ch_4cl'
    num_df_domains = 10
    num_dp_domains = 5
    num_classes = 4
    class_names = ['WAL', 'RUN', 'CLD', 'CLU']

elif dataset == 'cwru':
    dataset_name = 'cwru_256_3ch_5cl'
    num_df_domains = 4
    num_dp_domains = 4
    num_classes = 5
    class_names = ['IR', 'Ball', 'OR_centred', 'OR_orthogonal', 'OR_opposite']

classes_dict = {clss: i for i, clss in enumerate(class_names)}

for src_class in class_names:
    trg_classes = [clss for clss in class_names if clss != src_class]

    x_df = []
    y_df = []
    k_df = []

    x_dp = []
    y_dp = []
    k_dp = []

    for trg_class in trg_classes:
        x_df_, y_df_, k_df_ = get_df_data(dataset_name, classes_dict[trg_class], num_df_domains)
        x_dp_, y_dp_, k_dp_ = get_dp_data(dataset_name, classes_dict[trg_class], num_df_domains)

        x_df.append(x_df_)
        y_df.append(y_df_)
        k_df.append(k_df_)

        x_dp.append(x_dp_)
        y_dp.append(y_dp_)
        k_dp.append(k_dp_)

    x_df = np.concatenate(x_df, axis=0)
    y_df = np.concatenate(y_df, axis=0)
    k_df = np.concatenate(k_df, axis=0)

    x_dp = np.concatenate(x_dp, axis=0)
    y_dp = np.concatenate(y_dp, axis=0)
    k_dp = np.concatenate(k_dp, axis=0)
    
    acc = calculate_tstr_score(x_df, y_df, x_dp, y_dp)

    print(f'{src_class} -> {trg_classes}: {acc:.2f}')

IR -> ['Ball', 'OR_centred', 'OR_orthogonal', 'OR_opposite']: 0.28
Ball -> ['IR', 'OR_centred', 'OR_orthogonal', 'OR_opposite']: 0.18
OR_centred -> ['IR', 'Ball', 'OR_orthogonal', 'OR_opposite']: 0.29
OR_orthogonal -> ['IR', 'Ball', 'OR_centred', 'OR_opposite']: 0.35
OR_opposite -> ['IR', 'Ball', 'OR_centred', 'OR_orthogonal']: 0.45
