<h1><b>COMP 432 - Machine Learning</b> | <b>Final Project</b> (W2025)</h1>
<h3>Machine Learning Model to detect Copyright Infringement in Music</h3>
Written by Sisahga Phimmasone - 40210015<br>
April 20th, 2025


<h3><b>1. Data Preparation and Cleaning</b></h3>

The code cells below convert YouTube video links of pairs of songs from the list of historically known copyright infringement cases to MP3.

https://en.wikipedia.org/wiki/List_of_songs_subject_to_plagiarism_disputes

See 'copyright_cases.csv' file.
Uses yt_dlp library to output YouTube video URLs to .mp3

<h4><b>1.1 Convert YouTube URLs to MP3 with yt_dlp</b></h4>

In [None]:
import yt_dlp
import pandas as pd
import os
import re

In [None]:
mp3_song_dir = "songs_mp3"
os.makedirs(mp3_song_dir, exist_ok=True)
df = pd.read_csv("copyright_cases.csv")

In [None]:
# YT-DLP options
def get_opts(output_path):
    return {
        'format': 'bestaudio/best',
        'outtmpl': output_path,
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'quiet': False,
        'noplaylist': True,
    }

In [None]:
# Creates a clean filename for the mp3 files
def clean_filename(s):
    return re.sub(r'[\\/*?:"<>|]', "", s.replace(" ", "_"))

In [None]:
# Download Loop, record by record
for index, row in df.iterrows():
    orig_song = clean_filename(row['Original Song'])
    orig_artist = clean_filename(row['Original Artist'])
    second_song = clean_filename(row['Second Song'])
    second_artist = clean_filename(row['Second Artist'])
    orig_url = row['Original Song YouTube Link']
    second_url = row['Second Song Youtube Link']

    row_id = f"{index+1:03d}"

    # Build filenames
    orig_filename = os.path.join(
        mp3_song_dir,
        f"{row_id}_original_{orig_song}_by_{orig_artist}.%(ext)s"
    )
    second_filename = os.path.join(
        mp3_song_dir,
        f"{row_id}_second_{second_song}_by_{second_artist}.%(ext)s"
    )

    try:
        with yt_dlp.YoutubeDL(get_opts(orig_filename)) as ydl:
            print(f"Downloading original: {orig_song} by {orig_artist}")
            ydl.download([orig_url])
    except Exception as e:
        print(f"Failed to download original song at {orig_url}: {e}")

    try:
        with yt_dlp.YoutubeDL(get_opts(second_filename)) as ydl:
            print(f"Downloading second: {second_song} by {second_artist}")
            ydl.download([second_url])
    except Exception as e:
        print(f"Failed to download second song at {second_url}: {e}")

In [None]:
# Check how many files were downloaded (should be 176)
mp3_files = [f for f in os.listdir(mp3_song_dir) if f.lower().endswith(".mp3")]
print(f"Total MP3 files downloaded: {len(mp3_files)}")

<h4><b>1.2 Create Data Structure for known copyrighted song pairings</b></h4>

In [None]:
import numpy as np
import librosa
import pickle

In [None]:
# Librosa Configuration
AUDIO_DIR = mp3_song_dir
SR = 16000
DURATION = 10
TARGET_LEN = DURATION * SR

In [None]:
def preprocess_audio(audio_path):
    """Preprocesses .mp3 audio files with specified Librosa Configuration above. Splits each song into chunks of 10 seconds.

    PARAMETERS
    ----------
        audio_path: str
            Path to audio file
    RETURNS
    -------
        list:
            A list of numpy arrays, where each array is a 10-second audio chunk of size TARGET_LEN of a song.
    """

    audio, sr = librosa.load(audio_path, sr=None)

    # Trim silent parts (set to top off lower than 20DB)
    audio, _ = librosa.effects.trim(audio, top_db=20)

    # Resample to 16kHz for Wave2Vec2
    if sr != SR:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=SR)

    # Split song into 10-second chunks of 16kHz
    segments = []
    for i in range(0, len(audio), TARGET_LEN):
        segment = audio[i: (i + TARGET_LEN)]
        # If the segment is shorter than TARGET_LEN (i.e. last part of song), pad with zeros for uniformity
        if len(segment) < TARGET_LEN:
            segment = np.pad(segment, (0, TARGET_LEN - len(segment)), mode='constant')

        segments.append(segment)

    return segments

