In [1]:
import h5py
from pathlib import Path


def explore_h5_structure(base_folder):
    """
    Explore the structure of .ha files in the dataset
    Find the first H5 file in the dataset and its internal structure
    """
    base_path = Path(base_folder)
    first_work = next(base_path.iterdir())
    print(f"Examining first work folder: {first_work.name}")

    # Get first .h5 file
    first_file = next(first_work.glob("*.h5"))
    print(f"Examining file: {first_file.name}")

    def print_h5_structure(name, item):
        """
        Helper function to recursively print H5 structure
        """
        if isinstance(item, h5py.Dataset):
            print(f"\nDataset: {name}")
            print(f"  Shape: {item.shape}")
            print(f"  Type: {item.dtype}")
            print(f"  Attributes: {list(item.attrs.keys())}")
        elif isinstance(item, h5py.Group):
            print(f"\nGroup: {name}")
            print(f"  Attributes: {list(item.attrs.keys())}")
    try:
        with h5py.File(first_file, "r") as f:
            print("\nFile structure:")
            print(f"Top-level keys: {list(f.keys())}")
            f.visititems(print_h5_structure)

    except Exception as e:
        print(f"Error reading file: {e}")


def validate_multiple_files(base_folder, num_files=3):
    """
    Check multiple .h5 files to ensure consistent structure
    """
    base_path = Path(base_folder)
    structures = []
    print("\nValidating multiple files...")
    for work_folder in base_path.iterdir():
        if len(structures) >= num_files:
            break

        for h5_file in work_folder.glob("*.h5"):
            if len(structures) >= num_files:
                break
            try:
                with h5py.File(h5_file, "r") as f:
                    structure = {"file": h5_file.name, "keys": list(f.keys()), "shapes": {k: f[k].shape for k in f.keys() if isinstance(f[k], h5py.Dataset)}}
                    structures.append(structure)
                    print(f"\nFile: {h5_file.name}")
                    print(f"Keys: {structure['keys']}")
                    print(f"Shapes: {structure['shapes']}")
            except Exception as e:
                print(f"Error with file {h5_file}: {e}")
                
    return structures


base_folder = r"D:\TACOS\da-tacos_benchmark_subset_hpcp\da-tacos_benchmark_subset_hpcp"
print("Exploring single file structure:")
explore_h5_structure(base_folder)

print("\nValidating multiple files:")
structures = validate_multiple_files(base_folder)

Exploring single file structure:
Examining first work folder: W_1002_hpcp
Examining file: P_1002_hpcp.h5

File structure:
Top-level keys: ['hpcp']

Dataset: hpcp
  Shape: (19493, 12)
  Type: float32
  Attributes: ['CLASS', 'TITLE', 'VERSION']

Validating multiple files:

Validating multiple files...

File: P_1002_hpcp.h5
Keys: ['hpcp']
Shapes: {'hpcp': (19493, 12)}

File: P_122525_hpcp.h5
Keys: ['hpcp']
Shapes: {'hpcp': (17405, 12)}

File: P_129091_hpcp.h5
Keys: ['hpcp']
Shapes: {'hpcp': (18803, 12)}


In [2]:
import numpy as np
from scipy.spatial.distance import cdist


