In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/Information_Retrieval

In [None]:
!git clone https://github.com/spijkervet/clmr.git && cd clmr
%cd ./clmr

In [None]:
!python --version

In [None]:
# vào requirement.txt sửa
# torch==1.11.0
# sklearn thành scikit-learn
# pytorch-lightning==1.9.0
# !sudo apt install python3-pip
!pip install -r requirements.txt
# !pip install torch==1.13.0

# Feature extraction

In [None]:
import librosa
import numpy as np
import soundfile as sf
import os

def preprocessing(input_array):
    group_size = 59049

    if len(input_array) < group_size:
      return torch.unsqueeze(torch.unsqueeze(torch.tensor(input_array), 0))

    else:
        num_groups = len(input_array) // group_size
        current_segment = []

        for i in range(num_groups):
            start = i * group_size
            end = (i + 1) * group_size
            audio_segment = torch.from_numpy(input_array[start:end]).reshape(1,-1)
            current_segment.append(audio_segment)

        if len(input_array) % group_size != 0:
            remaining_audio = torch.from_numpy(input_array[num_groups * group_size:])
            padded_array = np.pad(remaining_audio, (0, group_size - len(remaining_audio)), 'constant', constant_values=0)
            remaining_audio = torch.from_numpy(padded_array).reshape(1,-1)
            current_segment.append(remaining_audio)

        return torch.stack(current_segment)

In [None]:
def load_audio(path):
  audio, sample_rate = librosa.load(path, sr=22050)
  audio = preprocessing(audio)
  return audio

In [None]:
import torch.nn as nn
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()

    def forward(self, x):
        return x

In [None]:
def load_encoder(checkpoint_path):
  n_classes = 50
  encoder = SampleCNN(
      strides=[3, 3, 3, 3, 3, 3, 3, 3, 3],
      supervised=0,
      out_dim=n_classes,
  )

  n_features = encoder.fc.in_features  # get dimensions of last fully-connected layer

  state_dict = load_encoder_checkpoint(checkpoint_path, n_classes)
  encoder.load_state_dict(state_dict)
  encoder.fc = Identity()

  return encoder

In [None]:
def load_arg(config_path):
  parser = argparse.ArgumentParser(description="SimCLR")
  parser = Trainer.add_argparse_args(parser)

  config = yaml_config_hook(config_path)
  for k, v in config.items():
      parser.add_argument(f"--{k}", default=v, type=type(v))

  args, unknown = parser.parse_known_args()
  pl.seed_everything(args.seed)
  args.accelerator = None

  if not os.path.exists(args.checkpoint_path):
    raise FileNotFoundError("That checkpoint does not exist")
  return args

In [None]:
import os
import argparse
import pytorch_lightning as pl
import torch
import torch.nn as nn
import json
from glob import glob

from tqdm import tqdm
from torch.utils.data import DataLoader
from torchaudio_augmentations import Compose, RandomResizedCrop
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

from clmr.datasets import get_dataset
from clmr.data import ContrastiveDataset
from clmr.evaluation import evaluate
from clmr.models import SampleCNN
from clmr.modules import ContrastiveLearning, LinearEvaluation
from clmr.utils import (
    yaml_config_hook,
    load_encoder_checkpoint,
    load_finetuner_checkpoint,
)

