In [None]:
%pip install openl3

import numpy as np
import pandas as pd
from scipy.io import wavfile
import matplotlib.pyplot as plt
import os

from scipy.signal import spectrogram
from sklearn.metrics import accuracy_score

import openl3
import librosa
import librosa.display

In [None]:
# reading audio files from drive
machine = 'fan', 'gearbox', 'pump', 'slider', 'ToyCar', 'ToyTrain', 'valve']

def load_audio_files(base_directory, dataset='train'):
    categories = ['fan', 'gearbox', 'pump', 'slider', 'ToyCar', 'ToyTrain', 'valve']
    base_directories = [base_directory + '/' + category + '/' + dataset for category in categories]
    audio_files = []

    for i, base_directory in enumerate(base_directories):
        category = categories[i]
        for root, _, files in os.walk(base_directory):
            for filename in files:
                if filename.endswith('.wav'):
                    filepath = os.path.join(root, filename)
                    audio_files.append(filepath)
                    audio, sr = librosa.load(filepath, sr=None)
                    section = filename[8:10]
                    if 'normal' in filename:
                        audio_files.append((audio, sr, category, dataset, 'normal', section))
                        audio_files.append(base_directories)
                    elif 'anomaly' in filename:
                        audio_files.append((audio, sr, category, dataset, 'anomaly', section))
                        audio_files.append(base_directories)
    return audio_files

base_directory = "/path/to/dataset"
train_audio_files = load_audio_files(base_directory, dataset='train')
# train_data_waveform = [x[0] for x in train_audio_files]

In [None]:
source_test_audio_files = load_audio_files(base_directory, dataset='source_test')
target_test_audio_files = load_audio_files(base_directory, dataset='target_test')

test_audio_files = source_test_audio_files + target_test_audio_files

# open a file, where you stored the pickled data
# test_data_waveform = [x[0] for x in test_audio_files]

In [None]:
import pickle

f1 = "/path/to/dataset"
with open(f1, 'rb') as file:
    train_data = pickle.load(file)

f2 = "/path/to/dataset"
with open(f2, 'rb') as file:
    train_label = pickle.load(file)

# get the unique pretrained embeddings
f3 = "/path/to/dataset"
with open(f3, 'rb') as file:
    train_emb = pickle.load(file)

# deduplicate
unique = []
for sub_a in train_emb:
    if not any([np.array_equal(i, sub_a) for i in unique]):
        unique.append(sub_a)

pretrained = np.array(unique)

In [None]:
train_data = np.array([x[0] for x in train_audio_files])
train_machine = [x[2] for x in train_audio_files]
train_source = [x[3] for x in train_audio_files]
train_label = [x[4] for x in train_audio_files]
train_section = [x[5] for x in train_audio_files]

train_labels = [(a, b, c, d) for a, b, c, d in zip(train_machine, train_source, train_label, train_section)]

In [None]:
# Frame size and hop length in samples

def extract_mel_spectrogram(signal, sr, frame, hop ):
    frame_size = int(0.064 * sr)  # 64 ms frame size
    hop_length = int(0.032 * sr)  # 50% hop length

    # Extract the mel spectrogram
    mel_spectrogram = librosa.feature.melspectrogram(y=signal, sr=sr, n_fft=frame_size, hop_length=hop_length, n_mels=128)

    # Convert to log-mel spectrogram
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)

    return log_mel_spectrogram

# Concatenate 5 consecutive frames
def concatenate_frames(log_mel_spectrogram, P=5):
    num_frames = log_mel_spectrogram.shape[1]
    concatenated_features = []

    for i in range(num_frames - P + 1):
        feature_vector = log_mel_spectrogram[:, i:i + P].flatten()
        concatenated_features.append(feature_vector)

    return np.array(concatenated_features)


In [None]:
def pickle_data(feat_list, label_lst, filename1 ):
    import pickle
    features_arr = np.array(feat_list)
    labels = label_lst
    filename_data = f'{filename1}_data.pkl'
    filename_label = f'{filename1}_label.pkl'
    with open(filename_data, 'wb') as file1:
        pickle.dump(features_arr, file1)
    with open(filename_label, 'wb') as file2:
        pickle.dump(labels, file2)