In [None]:
def get_pair_id(audio_filename):
    """Retrieves pairs of known copyrighted cases from the songs_mp3 directory.

    PARAMETERS
    ----------
    audio_filename: str
        Path to audio file

    RETURNS
    -------
        str:
            The pair ID (i.e. "001")
    """
    match = re.match(r"(\d{3})_", audio_filename)
    return match.group(1) if match else None

In [None]:
# Group song mp3 files by pair ID
pairs = {}
for fname in os.listdir(mp3_song_dir):
    if fname.endswith(".mp3"):
        pair_id = get_pair_id(fname)
        if pair_id:
            pairs.setdefault(pair_id, []).append(fname)

In [None]:
# Build dataset from the pairs dictionary intialized in the cell above, preprocess them, and save to pickle file
dataset = []
preprocessed_output_dir = "preprocessed_pairs"
os.makedirs(preprocessed_output_dir, exist_ok=True)

for pair_id, files in pairs.items():
    if len(files) < 2:
        print(f"Skipping {pair_id} expected 2 files but got {len(files)}")
        continue

    files = sorted(files) # Original songs [0], copies [1]
    og_song = os.path.join(mp3_song_dir, files[0])
    copy_song = os.path.join(mp3_song_dir, files[1])

    print(f"Processing pair {pair_id}...")
    segments1 = preprocess_audio(og_song)
    print(f"Preprocessed original song segments: {len(segments1)}")
    segments2 = preprocess_audio(copy_song)
    print(f"Preprocessed copied song segments: {len(segments2)}")

    label = True # True since we are preprocessing all pairs of known copyright infringement cases

    pair_data = {
        "pair_id": pair_id,
        "original_song_segments": segments1,
        "copied_song_segments": segments2,
        "label": label,
    }

    with open(os.path.join(preprocessed_output_dir, f"{pair_id}.pkl"), "wb") as f:
        pickle.dump(pair_data, f)
    print(f"Saved pair {pair_id} to pickle.")

    dataset.append(pair_data)

In [None]:
def load_dataset_from_pickles(pickle_dir):
    """
    Handy function to load all preprocessed song pair .pkl files from preprocessed_pairs directory.

    Parameters
    ----------
    pickle_dir : str
        Directory where the pickle files are stored.

    Returns
    -------
    list
        A list of dictionaries, each containing:
            - 'pair_id': str
            - 'song1': list of numpy arrays (segments)
            - 'song2': list of numpy arrays (segments)
            - 'label': bool
    """
    loaded_dataset = []
    for pkl_fname in os.listdir(pickle_dir):
        if pkl_fname.endswith(".pkl"):
            path = os.path.join(pickle_dir, pkl_fname)
            with open(path, "rb") as f:
                load_pair_data = pickle.load(f)
                loaded_dataset.append(load_pair_data)

    return loaded_dataset

<h4><b>1.3 Transform preprocessed raw waveforms into feature vector representations with Librosa</b></h4>

In [None]:
import librosa.feature as librosa_feature
from tqdm import tqdm