def dataset_feature_extraction(dataset_path, output_path):
  args = load_arg("./config/config.yaml")

  encoder = load_encoder(args.checkpoint_path)
  # device = "cuda:0" if args.gpus else "cpu"
  if torch.cuda.is_available():
    device = torch.device("cuda")
  else:
      device = torch.device("cpu")
  print("Running on:", device)
  encoder = encoder.to(device)
  encoder.eval()
  # print(encoder)

  results = []
  dataset_files = glob(os.path.join(dataset_path, "*.mp3"),
      recursive=True)
  dataset_files.sort()

  index_mapping = {}
  index_mapping_path = os.path.join(output_path,'index_mapping.json')
  if os.path.exists(index_mapping_path):
    with open(index_mapping_path, 'r') as f:
      index_mapping = json.load(f)

  cnt = len(index_mapping)
  for file_path in tqdm(dataset_files):
    file_name = file_path.split('/')[-1].split('.')[0]
    output_file_path = os.path.join(output_path ,f'{cnt}.pt')

    if file_name in index_mapping.keys():
      # print(f"{output_file_path} is already exist")
      continue

    index_mapping[file_name] = cnt

    audio = load_audio(file_path)
    audio = audio.to(device)
    with torch.no_grad():
      features = encoder(audio)
      results.append(features)
    torch.save(features, output_file_path)
    # print(f"Saved: {file_name}")

    with open(index_mapping_path, 'w') as f:
      json.dump(index_mapping, f, indent=4)
    cnt += 1

  return index_mapping

In [None]:
index_mapping = dataset_feature_extraction("/content/drive/MyDrive/Information_Retrieval/clmr/test/dataset/", "./test/features/")

In [None]:
print(index_mapping)

In [None]:
# from glob import glob

# glob("/content/drive/MyDrive/Information_Retrieval/clmr/test/dataset/*.mp3",
#     recursive=True,
#      )
# # print(os.path.join(
# #     "test", "**", "*{}".format(".mp3")))

# Query

In [None]:
import torchaudio

In [None]:
# from glob import glob

# mp3_files = glob("/content/drive/MyDrive/Information_Retrieval/clmr/test/queries/**/*.mp3",
#     recursive=True)

# mp3_files.sort()
# output_folder = "/content/drive/MyDrive/Information_Retrieval/clmr/test/queries"

# for mp3_file in mp3_files:
#   process_audio(mp3_file, output_folder)

In [None]:
print(index_mapping)

In [None]:
from pprint import pprint
def get_ground_truth(query_folder):
  queries_path = glob(os.path.join(query_folder, "*.mp3") ,
        recursive=True,
      )
  queries_path.sort()
  print(queries_path)
  print(len(queries_path))
  queries_name = [path.split("/")[-1] for path in queries_path]
  print(queries_name)
  file_names = [file_name.split(".")[0] for file_name in queries_name]
  ground_truth_name = [file_name.split(" ")[0] for file_name in file_names]
  ground_truth = [index_mapping[name] for name in ground_truth_name]
  print(ground_truth)
  return ground_truth

In [None]:
humming_ground_truth = get_ground_truth("/content/drive/MyDrive/Information_Retrieval/clmr/test/queries/humming/")

In [None]:
example_ground_truth = get_ground_truth("/content/drive/MyDrive/Information_Retrieval/clmr/test/queries/example/")

In [None]:
# print(queries_path[3])

In [None]:
# ground_truth = [102, 102, 102, 103, 55, 55, 55, 55, 99, 103, 102, 103, 103, 103, 103]

In [None]:
from pickle import encode_long
def query_feature_extraction(queries_path):
  args = load_arg("./config/config.yaml")
  encoder = load_encoder(args.checkpoint_path)
  if torch.cuda.is_available():
    device = torch.device("cuda")
  else:
    device = torch.device("cpu")

  encoder = encoder.to(device)
  encoder.eval()
  audio, sample_rate = librosa.load(queries_path, sr=22050)
  audio = preprocessing(audio)
  audio = audio.to(device)
  query_features = encoder(audio)
  return query_features


In [None]:
# query_features = query_feature_extraction(queries_path[0])
# print(query_features)

# Search

In [None]:
# query_features = torch.tensor([[1] for i in range(512)])

In [None]:
data_features_path = "./test/features"



In [None]:
from torch.nn.functional import cosine_similarity