In [None]:
features_list = []
for signal in train_data:
    spec = extract_mel_spectrogram(signal, signal.shape[0], 0.064, 0.032)
    feature = concatenate_frames(spec)
    features_list.append(feature)

In [None]:
base_directory = 'path/to/dataset'
source_test_audio_files = load_audio_files(base_directory, dataset='source_test')
target_test_audio_files = load_audio_files(base_directory, dataset='target_test')

test_audio_files = source_test_audio_files + target_test_audio_files
test_data = np.array([x[0] for x in test_audio_files])
test_machine = [x[2] for x in test_audio_files]
test_source = [x[3] for x in test_audio_files]
test_label = [x[4] for x in test_audio_files]
test_section = [x[5] for x in test_audio_files]
test_labels = [(a, b, c, d) for a, b, c, d in zip(test_machine, test_source, test_label, test_section)]

features_list = []
for signal in test_data:
    spec = extract_mel_spectrogram(signal, signal.shape[0], 0.064, 0.032)
    feature = concatenate_frames(spec)
    features_list.append(feature)

pickle_data(features_list, test_labels, 'test')

In [None]:
# import numpy as np
from sklearn.preprocessing import PowerTransformer

# Reshape data to 2D
train_data = np.array(features_list)
train_data_old = train_data
data = train_data
batch_size, height, width = data.shape
data_reshaped = data.reshape(batch_size, -1)  # Shape becomes (1500, 28*640)

# Initialize the PowerTransformer
pt = PowerTransformer(method='yeo-johnson', standardize=True)
# scaler = StandardScaler()

# Apply the transform
data_transformed = pt.fit_transform(data_reshaped)

# Reshape back to the original 3D shape
data_transformed_reshaped = data_transformed.reshape(batch_size, height, width)

# Print shapes to verify
print("Original shape:", data.shape)
print("Reshaped for transformation:", data_reshaped.shape)
print("Transformed shape:", data_transformed.shape)
print("Reshaped back to original shape:", data_transformed_reshaped.shape)

train_data = data_transformed_reshaped

test_data = np.array(features_list)
test_data_old = test_data
data = test_data

# Reshape data to 2D
batch_size, height, width = data.shape
data_reshaped = data.reshape(batch_size, -1)  # Shape becomes (1500, 28*640)

# Initialize the PowerTransformer
pt = PowerTransformer(method='yeo-johnson', standardize=True)
# scaler = StandardScaler()

# Apply the transform
data_transformed = pt.fit_transform(data_reshaped)

# Reshape back to the original 3D shape
data_transformed_reshaped = data_transformed.reshape(batch_size, height, width)

# Print shapes to verify
print("Original shape:", data.shape)
print("Reshaped for transformation:", data_reshaped.shape)
print("Transformed shape:", data_transformed.shape)
print("Reshaped back to original shape:", data_transformed_reshaped.shape)

test_data = data_transformed_reshaped

In [None]:
def pickle_data(features_arr, label_lst, filename1 ):
    import pickle
    # features_arr = np.array(feat_list)
    labels = label_lst
    filename_data = f'{filename1}_data_pt.pkl'
    filename_label = f'{filename1}_label_pt.pkl'
    with open(filename_data, 'wb') as file1:
        pickle.dump(features_arr, file1)
    with open(filename_label, 'wb') as file2:
        pickle.dump(labels, file2)

pickle_data(train_data, train_labels, 'train')

In [None]:
# get pretrained embeddings from raw waveform

model = openl3.models.load_audio_embedding_model(input_repr='mel128', content_type='env',
                                                 embedding_size=512, frontend='librosa')


def pickle_emb_batch(feat_list, filename):
    import pickle
    feat_arr = np.array(feat_list)
    filepath = f"{filename}.pkl"
    with open(filepath, 'wb') as file:
        pickle.dump(feat_arr, file)

In [None]:
emb_list = list()
ts_list = list()

for i, audio in enumerate(test_data_waveform):
    # print(i, audio.shape)

    emb, ts = openl3.get_audio_embedding(audio, 16000, model=model,
                                         input_repr='mel128', frontend='librosa',
                                         verbose=1)
    emb_list.append(emb)
    ts_list.append(ts)

    if i % 500 == 0:
        print(f'{i} processed')
        if i !=0:
            try:
                pickle_emb_batch(emb_list, f'emb_valve_test_batch{i+1}')
            except:
                pass