class MemoryEfficientDTW:
    def __init__(self, window_size=None, target_length=2000):
        """
        DTW comparator, made in a memory-efficient way
        """
        self.window_size = window_size #Size of the Sakoe-Chiba window for constrained DTW
        self.target_length = target_length # after downsampling

    def load_features(self, h5_path):
        """
        Load and preprocess chromagram features 
        """
        with h5py.File(h5_path, "r") as f:
            features = np.array(f["hpcp"], dtype=np.float32) # from float64 to float32

            # Normalize features 
            row_norms = np.sqrt(np.sum(features**2, axis=1, keepdims=True))
            features = np.divide(features, row_norms, where=row_norms > 1e-10)

            return features.astype(np.float32)

    def downsample_sequence(self, sequence):
        """
        Downsample a sequence to reduce memory usage
        """
        # Calculate downsample factor based on sequence length
        downsample_factor = max(1, len(sequence) // self.target_length)
        return sequence[::downsample_factor]

    def compute_dtw_distance(self, seq1, seq2):
        """
        Takes two sequences as input and computes their DTW distance
        """
        seq1 = seq1.astype(np.float32)
        seq2 = seq2.astype(np.float32)

        # Downsample sequences
        seq1_ds = self.downsample_sequence(seq1)
        seq2_ds = self.downsample_sequence(seq2)

        N, M = len(seq1_ds), len(seq2_ds)

        # Use two rows instead of full matrix
        previous_row = np.full(M, np.inf, dtype=np.float32)
        current_row = np.full(M, np.inf, dtype=np.float32)
        # first cell
        current_row[0] = cdist(seq1_ds[0].reshape(1, -1), seq2_ds[0].reshape(1, -1))[0, 0]
        # first row
        for j in range(1, M):
            current_row[j] = current_row[j - 1] + cdist(seq1_ds[0].reshape(1, -1), seq2_ds[j].reshape(1, -1))[0, 0]
        # rest of the sequences
        for i in range(1, N):
            previous_row, current_row = current_row, previous_row
            # window boundaries
            if self.window_size:
                j_start = max(0, i - self.window_size)
                j_end = min(M, i + self.window_size + 1)
            else:
                j_start, j_end = 0, M
            # first column
            current_row[0] = previous_row[0] + cdist(seq1_ds[i].reshape(1, -1), seq2_ds[0].reshape(1, -1))[0, 0]
            # current row
            for j in range(max(1, j_start), j_end):
                cost = max(1e-10, cdist(seq1_ds[i].reshape(1, -1), seq2_ds[j].reshape(1, -1))[0, 0])
                current_row[j] = cost + min(previous_row[j], current_row[j - 1], previous_row[j - 1]) 

        
        print("DTW matrix calculated")
        return np.log1p(current_row[-1]) / np.sqrt(N * M) # Return normalized distance using log transform 

    def compare_performances(self, perf1_path, perf2_path):
        """
        Compare two performances using all key rotations and find the min DTW distance with the optimal key shift.
        """
        chroma1 = self.load_features(perf1_path)
        chroma2 = self.load_features(perf2_path)

        min_distance = float("inf")
        best_shift = 0

        for shift in range(12): # all possible key shifts
            print(f"Calculate distance with key shift on key {shift}")
            shifted_chroma2 = np.roll(chroma2, shift, axis=1)
            # find DTW distance
            distance = self.compute_dtw_distance(chroma1, shifted_chroma2)

            if distance < min_distance: #update if needed
                min_distance = distance
                best_shift = shift

        return min_distance, best_shift

In [3]:
import numpy as np
from pathlib import Path
from itertools import combinations
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

class MemoryEfficientDatasetProcessor:
    def __init__(self, base_folder, dtw_comparator, max_files_per_work=5):
        """
        Main data processing class. 
        Takes the folder and the DTW distance function as inputs.
        Includes functions for comparing performances between the same or different works,
        and plotting the results.
        """
        self.base_folder = Path(base_folder)
        self.comparator = dtw_comparator
        self.max_files_per_work = max_files_per_work
        self.works = self._get_works()

    def _get_works(self):
        """
        Get all work folders
        """
        return [d for d in self.base_folder.iterdir() if d.is_dir()]

    def _get_performances(self, work_path):
        """
        Get a subset of a work's performances
        """
        all_performances = list(work_path.glob("*.h5"))
        if len(all_performances) > self.max_files_per_work:
            return np.random.choice(all_performances, self.max_files_per_work, replace=False).tolist()
        return all_performances

    def evaluate_single_work(self, work_path):
        """
        Compare perfomances of the same work
        """
        performances = self._get_performances(work_path)
        scores = []
        print(f"Processing work: {work_path.name}")
        print(f"Comparing {len(list(combinations(performances, 2)))} combinations")
        for perf1, perf2 in tqdm(list(combinations(performances, 2)), desc="Comparing performances"):
            try:
                distance, shift = self.comparator.compare_performances(str(perf1), str(perf2))
                scores.append({"work": work_path.name, "perf1": perf1.name, "perf2": perf2.name, "distance": distance, "key_shift": shift, "same_work": True})
            except Exception as e:
                print(f"Error processing {perf1.name} and {perf2.name}: {str(e)}")

        return scores

    def evaluate_between_works(self, work1_path, work2_path, num_samples=2):
        """
        Compare perfomances of the different works
        """
        perfs1 = self._get_performances(work1_path)[:num_samples]
        perfs2 = self._get_performances(work2_path)[:num_samples]
        scores = []
        print(f"Comparing {work1_path.name} with {work2_path.name}")
        for perf1 in perfs1:
            for perf2 in perfs2:
                try:
                    distance, shift = self.comparator.compare_performances(str(perf1), str(perf2))
                    scores.append({"work1": work1_path.name, "work2": work2_path.name, "perf1": perf1.name, "perf2": perf2.name, "distance": distance, "key_shift": shift, "same_work": False})
                except Exception as e:
                    print(f"Error comparing {perf1.name} and {perf2.name}: {str(e)}")

        return scores

    def analyze_dataset(self, num_works=None, num_between_samples=2):
        """
        Analyze a subset of the dataset by using the comparing functions
        """
        all_scores = []
        # works to analyze
        selected_works = self.works
        if num_works is not None:
            selected_works = np.random.choice(self.works, min(num_works, len(self.works)), replace=False)

        print("Analyzing within-work comparisons...")
        for work in selected_works:
            scores = self.evaluate_single_work(work)
            all_scores.extend(scores)

            if len(all_scores) > 1000:
                temp_df = pd.DataFrame(all_scores)
                all_scores = [temp_df]

        print("Analyzing between-work comparisons...")
        for work1, work2 in combinations(selected_works, 2):
            scores = self.evaluate_between_works(work1, work2, num_between_samples)
            all_scores.extend(scores)

        # Combine results in a results_df
        if isinstance(all_scores[0], pd.DataFrame):
            final_df = pd.concat(all_scores)
        else:
            final_df = pd.DataFrame(all_scores)

        return final_df

    def plot_distance_distribution(self, results_df, save_path=None):
        """
        Plots the overall distribution of distances (histogram) and save it.
        Proposes suggested threshold
        """
        plt.figure(figsize=(15, 12))

        # Process data in chunks for memory efficiency
        chunk_size = 1000
        for chunk in np.array_split(results_df, max(1, len(results_df) // chunk_size)):
            sns.histplot(data=chunk, x="distance", hue="same_work", bins=30, alpha=0.6, stat="count")  

        plt.title("Distribution of DTW Distances Between Performances")
        plt.xlabel("DTW Distance (smaller = more similar)")
        plt.ylabel("Number of Comparisons")
        plt.legend(["Different Works", "Same Work"])
        plt.tight_layout()

        with open(save_path/'comparison_logs.txt', "w", encoding="utf-8") as f:
            # Print the actual distances for detailed inspection
            f.write("\nDetailed Distance Values:")
            f.write("-" * 50)
            for idx, row in results_df.sort_values("distance").iterrows():
                work_type = "Same Work" if row["same_work"] else "Different Works"
                f.write(f"Distance: {row['distance']:.4f} | {work_type}")
                if "work" in row:
                    f.write(f"    Work: {row['work']}")
                    f.write(f"    Performances: {row['perf1']} vs {row['perf2']}")
                else:
                    f.write(f"    Work 1: {row['work1']} | Perf: {row['perf1']}")
                    f.write(f"    Work 2: {row['work2']} | Perf: {row['perf2']}")

        if save_path:
            plt.savefig(save_path/"distance_distribution.png", bbox_inches="tight", dpi=300)
            plt.close()

        # Filter distances < 1 do exclude exploding distance comparisons
        filtered_df = results_df[results_df["distance"] < 1]

        # Calculate and print suggested threshold
        same_work_distances = filtered_df[filtered_df["same_work"]]["distance"]
        diff_work_distances = filtered_df[~filtered_df["same_work"]]["distance"]

        if not (same_work_distances.empty or diff_work_distances.empty):
            suggested_threshold = (same_work_distances.mean() + diff_work_distances.mean()) / 2
            print(f"\nSuggested threshold for cover detection: {suggested_threshold:.4f}")
            with open(save_path/'comparison_logs.txt', "a") as f:
                f.write(f"\nSuggested threshold for cover detection: {suggested_threshold:.4f}")
        return suggested_threshold
        

    def plot_chromagram(self, file_path, downsample_factor=10, save_path=None):
        """
        Plot chromagram after downsampling
        """
        features = self.comparator.load_features(str(file_path))
        features = features[::downsample_factor]

        plt.figure(figsize=(15, 5))
        plt.imshow(features.T, aspect="auto", origin="lower", interpolation="nearest", cmap="Blues")
        plt.colorbar(label="Magnitude")
        plt.ylabel("Pitch Class")
        plt.xlabel(f"Time (downsampled by factor of {downsample_factor})")
        plt.title(f"Chromagram: {Path(file_path).name}")

        if save_path:
            plt.savefig(save_path / "chromagram_example.png")
            plt.close()

In [4]:
import numpy as np
from pathlib import Path

def DTW_analysis(croma_folder, num_works=3, max_files_per_work=5):
    """
    Full DTW analysis of a subset of the dataset by combining the previous functions
    """
    dtw_comparator = MemoryEfficientDTW(window_size=200, target_length=1000)
    processor = MemoryEfficientDatasetProcessor(croma_folder, dtw_comparator, max_files_per_work=max_files_per_work)

    works_list = list(Path(croma_folder).glob("W_*"))
    if not works_list:
        raise ValueError(f"No work folders found in {croma_folder}") #check if folder is empty

    selected_works = np.random.choice(works_list, min(num_works, len(works_list)), replace=False)

    # Create visualization directory
    viz_folder = Path("visualization_results")
    viz_folder.mkdir(exist_ok=True)

    print("Starting analysis...")
    try:
        # Analyze the dataset
        print("Processing selected works...")
        results = processor.analyze_dataset(num_works=len(selected_works), num_between_samples=2)
        # Create visualizations
        print("\nGenerating visualizations...")
        # Plot example chromagram
        print("Plotting chromagram...")
        sample_file = next(selected_works[0].glob("*.h5"))
        processor.plot_chromagram(sample_file, downsample_factor=200, save_path=viz_folder)
        # Plot distance distribution
        print("Plotting distance distribution...")
        suggested_threshold = processor.plot_distance_distribution(results, save_path=viz_folder)
        return results ,suggested_threshold

    except Exception as e:
        print(f"Error during analysis: {str(e)}")
        raise


In [5]:
# perform the analysis
results,suggested_threshold = DTW_analysis(croma_folder=r"D:\TACOS\da-tacos_benchmark_subset_hpcp\da-tacos_benchmark_subset_hpcp", num_works=5, max_files_per_work=2)

Starting analysis...
Processing selected works...
Analyzing within-work comparisons...
Processing work: W_112314_hpcp
Comparing 0 combinations


Comparing performances: 0it [00:00, ?it/s]


Processing work: W_76417_hpcp
Comparing 0 combinations


Comparing performances: 0it [00:00, ?it/s]


Processing work: W_126546_hpcp
Comparing 0 combinations


Comparing performances: 0it [00:00, ?it/s]


Processing work: W_5071_hpcp
Comparing 1 combinations


Comparing performances:   0%|          | 0/1 [00:00<?, ?it/s]

Calculate distance with key shift on key 0
DTW matrix calculated
Calculate distance with key shift on key 1
DTW matrix calculated
Calculate distance with key shift on key 2
DTW matrix calculated
Calculate distance with key shift on key 3
DTW matrix calculated
Calculate distance with key shift on key 4
DTW matrix calculated
Calculate distance with key shift on key 5
DTW matrix calculated
Calculate distance with key shift on key 6
DTW matrix calculated
Calculate distance with key shift on key 7
DTW matrix calculated
Calculate distance with key shift on key 8
DTW matrix calculated
Calculate distance with key shift on key 9
DTW matrix calculated
Calculate distance with key shift on key 10
DTW matrix calculated
Calculate distance with key shift on key 11


Comparing performances: 100%|██████████| 1/1 [00:33<00:00, 33.76s/it]


DTW matrix calculated
Processing work: W_132171_hpcp
Comparing 0 combinations


Comparing performances: 0it [00:00, ?it/s]

Analyzing between-work comparisons...
Comparing W_112314_hpcp with W_76417_hpcp





Calculate distance with key shift on key 0
DTW matrix calculated
Calculate distance with key shift on key 1
DTW matrix calculated
Calculate distance with key shift on key 2
DTW matrix calculated
Calculate distance with key shift on key 3
DTW matrix calculated
Calculate distance with key shift on key 4
DTW matrix calculated
Calculate distance with key shift on key 5
DTW matrix calculated
Calculate distance with key shift on key 6
DTW matrix calculated
Calculate distance with key shift on key 7
DTW matrix calculated
Calculate distance with key shift on key 8
DTW matrix calculated
Calculate distance with key shift on key 9
DTW matrix calculated
Calculate distance with key shift on key 10
DTW matrix calculated
Calculate distance with key shift on key 11
DTW matrix calculated
Comparing W_112314_hpcp with W_126546_hpcp
Calculate distance with key shift on key 0
DTW matrix calculated
Calculate distance with key shift on key 1
DTW matrix calculated
Calculate distance with key shift on key 2
DT

  return bound(*args, **kwds)



Suggested threshold for cover detection: 0.0064


In [6]:
results

Unnamed: 0,work,perf1,perf2,distance,key_shift,same_work,work1,work2
0,W_5071_hpcp,P_265944_hpcp.h5,P_141597_hpcp.h5,0.006384,0,True,,
1,,P_117251_hpcp.h5,P_105327_hpcp.h5,0.006367,0,False,W_112314_hpcp,W_76417_hpcp
2,,P_117251_hpcp.h5,P_525310_hpcp.h5,0.006441,3,False,W_112314_hpcp,W_126546_hpcp
3,,P_117251_hpcp.h5,P_141597_hpcp.h5,0.006535,5,False,W_112314_hpcp,W_5071_hpcp
4,,P_117251_hpcp.h5,P_24449_hpcp.h5,0.006461,8,False,W_112314_hpcp,W_5071_hpcp
5,,P_117251_hpcp.h5,P_268142_hpcp.h5,0.00652,0,False,W_112314_hpcp,W_132171_hpcp
6,,P_105327_hpcp.h5,P_525310_hpcp.h5,0.006325,3,False,W_76417_hpcp,W_126546_hpcp
7,,P_105327_hpcp.h5,P_42203_hpcp.h5,0.006202,6,False,W_76417_hpcp,W_5071_hpcp
8,,P_105327_hpcp.h5,P_5071_hpcp.h5,0.006305,8,False,W_76417_hpcp,W_5071_hpcp
9,,P_105327_hpcp.h5,P_268142_hpcp.h5,0.006449,0,False,W_76417_hpcp,W_132171_hpcp


In [7]:
def determine_cover_relationship(perf1_path, perf2_path, threshold=0.065):
    """
    Decide if two musical performances are covers of each other using DTW distance.
    Use the threshold that you found from dataset analysis
    """
    comparator = MemoryEfficientDTW(window_size=200, target_length=1000)  #can use more detailed comparison now that we work only on one pair
    try:
        distance, key_shift = comparator.compare_performances(perf1_path, perf2_path)
        is_cover = distance < threshold
        result = {"is_cover": is_cover, "distance": distance, "key_shift": key_shift, "explanation": get_comparison_explanation(distance, threshold, key_shift)}
        return result

    except Exception as e:
        raise Exception(f"Error comparing performances: {str(e)}")


def get_comparison_explanation(distance, threshold, key_shift):
    """
    Print explanation of the comparison results.
    """
    if distance < threshold:
        distance_level = "very similar" if distance < threshold / 2 else "similar"
        return f"The performances appear to be covers ({distance_level}). " f"They differ by {key_shift} semitones and have a distance of {distance:.3f}."
    else:
        return f"The performances do not appear to be covers. " f"The distance ({distance:.3f}) exceeds the threshold ({threshold:.3f})."

In [8]:
# random covers explanations
perf1_path = r"D:\TACOS\da-tacos_benchmark_subset_hpcp\da-tacos_benchmark_subset_hpcp\W_4726_hpcp\P_34522_hpcp.h5"
perf2_path = r"D:\TACOS\da-tacos_benchmark_subset_hpcp\da-tacos_benchmark_subset_hpcp\W_4726_hpcp\P_788651_hpcp.h5"
determine_cover_relationship(perf1_path, perf2_path, threshold=suggested_threshold)

Calculate distance with key shift on key 0
DTW matrix calculated
Calculate distance with key shift on key 1
DTW matrix calculated
Calculate distance with key shift on key 2
DTW matrix calculated
Calculate distance with key shift on key 3
DTW matrix calculated
Calculate distance with key shift on key 4
DTW matrix calculated
Calculate distance with key shift on key 5
DTW matrix calculated
Calculate distance with key shift on key 6
DTW matrix calculated
Calculate distance with key shift on key 7
DTW matrix calculated
Calculate distance with key shift on key 8
DTW matrix calculated
Calculate distance with key shift on key 9
DTW matrix calculated
Calculate distance with key shift on key 10
DTW matrix calculated
Calculate distance with key shift on key 11
DTW matrix calculated


{'is_cover': True,
 'distance': 0.006232473045356484,
 'key_shift': 0,
 'explanation': 'The performances appear to be covers (similar). They differ by 0 semitones and have a distance of 0.006.'}

In [9]:
# random non-covers explanation
perf1_path = r"D:\TACOS\da-tacos_benchmark_subset_hpcp\da-tacos_benchmark_subset_hpcp\W_129526_hpcp\P_247734_hpcp.h5"
perf2_path = r"D:\TACOS\da-tacos_benchmark_subset_hpcp\da-tacos_benchmark_subset_hpcp\W_25511_hpcp\P_281569_hpcp.h5"
determine_cover_relationship(perf1_path, perf2_path, threshold=suggested_threshold)

Calculate distance with key shift on key 0
DTW matrix calculated
Calculate distance with key shift on key 1
DTW matrix calculated
Calculate distance with key shift on key 2
DTW matrix calculated
Calculate distance with key shift on key 3
DTW matrix calculated
Calculate distance with key shift on key 4
DTW matrix calculated
Calculate distance with key shift on key 5
DTW matrix calculated
Calculate distance with key shift on key 6
DTW matrix calculated
Calculate distance with key shift on key 7
DTW matrix calculated
Calculate distance with key shift on key 8
DTW matrix calculated
Calculate distance with key shift on key 9
DTW matrix calculated
Calculate distance with key shift on key 10
DTW matrix calculated
Calculate distance with key shift on key 11
DTW matrix calculated


{'is_cover': False,
 'distance': 0.00647328484839017,
 'key_shift': 11,
 'explanation': 'The performances do not appear to be covers. The distance (0.006) exceeds the threshold (0.006).'}