def calc_similarity(query_features, audio_features):
  ############## Cũ ###############
  # sum_similarity = 0
  # avg_similarity = 0
  # glb_max_similarity = 0

  # for query_feature in query_features:
  #   max_similarity = 0
  #   for dataset_feature in audio_features:
  #     cur_similarity = float(torch.cosine_similarity(dataset_feature, query_feature, dim=0))
  #     max_similarity = max(max_similarity, cur_similarity)

  #     avg_similarity += cur_similarity
  #   sum_similarity += float(max_similarity)
  #   avg_similarity /= len(audio_features)
  #   glb_max_similarity = max(glb_max_similarity, max_similarity)
  # return sum_similarity

  ######### Mới ##########
  if torch.cuda.is_available():
    device = torch.device("cuda")
  else:
    device = torch.device("cpu")
  # query_features = query_features.to(device)
  # audio_features = audio_features.to(device)
  similarity = 0
  tensor1 = query_features.reshape(1,-1)
  query_length = len(query_features)
  for i in range(len(audio_features)):
    if i + query_length > len(audio_features):
      break
    cur_similarity = 0
    # for k in range(query_length):
    #   tensor1 = query_features[k]
    #   tensor2 = audio_features[i+k]
    #   cur_similarity += float(cosine_similarity(tensor1, tensor2, dim=0))
    # cur_similarity /= query_length
    tensor2 = torch.tensor(audio_features[i:i+query_length]).reshape(1,-1)
    cur_similarity = float(cosine_similarity(
                              tensor1,
                              tensor2,
                              dim=1
                            )
                          )
    similarity = max(similarity, cur_similarity)

  return similarity

In [None]:
def retrieval(query_path, data_features_path):
  query_features = query_feature_extraction(query_path)
  if torch.cuda.is_available():
    device = torch.device("cuda")
  else:
    device = torch.device("cpu")

  result = []
  for file_name in tqdm(os.listdir(data_features_path)):
    if file_name.endswith(".pt"):
      audio_features = torch.load(os.path.join(data_features_path, file_name), map_location=torch.device(device))
      score = calc_similarity(query_features, audio_features)
      result.append((score,file_name))
  result.sort(reverse=True)
  return result



In [None]:
# results = []
# for i in range(len(queries_path)):
#   results.append(retrieval(queries_path[i], data_features_path))

In [None]:
print(humming_ground_truth)

In [None]:
# print(results[9])

In [None]:
# 0: top 1 1 1 1
# 1: top 3 5 6 5
# 2: top 2 2 1 1
# 3: top 1 1 1 1
# 4: top 2 2 1 1
# 5: top 2 2 1 1

In [None]:
# !python preprocess.py --dataset audio --dataset_dir ./test/dataset/


In [None]:
# !python main.py --dataset audio --dataset_dir  ./test/dataset/


# Evaluation

In [None]:
def average_precision(retrieval_result):
  ap = 0
  cnt = 0
  for i in range(len(retrieval_result)):
    if retrieval_result[i] == 1:
      cnt += 1
      ap += (cnt / (i + 1))
  if cnt == 0:
    return ap
  return ap/cnt

In [None]:
print(average_precision([1,1,0,1,1,0,1,0,0,1]))

In [None]:
def mean_average_precision(retrieval_results):
  map = 0
  for retrieval_result in retrieval_results:
    map += average_precision(retrieval_result)
  return map / len(retrieval_results)

In [None]:
print(mean_average_precision( [[1,1,0], [1,0,1], [1,1,1]]))

In [None]:
def mean_reciprocal_rank(retrieval_results):
  mrr = 0
  for retrieval_result in retrieval_results:
    for i in range(len(retrieval_result)):
      if retrieval_result[i] == 1:
        mrr += 1 / (i + 1)
        break
  return mrr / len(retrieval_results)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, average_precision_score
%matplotlib inline

