In [None]:
'''
The code of this inference stage largely based on the code in
https://github.com/johnmartinsson/bird-species-classification

I demonstrate the testing for the dual input mode.
'''

from sklearn import metrics
import skimage
import librosa
import numpy as np
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.callbacks import Callback, ModelCheckpoint

import config
#from models.resnet import ResNetBuilder
from models.cnn import CNNBaseline, CNNDual

def load_test_dual(directory, target_size, audio_mode1, audio_mode2):
    if not os.path.isdir(directory):
        raise ValueError("data filepath is invalid")

    classes = []
    for subdir in sorted(os.listdir(directory)):
        if os.path.isdir(os.path.join(directory, subdir)):
            classes.append(subdir)
    nb_classes = len(classes)
    class_indices = dict(zip(classes, range(nb_classes)))
    index_to_species = dict(zip(range(nb_classes), classes))

    X_test1 = []
    X_test2 = []
    Y_test = []
    training_files = []
    for subdir in classes:
        subpath = os.path.join(directory, subdir)
        # load sound data
        class_segments = glob.glob(os.path.join(subpath, "*.wav"))
        # print(subdir+": ", len(class_segments))
        print("group segments ... ")
        samples = group_segments(class_segments)
        for sample in samples:
            training_files.append(sample)
            data1 = load_segments(sample, target_size, inputmode1)
            data2 = load_segments(sample, target_size, inputmode2)
            X_test1.append(data1)
            X_test2.append(data2)
            y = np.zeros(nb_classes)
            y[class_indices[subdir]] = 1.0
            Y_test.append(y)
    return np.asarray(X_test1), np.asarray(X_test2), np.asarray(Y_test), training_files

def load_test_single(directory, target_size, inputmode):
    if not os.path.isdir(directory):
        raise ValueError("data filepath is invalid")

    classes = []
    for subdir in sorted(os.listdir(directory)):
        if os.path.isdir(os.path.join(directory, subdir)):
            classes.append(subdir)
    nb_classes = len(classes)
    class_indices = dict(zip(classes, range(nb_classes)))
    index_to_species = dict(zip(range(nb_classes), classes))

    X_test = []
    Y_test = []
    training_files = []
    for subdir in classes:
        subpath = os.path.join(directory, subdir)
        # load sound data
        class_segments = glob.glob(os.path.join(subpath, "*.wav"))
        # print(subdir+": ", len(class_segments))
        print("group segments ... ")
        samples = group_segments(class_segments)
        for sample in samples:
            training_files.append(sample)
            data = load_segments(sample, target_size, inputmode)
            X_test.append(data)
            y = np.zeros(nb_classes)
            y[class_indices[subdir]] = 1.0
            Y_test.append(y)
    return np.asarray(X_test), np.asarray(Y_test), training_files

def load_segments(segments, target_size, input_data_mode):
    print(segments, target_size, input_data_mode)
    data = []
    for segment in segments:
        (fs, signal) = utils.read_wave_file(segment)
        if input_data_mode == "mfcc":
            sample = librosa.feature.mfcc(signal, fs, n_mfcc=target_size[0])
            sample = skimage.transform.resize(sample, target_size)
            sample = sample.reshape((sample.shape[0],
                                     sample.shape[1], 1))
        if input_data_mode == "mfcc_delta":
            mfcc = librosa.feature.mfcc(signal, fs, n_mfcc=target_size[0])
            mfcc_delta_3 = librosa.feature.delta(mfcc, width=3, order=1)
            mfcc_delta_11 = librosa.feature.delta(mfcc, width=11, order=1)
            mfcc_delta_19 = librosa.feature.delta(mfcc, width=19, order=1)

            mfcc = skimage.transform.resize(mfcc, target_size)
            mfcc_delta_3 = skimage.transform.resize(mfcc_delta_3, target_size)
            mfcc_delta_11 = skimage.transform.resize(mfcc_delta_11, target_size)
            mfcc_delta_19 = skimage.transform.resize(mfcc_delta_19, target_size)

            mfcc = mfcc[0:mfcc.shape[0], 0:mfcc.shape[1], 0:1]
            mfcc_delta_3 = mfcc_delta_3[0:mfcc_delta_3.shape[0], 0:mfcc_delta_3.shape[1], 0:1]
            mfcc_delta_11 = mfcc_delta_11[0:mfcc_delta_11.shape[0], 0:mfcc_delta_11.shape[1], 0:1]
            mfcc_delta_19 = mfcc_delta_19[0:mfcc_delta_19.shape[0], 0:mfcc_delta_19.shape[1], 0:1]
            sample = np.concatenate([mfcc, mfcc_delta_3, mfcc_delta_11, mfcc_delta_19], axis=2)

        if input_data_mode == "spectrogram":
            sample = wave_to_sample_spectrogram(signal, fs)
            sample = skimage.transform.resize(sample, target_size)
            sample = sample.reshape((sample.shape[0], sample.shape[1], 1))
        
        if input_data_mode == "melspectrogram":
            # Han window of size 512, and hop size 128 (75% overlap)
            #spect = wave_to_sample_spectrogram(wave, fs)
            # Passing through arguments to the Mel filters
            sample = librosa.feature.melspectrogram(signal, fs)
            #mel = librosa.feature.melspectrogram(S=spect, sr=fs)
            sample = librosa.power_to_db(sample)
            sample = skimage.transform.resize(sample, target_size)
            sample = sample.reshape((sample.shape[0], sample.shape[1], 1))
            
        data.append(sample)

    return np.asarray(data)

