In [None]:
import sys
import os
sys.path.insert(1, os.path.join(sys.path[0], '..'))

from utils.utilities import move_data_to_device

from interpretability_utilities import plot_layer_attribution_importance, plot_frame_attributions
from interpretability_utilities import load_workspace_file, plot_audio_attributions

import pickle

import numpy as np
import librosa

import librosa.display
from scipy.io import wavfile
from scipy.stats import ttest_ind

import mlflow
import torch

import matplotlib.pyplot as plt
import seaborn as sns

from captum.attr import DeepLift, LayerDeepLift, LayerIntegratedGradients
from captum.concept import Concept, TCAV

from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA

from captum.concept._utils.data_iterator import dataset_to_dataloader, CustomIterableDataset
from captum.concept._utils.common import concepts_to_str

## Settings and Utils

In [None]:
# Adjust according to your experiment
ref_fold = "8"
run_id = ""
tracking_server = ""
workspace_file = ""
dataset_dir = ""
device = 'cuda' if torch.cuda.is_available() else 'cpu'

mlflow.set_tracking_uri(f"{tracking_server}:5000")
logged_model = mlflow.pytorch.load_model(f"runs:/{run_id}/models")
logged_model = logged_model.eval()

client = mlflow.MlflowClient()
run = client.get_run(run_id)
run_data = run.data
tags = run_data.tags
sr = int(tags["sample_rate"])
n_fft = int(tags["window_size"])
hop_length = int(tags["hop_size"])
window_size = int(tags["window_size"])

In [None]:
inp_data, indexes, labels, _, lb_to_idx = load_workspace_file(workspace_file, ref_fold,
                                dataset_dir, device)

idx_to_label = {idx: label for label, idx in lb_to_idx.items()}
target = [idx for label, idx in lb_to_idx.items() if label.startswith("albilora")]

inp_data.requires_grad_()

In [None]:
rng = np.random.default_rng(135)

Until this execution, only 'others' class was predicted, I'll take five random audios and define concepts from each one for TCAV

In [None]:
audios_ref_indices = rng.choice(inp_data.size()[0], replace=False, size=5)
ref_audios = inp_data[audios_ref_indices]

print(np.argmax(labels[audios_ref_indices], axis=1), idx_to_label)

In [None]:
def filter_audio_frequencies(audio, fmin, fmax, sr, n_fft, hop_size):
    stft_mag, stft_phase = librosa.magphase(
        librosa.stft(audio,
            n_fft=n_fft, win_length=n_fft, hop_length=hop_size, center=True)
    )

    frequencies = librosa.fft_frequencies(sr=sr, n_fft=n_fft)

    # filter undesired frequencies
    less_than_min = frequencies < fmin
    greather_than_max = frequencies > fmax

    # Join the fmin and fmax undesired frequencies
    undesired_frequencies = np.flatnonzero(np.logical_or(less_than_min, greather_than_max))

    new_mag = stft_mag

    for i in undesired_frequencies:
        new_mag[i] = 0.0

    filtered_audio = librosa.istft(new_mag * stft_phase, hop_length=hop_size,
                                    win_length=n_fft, window="hann", center=True,
                                    length=len(audio))

    return filtered_audio

References to use

- 2kHz-4kHz frequency range of an audio
- 4kHz-6kHz frequency range of an audio
- 6kHz-8kHz frequency range of an audio
- 8kHz-10kHz frequency range of an audio
- Gaussian noise
- Impulsive noise

## Shrikumar et al. (2017) - DeepLift - Rescale Rule

In [None]:
def set_reference(method, audio, **kwargs):
    """Modifies an audio for compare references"""
    new_audio = audio.copy()
    if method == "gaussian":
        kwargs.setdefault('cycles', 1)
        for i in range(kwargs["cycles"]):
            new_audio += rng.normal(size=len(new_audio)) - rng.normal(size=len(new_audio))

    elif method == "impulsive":
        noise = rng.normal(size=len(new_audio))
        new_audio = np.where(noise > 0.85, new_audio + noise, new_audio)

    else:
        freqs = method.split('-')
        assert len(freqs) == 2, "Invalid values for min frequency and max frequency"
        fmin, fmax = float(freqs[0])*1000, float(freqs[1])*1000

        new_audio = filter_audio_frequencies(new_audio, fmin, fmax, kwargs["sr"],
                                            kwargs["n_fft"], kwargs["hop_size"])

    return new_audio