def draw_precision_recall_curve(retrieval_results):
  y_true = np.concatenate(retrieval_results)
  y_scores = np.arange(len(y_true), 0, -1)
  precision = [0 for i in range(len(retrieval_results[0]))]
  recall = [0 for i in range(len(retrieval_results[0]))]
  cur_relevant = 0
  for j in range(len(retrieval_results[0])):
    for i in range(len(retrieval_results)):
      if retrieval_results[i][j] == 1:
          cur_relevant += 1
    precision[j] += cur_relevant/(j+1)/len(retrieval_results)
    recall[j] += cur_relevant/len(retrieval_results)
  max_precision = 0
  for j in reversed(range(len(retrieval_results[0]))):
    precision[j] = max(precision[j], max_precision)
    max_precision = precision[j]

  # print(precision, recall)
  plt.figure(figsize=(8, 6))
  plt.plot(recall, precision, label=f'PR Curve', color='b', lw=2)
  plt.fill_between(recall, precision, alpha=0.2, color='blue', label='Area Under Curve')
  plt.xlabel('Recall', fontsize=12)
  plt.ylabel('Precision', fontsize=12)
  plt.title('Precision-Recall Curve', fontsize=14)
  plt.legend(loc='lower left', fontsize=10)
  plt.grid(alpha=0.3)
  plt.show()


In [None]:
draw_precision_recall_curve(
    retrieval_results=[[1,0,0],[0,1,0],[0,1,0]]
)

In [None]:
def evaluation(query_folder, ground_truth):
  queries_path = glob(os.path.join(query_folder, "**/*.mp3"),
      recursive=True,
    )
  queries_path.sort()
  results = []
  for i in range(len(queries_path)):
    results.append(retrieval(queries_path[i], data_features_path))

  predict_results = [[int(score[1].split('.')[0]) for score in result] for result in results]

  isRelevant = [[0 for i in range(len(predict_results[j]))] for j in range(len(predict_results))]
  for i in range(len(predict_results)):
    for j in range(len(predict_results[i])):
      isRelevant[i][j] = 1 if predict_results[i][j] == ground_truth[i] else 0
  return mean_average_precision(isRelevant), mean_reciprocal_rank(isRelevant), isRelevant




In [None]:
humming_mAP, humming_mrr, humming_isRelevant = evaluation("/content/drive/MyDrive/Information_Retrieval/clmr/test/queries/humming", humming_ground_truth)
print(f"\nmAP: {humming_mAP}, mrr: {humming_mrr}")

In [None]:
example_mAP, example_mrr, example_isRelevant = evaluation("/content/drive/MyDrive/Information_Retrieval/clmr/test/queries/example/", example_ground_truth)
print(f"\nmAP: {example_mAP}, mrr: {example_mrr}")

# MFCC

In [None]:
import librosa
import numpy as np
from scipy.spatial.distance import euclidean
from librosa.sequence import dtw
from tqdm import tqdm