In [None]:
def extract_features(audio_segment, sr=16000):
    """
    Extracts a set of audio features for music similarity detection using librosa such as:
        - Spectral Features (Timbre, the unique quality of a song's sound)
        - Rhythmic Features
        - Harmonic and Melodic Features
        - Structural Features

    Parameters:
    -----------
    audio_segment : numpy.ndarray
        Raw audio waveform (preprocessed above)
    sr : int
        Sample rate of the audio waveform

    Returns:
    --------
    dict
        Dictionary that holds the aforementioned audio features
    """

    features = {}
    # SPECTRAL FEATURES

    # MFCC - captures timbre characteristics (timbre is the unique quality of a song's sound)
    mfccs = librosa_feature.mfcc(y=audio_segment, sr=sr, n_mfcc=20)
    features['mfcc_mean'] = np.mean(mfccs, axis=1)
    features['mfcc_std'] = np.std(mfccs, axis=1)

    # Spectral contrast: captures timbral contrast between peaks and valleys
    contrast = librosa_feature.spectral_contrast(y=audio_segment, sr=sr)
    features['spectral_contrast_mean'] = np.mean(contrast, axis=1)

    # Spectral centroid: brightness of sound
    centroid = librosa_feature.spectral_centroid(y=audio_segment, sr=sr)[0]
    features['spectral_centroid_mean'] = np.mean(centroid)
    features['spectral_centroid_std'] = np.std(centroid)

    # Spectral rolloff: frequency below which most energy is contained
    rolloff = librosa_feature.spectral_rolloff(y=audio_segment, sr=sr)[0]
    features['spectral_rolloff_mean'] = np.mean(rolloff)

    # Width of spectrum
    bandwidth = librosa_feature.spectral_bandwidth(y=audio_segment, sr=sr)[0]
    features['spectral_bandwidth_mean'] = np.mean(bandwidth)

    # 2. RHYTHM FEATURES

    # Tempo and beat sterngth
    onset_env = librosa.onset.onset_strength(y=audio_segment, sr=sr)
    tempo, beat_frames = librosa.beat.beat_track(onset_envelope=onset_env, sr=sr)
    features['tempo'] = tempo

    # Beat intervals if beats were detected
    if len(beat_frames) > 1:
        beat_times = librosa.frames_to_time(beat_frames, sr=sr)
        beat_intervals = np.diff(beat_times)
        features['beat_intervals_mean'] = np.mean(beat_intervals)
        features['beat_intervals_std'] = np.std(beat_intervals)
    else:
        features['beat_intervals_mean'] = 0
        features['beat_intervals_std'] = 0

    # Onset detection: where new notes begin
    onsets = librosa.onset.onset_detect(y=audio_segment, sr=sr)
    features['onset_count'] = len(onsets)

    # Rhythm patterns using tempogram
    tempogram = librosa_feature.tempogram(onset_envelope=onset_env, sr=sr)
    features['tempogram_mean'] = np.mean(tempogram, axis=1)

    # 3. HARMONIC AND MELODIC FEATURES

    # Separate source audio segment into harmonic and percusive parts
    harmonic, percussive = librosa.effects.hpss(audio_segment)

    # Evaluates melodic similarities with harmonic audio part
    chroma_stft = librosa_feature.chroma_stft(y=harmonic, sr=sr)
    features['chroma_stft_mean'] = np.mean(chroma_stft, axis=1)
    features['chroma_stft_std'] = np.std(chroma_stft, axis=1)

    # Extract pitch related features with harmonic audio part (gets the tonal content)
    chroma_cq = librosa_feature.chroma_cqt(y=harmonic, sr=sr)
    features['chroma_cq_mean'] = np.mean(chroma_cq, axis=1)
    features['chroma_cq_std'] = np.std(chroma_cq, axis=1)

    # Tonnetz to measure harmonic relations (like how close 2 chords are on the circle of fifth)
    tonnetz = librosa_feature.tonnetz(y=harmonic, sr=sr)
    features['tonnetz_mean'] = np.mean(tonnetz, axis=1)
    features['tonnetz_std'] = np.std(tonnetz, axis=1)

    # 4. STRUCTURAL FEATURES

    # Mel spectrogram (log scale)
    # Represents the spectral energy across frequency bands, trying to get the overall feel of a song
    mel_spec = librosa_feature.melspectrogram(y=audio_segment, sr=sr, n_mels=128)
    log_mel_spec = librosa.power_to_db(mel_spec)
    features['mel_spec_mean'] = np.mean(log_mel_spec, axis=1)
    features['mel_spec_std'] = np.std(log_mel_spec, axis=1)

    # Zero-crossing rate (identifies signal crossing the 0 amplitude line)
    # High ZCR: noisy, cymbals, Low ZCR: smooth, tonal sounds like vocals or flute for ex.
    zcr = librosa_feature.zero_crossing_rate(audio_segment)[0]
    features['zcr_mean'] = np.mean(zcr)
    features['zcr_std'] = np.std(zcr)

    # RMS energy (perceived loudness over time, measures energy dynamics)
    rms = librosa_feature.rms(y=audio_segment)[0]
    features['rms_mean'] = np.mean(rms)
    features['rms_std'] = np.std(rms)

    return features

