## Imports

In [4]:
import sys
sys.path.insert(0, '..')
import os
import copy
from multiprocessing import Pool
from typing_extensions import Unpack
from abc import ABC
from typing import List, TypedDict, overload
import numpy as np
from sklearn.preprocessing import normalize
from db.vector_db import Item, VectorDB
from db.qdrant import Qdrant
os.environ["UNIVERSE_DIR"] = ("/mnt/disk1/ecad_database/features/65k/universe")

## DataLoader from CIA-EV-IDT

In [None]:
# from identificationWorker.utils import file_utils
import os
from tqdm import tqdm
import deepdish as dd
import glob


def get_unique_by_order(data):
    from collections import OrderedDict

    w = list(OrderedDict.fromkeys(data))
    return w


class DataLoader:
    """
    A class for loading and managing data for the identification engine.

    Args:
        universe_dir (str): The directory path where the universe data is stored.

    """

    def __init__(self, universe_dir):
        self.universe_mapping = []
        self.universe_features = []
        self.universe_context_idx = []
        self.ready = False
        self.universe_dir = universe_dir
        self.timestamp_dataloader = None
        self.universe_selection = []

    def get_status(self) -> bool:
        return self.ready

    def set_status(self, st: bool):
        self.ready = st

    def get_features_path(self, selection: list[str] = None) -> list[str]:
        """
        Retrieves the paths of the .h5 feature files.

        Args:
            selection (list[str], optional): A list of directory names to filter the search. Defaults to None.

        Returns:
            list[str]: A list of paths to the .h5 feature files.

        Raises:
            ValueError: If no .h5 files are found in the specified directory/directories.
        """
        h5_list = []
        if selection is not None:
            for dir_name in selection:
                h5_files = glob.glob(os.path.join(self.universe_dir, dir_name, "*.h5"))

                if not h5_files:
                    raise ValueError(f"No .h5 files found in directory {dir_name}")
                h5_list.extend(h5_files)
        else:
            h5_list = glob.glob(os.path.join(self.universe_dir, "**/*.h5"))
            if not h5_list:
                raise ValueError(
                    f"No .h5 files found in the universe directory {self.universe_dir}"
                )

        return h5_list

    def load_h5_features(self, file: str) -> dict:
        try:
            # dd -> python2, 3 compatibility for h5 files
            file_data = dd.io.load(file)

            if "segments" in file_data:
                return file_data["segments"][0]
            else:
                return file_data
        except Exception:
            print(f"Error: {file} not found or corrupt data.")
            return None

    def get_h5data_dict(self, work_dict):
    
        file = os.path.join(self.universe_dir, work_dict["obra"], work_dict["fonograma"] + ".h5")

        features = self.load_h5_features(file)

        return features
    
    def get_h5data(self, work_id, track_id):    
        file = os.path.join(self.universe_dir, work_id, track_id + ".h5")

        features = self.load_h5_features(file)

        return features
    
    def batch_load_h5data(self, list_files):
        pass

    def load_universe(self, selection: list[str] = None) -> bool:
        """
        Loads the universe data by iterating over the provided selection of files.

        Args:
            selection (list[str], optional): A list of file paths to load. Defaults to None.

        Returns:
            bool: True if the universe data is successfully loaded, False otherwise.

        Raises:
            ValueError: If no universe files data is found.
        """
        universe_features_path: list[str] = self.get_features_path(selection)

        if not universe_features_path:
            raise ValueError("No universe files data found.")
        else:
            for file in universe_features_path:
                parts = file.split("/")
                work_id = parts[-2]
                track_id = parts[-1].split(".")[0]
                self.universe_mapping.append({"obra": work_id, "fonograma": track_id})
                self.universe_selection.append(f"{work_id}")
        universe_features_path = []

        return True

    def set_universe(self, selection: list[str] = None) -> None:
        """
        Prepares the data loader by loading the universe data and setting the readiness status.

        Args:
            selection (list[str], optional): List of universe selections. Defaults to None.

        Returns:
            bool: True if the universe features and mapping are loaded successfully, False otherwise.
        """
        self.ready = False
        self.universe_mapping = []
        self.universe_features = []
        self.universe_context_idx = []
        self.universe_selection = []
        if self.load_universe(selection):
            if len(self.universe_mapping) > 0:
                self.ready = True
                # self.universe_selection = selection

    def get_universe(self):
        return self.universe_mapping

    def get_universe_idx(self):
        return range(len(self.universe_mapping))

    def get_universe_mapping(self):
        return self.universe_mapping

    def _search_restricted_universe(self, universe_context=None):
        self.universe_context_idx = []

        for idx, u in enumerate(self.universe_mapping):
            if u["obra"] in universe_context:
                self.universe_context_idx.append(idx)

        return self.universe_context_idx

    def get_restricted_universe(
        self, universe_context_by_id=None, universe_context_by_idx=None
    ):
        if universe_context_by_id is None and universe_context_by_idx is None:
            return (
                self.get_universe_idx(),
                self.universe_mapping,
            )

        self.universe_context_idx = []
        # universe_restricted = []
        universe_restricted_mapping = []

        if universe_context_by_id is not None:
            for idx, u in enumerate(self.universe_mapping):
                if u["obra"] in universe_context_by_id:
                    self.universe_context_idx.append(idx)
                    # universe_restricted.append(self.universe_features[idx])
                    universe_restricted_mapping.append(u)

        else:
            for idx in universe_context_by_idx:
                self.universe_context_idx.append(idx)
                # universe_restricted.append(self.universe_features[idx])
                universe_restricted_mapping.append(self.universe_mapping[idx])

        return (
            self.universe_context_idx,
            # universe_restricted,
            universe_restricted_mapping,
        )

    def get_restricted_universe_idx(self, universe_context=None):
        if universe_context is not None:
            self._search_restricted_universe(universe_context=universe_context)

        if len(self.universe_context_idx) == 0:
            return self.get_universe_idx()

        return self.universe_context_idx

    def get_work_from_idx(self, list_idx):
        worksList = []
        for idx in list_idx:
            worksList.append(self.universe_mapping[idx]["obra"])

        return get_unique_by_order(worksList)

    def get_fonograma_from_idx(self, list_idx):
        fonogramaList = []
        for idx in list_idx:
            fonogramaList.append(self.universe_mapping[idx]["fonograma"])

        return fonogramaList