In [None]:
baseline_2k = []
for audio in inp_data:
    baseline_2k.append(set_reference("2-4", audio.cpu().detach().numpy(), **{"sr": sr, "n_fft": n_fft, "hop_size": hop_length}))

baseline_2k = move_data_to_device(np.array(baseline_2k), device)

In [None]:
baseline_4k = []
for audio in inp_data:
    baseline_4k.append(set_reference("4-6", audio.cpu().detach().numpy(), **{"sr": sr, "n_fft": n_fft, "hop_size": hop_length}))

baseline_4k = move_data_to_device(np.array(baseline_4k), device)

In [None]:
baseline_6k = []
for audio in inp_data:
    baseline_6k.append(set_reference("6-8", audio.cpu().detach().numpy(), **{"sr": sr, "n_fft": n_fft, "hop_size": hop_length}))

baseline_6k = move_data_to_device(np.array(baseline_6k), device)

In [None]:
baseline_8k = []
for audio in inp_data:
    baseline_8k.append(set_reference("8-10", audio.cpu().detach().numpy(), **{"sr": sr, "n_fft": n_fft, "hop_size": hop_length}))

baseline_8k = move_data_to_device(np.array(baseline_8k), device)

In [None]:
baseline_gaussian = []
for audio in inp_data:
    baseline_gaussian.append(set_reference("gaussian", audio.cpu().detach().numpy(), **{"sr": sr, "n_fft": n_fft, "hop_size": hop_length}))

baseline_gaussian = move_data_to_device(np.array(baseline_gaussian), device)

In [None]:
baseline_impulsive = []
for audio in inp_data:
    baseline_impulsive.append(set_reference("impulsive", audio.cpu().detach().numpy(), **{"sr": sr, "n_fft": n_fft, "hop_size": hop_length}))

baseline_impulsive = move_data_to_device(np.array(baseline_impulsive), device)

### Model attribution

In [None]:
dl = DeepLift(logged_model)

In [None]:
def model_importance(dl, inp_data, baseline, target, title="Average frames importance using DeepLift"):
    """Computes the frames importance given a reference and the target classes"""

    attributions = dl.attribute(inp_data, baseline, target=target)
    plot_frame_attributions(attributions, title=title)
    return attributions

#### Silent audio

0 tensor

Compute attributions for S. albilora call and syllables

In [None]:
gen_attribution = dl.attribute(inp_data, target=np.argmax(labels, axis=1))

In [None]:
plot_frame_attributions(gen_attribution,
        title="Atribuição média do modelo para cada frame do conjunto de validação usando DeepLift")

In [None]:
fig, ax = plt.subplots(figsize=(10, 6))

ax.bar(np.arange(gen_attribution.size()[1]), np.mean(gen_attribution.cpu().detach().numpy(), axis=0))

ax.set_xlabel("Tempo [s]")
ax.set_ylabel("Atribuição")

ax.set_xticks(np.arange(0, gen_attribution.size()[1], int(sr/10)))
ax.set_xticklabels(np.arange(0, 0.75, 0.10, dtype=np.float32))

ax.set_title("Atribuição média do modelo para cada frame do conjunto de validação usando DeepLift")

fig.show()

In [None]:
del gen_attribution

if device == 'cuda':
        torch.cuda.empty_cache()

#### Frequency range and noise

2kHz-4kHz activity

In [None]:
activity_2kHz_4kHz_attributions = model_importance(dl, inp_data, baseline_2k,
        np.argmax(labels, axis=1), "Atribuição média- Atividades entre 2kHz-4kHz")

plot_audio_attributions(activity_2kHz_4kHz_attributions, "Atribuição média- Atividades entre 2kHz-4kHz")
del activity_2kHz_4kHz_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

