In [7]:
!rm -rf sample_data
!mkdir output
!mkdir tmp
!pip install demucs==4.0.1 librosa==0.11.0 numpy==1.26.4 yt-dlp>=2025.11.12 pydub==0.25.1



## Downloading, Track separation & Processing
_mindless dump of all the project files just to make this notebook happen..._

In [2]:
import shutil
import warnings
from pathlib import Path

import demucs.separate
import librosa
import numpy as np
import torch

# TODO: fix; perhaps coming from htdemucs or librosa
warnings.filterwarnings(
    "ignore", message="Torchaudio's I/O functions now support per-call backend dispatch"
)
warnings.filterwarnings(
    "ignore",
    message=".*this function's implementation will be changed to use torchaudio.save_with_torchcodec.*",
)


def read_audio_file(input_file_path: Path) -> tuple[np.ndarray, np.ndarray, float]:
    y, sample_rate = librosa.load(input_file_path, mono=True)
    time = np.arange(len(y)) / sample_rate

    return time, y.astype(np.float32), sample_rate


def extract_drums(
    input_file_path: Path, skip_cache=False
) -> tuple[tuple[np.ndarray, np.ndarray, float], Path]:
    try:
        torch.cuda.init()
    except Exception:
        raise RuntimeError(
            "CUDA initialization failed. You dont wanna run this on CPU mode!!"
        )

    if not input_file_path.exists():
        raise FileNotFoundError(
            f"The input file {input_file_path.as_posix()} does not exist."
        )

    extracted_drums_file_path = (
        input_file_path.parent / f"{input_file_path.stem}_drums.wav"
    )

    if skip_cache or not extracted_drums_file_path.exists():
        print("Extracting drums")
        temp_file_path = (
            input_file_path.parent
            / "htdemucs"
            / f"{input_file_path.stem}"
            / "drums.wav"
        )
        print("Isolating drums with Demucs...")
        demucs.separate.main(
            [
                "--two-stems",
                "drums",
                "--device",
                "cuda",
                "-o",
                f"{input_file_path.parent}",
                input_file_path.as_posix(),
            ]
        )
        shutil.copy(temp_file_path, extracted_drums_file_path)
        shutil.rmtree(temp_file_path.parent)
        torch.cuda.empty_cache()

    return read_audio_file(extracted_drums_file_path), extracted_drums_file_path

import json
import re
import uuid
from pathlib import Path

from yt_dlp import YoutubeDL


def download_from_youtube_as_mp3(url: str) -> tuple[bool, Path | None]:
    if not re.match(r"(https?://)?(www\.)?(youtube\.com|youtu\.be)/", url):
        raise ValueError("The provided URL is not a valid YouTube video URL.")

    output_folder = Path("./tmp")
    output_folder.mkdir(exist_ok=True)

    temp_name = str(uuid.uuid4())
    temp_path = str(output_folder / f"{temp_name}.%(ext)s")

    opts = {
        "format": "bestaudio/best",
        "extractaudio": True,
        "audioformat": "mp3",
        "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "mp3"}],
        "outtmpl": temp_path,
        "noplaylist": True,
        "quiet": False,
    }

    try:
        with YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=True)

            if info is None:
                print("Failed to download video.")
                return False, None

            title = re.sub(r'[<>:"/\\|?*]', " ", info.get("title", temp_name))
            final_path = output_folder / f"{title}.mp3"
            downloaded_path = output_folder / f"{temp_name}.mp3"

            if downloaded_path.exists():
                downloaded_path.rename(final_path)

            return True, final_path

    except Exception as e:
        print(f"Error: {e}")
        return False, None

import json
import re
import uuid
from pathlib import Path

from yt_dlp import YoutubeDL

def download_from_youtube_as_mp3(url: str) -> tuple[bool, Path | None]:
    if not re.match(r"(https?://)?(www\.)?(youtube\.com|youtu\.be)/", url):
        raise ValueError("The provided URL is not a valid YouTube video URL.")


    output_folder = Path("./tmp")
    output_folder.mkdir(exist_ok=True)

    temp_name = str(uuid.uuid4())
    temp_path = str(output_folder / f"{temp_name}.%(ext)s")

    opts = {
        "format": "bestaudio/best",
        "extractaudio": True,
        "audioformat": "mp3",
        "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "mp3"}],
        "outtmpl": temp_path,
        "noplaylist": True,
        "quiet": False,
    }

    try:
        with YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=True)

            if info is None:
                print("Failed to download video.")
                return False, None

            title = re.sub(r'[<>:"/\\|?*]', " ", info.get("title", temp_name))
            final_path = output_folder / f"{title}.mp3"
            downloaded_path = output_folder / f"{temp_name}.mp3"

            if downloaded_path.exists():
                downloaded_path.rename(final_path)

            return True, final_path

    except Exception as e:
        print(f"Error: {e}")
        return False, None


import json
from pathlib import Path
from zipfile import ZipFile

import numpy as np
from pydub import AudioSegment

default_output_dir = "./output"