In [None]:
def extract_features_from_dataset(dataset):
    """
    Process all song pairs and extract features.

    Parameters:
    -----------
    dataset : list
        List of dictionaries containing pair_id, original_song_segments, copied_song_segments, and label

    Returns:
    --------
    list
        List of dictionaries with pair_id, original_features, copied_features, and label
    """
    processed_dataset = []
    os.makedirs("processed_features", exist_ok=True)

    for pair_data in tqdm(dataset, desc="Extracting features from song pairs"):
        pair_id = pair_data['pair_id']
        original_segments = pair_data['original_song_segments']
        copied_segments = pair_data['copied_song_segments']
        label = pair_data['label']

        # Extract features for each segment of the original song
        original_features = []
        for i, segment in enumerate(tqdm(original_segments, desc=f"Processing original song {pair_id}", leave=False)):
            try:
                features = extract_features(segment)
                original_features.append(features)

                # Save intermediate results every 5 segments
                if (i + 1) % 5 == 0:
                    with open(f"processed_features/pair_{pair_id}_original_temp.pkl", "wb") as f:
                        pickle.dump(original_features, f)

            except Exception as e:
                print(f"Error processing segment {i} in original song {pair_id}: {str(e)}")
                continue

        # Extract features for each segment of the copied song
        copied_features = []
        for i, segment in enumerate(tqdm(copied_segments, desc=f"Processing copied song {pair_id}", leave=False)):
            try:
                features = extract_features(segment)
                copied_features.append(features)

                # Save intermediate results every 5 segments
                if (i + 1) % 5 == 0:
                    with open(f"processed_features/pair_{pair_id}_copied_temp.pkl", "wb") as f:
                        pickle.dump(copied_features, f)

            except Exception as e:
                print(f"Error processing segment {i} in copied song {pair_id}: {str(e)}")
                continue

        # Only add to processed dataset if both songs have features extracted
        if original_features and copied_features:
            processed_pair = {
                'pair_id': pair_id,
                'original_features': original_features,
                'copied_features': copied_features,
                'label': label
            }
            processed_dataset.append(processed_pair)

            # Save each processed pair individually
            with open(f"processed_features/pair_{pair_id}_features.pkl", "wb") as f:
                pickle.dump(processed_pair, f)

    return processed_dataset

In [None]:
def prepare_data_for_cnn(processed_dataset):
    """
    Convert processed features to vectors suitable for 1D CNN input.

    Parameters:
    -----------
    processed_dataset : list
        List of dictionaries with pair_id, original_features, copied_features, and label

    Returns:
    --------
    tuple
        (X_pairs, y) where X_pairs contains paired feature vectors and y contains labels
    """
    X_pairs = []
    y = []

    for pair_data in processed_dataset:
        original_features = pair_data['original_features']
        copied_features = pair_data['copied_features']
        label = pair_data['label']

        # For each pair of segments (one from original, one from copied) create a feature vector pair
        for orig_feat in original_features:
            for copy_feat in copied_features:
                # Convert dictionary features to vector
                orig_vector = []
                copy_vector = []

                # Add all numeric features to vectors
                for key, value in orig_feat.items():
                    if isinstance(value, np.ndarray):
                        orig_vector.extend(value.flatten())
                    elif isinstance(value, (int, float)):
                        orig_vector.append(value)

                for key, value in copy_feat.items():
                    if isinstance(value, np.ndarray):
                        copy_vector.extend(value.flatten())
                    elif isinstance(value, (int, float)):
                        copy_vector.append(value)

                # Ensure vectors are of equal length by padding if need be
                max_len = max(len(orig_vector), len(copy_vector))
                orig_vector = np.pad(orig_vector, (0, max_len - len(orig_vector)), 'constant')
                copy_vector = np.pad(copy_vector, (0, max_len - len(copy_vector)), 'constant')

                # Create pair representation
                X_pairs.append([np.array(orig_vector), np.array(copy_vector)])
                y.append(label)

    return np.array(X_pairs), np.array(y)

In [None]:
# Load Dataset
preprocessed_dataset = load_dataset_from_pickles(preprocessed_output_dir)
print(f"Loaded {len(dataset)} song pairs")

In [None]:
# Start Extracting Features
os.makedirs("features", exist_ok=True)
feat_processed_dataset = extract_features_from_dataset(dataset)
with open("features/all_processed_features.pkl", "wb") as f:
        pickle.dump(feat_processed_dataset, f)

In [None]:
# Prepare data for 1D CNN
os.makedirs("prepared_data_cnn", exist_ok=True)
X_pairs, y = prepare_data_for_cnn(feat_processed_dataset)
with open("prepared_data_cnn/cnn_ready_data.pkl", "wb") as f:
    pickle.dump((X_pairs, y), f)