In [1]:

import numpy as np
import os
import txt_module, json_module, dirs_module

data_dir = r'D:\FINKI\40_diploma_thesis\data\datasets_numpy'
arrays_index = txt_module.read_list_from_txt(r'D:\FINKI\40_diploma_thesis\data\datasets_numpy\index.txt')
min_measurement_lengths = json_module.read_json(r'D:\FINKI\40_diploma_thesis\metadata\min_measurement_lengths.json')



In [11]:
NEW_SIGNAL_DURATION_SECONDS = 1
OFFSET_SECONDS_DAMAGED = 0.05
OFFSET_SECONDS_HEALTHY = 0.01

write_data_dir = fr'G:\s3_test_datasets_numpy_' \
                 fr'{str(NEW_SIGNAL_DURATION_SECONDS).replace(".","")}S_' \
                 fr'OffsetHealthy{str(OFFSET_SECONDS_HEALTHY).replace(".","")}_' \
                 fr'OffsetDamaged{str(OFFSET_SECONDS_DAMAGED).replace(".","")}'
dirs_module.create_directory(write_data_dir)

In [12]:
filter_regime = False
regime = 'N15_M07_F10'

# s3
healthy_train = ['K001','K002','K003']
real_damage_train = ['KA04','KA15','KA22','KA30','KB23','KB27','KI04','KI17']
artificial_damage_train = ['KA01','KA05','KA07','KI01','KI03']

#s9
# healthy_train = ['K001']
# real_damage_train = ['KB24']
# artificial_damage_train = []

train_bearing_codes = healthy_train + artificial_damage_train + real_damage_train

features_to_generate = [
    # 'Mech_4kHz_Data',
    # 'HostService_64kHz_Data',
    # 'Temp_1Hz_Data',
    # 'Mech_4kHz_force',
    # 'HostService_64kHz_phase_current_1',
    # 'HostService_64kHz_phase_current_2',
    # 'Mech_4kHz_speed',
    # 'Temp_1Hz_temp_2_bearing_module',
    # 'Mech_4kHz_torque',
    'HostService_64kHz_vibration_1'
]

In [13]:
def number_of_samples_per_second_from_string(hz_string):
    hz_string = hz_string.split('_')[1].strip('Hz')
    kHz = False
    if hz_string.endswith('k'):
        kHz = True
        hz_string = hz_string.strip('k')
    return int(hz_string) * 1000 if kHz else int(hz_string)

samples_per_second_dict = {feature: number_of_samples_per_second_from_string(feature) for feature in arrays_index}
healthy_offset_samples_dict = {feature: samples_per_second * OFFSET_SECONDS_HEALTHY
                               for feature, samples_per_second in samples_per_second_dict.items()}
damaged_offset_samples_dict = {feature: samples_per_second * OFFSET_SECONDS_DAMAGED
                               for feature, samples_per_second in samples_per_second_dict.items()}

In [14]:
for file in os.listdir(data_dir):
    if file.startswith('index'): continue
    if file.split('_')[3] in train_bearing_codes: continue
    if filter_regime:
        if regime not in file: continue
    # print(file)
    file_ndarray = np.load(f'{data_dir}/{file}', allow_pickle = True)

    current_start_position = {feature: 0 for feature in arrays_index}

    new_signal_id = 0
    more_signals_to_generate = True
    while more_signals_to_generate:
        new_signal_id += 1

        # generating subsignals
        new_file_list_to_ndarray = list()
        feature_position = -1
        for feature_array in file_ndarray:
            feature_position += 1
            feature = arrays_index[feature_position]
            # ignore temperature since it is measured in 1Hz
            if feature not in features_to_generate: continue
            feature_array = feature_array[:min_measurement_lengths[feature]].astype(np.float32)

            start = int(current_start_position[feature])
            end = int(current_start_position[feature] + samples_per_second_dict[feature] * NEW_SIGNAL_DURATION_SECONDS)
            new_file_list_to_ndarray.append(feature_array[start:end])

        # new start posoition
        if file.split('_')[3].startswith('K00'):
            current_start_position = {feature: int(current_start_pos + healthy_offset_samples_dict[feature])
                                      for feature, current_start_pos in current_start_position.items()}
        else:
            current_start_position = {feature: int(current_start_pos + damaged_offset_samples_dict[feature])
                                      for feature, current_start_pos in current_start_position.items()}

        # break condition
        for feature, new_start_position in current_start_position.items():
            if new_start_position + samples_per_second_dict[feature] * NEW_SIGNAL_DURATION_SECONDS > min_measurement_lengths[feature]:
                more_signals_to_generate = False

        np.save(f'{write_data_dir}/{file.strip(".npy")}_{new_signal_id}.npy', np.array(new_file_list_to_ndarray))


KeyboardInterrupt: 

In [15]:
%reset