def group_segments(segments):
    unique_samples = []
    for segment in segments:
        splits = segment.split('_')
        if not splits[2] in unique_samples:
            unique_samples.append(splits[2])

    samples = []
    for unique_sample in unique_samples:
        sample = []
        for segment in segments:
            if segment.split('_')[2] == unique_sample:
                sample.append(segment)
        # print(unique_sample, ":", len(sample))
        samples.append(sample)
    return samples

def average_prediction(modelx, X1, X2):
    y_scores = modelx.predict([X1,X2])
    y_average_score = np.mean(y_scores, axis=0)
    return y_average_score

def average_prediction0(modelx, X):
    y_scores = modelx.predict(X)
    y_average_score = np.mean(y_scores, axis=0)
    return y_average_score

def mean_average_precision(y_trues, y_scores):
    """
    y_trues  : [nb_samples, nb_classes]
    y_scores : [nb_samples, nb_classes]
    map      : float (MAP)
    """
    aps = []
    for y_t, y_s in zip(y_trues, y_scores):
        ap = metrics.average_precision_score(y_t, y_s)
        aps.append(ap)
    return np.mean(np.array(aps))

    
model_name = config.model_name
print("loading model {} ... ".format(model_name))
    
model = None
if model_name == 'cnn_baseline':
    model = CNNBaseline(nb_classes, input_shape)
elif model_name == 'cnn_dual':
    model = CNNDual(nb_classes, input_shape1, input_shape2)
elif model_name == 'resnet_18':
    model = ResNetBuilder.build_resnet_18(input_shape, nb_classes)
elif model_name == 'resnet_34':
    model = ResNetBuilder.build_resnet_34(input_shape, nb_classes)
elif model_name == 'resnet_50':
    model = ResNetBuilder.build_resnet_50(input_shape, nb_classes)
elif model_name == 'resnet_101':
    model = ResNetBuilder.build_resnet_101(input_shape, nb_classes)
elif model_name == 'resnet_152':
    model = ResNetBuilder.build_resnet_152(input_shape, nb_classes)
else:
    raise ValueError("Can not find model ", model_name, ".")

model.load_weights(config.best_weight_file_path)
model.compile(loss="categorical_crossentropy", optimizer=config.optimizer)

print("loading test data ... ")
if inputmode=='dual':
    (X_tests1, X_tests2 Y_tests, training_files) = load_test_dual(config.test_path, 
                                                                  config.target_size, 
                                                                  config.audio_mode1,
                                                                  config.audio_mode2)
        
y_scores = []
y_trues = Y_tests
print("running predictions ... ")
for X_t1,X_t2 in zip(X_tests1,X_tests2):
    y_score = average_prediction(model, X_t1,X_t2)
    y_scores.append(y_score)

print(mean_average_precision(y_trues, y_scores))
    
y_preds = np.argmax(y_scores,axis=1)
y_tests = np.argmax(Y_tests,axis=1)

# Print the confusion matrix
print(metrics.confusion_matrix(y_tests, y_preds))

# Print the precision and recall, among other metrics
print(metrics.classification_report(y_tests, y_preds, digits=4))

In [None]:
#An example of the mel-spectrogram visualization
import matplotlib.pyplot as plt
import librosa.display
import librosa

fname = 'data/test/ Otus_angelinae/Otusangelinae141909-extract3.wav'
y, sr = librosa.load(fname)
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000)
plt.figure(figsize=(20, 5))
S_dB = librosa.power_to_db(S, ref=np.max)
librosa.display.specshow(S_dB, x_axis='time',
                          y_axis='mel', sr=sr)
#plt.colorbar(format='%+2.0f dB')
plt.title('Mel-frequency spectrogram')
#plt.tight_layout()
plt.show()