4kHz-6kHz activity

In [None]:
activity_4kHz_6kHz_attributions = model_importance(dl, inp_data, baseline_4k,
        np.argmax(labels, axis=1), "Atribuição média- Atividades entre 4kHz-6kHz")

del activity_4kHz_6kHz_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

6kHz-8kHz activity

In [None]:
activity_6kHz_8kHz_attributions = model_importance(dl, inp_data, baseline_6k,
        np.argmax(labels, axis=1), "Atribuição média- Atividades entre 6kHz-8kHz")

del activity_6kHz_8kHz_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

8kHz-10kHz activity

In [None]:
activity_8kHz_10kHz_attributions = model_importance(dl, inp_data, baseline_8k,
        np.argmax(labels, axis=1), "Atribuição média- Atividades entre 8kHz-10kHz")

del activity_8kHz_10kHz_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Gaussian noise

In [None]:
gaussian_attributions = model_importance(dl, inp_data, baseline_gaussian,
        np.argmax(labels, axis=1), "Atribuição média- Atividades com ruído gaussiano")

del gaussian_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Impulsive noise

In [None]:
impulsive_attributions = model_importance(dl, inp_data, baseline_impulsive,
        np.argmax(labels, axis=1), "Atribuição média- Atividades com ruído impulsivo")

del impulsive_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

### Block attribution

#### First block

In [None]:
dl_first_conv_block = LayerDeepLift(logged_model, logged_model.base.conv_block1)

##### Silent audio