### Instace

In [5]:
UNIVERSE_DIR = os.getenv("UNIVERSE_DIR")
dl = DataLoader(UNIVERSE_DIR)

### Load Universe

In [6]:
dl.load_universe()

True

## Qdrant

### Instace

In [7]:
qdrant = Qdrant()

## Repositories

### Abstract

In [None]:
class Filter(TypedDict):
    obra: str
    fonograma: str


class  FeatureRepository(ABC):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = None
        self.dim = None

    def try_create_collection(self):
        self.vector_db.try_create_collection(self.collection_name, self.dim)
    
    # TODO: Add basic logic to process the feature
    def process(self, feature: np.ndarray):
        return feature

    def add(self, features: list[np.ndarray], payloads=None):
        for i, feature in enumerate(features):
            vectors = [vector for vector in feature]
            payload = payloads[i] if payloads else None
            if payload:
                payload = [payload for _ in vectors]
            
            self.vector_db.add(self.collection_name, vectors, payload)

    def search(self, queries: np.ndarray, top_k=5, filter: dict = None):
        for feature in queries:
            vectors = [vector for vector in feature]
            yield self.vector_db.search(
                self.collection_name, vectors, top_k=top_k, filter=filter
            )
    
    @overload
    def get(self, id: str) -> List[Item]: ...

    @overload
    def get(self, filter: Filter, top_k=5) -> List[Item]: ...
        
    def get(self, id_or_filter: str = None, top_k=5):
        return self.vector_db.get(self.collection_name, id_or_filter, top_k)

    def delete(self, id: str):
        self.vector_db.delete(self.collection_name, id)

    def delete_collection(self):
        self.vector_db.delete_collection(self.collection_name)


### Implementation

In [None]:
class CQTNetRepository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "cqtnet_test" # TODO: Mudar para o nome correto
        self.dim = 300

