In [1]:
from __future__ import unicode_literals
from yt_dlp import YoutubeDL
import librosa
from madmom.features.beats import DBNBeatTrackingProcessor, RNNBeatProcessor
from madmom.features.downbeats import DBNDownBeatTrackingProcessor, RNNDownBeatProcessor
import soundfile as sf
import numpy as np
import os
from pydub import AudioSegment
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
import openl3
from madmom.features.chords import DeepChromaChordRecognitionProcessor, majmin_targets_to_chord_labels
from madmom.evaluation.chords import encode as encode_chords, merge_chords, reduce_to_triads
from madmom.audio.chroma import DeepChromaProcessor
import csv
import pretty_midi



%load_ext autoreload
%autoreload 2

In [2]:
def download_mp3(url, out_path="./%(title)s.%(ext)s"):
    ydl_opts = {
        "format": "bestaudio/best",
        "outtmpl": out_path,
        "ffmpeg_location": r"C:\FFmpeg\bin",
        "postprocessors": [{
            "key": "FFmpegExtractAudio",
            "preferredcodec": "mp3",
                "preferredquality": "192",
        }],
        "quiet": False,
        "no_warnings": True, 
    }

    with YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])

In [3]:
def get_beats_and_downbeats(y, sr, tempo=0):

    if tempo == 0:
        tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
    #beat_tracker = BeatNet(1, mode="online", inference_model="PF", thread=False)
    #beats_np = beat_tracker.process(y)
    #print(beats_np)
    print(tempo)
    sf.write("test.wav",y,sr)
    act = RNNDownBeatProcessor()("test.wav")

    dbn = DBNDownBeatTrackingProcessor(beats_per_bar=4, fps=100, min_bpm = tempo * 0.8, max_bpm = tempo * 1.2)

    downbeats = dbn(act)

    
    beats = np.array([time for time, beat in downbeats])
    downbeats = np.array([time for time, beat in downbeats if beat == 1])
    print(60/np.mean(np.diff(beats)))
    os.remove("test.wav")
    #print(beats_fixed)
    #return beats
    return (beats, downbeats, act, tempo)

def load(path):
    return librosa.load(path)

def get_clicks(beats, sr):
    return librosa.clicks(times=beats, sr=sr)

def overlay(y, clicks, sr, path):
    sf.write("y.wav", y, sr, subtype='PCM_16')
    sf.write("clicks.wav", clicks, sr, subtype='PCM_16')

   
    y_audio = AudioSegment.from_wav("y.wav")
    clicks_audio = AudioSegment.from_wav("clicks.wav")
    audio = y_audio.overlay(clicks_audio)

    os.remove("y.wav")
    os.remove("clicks.wav")


    audio.export(path, format="wav")

def trim_silence(y):
    y_t, index = librosa.effects.trim(y, top_db=40)
    return y_t

In [4]:
def midi_to_chroma(pm, sr, hop_length):
    chroma = pm.get_chroma(fs=sr / hop_length)
    return librosa.util.normalize(chroma + 1e-6, axis=0)

def audio_to_chroma(y, sr, hop_length):
    C = librosa.feature.chroma_stft(y=y, sr=sr, hop_length=hop_length)
    return C
    #return librosa.util.normalize(C, axis=0)

In [10]:
def slice_midi_segment(pm: pretty_midi.PrettyMIDI, times):
    t0, t1 = times
    seg = pretty_midi.PrettyMIDI()
    for instr in pm.instruments:
        instr_seg = pretty_midi.Instrument(program=instr.program,
                                           is_drum=instr.is_drum)
        for note in instr.notes:
            if note.start >= t0 and note.start < t1:
                n = pretty_midi.Note(
                    velocity=note.velocity,
                    pitch=note.pitch,
                    start=note.start - t0,
                    end  =min(note.end - t0, t1 - t0)
                )
                instr_seg.notes.append(n)
        seg.instruments.append(instr_seg)
    return seg