def compress_to_mp3(wav_path: Path, bitrate: str = "192k") -> Path:
    mp3_path = wav_path.with_suffix(".mp3")
    audio = AudioSegment.from_wav(wav_path)
    audio.export(mp3_path, format="mp3", bitrate=bitrate)
    return mp3_path


def save_result(
    time: np.ndarray,
    ranges_to_highlight: list[tuple[int, int]],
    snare_frequency: float,
    bass_drum_frequency: float,
    filepath: Path,
    drumtrack_path: Path,
    output_dir: str = default_output_dir,
):
    output = {
        "blast_beats": [],
        "snare_frequency": snare_frequency,
        "bass_drum_frequency": bass_drum_frequency,
    }

    for start, end in ranges_to_highlight:
        output["blast_beats"].append(
            {"start_time": float(time[start]), "end_time": float(time[end - 1])}
        )

    mp3_drumtrack_path = compress_to_mp3(drumtrack_path)

    zip_path = f"{output_dir}/{filepath.stem.replace(' ', '_').replace('-', '_')}.zip"
    with ZipFile(zip_path, "w") as zipf:
        zipf.writestr(
            f"{filepath.stem.replace(' ', '_').replace('-', '_')}.json",
            json.dumps(output, indent=4),
        )
        zipf.write(filepath, arcname=filepath.name)
        zipf.write(mp3_drumtrack_path, arcname=mp3_drumtrack_path.name)

    print(f"Exported results to: {zip_path}")
    return zip_path

from pathlib import Path
from typing import NamedTuple

import numpy as np
from numpy.fft import fft


class LabeledSection(NamedTuple):
    start_idx: int
    end_idx: int
    snare_present: bool
    bass_drum_present: bool


def get_frequency_and_intensity_arrays(
    audio_data: np.ndarray, sample_rate: float
) -> tuple[np.ndarray, np.ndarray]:
    X = fft(audio_data)
    N = len(X)
    n = np.arange(N)
    T = N / sample_rate
    freq = n / T

    # Only keep positive frequencies
    nyquist_idx = len(freq) // 2
    freq = freq[:nyquist_idx]
    X = X[:nyquist_idx]

    return freq, np.abs(X)


def is_peak_present_around_frequency(
    freq_to_find: float,
    frequencies: np.ndarray,
    intensities: np.ndarray,
    peak_detection_band_width: float,
    peak_detection_min_area_threshold: float,
) -> bool:
    lower_bound = freq_to_find - peak_detection_band_width
    upper_bound = freq_to_find + peak_detection_band_width

    # Martins approach:  sum intensities (proxy for area under the curve), and compare with a peak_detection_min_area_threshold -> if meets thresh, it contains a peak around the freq. we're looking for
    indexes = np.where((frequencies >= lower_bound) & (frequencies <= upper_bound))[0]
    if len(indexes) == 0:
        return False

    intensity_sum = np.sum(intensities[indexes])

    return intensity_sum > peak_detection_min_area_threshold


def get_sections_labeled_by_percussion_content_from_audio(
    time: np.ndarray,
    data: np.ndarray,
    sample_rate: float,
    bass_drum_freq: float,
    snare_drum_freq: float,
    step_size_in_seconds: float,
    peak_detection_band_width: float,
    peak_detection_min_area_threshold: float,
) -> list[LabeledSection]:
    results = []

    step_size_in_samples = int(step_size_in_seconds * sample_rate)

    for start_idx in range(0, len(time), step_size_in_samples):
        end_idx = start_idx + step_size_in_samples
        data_range = data[start_idx:end_idx]

        freq, fft_magnitude = get_frequency_and_intensity_arrays(
            data_range, sample_rate
        )

        snare_present = is_peak_present_around_frequency(
            snare_drum_freq,
            freq,
            fft_magnitude,
            peak_detection_band_width,
            peak_detection_min_area_threshold,
        )
        bass_drum_present = is_peak_present_around_frequency(
            bass_drum_freq,
            freq,
            fft_magnitude,
            peak_detection_band_width,
            peak_detection_min_area_threshold,
        )

        results.append(
            LabeledSection(
                start_idx,
                end_idx,
                snare_present=snare_present,
                bass_drum_present=bass_drum_present,
            )
        )

    return results


def identify_blastbeats(
    sections: list[LabeledSection], min_hits
) -> list[tuple[int, int]]:
    blastbeat_start_idx = 0
    hits = 0
    results = []

    # Caveman approach: consider it as blast beat if a given number of consecutive labeled sections contain snare & bassdrum
    for i, section in enumerate(sections):
        # Ugly but necessary for improving the "counting" functionality. Otherwise a single long blast beat section was being counted as only 1.
        if hits >= min_hits:
            results.append((sections[blastbeat_start_idx].start_idx, section.start_idx))
            hits = 0

        if section.snare_present and section.bass_drum_present:
            if hits == 0:
                blastbeat_start_idx = i
            hits += 1
        else:
            hits = 0

    return results