In [None]:
dl_1st_block_attributions = dl_first_conv_block.attribute(inp_data, target=np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_attributions,
    "1st block (silence) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_attributions, f_out)

del dl_1st_block_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

##### Frequency range and noise

2kHz-4kHz activity

In [None]:
dl_1st_block_2k_activity_attributions = dl_first_conv_block.attribute(inp_data,
        baseline_2k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_2k_activity_attributions,
        "1st block (2kHz-4kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_2k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_2k_activity_attributions, f_out)

del dl_1st_block_2k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

4kHz-6kHz activity

In [None]:
dl_1st_block_4k_activity_attributions = dl_first_conv_block.attribute(inp_data,
        baseline_4k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_4k_activity_attributions,
        "1st block (4kHz-6kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_4k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_4k_activity_attributions, f_out)

del dl_1st_block_4k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

6kHz-8kHz activity

In [None]:
dl_1st_block_6k_activity_attributions = dl_first_conv_block.attribute(inp_data,
        baseline_6k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_6k_activity_attributions,
        "1st block (6kHz-8kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_6k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_6k_activity_attributions, f_out)

del dl_1st_block_6k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

8kHz-10kHz activity

In [None]:
dl_1st_block_8k_activity_attributions = dl_first_conv_block.attribute(inp_data,
        baseline_8k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_8k_activity_attributions,
        "1st block (8kHz-10kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_8k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_8k_activity_attributions, f_out)

del dl_1st_block_8k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Gaussian noise

In [None]:
dl_1st_block_gaussian_attributions = dl_first_conv_block.attribute(inp_data,
        baseline_gaussian, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_gaussian_attributions,
        "1st block (gaussian noise) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_gaussian_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_gaussian_attributions, f_out)

del dl_1st_block_gaussian_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Impulsive noise

In [None]:
dl_1st_block_impulsive_attributions = dl_first_conv_block.attribute(inp_data,
        baseline_impulsive, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_1st_block_impulsive_attributions,
        "1st block (impulsive noise) average frame importance using DeepLift")

In [None]:
with open("data/dl_1st_block_impulsive_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_1st_block_impulsive_attributions, f_out)

del dl_1st_block_impulsive_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

#### 2nd block

In [None]:
dl_second_conv_block = LayerDeepLift(logged_model, logged_model.base.conv_block2)

##### Silent audio

In [None]:
dl_2nd_block_attributions = dl_second_conv_block.attribute(inp_data,
        target=np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_attributions,
        "2nd block (silence) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_attributions, f_out)

del dl_2nd_block_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

2kHz-4kHz activity

In [None]:
dl_2nd_block_2k_activity_attributions = dl_second_conv_block.attribute(inp_data,
        baseline_2k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_2k_activity_attributions,
        "2nd block (2kHz-4kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_2k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_2k_activity_attributions, f_out)

del dl_2nd_block_2k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

4kHz-6kHz activity

In [None]:
dl_2nd_block_4k_activity_attributions = dl_second_conv_block.attribute(inp_data,
        baseline_4k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_4k_activity_attributions,
        "2nd block (4kHz-6kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_4k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_4k_activity_attributions, f_out)

del dl_2nd_block_4k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

6kHz-8kHz activity

In [None]:
dl_2nd_block_6k_activity_attributions = dl_second_conv_block.attribute(inp_data,
        baseline_6k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_6k_activity_attributions,
        "2nd block (6kHz-8kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_6k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_6k_activity_attributions, f_out)

del dl_2nd_block_6k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

8kHz-10kHz activity

In [None]:
dl_2nd_block_8k_activity_attributions = dl_second_conv_block.attribute(inp_data,
        baseline_8k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_8k_activity_attributions,
        "2nd block (8kHz-10kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_8k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_8k_activity_attributions, f_out)

del dl_2nd_block_8k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Gaussian noise

In [None]:
dl_2nd_block_gaussian_attributions = dl_second_conv_block.attribute(inp_data,
        baseline_gaussian, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_gaussian_attributions,
        "2nd block (gaussian noise) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_gaussian_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_gaussian_attributions, f_out)

del dl_2nd_block_gaussian_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Impulsive noise

In [None]:
dl_2nd_block_impulsive_attributions = dl_second_conv_block.attribute(inp_data,
        baseline_impulsive, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_2nd_block_impulsive_attributions,
        "2nd block (impulsive noise) average frame importance using DeepLift")

In [None]:
with open("data/dl_2nd_block_impulsive_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_2nd_block_impulsive_attributions, f_out)

del dl_2nd_block_impulsive_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

#### 5th block

In [None]:
dl_fifth_conv_block = LayerDeepLift(logged_model, logged_model.base.conv_block5)

##### Silent audio

In [None]:
dl_5th_block_attributions = dl_fifth_conv_block.attribute(inp_data,
        target=np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_attributions,
        "5th block (silence) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_attributions, f_out)

del dl_5th_block_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

2kHz-4kHz activity

In [None]:
dl_5th_block_2k_activity_attributions = dl_fifth_conv_block.attribute(inp_data,
        baseline_2k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_2k_activity_attributions,
        "5th block (2kHz-4kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_2k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_2k_activity_attributions, f_out)

del dl_5th_block_2k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

4kHz-6kHz activity

In [None]:
dl_5th_block_4k_activity_attributions = dl_fifth_conv_block.attribute(inp_data,
        baseline_4k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_4k_activity_attributions,
        "5th block (4kHz-6kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_4k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_4k_activity_attributions, f_out)

del dl_5th_block_4k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

6kHz-8kHz activity

In [None]:
dl_5th_block_6k_activity_attributions = dl_fifth_conv_block.attribute(inp_data,
        baseline_6k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_6k_activity_attributions,
        "5th block (6kHz-8kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_6k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_6k_activity_attributions, f_out)

del dl_5th_block_6k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

8kHz-10kHz activity

In [None]:
dl_5th_block_8k_activity_attributions = dl_fifth_conv_block.attribute(inp_data,
        baseline_8k, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_8k_activity_attributions,
        "5th block (8kHz-10kHz) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_8k_activity_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_8k_activity_attributions, f_out)

del dl_5th_block_8k_activity_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Gaussian noise

In [None]:
dl_5th_block_gaussian_attributions = dl_fifth_conv_block.attribute(inp_data,
        baseline_gaussian, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_gaussian_attributions,
        "5th block (gaussian noise) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_gaussian_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_gaussian_attributions, f_out)

del dl_5th_block_gaussian_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

Impulsive noise

In [None]:
dl_5th_block_impulsive_attributions = dl_fifth_conv_block.attribute(inp_data,
        baseline_impulsive, np.argmax(labels, axis=1))

In [None]:
plot_layer_attribution_importance(dl_5th_block_impulsive_attributions,
        "5th block (impulsive noise) average frame importance using DeepLift")

In [None]:
with open("data/dl_5th_block_impulsive_attributions.pkl", "wb") as f_out:
        pickle.dump(dl_5th_block_impulsive_attributions, f_out)

del dl_5th_block_impulsive_attributions

if device == 'cuda':
        torch.cuda.empty_cache()

## Kim et al. (2018) - TCAV

For this method we obtain a dictionary with the attributions for each concept. I will not split for type of audio. See https://captum.ai/api/concept.html

In [None]:
layers = ["base.conv_block1", "base.conv_block2", "base.conv_block5"]
audio_tcav = TCAV(model=logged_model, layers=layers,
                layer_attr_method = LayerIntegratedGradients(
                    logged_model, None, multiply_by_inputs=False
                ))

Auxiliar method/function to define how concepts are created and assembled

In [None]:
def create_concept(name, audios, ids, concepts_path, force_save=False, **kwargs):
    saved = True
    
    if name == "gaussian":
        kwargs.setdefault('cycles', 1)

        save_path = os.path.join(concepts_path, "gaussian_noise")
        if force_save or not(os.path.exists(save_path)):
            os.makedirs(save_path)

            for id, audio in zip(ids, audios):
                new_audio = audio.copy()
                for _ in range(kwargs["cycles"]):
                    new_audio += rng.normal(size=len(audio)) - rng.normal(size=len(audio))

                wavfile.write(os.path.join(save_path, f"fold_{ref_fold}_ind{id}.wav"),
                                            kwargs["sr"], new_audio)
                
        else:
            saved = False
            

    elif name == "impulsive":

        save_path = os.path.join(concepts_path, "impulsive_noise")
        if force_save or not(os.path.exists(save_path)):
            os.makedirs(save_path)

            for id, audio in zip(ids, audios):
                new_audio = audio.copy()
                noise = rng.normal(size=len(audio))
                new_audio = np.where(noise > 0.85, audio + noise, new_audio)

                wavfile.write(os.path.join(save_path, f"fold_{ref_fold}_ind{id}.wav"),
                                            kwargs["sr"], new_audio)

        else:
            saved = False

    else:
        freqs = name.split('-')
        assert len(freqs) == 2, "Invalid values for min frequency and max frequency"
        fmin, fmax = float(freqs[0])*1000, float(freqs[1])*1000

        save_path = os.path.join(concepts_path, f"{fmin}_{fmax}Hz")
        
        if force_save or not(os.path.exists(save_path)):
            os.makedirs(save_path)

            for id, audio in zip(ids, audios):
                new_audio = filter_audio_frequencies(audio, fmin, fmax, kwargs["sr"],
                                                    kwargs["n_fft"], kwargs["hop_size"])

                wavfile.write(os.path.join(save_path, f"fold_{ref_fold}_ind{id}.wav"),
                                        kwargs["sr"], new_audio)
                
        else:
            saved = False

    if not(saved):
        print(f"Method called without saving concetps. Check force_save parameter: {force_save}",
                f" or concepts_path parameter for {name}: {save_path} passed.")

In [None]:
# See https://github.com/pytorch/captum/blob/master/tutorials/TCAV_Image.ipynb
def get_tensor_from_filename(filename):
    audio, _ = librosa.load(filename, sr=None)
    return audio

def assemble_concept(name, id, concepts_path="data/tcav/concepts/"):
    frequencies = name.split("-")
    if len(frequencies) == 2:
        name = f"{float(frequencies[0])*1000}_{float(frequencies[1])*1000}Hz"
    else:
        name = f"{name}_noise"

    concept_path = f"{os.path.join(concepts_path, name)}{os.sep}"
    dataset = CustomIterableDataset(get_tensor_from_filename, concept_path)
    concept_iter = dataset_to_dataloader(dataset)

    return Concept(id=id, name=name, data_iter=concept_iter)

Preprocessing

In [None]:
concepts_path = os.path.normpath("data/tcav/concepts")

In [None]:
kwargs = {"sr": sr, "n_fft": n_fft, "hop_size": hop_length}

[create_concept(f"{i}-{i+2}", ref_audios, int(i/2-1), concepts_path) for i in range(2, 10, 2)]

create_concept("gaussian", ref_audios, 4, concepts_path, **kwargs)

create_concept("impulsive", ref_audios, 5, concepts_path, **kwargs)

In [None]:
frequency_concepts = [assemble_concept(f"{i}-{i+2}", int(i/2-1))
                    for i in range(2, 10, 2)]
                    
noise_concepts = [assemble_concept("gaussian", 4), assemble_concept("impulsive", 5)]

In [None]:
experimental_set_rand_1 = [[frequency_concepts[0], noise_concepts[0]],
                        [frequency_concepts[0], noise_concepts[1]]]

experimental_set_rand_2 = [[frequency_concepts[1], noise_concepts[0]],
                        [frequency_concepts[1], noise_concepts[1]]]

experimental_set_rand_3 = [[frequency_concepts[2], noise_concepts[0]],
                        [frequency_concepts[2], noise_concepts[1]]]

experimental_set_rand_4 = [[frequency_concepts[3], noise_concepts[0]],
                        [frequency_concepts[3], noise_concepts[1]]]

Visualize TCAV scores

In [None]:
# See https://github.com/pytorch/captum/blob/master/tutorials/TCAV_Image.ipynb
def format_float(f):
    return float('{:.3f}'.format(f) if abs(f) >= 0.0005 else '{:.3e}'.format(f))

def plot_tcav_scores(experimental_sets, tcav_scores):
    _, ax = plt.subplots(1, len(experimental_sets), figsize = (25, 7))

    barWidth = 1 / (len(experimental_sets[0]) + 1)

    for idx_es, concepts in enumerate(experimental_sets):

        concepts = experimental_sets[idx_es]
        concepts_key = concepts_to_str(concepts)

        pos = [np.arange(len(layers))]
        for i in range(1, len(concepts)):
            pos.append([(x + barWidth) for x in pos[i-1]])
        _ax = (ax[idx_es] if len(experimental_sets) > 1 else ax)
        for i in range(len(concepts)):
            val = [format_float(scores['sign_count'][i]) for layer, scores in tcav_scores[concepts_key].items()]
            _ax.bar(pos[i], val, width=barWidth, edgecolor='white', label=concepts[i].name)

        # Add xticks on the middle of the group bars
        _ax.set_xlabel('Experimento {}'.format(str(idx_es+1)), fontweight='bold', fontsize=16)
        _ax.set_xticks([r + 0.3 * barWidth for r in range(len(layers))])
        _ax.set_xticklabels(layers, fontsize=16)

        # Create legend & Show graphic
        _ax.legend(fontsize=16, loc="lower right")

    plt.show()

### 2kHz-4kHz experiment

In [None]:
tcav_scores_2k = audio_tcav.interpret(inputs=inp_data,
                                    experimental_sets=experimental_set_rand_1,
                                    target=np.argmax(labels, axis=1),
                                    n_steps=5)

In [None]:
plot_tcav_scores(experimental_set_rand_1, tcav_scores_2k)

### 4kHz-6kHz experiment

In [None]:
tcav_scores_4k = audio_tcav.interpret(inputs=inp_data,
                                    experimental_sets=experimental_set_rand_2,
                                    target=np.argmax(labels, axis=1),
                                    n_steps=5)

In [None]:
plot_tcav_scores(experimental_set_rand_2, tcav_scores_4k)

### 6kHz-8kHz experiment

In [None]:
tcav_scores_6k = audio_tcav.interpret(inputs=inp_data,
                                    experimental_sets=experimental_set_rand_3,
                                    target=np.argmax(labels, axis=1),
                                    n_steps=5)

In [None]:
plot_tcav_scores(experimental_set_rand_3, tcav_scores_6k)

### 8kHz-10kHz experiment

In [None]:
tcav_scores_8k = audio_tcav.interpret(inputs=inp_data,
                                    experimental_sets=experimental_set_rand_4,
                                    target=target[0],
                                    n_steps=5)

In [None]:
plot_tcav_scores(experimental_set_rand_4, tcav_scores_8k)

### Statistical Significance

In [None]:
experimental_sets = [[frequency_concepts[i], frequency_concepts[j]]
                    for i in range(len(frequency_concepts))
                    for j in range(len(frequency_concepts))
                    if i != j]

experimental_sets.extend([[frequency_concept, noise_concept]
                    for frequency_concept in frequency_concepts
                    for noise_concept in noise_concepts])

experimental_sets

In [None]:
def assemble_scores(scores, experimental_sets, idx, score_layer, score_type):
    score_list = []
    for concepts in experimental_sets:
        score_list.append(scores["-".join([str(c.id) for c in concepts])][score_layer][score_type][idx])
        
    return score_list

In [None]:
def get_pval(scores, experimental_sets, score_layer, score_type, alpha=0.05, print_ret=False):
    
    P1 = assemble_scores(scores, experimental_sets, 0, score_layer, score_type)
    P2 = assemble_scores(scores, experimental_sets, 1, score_layer, score_type)
    
    if print_ret:
        print('P1[mean, std]: ', format_float(np.mean(P1)), format_float(np.std(P1)))
        print('P2[mean, std]: ', format_float(np.mean(P2)), format_float(np.std(P2)))

    _, pval = ttest_ind(P1, P2)

    if print_ret:
        print("p-values:", format_float(pval))

    if pval < alpha:    # alpha value is 0.05 or 5%
        relation = "Disjoint"
        if print_ret:
            print("Disjoint")
    else:
        relation = "Overlap"
        if print_ret:
            print("Overlap")
        
    return P1, P2, format_float(pval), relation

In [None]:
def show_boxplots(layer, experimental_sets, scores, n=3, n_plots=2, other_name="noise", metric='sign_count'):

    # readequar essa função para mostrar bloxplots de diferentes
    # referências
    def format_label_text(experimental_sets):
        concept_id_list = [exp.name if i == 0 else \
                             other_name for i, exp in enumerate(experimental_sets[0])]
        return concept_id_list

    fig, ax = plt.subplots(n_plots, 1, figsize = (25, 7 * n_plots))
    fs = 18
    for i in range(n_plots):
        esl = experimental_sets[i * n : (i+1) * n]
        P1, P2, pval, relation = get_pval(scores, esl, layer, metric)

        ax[i].set_ylim([0, 1])
        ax[i].set_title(layer + "-" + metric + " (pval=" + str(pval) + " - " + relation + ")", fontsize=fs)
        ax[i].boxplot([P1, P2], showfliers=True)

        ax[i].set_xticklabels(format_label_text(esl), fontsize=fs)

    plt.show()


In [None]:
scores = audio_tcav.interpret(inp_data, experimental_sets,
        np.argmax(labels, axis=1), n_steps=5)

#### Frequency range comparison

First block

In [None]:
show_boxplots(layers[0], experimental_sets, scores, n_plots=4, other_name="other Frequencies")

Second block

In [None]:
show_boxplots(layers[1], experimental_sets, scores, n_plots=4, other_name="other Frequencies")

Fifth block

In [None]:
show_boxplots(layers[2], experimental_sets, scores, n_plots=4, other_name="other Frequencies")

In [None]:
#### Frequency x noise comparison

#### Frequency x noise comparison

First block

In [None]:
show_boxplots(layers[0], experimental_sets[12:], scores, n=2, n_plots=4, other_name="noises")

Second block

In [None]:
show_boxplots(layers[1], experimental_sets[12:], scores, n=2, n_plots=4, other_name="noises")

Fifth block

In [None]:
show_boxplots(layers[2], experimental_sets[12:], scores, n=2, n_plots=4, other_name="noises")

## Feature visualization

In [None]:
albilora_ca = inp_data[audios_ref_indices[4]]

In [None]:
_, axis = plt.subplots(figsize=(6, 3))

librosa.display.specshow(np.abs(
    librosa.stft(albilora_ca.cpu().detach().numpy(),
        n_fft=window_size, win_length=window_size, hop_length=hop_length, center=True)
    ),
    sr=sr, x_axis="time", y_axis="linear", hop_length=hop_length,
    fmin=int(tags["fmin"]), fmax=int(tags["fmax"]), ax=axis
)

axis.set_title(f"Espectrograma- albilora-CA", {'fontsize': 11})

axis.set_xlabel("Tempo [s]");

In [None]:
_, axis = plt.subplots(figsize=(6, 3))

librosa.display.specshow(np.abs(
    librosa.stft(baseline_gaussian[audios_ref_indices[4]].cpu().detach().numpy(),
        n_fft=window_size, win_length=window_size, hop_length=hop_length, center=True)
    ),
    sr=sr, x_axis="time", y_axis="linear", hop_length=hop_length,
    fmin=int(tags["fmin"]), fmax=int(tags["fmax"]), ax=axis
)

axis.set_title(f"Espectrograma- albilora-CA com ruído gaussiano", {'fontsize': 11})

axis.set_xlabel("Tempo [s]");

In [None]:
_, axis = plt.subplots(figsize=(6, 3))

librosa.display.specshow(np.abs(
    librosa.stft(baseline_impulsive[audios_ref_indices[4]].cpu().detach().numpy(),
        n_fft=window_size, win_length=window_size, hop_length=hop_length, center=True)
    ),
    sr=sr, x_axis="time", y_axis="linear", hop_length=hop_length,
    fmin=int(tags["fmin"]), fmax=int(tags["fmax"]), ax=axis
)

axis.set_title(f"Espectrograma- albilora-CA com ruído impulsivo", {'fontsize': 11})

axis.set_xlabel("Tempo [s]");

In [None]:
_, axis = plt.subplots(figsize=(6, 3))

librosa.display.specshow(np.abs(
    librosa.stft(baseline_2k[audios_ref_indices[4]].cpu().detach().numpy(),
        n_fft=window_size, win_length=window_size, hop_length=hop_length, center=True)
    ),
    sr=sr, x_axis="time", y_axis="linear", hop_length=hop_length,
    fmin=int(tags["fmin"]), fmax=int(tags["fmax"]), ax=axis
)

axis.set_title(f"Espectrograma- albilora-CA: baseline2k", {'fontsize': 11})

axis.set_xlabel("Tempo [s]");

Analyze inputs 0-150 and 150-200

In [None]:
str_labels = [idx_to_label[int(i.cpu().detach().numpy())] for i in np.argmax(labels, axis=1)]
first_150_labels = {"label": str_labels[:150]}
labels_inputs_150_200 = {"label": str_labels[150:200]}
labels_inputs_200_350 = {"label": str_labels[200:350]}
labels_inputs_350_tot = {"label": str_labels[350:]}

In [None]:
sns.countplot(x=first_150_labels["label"]);

In [None]:
sns.countplot(x=labels_inputs_150_200["label"]);

In [None]:
sns.countplot(x=labels_inputs_200_350["label"]);

In [None]:
sns.countplot(x=labels_inputs_350_tot["label"]);

In [None]:
evaluated_input = logged_model(inp_data)

In [None]:
pca = PCA(n_components=2)
pca_evaluated_input = pca.fit_transform(evaluated_input.cpu().detach().numpy())

Kmeans

In [None]:
kmeans = KMeans(random_state=135).fit(evaluated_input.cpu().detach().numpy())
kmeans.labels_

In [None]:
sns.scatterplot(x=pca_evaluated_input[:, 0], y=pca_evaluated_input[:, 1],
        hue=kmeans.labels_, palette="deep");

DBSCAN

In [None]:
dbscan = DBSCAN(eps=0.15, min_samples=2).fit(evaluated_input.cpu().detach().numpy())
dbscan.labels_

In [None]:
sns.scatterplot(x=pca_evaluated_input[:, 0], y=pca_evaluated_input[:, 1],
        hue=dbscan.labels_, palette="deep");