def concat_midi(midis):
    merged = pretty_midi.PrettyMIDI()
    merged.instruments = [instr for instr in midis[0].instruments]
    current_time = midis[0].get_end_time()

    # Iterate over the rest
    for pm in midis[1:]:
        # For each instrument in this segment
        for instr in pm.instruments:
            # Copy the instrument and shift its notes/events
            instr_copy = pretty_midi.Instrument(
                program=instr.program,
                is_drum=instr.is_drum,
                name=instr.name
            )
            # Shift each note
            for note in instr.notes:
                instr_copy.notes.append(pretty_midi.Note(
                    velocity=note.velocity,
                    pitch=note.pitch,
                    start=note.start + current_time,
                    end=note.end   + current_time
                ))
            # Shift any control changes or pitch bends if you care to preserve them:
            for cc in instr.control_changes:
                instr_copy.control_changes.append(pretty_midi.ControlChange(
                    number=cc.number,
                    value=cc.value,
                    time=cc.time + current_time
                ))
            for pb in instr.pitch_bends:
                instr_copy.pitch_bends.append(pretty_midi.PitchBend(
                    pitch=pb.pitch,
                    time=pb.time + current_time
                ))
            merged.instruments.append(instr_copy)

        # Advance the time offset by this segment’s length
        current_time += pm.get_end_time()

    return merged


def dtw_align_m(y_s_harmonic, y_s, midi_p, downbeats_s, downbeats_p, sr, hop_length=512, n_beats=8, sim_metric='cosine', cost_threshold=1e3):
    aligned = []
    seg_s = []
    seg_p = []
    seg_p_list = []
    i = 0
    print(len(downbeats_s), len(downbeats_p))
    while i < len(downbeats_s) or i < len(downbeats_p):
        if i + n_beats >= min(len(downbeats_s), len(downbeats_p)) - 1:
            break

        t0_p, t1_p = downbeats_p[i], downbeats_p[i+n_beats]
        t0_s, t1_s = downbeats_s[i], downbeats_s[i+n_beats]

        mid_chunk_p = slice_midi_segment(midi_p, (t0_p, t1_p))
        s0_s, s1_s = int(t0_s * sr), int(t1_s * sr)

        y_chunk_s_harmonic = y_s_harmonic[s0_s: s1_s]
        y_chunk_s = y_s[s0_s: s1_s]

        C_s = audio_to_chroma(y_chunk_s_harmonic, sr, hop_length)
        C_p = midi_to_chroma(mid_chunk_p, sr, hop_length)

        D_feat = cdist(C_p.T, C_s.T, metric=sim_metric)

        alpha = 0.03
        idx1 = np.arange(D_feat.shape[0])[:,None]
        idx2 = np.arange(D_feat.shape[1])[None,:]
        D_time = alpha * np.abs(idx1 - idx2)

        Cost = D_feat + D_time

        D, wp = librosa.sequence.dtw(C=Cost)
        cost = D[-1, -1]

        print(i, cost)
        if cost < cost_threshold:
            aligned.append((t0_p, t1_p, t0_s, t1_s, cost))
            seg_p.append(mid_chunk_p)
            seg_s.append(y_chunk_s)
            #j += 8
            i += 8
        else:
            break
            #if len(downbeats_p) < len(downbeats_s):
            #    i += 1
            #else:
            #   j += 1

    return aligned, seg_p, seg_s