class CQTNetV1Repository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "cqtnet_v1_test" # TODO: Mudar para o nome correto
        self.dim = 300

class FusionRepository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "fusion_test" # TODO: Mudar para o nome correto
        self.dim = 300

class CoverhunterRepository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "coverhunter_test" # TODO: Mudar para o nome correto
        self.dim = 128
        

### Instance

In [None]:
cqtnet_repository = CQTNetRepository(qdrant)
coverhunter_repository = CoverhunterRepository(qdrant)
cqtnetv1_repository = CQTNetV1Repository(qdrant)
fusion_repository = FusionRepository(qdrant)

### Inicialize collections

In [11]:
cqtnet_repository.try_create_collection()
coverhunter_repository.try_create_collection()
cqtnetv1_repository.try_create_collection()
fusion_repository.try_create_collection()



## FeaturesDbLoader

In [None]:
class PreProcessOptions(TypedDict):
    granularity: str
    with_confidency: bool

class FeaturesDbLoader:
    def __init__(
        self,
        data_loader: DataLoader,
        cqtnet_repository: CQTNetRepository,
        cqtnetv1_repository: CQTNetV1Repository,
        coverhunter_repository: CoverhunterRepository,
        fusion_repository: FusionRepository,
    ):
        self.data_loader = data_loader
        self.cqtnet_repository = cqtnet_repository
        self.cqtnetv1_repository = cqtnetv1_repository
        self.coverhunter_repository = coverhunter_repository
        self.fusion_repository = fusion_repository

    def get_feature_vector(
        self, feature: dict, feature_name: str, **pre_process_options: Unpack[PreProcessOptions]
    ) -> np.ndarray:
        if pre_process_options:
            return self.pre_process_features(
                feature,
                feature_name,
                pre_process_options.get("granularity", "all"),
                pre_process_options.get("with_confidency", True),
            )
        return feature[feature_name]["features"]

    def load_features(self, *features: str, **pre_process_options: Unpack[PreProcessOptions]):
        features_path = self.data_loader.get_features_path()        
        with Pool(processes=4) as pool:
            features_list = pool.map(self.data_loader.load_h5_features, features_path)
            # feature_args = [(feature_name, features_list, pre_process_options) for feature_name in features]
            # results = pool.map(self._process_feature_name, feature_args)
        # features_list = [
        #     self.data_loader.load_h5_features(path) for path in features_path
        # ]

        results = []
        for feature_name in features:
            if feature_name == "fusion":
                vectors_cqtnet = [
                    self.get_feature_vector(feature, "cqtnet", **pre_process_options)
                    for feature in features_list
                ]
                vectors_cqtnet_v1 = [
                    self.get_feature_vector(feature, "cqtnet_v1", **pre_process_options)
                    for feature in features_list
                ]
                vectors = [
                    (v1 + v2) / 2 for v1, v2 in zip(vectors_cqtnet, vectors_cqtnet_v1)
                ]
            elif feature_name in ["cqtnet", "coverhunter", "cqtnet_v1"]:
                vectors = [
                    self.get_feature_vector(feature, feature_name, **pre_process_options)
                    for feature in features_list
                ]
            else:
                return None
            results.append(vectors)

        results.append(dl.get_universe_mapping())

        return tuple(results)

    def add_features(self, *features: str, **pre_process_options: Unpack[PreProcessOptions]):
        features_payload = self.load_features(*features, **pre_process_options)
        vectors = features_payload[: len(features_payload) - 1]
        payloads = features_payload[-1]
        feature_repository_map: dict[str, FeatureRepository] = {
            "cqtnet": cqtnet_repository,
            "cqtnet_v1": cqtnetv1_repository,
            "coverhunter": coverhunter_repository,
            "fusion": fusion_repository,
        }

        for f, v in zip(features, vectors):
            feature_repository_map[f].add(v, payloads)

    def pre_process_features(
        self, feature: dict, feature_name: str, granularity: str, with_confidence=True
    ):
        chroma_feat = feature_name

        if with_confidence:
            if "features" in feature["confidence"]:
                if granularity == "all":
                    vectors = feature[chroma_feat]["features"]
                    confidence = self._ret_confidence(feature["confidence"]["features"])
                elif granularity == "only_all_song":
                    vectors = feature[chroma_feat]["features"][None, -1, :]
                    confidence = 1.0
                else:
                    vectors = feature[chroma_feat]["features"][0:-1, :]
                    confidence = self._ret_confidence(
                        feature["confidence"]["features"][0:-1, :]
                    )
            elif "confidence" in feature:
                if granularity == "all":
                    vectors = feature[chroma_feat]
                    confidence = self._ret_confidence(feature["confidence"])
                elif granularity == "only_all_song":
                    vectors = feature[chroma_feat][None, -1, :]
                    confidence = 1.0
                else:
                    vectors = feature[chroma_feat][0:-1, :]
                    confidence = self._ret_confidence(feature["confidence"][0:-1, :])
            else:
                if granularity == "all":
                    vectors = feature[chroma_feat]
                elif granularity == "only_all_song":
                    vectors = feature[chroma_feat][None, -1, :]
                else:
                    vectors = feature[chroma_feat][0:-1, :]

                confidence = 1.0

        else:
            if "features" in feature[chroma_feat]:
                if feature[chroma_feat]["features"].shape[0] > 1:
                    if granularity == "all":
                        vectors = feature[chroma_feat]["features"]
                    elif granularity == "only_all_song":
                        vectors = feature[chroma_feat]["features"][None, -1, :]
                    else:
                        vectors = feature[chroma_feat]["features"][0:-1, :]
                else:
                    vectors = feature[chroma_feat]["features"]
            else:
                if feature[chroma_feat].shape[0] > 1:
                    if granularity == "all":
                        vectors = feature[chroma_feat]
                    elif granularity == "only_all_song":
                        vectors = feature[chroma_feat][None, -1, :]
                    else:
                        vectors = feature[chroma_feat][0:-1, :]
                else:
                    vectors = feature[chroma_feat]

        vectors = normalize(vectors, norm="l2")

        if with_confidence:
            size = min(vectors.shape[0], len(confidence))
            vectors = (vectors[:size, :] * confidence[:size, :]).astype(np.float32)
            # vectors = (vectors * confidence).astype("float32")
        else:
            vectors = vectors.astype("float32")

        return vectors

    def _process_feature_name(self, feature_name, features_list, **pre_process_options):
        if feature_name == "fusion":
            vectors_cqtnet = [
                self.get_feature_vector(feature, "cqtnet", **pre_process_options)
                for feature in features_list
            ]
            vectors_cqtnet_v1 = [
                self.get_feature_vector(feature, "cqtnet_v1", **pre_process_options)
                for feature in features_list
            ]
            return [(v1 + v2) / 2 for v1, v2 in zip(vectors_cqtnet, vectors_cqtnet_v1)]
        
        elif feature_name in ["cqtnet", "coverhunter"]:
            return [
                self.get_feature_vector(feature, feature_name, **pre_process_options)
                for feature in features_list
            ]
        else:
            return None
        
    def _ret_confidence(self, confidence, th=0.6):
        conf = copy.deepcopy(confidence)
        conf[confidence > th] = 1.0
        return conf

### Instance

In [None]:
# 
fdbl = FeaturesDbLoader(
    data_loader=dl,
    cqtnet_repository=cqtnet_repository,
    cqtnetv1_repository=cqtnetv1_repository,
    coverhunter_repository=coverhunter_repository,
    fusion_repository=fusion_repository,
)

### Load features

In [None]:
# 8min

In [None]:
# %timeit 1
# features = fdbl.load_features("cqtnet", "coverhunter", "fusion")
# print(features)

In [None]:
features_pre_processed = fdbl.load_features("cqtnet", "coverhunter", "fusion", granularity="all", with_confidency=True)
print(features_pre_processed)

### Add features in its collections

In [None]:
fdbl.add_features("cqtnet", "coverhunter", "fusion", granularity="all", with_confidency=True)

## Clean up

In [None]:
cqtnet_repository.delete_collection()
coverhunter_repository.delete_collection()
cqtnetv1_repository.delete_collection()
fusion_repository.delete_collection()