In [1]:
import os
import torch
import numpy as np
import pickle as pkl
import sys

sys.path.insert(1, '../')
from src.fsd_meta import fsd50k_select_ids, fsd50k_blacklist, FSD50K_MetaContainer

In [2]:
taxonomy_path = './taxonomy/ontology.json'
vocabulary_path = './taxonomy/ground_truth/vocabulary.csv'

In [3]:
all_ids = list()
for _l in fsd50k_select_ids.values():
    all_ids.extend(_l)

print(len(all_ids))

143


### Create a mapping from id to label

In [4]:
container = FSD50K_MetaContainer(as_taxonomy_path=taxonomy_path, fsd_vocabulary_path=vocabulary_path)
index2mid, index2label = dict(), dict()
for _k, _v in container.curr_vocabulary.items():
    index2label[_v['index']] = _v['label']
    index2mid[_v['index']] = _k

print(len(index2label))
print(index2label[0])
print(index2mid[0])

200
Accelerating_and_revving_and_vroom
/m/07q2z82


### Filter redundant labels in the container vocabulary

In [5]:
all_mids = list()
for _id in all_ids:
    all_mids.append(index2mid[_id])

container.curr_vocabulary = container.remove_class(
    container.curr_vocabulary,
    [mid for mid in container.curr_vocabulary.keys() if mid not in all_mids]
)

print(len(container.curr_vocabulary))  # check the num of classes in the current vocabulary

143


### Re-construct tree and taxonomy as per vocabulary

In [6]:
container.curr_taxonomy = container.filter_taxonomy(container.curr_taxonomy, container.curr_vocabulary)
container.curr_tree = container.filter_tree(container.curr_tree, container.curr_vocabulary)

cnt = 0
for _lvl in container.curr_tree:
    cnt += len(_lvl)

print(cnt)
print(len(container.curr_taxonomy))

143
143


### Find path to root w.r.t each class in our taxonomy

In [None]:
class AudioSetTaxonomy(object):
    def __init__(self, taxonomy_path: str, vocabulary_path: str) -> None:
        container = FSD50K_MetaContainer(as_taxonomy_path=taxonomy_path, fsd_vocabulary_path=vocabulary_path)
        self.taxonomy = container.as_taxonomy
        self.paths = self.traverse_paths()

    def traverse_paths(self):
        res = list()
        paths, pt2midnodes = list(), dict()
        # Start from leaf nodes
        path_id = 0
        for mid, attr in self.taxonomy.items():
            if attr['child_mid'] == []:
                paths.append([mid])  # e.g. path_0=[mid, ..., root]
                # e.g. pt2midnodes[current point] = [path_id, ...]
                try:
                    pt2midnodes[mid].append(path_id)
                except KeyError:
                    pt2midnodes[mid] = [path_id]

                path_id += 1

        # loop over to construct paths to root
        while True:
            path_id, _tmp_paths, _tmp_used, _tmp_pt = 0, list(), list(), dict()
            for mid, attr in self.taxonomy.items():
                # add the mid to the end of paths if the current point to its child class
                # also add this mid to the next current point set
                for _node in pt2midnodes.keys():
                    if _node in attr['child_mid']:
                        for _pid in pt2midnodes[_node]:
                            _tmp_paths.append([*paths[_pid], mid])
                            _tmp_used.append(_pid)

                            try:
                                _tmp_pt[mid].append(path_id)
                            except KeyError:
                                _tmp_pt[mid] = [path_id]

                            path_id += 1

            _complete_path = [p for id, p in enumerate(paths) if id not in _tmp_used]
            if len(_complete_path) != 0:
                res.extend(_complete_path)

            if len(_tmp_paths) == 0 and len(_tmp_pt) == 0:
                print("Paths are searched completely.")
                break
            else:
                paths, pt2midnodes= _tmp_paths, _tmp_pt

        return res

    def get_taxonomy_distance(self, mids):
        """ Find the lowest common ancestor of a pair of classes"""
        m1, m2 = mids
        p1, p2 = list(), list()

        for _p in self.paths:
            for _lvl in range(len(_p)):
                if _p[_lvl] == m1:
                    p1.append(_p[_lvl:])

                if _p[_lvl] == m2:
                    p2.append(_p[_lvl:])

        distances = list()
        for _p1 in p1:
            for _p2 in p2:
                distances.append(self.path_distance(_p1, _p2))

        return np.min(distances)

    @classmethod
    def path_distance(self, a, b):
        """ Measure number of intermediate nodes between two array-like paths. e.g., path_0 = [leaf, ..., root]."""
        la, lb = len(a), len(b)
        a, b = np.array(a), np.array(b)
        if a.shape[0] < b.shape[0]:
            n_pad = b.shape[0] - a.shape[0]
            a = np.pad(a, (n_pad, 0), 'constant', constant_values=0)
        elif a.shape[0] > b.shape[0]:
            n_pad = a.shape[0] - b.shape[0]
            b = np.pad(b, (n_pad, 0), 'constant', constant_values=0)

        n_shared_nodes = np.count_nonzero((a==b))

        return la + lb - n_shared_nodes * 2

In [None]:
taxonomy = AudioSetTaxonomy(taxonomy_path, vocabulary_path)
print(f"The result: {taxonomy.get_taxonomy_distance(['/m/07phhsh', '/m/05n1m'])}")  # /m/07phhsh, /m/07qh7jl, /m/05n1m

### Measure distance between two classes by counting the intermediate nodes between them

In [None]:
distance_matrix = np.ones((200, 200), dtype=int) * (-1)  # if the element is -1 the distance is not available in our dataset
# Row denotes a specific label and col denotes another label to this label
for row in range(200):
    for col in range(200):
        distance_matrix[row][col] = taxonomy.get_taxonomy_distance([index2mid[row], index2mid[col]])

with open('./fsd50k_distance_matrix.pkl', 'wb') as f:
    pkl.dump(distance_matrix, f)

print(distance_matrix)

In [None]:
def create_embedding_layer(weight_path: str, beta: float, slt_classes: list, trainable: bool = False):
    with open('./fsd50k_distance_matrix.pkl', 'rb') as f:
        weight_matrix = pkl.load(f)

    weight_matrix = torch.from_numpy(weight_matrix)
    num_embeddings, embedding_dim = weight_matrix.size()
    emb_layer = torch.nn.Embedding(num_embeddings, embedding_dim)
    emb_layer.load_state_dict({"weight": weight_matrix})
    if not trainable:
        emb_layer.weight.requires_grad = False

    return emb_layer

In [None]:
emb_layer, num_embeddings, embedding_dim = create_embedding_layer(weight_path='./fsd50k_distance_matrix.pkl', beta=30, slt_classes=[])

print(emb_layer)
print(num_embeddings)
print(embedding_dim)

In [None]:
a = np.array([0, 1, 2, 3])
b = np.array([4, 1, 2, 3])
np.count_nonzero((a == b))

In [None]:
a = np.array([0, 1, 2, 3])
b = np.array([4, 1, 2, 3])
np.count_nonzero((a == b))