In [16]:
song_links = []
hop_length = 512
with open("links.csv", newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        song_links.append(row["audio_link"])

for i in range(len(song_links)):
    if i > 63:
        download_mp3(song_links[i], f"song/{i}")

        y_s, sr = librosa.load(f"song/{i}.mp3")
        y_s = trim_silence(y_s)

        y_s_harmonic, y_s_percussive = librosa.effects.hpss(y_s)
            #y_p_harmonic, y_p_percussive = librosa.effects.hpss(y_p)

        pm = pretty_midi.PrettyMIDI(f"midi/{i}.mid")

        C_s = audio_to_chroma(y_s_harmonic, sr, hop_length)
        C_p = midi_to_chroma(pm, sr, hop_length)

        
        downbeats_p = pm.get_downbeats()
        beats_p = pm.get_beats()

        tempo_p = 60 / np.mean(np.diff(beats_p))

        beats_s, downbeats_s, act_s, tempo_s = get_beats_and_downbeats(y_s, sr, tempo=tempo_p)

        if tempo_p / 1.8 > tempo_s:
            beats_s, downbeats_s, act_s, tempo_s = get_beats_and_downbeats(y_s, sr, tempo=tempo_p)

        if tempo_s / 1.8 > tempo_p:
            beats_s, downbeats_s, act_s, tempo_s = get_beats_and_downbeats(y_s, sr, tempo=tempo_p)


        aligned, pm_list, y_s_list = dtw_align_m(y_s_harmonic, y_s, pm, downbeats_s, downbeats_p, sr, hop_length=512, n_beats=8, sim_metric='cosine', cost_threshold=500)

        os.remove(f"song/{i}.mp3")

        if len(y_s_list) != 0:
            for j in range(len(y_s_list)):
                pm_list[j].write(f"midi_done/{i}_{j}.mid")
                sf.write(f"song/{i}_{j}.mp3",  y_s_list[j], sr, bitrate_mode='VARIABLE', compression_level=0)
        


[youtube] Extracting URL: https://www.youtube.com/watch?v=dIxAEusj2bk
[youtube] dIxAEusj2bk: Downloading webpage
[youtube] dIxAEusj2bk: Downloading tv client config
[youtube] dIxAEusj2bk: Downloading tv player API JSON
[youtube] dIxAEusj2bk: Downloading ios player API JSON
[youtube] dIxAEusj2bk: Downloading m3u8 information
[info] dIxAEusj2bk: Downloading 1 format(s): 251
[download] Destination: song\64
[download] 100% of    3.26MiB in 00:00:00 at 4.20MiB/s   
[ExtractAudio] Destination: song\64.mp3
Deleting original file song\64 (pass -k to keep)




121.71234026389882


  return array(a, dtype, copy=False, order=order)


123.040182743225
99 103
0 438.3889112795598
8 477.2026475096992
16 452.0804793983694
24 371.28914963519105
32 381.501618089245
40 422.9468254690411
48 430.3549184022059
56 370.4823476356861
64 367.28687130344053
72 428.4311459699791
80 366.2449873021282
88 367.62923809010584
[youtube] Extracting URL: https://www.youtube.com/watch?v=9WpBf2O7whw
[youtube] 9WpBf2O7whw: Downloading webpage
[youtube] 9WpBf2O7whw: Downloading tv client config
[youtube] 9WpBf2O7whw: Downloading tv player API JSON
[youtube] 9WpBf2O7whw: Downloading ios player API JSON
[youtube] 9WpBf2O7whw: Downloading m3u8 information
[info] 9WpBf2O7whw: Downloading 1 format(s): 251
[download] Destination: song\65
[download] 100% of    2.93MiB in 00:00:03 at 929.27KiB/s   
[ExtractAudio] Destination: song\65.mp3
Deleting original file song\65 (pass -k to keep)




117.99990166674812


  return array(a, dtype, copy=False, order=order)


117.11446881062967
83 82
0 426.87802240894445
8 400.3972000929896
16 407.0377737867254
24 349.1430805826463
32 357.3305658812588
40 363.4317856599559
48 391.74250219959964
56 338.72179614819686
64 272.0463360989768
72 381.39600010835716
[youtube] Extracting URL: https://www.youtube.com/watch?v=M2M5Tm64swM
[youtube] M2M5Tm64swM: Downloading webpage
[youtube] M2M5Tm64swM: Downloading tv client config
[youtube] M2M5Tm64swM: Downloading tv player API JSON
[youtube] M2M5Tm64swM: Downloading ios player API JSON
[youtube] M2M5Tm64swM: Downloading m3u8 information
[info] M2M5Tm64swM: Downloading 1 format(s): 251
[download] Destination: song\66
[download] 100% of    2.76MiB in 00:00:00 at 5.97MiB/s   
[ExtractAudio] Destination: song\66.mp3
Deleting original file song\66 (pass -k to keep)




94.99984166693015


  return array(a, dtype, copy=False, order=order)


94.98392673015103
66 66
0 399.37213677165624
8 355.55311382126627
16 413.68279518120215
24 397.8657304114511
32 334.4769167498529
40 408.3734566678995
48 337.2516983299544
56 364.53677821327864
[youtube] Extracting URL: https://www.youtube.com/watch?v=4Pe20y_-32c
[youtube] 4Pe20y_-32c: Downloading webpage
[youtube] 4Pe20y_-32c: Downloading tv client config
[youtube] 4Pe20y_-32c: Downloading tv player API JSON
[youtube] 4Pe20y_-32c: Downloading ios player API JSON
[youtube] 4Pe20y_-32c: Downloading m3u8 information
[info] 4Pe20y_-32c: Downloading 1 format(s): 251
[download] Destination: song\67
[download] 100% of    3.48MiB in 00:00:01 at 2.74MiB/s   
[ExtractAudio] Destination: song\67.mp3
Deleting original file song\67 (pass -k to keep)


FileNotFoundError: [Errno 2] No such file or directory: 'midi/67.mid'