def identify_bass_and_snare_frequencies(
    audio_data: np.ndarray,
    sample_rate: float,
    bass_drum_range: tuple[int, int],
    snare_range: tuple[int, int],
    debug_song_name: str | None = None,
) -> tuple[float, float]:
    # simple approach: fft over the whole song
    freq, intensities = get_frequency_and_intensity_arrays(audio_data, sample_rate)

    # find the peak in the bass drum range
    bass_drum_peak_idx = np.where(
        (freq >= bass_drum_range[0]) & (freq <= bass_drum_range[1])
    )[0]
    snare_peak_idx = np.where((freq >= snare_range[0]) & (freq <= snare_range[1]))[0]

    bass_drum_freq, snare_freq = None, None

    if bass_drum_peak_idx.size > 0:
        bass_drum_freq = freq[
            bass_drum_peak_idx[np.argmax(intensities[bass_drum_peak_idx])]
        ]
    else:
        print("Warning: No bass drum frequency found in the specified range.")

    if snare_peak_idx.size > 0:
        snare_freq = freq[snare_peak_idx[np.argmax(intensities[snare_peak_idx])]]
    else:
        print("Warning: No snare frequency found in the specified range.")

    if bass_drum_freq is None or snare_freq is None:
        raise ValueError(
            "Could not identify bass drum or snare frequencies. Please check the audio file."
        )
    return bass_drum_freq, snare_freq


def process_song(
    file_path: Path,
    peak_detection_band_width=10.0,
    peak_detection_min_area_threshold=37.6,
    step_size_in_seconds=0.15,
    bass_drum_range=(10, 100),
    snare_range=(170, 600),
    min_consecutive_hits=8,
):
    print("Separating drum track...")
    (time, audio_data, sample_rate), drumtrack_path = extract_drums(file_path)
    bass_drum_freq, snare_freq = identify_bass_and_snare_frequencies(
        audio_data,
        sample_rate,
        bass_drum_range,
        snare_range,
        # TODO: remove this when not debugging
        # debug_song_name=file_path.stem,
    )
    print(
        f"Estimated frequencies -- Bass drum: {bass_drum_freq} Hz; Snare drum: {snare_freq} Hz"
    )

    print("Identifying blast beats...")
    labeled_sections = get_sections_labeled_by_percussion_content_from_audio(
        time,
        audio_data,
        sample_rate,
        bass_drum_freq,
        snare_freq,
        step_size_in_seconds,
        peak_detection_band_width,
        peak_detection_min_area_threshold,
    )
    blastbeat_intervals = identify_blastbeats(labeled_sections, min_consecutive_hits)

    save_result(
        time, blastbeat_intervals, snare_freq, bass_drum_freq, file_path, drumtrack_path
    )


## Pipeline

In [15]:
import csv
from pathlib import Path
import os

songs = [
    "https://youtu.be/cPTJzeEqkFs", # Youtube URLs are supported on a best-effort basis (uses yt-dlp to download the audio).
    #"/content/DER WEG EINER FREIHEIT - Ewigkeit (Drumcam live at Summer Breeze 2017).mp3"  # example of how to point to a local file
]

for src in songs:
    file_path = None

    if src.startswith(("http://youtu", "https://youtu")):
        print(f"Processing YouTube URL: {src}")
        success, file_path = download_from_youtube_as_mp3(src)
        if not success or not file_path:
            print(f"Failed to download: {src}\n\nNB: Youtube downloads might get rate-limited or blocked at some point.\nIf this doesn't work, try uploading your songs directly to the notebook file system and indicate the path to it")
            print("-----")
            continue
    else:
        file_path = Path(src)
        if not file_path.exists():
            print(f"File does not exist: {file_path}")
            continue

    process_song(file_path)

    # clean up
    for f in file_path.parent.iterdir():
        if f.is_file() and "_drums." in f.name:
            f.unlink()

    print("-----")


Processing YouTube URL: https://youtu.be/cPTJzeEqkFs
[youtube] Extracting URL: https://youtu.be/cPTJzeEqkFs
[youtube] cPTJzeEqkFs: Downloading webpage




[youtube] cPTJzeEqkFs: Downloading android sdkless player API JSON
[youtube] cPTJzeEqkFs: Downloading web safari player API JSON


ERROR: [youtube] cPTJzeEqkFs: Sign in to confirm you’re not a bot. Use --cookies-from-browser or --cookies for the authentication. See  https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp  for how to manually pass cookies. Also see  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies  for tips on effectively exporting YouTube cookies


Error: ERROR: [youtube] cPTJzeEqkFs: Sign in to confirm you’re not a bot. Use --cookies-from-browser or --cookies for the authentication. See  https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp  for how to manually pass cookies. Also see  https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies  for tips on effectively exporting YouTube cookies
Failed to download: https://youtu.be/cPTJzeEqkFs

NB: Youtube downloads might get rate-limited or blocked at some point.
If this doesn't work, try uploading your songs directly to the notebook file system and indicate the path to it
-----
Separating drum track...
Estimated frequencies -- Bass drum: 38.14778942937706 Hz; Snare drum: 214.47713835957984 Hz
Identifying blast beats...
Exported results to: ./output/DER_WEG_EINER_FREIHEIT___Ewigkeit_(Drumcam_live_at_Summer_Breeze_2017).zip
-----