# Function to extract MFCC and pitch features
def extract_features(input_array, sr=22050, n_mfcc=20, feature_type="mfcc"):
    """
    Extract MFCC or pitch features from an input audio array.

    Args:
        input_array (numpy.ndarray): Audio time-series array.
        sr (int): Sampling rate of the audio. Default is 22050.
        n_mfcc (int): Number of MFCC features to extract (if feature_type="mfcc"). Default is 13.
        feature_type (str): Type of feature to extract ("mfcc" or "pitch"). Default is "pitch".

    Returns:
        numpy.ndarray: Extracted features.
    """
    if feature_type == "mfcc":
        # Extract MFCC features
        mfcc = librosa.feature.mfcc(y=input_array, sr=sr, n_mfcc=n_mfcc)
        return mfcc.T  # Transpose to get frames as rows
    elif feature_type == "pitch":
        # Extract pitch (using librosa.pyin)

        pitch, voiced_flag, _ = librosa.pyin(input_array, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C7'))
        # pitch = np.nan_to_num(pitch)#, nan=0.0)  # Replace NaN values with 0
        return pitch#.reshape(-1, 1)  # Reshape to make it 2D
    else:
        raise ValueError("Invalid feature_type. Choose 'mfcc' or 'pitch'.")

# Function to compute Dynamic Time Warping (DTW) distance
def perform_dtw(query_features, song_features):
    """
    Compute the matching score between query and song features using DTW.

    Args:
        query_features (numpy.ndarray): Features of the query (MFCC + pitch).
        song_features (numpy.ndarray): Features of the song (MFCC + pitch).

    Returns:
        float: DTW distance (matching score).
    """
    # Ensure inputs are in the shape (features, time frames)
    query_features = query_features.T if query_features.shape[0] > query_features.shape[1] else query_features
    song_features = song_features.T if song_features.shape[0] > song_features.shape[1] else song_features
    cost_matrix, _ = dtw(query_features, song_features, metric='euclidean')
    return cost_matrix[-1, -1]  # Return the final cumulative cost


In [None]:
def mfcc_dataset_feature_extraction(dataset_path, output_path):
  results = []
  dataset_files = glob(os.path.join(dataset_path, "*.mp3"),
      recursive=True)
  dataset_files.sort()
  if not os.path.exists(output_path):
    os.makedirs(output_path)

  index_mapping = {}
  index_mapping_path = os.path.join(output_path,'index_mapping.json')
  if os.path.exists(index_mapping_path):
    with open(index_mapping_path, 'r') as f:
      index_mapping = json.load(f)

  cnt = len(index_mapping)
  for file_path in tqdm(dataset_files):
    file_name = file_path.split('/')[-1].split('.')[0]
    output_file_path = os.path.join(output_path ,f'{cnt}.npy')

    if file_name in index_mapping.keys():
      print(f"{output_file_path} is already exist")
      continue

    index_mapping[file_name] = cnt

    audio, _ = librosa.load(file_path)
    features = extract_features(audio)
    results.append(features)

    np.save(output_file_path, features)
    print(f"Saved: {file_name}")

    with open(index_mapping_path, 'w') as f:
      json.dump(index_mapping, f, indent=4)
    cnt += 1

  return index_mapping

In [None]:
mfcc_index = mfcc_dataset_feature_extraction("/content/drive/MyDrive/Information_Retrieval/clmr/test/dataset/",  "./test/mfcc_features/")

In [None]:
def mfcc_retrieval(query_path, data_features_path):
  audio, _ = librosa.load(query_path)
  query_features =  extract_features(audio)
  result = []
  for file_name in os.listdir(data_features_path):
    if file_name.endswith(".npy"):
      audio_features = np.load(os.path.join(data_features_path, file_name))
      score = perform_dtw(query_features, audio_features)
      result.append((score,file_name))
  result.sort()
  return result

mfcc_results = []
for i in tqdm(range(len(queries_path))):
  mfcc_results.append(mfcc_retrieval(queries_path[i], "./test/mfcc_features"))

In [None]:
print(mfcc_results[0])

In [None]:
print(ground_truth)

In [None]:
mfcc_ground_truth = [102, 103, 1, 15, 102]

In [None]:
def mfcc_evaluation(query_folder, ground_truth):
  queries_path = glob(os.path.join(query_folder, "**/*.mp3"),
      recursive=True,
    )
  queries_path = [queries_path[0], queries_path[2], queries_path[3],queries_path[4], queries_path[5]]
  results = []
  for i in tqdm(range(len(queries_path))):
    results.append(mfcc_retrieval(queries_path[i],  "./test/mfcc_features"))
  # print(results)
  predict_results = [[int(score[1].split('.')[0]) for score in result] for result in results]
  # print(predict_results)
  isRelevant = [[0 for i in range(len(predict_results[j]))] for j in range(len(predict_results))]
  for i in range(len(predict_results)):
    for j in range(len(predict_results[i])):
      isRelevant[i][j] = 1 if predict_results[i][j] == ground_truth[i] else 0

  return mean_average_precision(isRelevant)

mfcc_mAP = mfcc_evaluation("/content/drive/MyDrive/Information_Retrieval/clmr/test/queries", mfcc_ground_truth)
print(mfcc_mAP)