#### Dependencies

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import librosa
import time
import multiprocessing as mp
from dotenv import load_dotenv
from tqdm.notebook import tqdm
import sys
from contextlib import closing
import psutil
import tracemalloc
import threading
import gc
from guppy import hpy

from essentia.standard import (
    MonoLoader,
    Danceability,
    Spectrum,
    FrameCutter,
    Loudness,
    RhythmExtractor2013,
    KeyExtractor,
    Energy,
    TonalExtractor,
    Inharmonicity,
    MFCC,
    OnsetRate,
    SpectralCentroidTime,
    DynamicComplexity,
    SpectralPeaks,
    NoveltyCurve,
    Spectrum,
    FrameGenerator,
    Windowing,
    MelBands,
    BeatsLoudness,
    Beatogram,
    Meter,
)

#### Global Constants

In [2]:
load_dotenv()
DOWNLOAD_FOLDER = os.getenv('DOWNLOAD_FOLDER')
CPU_THREADS = int(os.getenv('CPU_THREADS'))
BATCH_SIZE = 20

#### Data

In [3]:
songs_data = pd.read_csv('data/songs_final.csv')

#### Feature Extraction Functions

In [4]:
def create_spectrogram_image(spectrogram_db, sample_rate):
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(spectrogram_db, sr=sample_rate, x_axis='time', y_axis='mel', fmax=11025)
    plt.colorbar(format='%+2.0f dB')
    plt.title(f"Mel-Spectrogram")
    plt.tight_layout()
    plt.show()
    plt.close()

In [5]:
def mp3_to_spectrogram(audio_path, sample_rate, create_image=False):
    mp3, _ = librosa.load(audio_path, sr=sample_rate)
    spectrogram = librosa.feature.melspectrogram(y=mp3, sr=sample_rate, n_mels=128, fmax=11025)
    spectrogram_db = librosa.power_to_db(spectrogram, ref=np.max)

    if create_image:
        create_spectrogram_image(spectrogram_db, sample_rate)

    del mp3, spectrogram
    gc.collect()
    return spectrogram_db

In [6]:
def get_mel_bands(audio):
    spectrum = Spectrum()
    frame_generator = FrameGenerator(audio, frameSize=2048, hopSize=1024)
    window = Windowing(type='hann')

    mel_bands = MelBands(numberBands=40)
    mel_band_energies = []

    for frame in frame_generator:
        spec = spectrum(window(frame))
        mel_band_energies.append(mel_bands(spec))

    mel_band_energies = np.array(mel_band_energies)

    del spectrum, frame_generator, window, mel_bands
    gc.collect()
    return mel_band_energies

In [7]:
def run_essentia_algorithms(audio44k, audio16k):
    _, mfcc_coeffs = MFCC(inputSize=len(audio16k))(audio16k)
    danceability_score = Danceability()(audio44k)
    loudness_score = Loudness()(audio16k)
    bpm, beat_positions, _, _, _ = RhythmExtractor2013(method="multifeature")(audio44k)
    key, scale, _ = KeyExtractor()(audio44k)
    energy_score = Energy()(audio16k)

    ### Chord Significances
    _, _, _, _, chords, _, _, _, _, _, _, _ = TonalExtractor()(audio44k)
    unique_chords, counts = np.unique(chords, return_counts=True)
    chords_significance = {chord: significance for (chord, significance) in zip(unique_chords, counts)}

    ### Inharmonicity
    frames = []
    frameCutter = FrameCutter()
    while True:
        frame = frameCutter(audio44k)
        if not len(frame):
            break
        frames.append(frame)
        
    spectrum_magnitudes = []
    for frame in frames:
        spectrum_magnitudes_frame = Spectrum()(frame)
        spectrum_magnitudes.append(spectrum_magnitudes_frame)
    spectrum_magnitudes = np.array(spectrum_magnitudes).flatten()
    
    frequencies, magnitudes = SpectralPeaks()(audio44k)
    hnr_score = None
    if frequencies[0]: 
        hnr_score = Inharmonicity()(frequencies, magnitudes)
    ###
    
    onset_rate_score = OnsetRate()(audio44k)
    brightness_score = SpectralCentroidTime()(audio44k)
    dynamic_complexity_score, _ = DynamicComplexity()(audio16k)
    
    mel_bands = get_mel_bands(audio44k)
    novelty_curve = NoveltyCurve()(mel_bands)
    novelty_score = np.median(np.abs(np.diff(novelty_curve)))
    
    beats_loudness, beats_loudness_band_ratio = BeatsLoudness(beats=beat_positions)(audio44k)
    beatogram = Beatogram()(beats_loudness, beats_loudness_band_ratio)
    time_signature = Meter()(beatogram)

    features = {
        'Danceability': danceability_score[0],
        'Loudness': loudness_score,
        'BPM': bpm,
        'Key': key,
        'Key Scale': scale,
        'Energy': energy_score,
        'Chords Significance': chords_significance,
        'Inharmonicity': hnr_score,
        'Timbre': np.mean(mfcc_coeffs),
        'Onset Rate': onset_rate_score[1],
        'Brightness': brightness_score,
        'Dynamic Complexity': dynamic_complexity_score,
        'Novelty': novelty_score,
        'Time Signature': time_signature,
    }

    del mfcc_coeffs, danceability_score, loudness_score, bpm, beat_positions, key, scale, energy_score
    del chords, unique_chords, counts, chords_significance, frames, frameCutter, spectrum_magnitudes
    del frequencies, magnitudes, hnr_score, onset_rate_score, brightness_score, dynamic_complexity_score
    del mel_bands, novelty_curve, novelty_score, beats_loudness, beats_loudness_band_ratio, beatogram, time_signature
    gc.collect()
    return features

In [8]:
def extract_audio_features(audio_file):
    # Load the audio file
    audio44k = MonoLoader(filename=audio_file)()
    audio16k = MonoLoader(filename=audio_file, sampleRate=16000)()

    # Run algorithms
    spectrogram = mp3_to_spectrogram(audio_file, 22050)
    algorithm_features = run_essentia_algorithms(audio44k, audio16k) | {'Spectrogram': spectrogram}

    del audio44k, audio16k, spectrogram

    gc.collect()
    # Merge results
    return algorithm_features

#### Main Code

In [9]:
# Class constructed from song path
# Song path must follow this format: /some/path/(int)^(video id)^(title).mp3
#                               e.g  /some/path/0^LlWGt_84jpg^Special Breed.mp3
class SongPath:
    def __init__(self, song_path: str):
        self.path = song_path
        self.filename = os.path.basename(song_path)

        song_filename_split = self.filename.split('^')
        if len(song_filename_split) != 3:
            raise Exception("The song's filename doesn't follow the correct format: /some/path/(int)^(video id)^(title).mp3")
        
        self.index, self.video_id, self.title_with_extension = song_filename_split

        self.index = int(self.index)
        self.title = os.path.splitext(self.title_with_extension)[0]

    def __str__(self):
        return f"Idx: {self.index},  videoID: {self.video_id}, title: {self.title_with_extension}"

In [10]:
# Returns memory usage of main and child processes
def get_total_memory_usage(process):
    memory_summary = {'Process': process.memory_info().rss / (1024 * 1024)}
    for index, child in enumerate(process.children(recursive=True)):
        memory_summary = memory_summary | {f'Child Process {index}': child.memory_info().rss / (1024 * 1024)}
    return memory_summary

def print_memory_usage(process):
    print(get_total_memory_usage(process))
    snapshot = tracemalloc.take_snapshot()
    print(f"Consumer {os.getpid()}: {snapshot.statistics('lineno')[:1]}")

def monitor_memory_usage(process, song_results, kill_thread, interval=30):
    while True:
        if kill_thread.value:
            print("KILL THREAD")
            return
        try:
            print_memory_usage(process)
            #print(f"Songs Processed: {len(song_results)}")
            gc.collect()
        except Exception as e:
            print(f"Thread ERROR: {e}")
            return
        time.sleep(interval)

In [11]:
def process_song(song_path, conn):
    song = SongPath(song_path)
    song_features = extract_audio_features(song.path)
    conn.send((song.index, song_features))
    conn.close()
    gc.collect()

def process_song(song_path):
    song = SongPath(song_path)
    song_features = extract_audio_features(song.path)
    print(f"Child Process: {os.getpid()}")
    print_memory_usage(psutil.Process(os.getpid()))
    gc.collect()
    return song.index, song_features

In [12]:
from contextlib import ExitStack
import multiprocessing.connection as mpc

def process_songs():
    tracemalloc.start()
    # Get song paths
    song_paths = np.array([os.path.join(DOWNLOAD_FOLDER, song_filename) for song_filename in os.listdir(DOWNLOAD_FOLDER)])
    songs_data_lower, songs_data_higher = [0, len(song_paths)//6]
    song_paths = song_paths[songs_data_lower:songs_data_higher]

    # Start a background thread to monitor memory usage
    main_process = psutil.Process(os.getpid())
    memory_thread = threading.Thread(target=monitor_memory_usage, args=(main_process,))
    memory_thread.daemon = True  # Ensure the thread will exit when the main program exits
    memory_thread.start()

    songs_data_full = songs_data.copy(deep=True)

    with mp.Pool(processes=CPU_THREADS, maxtasksperchild=1) as pool:
        # Use ExitStack to handle pipe cleanup automatically
        with ExitStack() as stack:
            parent_conns, child_conns = zip(*[mp.Pipe() for _ in range(len(song_paths))])
            parent_conns = [stack.enter_context(conn) for conn in parent_conns]  # Enter context for each parent pipe

            # Async function to process songs
            [pool.apply_async(process_song, args=(song_path, child_conn)) for song_path, child_conn in zip(song_paths, child_conns)]

            completed_tasks = 0
            total_tasks = len(song_paths)

            while completed_tasks < total_tasks:
                ready_conns = mpc.wait(parent_conns)

                for conn in ready_conns:
                    song_index, song_features = conn.recv()
                    completed_tasks += 1

                    for feature, value in song_features.items():
                        if feature not in songs_data_full.columns and isinstance(value, (tuple, set, list, np.ndarray, dict)):
                            songs_data_full[feature] = np.nan
                            songs_data_full[feature] = songs_data_full[feature].astype(object)
                        songs_data_full.at[song_index, feature] = value

                    conn.close()  # Ensure parent connection is closed
                    parent_conns.remove(conn)
                    del conn
                del ready_conns

        pool.close()
        pool.join()

    return songs_data_full


In [13]:
def process_songs():
    tracemalloc.start()

    song_paths = np.array([os.path.join(DOWNLOAD_FOLDER, song_filename) for song_filename in os.listdir(DOWNLOAD_FOLDER)])

    songs_data_lower, songs_data_higher = [0, 120]
    song_paths = song_paths[songs_data_lower:songs_data_higher]

    main_process = psutil.Process(os.getpid())
    memory_thread = threading.Thread(target=monitor_memory_usage, args=(main_process,))
    memory_thread.daemon = True  # Ensure the thread will exit when the main program exits
    memory_thread.start()
    
    song_results = []
    with mp.Pool(processes=CPU_THREADS, maxtasksperchild=1) as pool:
        with tqdm(total=len(song_paths), desc="Processing songs") as pbar:
            for song_paths_batch in np.array_split(song_paths, CPU_THREADS):
                song_batch_results = pool.map(process_song, song_paths_batch)
                song_results.extend(song_batch_results)
                pbar.update(len(song_batch_results))
                gc.collect()

    pool.terminate()
    gc.collect()

    # Aggregate results in the pandas dataframe
    songs_data_full = songs_data.copy(deep=True)
    for song_index, song_features in song_results:
        for feature, value in song_features.items():
            if feature not in songs_data_full.columns and isinstance(value, (tuple, set, list, np.ndarray, dict)):
                songs_data_full[feature] = np.nan
                songs_data_full[feature] = songs_data_full[feature].astype(object)
            songs_data_full.at[song_index, feature] = value

    return songs_data_full

In [14]:
def process_song_batch(song_batch, song_results):
    for song_path in song_batch:
        song = SongPath(song_path)
        song_features = extract_audio_features(song.path)
        song_results.append(song_features)
        print(f"Child Process: {os.getpid()}")
        print_memory_usage(psutil.Process(os.getpid()))
        gc.collect()

In [15]:
def process_songs():
    tracemalloc.start()

    # Load song paths
    song_paths = np.array([os.path.join(DOWNLOAD_FOLDER, song_filename) for song_filename in os.listdir(DOWNLOAD_FOLDER)])

    # Define lower and upper data range for processing
    songs_data_lower, songs_data_higher = [0, len(song_paths)//6]
    song_paths = song_paths[songs_data_lower:songs_data_higher]

    # Start memory monitoring thread
    main_process = psutil.Process(os.getpid())
    memory_thread = threading.Thread(target=monitor_memory_usage, args=(main_process,))
    memory_thread.daemon = True  # Ensure the thread will exit when the main program exits
    memory_thread.start()

    # Split the song paths into 6 batches
    batches = np.array_split(song_paths, 6)

    # Use Manager to handle shared data across processes
    with mp.Manager() as manager:
        song_results = manager.list()  # This will hold results from all processes

        processes = []
        for batch in batches:
            # Create a new process for each batch, passing the shared song_results list
            p = mp.Process(target=process_batch, args=(batch, song_results))
            processes.append(p)
            p.start()

        # Wait for all processes to finish
        for p in processes:
            p.join()

        gc.collect()

        # Convert shared list back to regular list before returning
        song_results = list(song_results)

        # Aggregate results in the pandas dataframe
        songs_data_full = songs_data.copy(deep=True)
        for song_index, song_features in song_results:
            for feature, value in song_features.items():
                if feature not in songs_data_full.columns and isinstance(value, (tuple, set, list, np.ndarray, dict)):
                    songs_data_full[feature] = np.nan
                    songs_data_full[feature] = songs_data_full[feature].astype(object)
                songs_data_full.at[song_index, feature] = value

        return songs_data_full


def process_batch(batch, song_results):
    """Process a batch of song paths and append the results to song_results."""
    for song_path in batch:
        result = process_song(song_path)  # Assuming process_song returns (index, features)
        song_results.append(result)

In [16]:
def process_song(args):
    song_path, song_results = args
    song = SongPath(song_path)
    song_features = extract_audio_features(song.path)
    print(f"Child Process: {os.getpid()}")
    print_memory_usage(psutil.Process(os.getpid()))
    song_results.append((song.index, song_features))
    gc.collect()

In [17]:
def process_songs():                   
    tracemalloc.start()

    song_paths = np.array([os.path.join(DOWNLOAD_FOLDER, song_filename) for song_filename in os.listdir(DOWNLOAD_FOLDER)])

    songs_data_lower, songs_data_higher = [0, len(song_paths)//6]
    song_paths = song_paths[songs_data_lower:songs_data_higher]
    
    with mp.Manager() as manager:
        kill_thread = manager.Value('b', False)
        shared_song_results = manager.list()
        main_process = psutil.Process(os.getpid())
        memory_thread = threading.Thread(target=monitor_memory_usage, args=(main_process, shared_song_results, kill_thread))
        memory_thread.daemon = True  # Ensure the thread will exit when the main program exits
        memory_thread.start()
        with mp.Pool(processes=CPU_THREADS, maxtasksperchild=1) as pool: # TODO: maxtasksperchild=1
            with tqdm(total=len(song_paths), desc="Processing songs") as pbar:
                args = [(song_path, shared_song_results) for song_path in song_paths]
                for _ in pool.imap(process_song, args): # TODO: chuknsize
                    pbar.update(1)
                    gc.collect()
        kill_thread.value = True
        song_results = list(shared_song_results)

    gc.collect()

    # Aggregate results in the pandas dataframe
    songs_data_full = songs_data.copy(deep=True)
    for song_index, song_features in song_results:
        for feature, value in song_features.items():
            if feature not in songs_data_full.columns and isinstance(value, (tuple, set, list, np.ndarray, dict)):
                songs_data_full[feature] = np.nan
                songs_data_full[feature] = songs_data_full[feature].astype(object)
            songs_data_full.at[song_index, feature] = value

    return songs_data_full

In [18]:
songs_data_full = process_songs()
songs_data_full

{'Main Process': 365.59765625, 'Child Process 0': 1737.8046875, 'Child Process 1': 824.765625, 'Child Process 2': 805.0, 'Child Process 3': 651.328125, 'Child Process 4': 523.5234375, 'Child Process 5': 380.9296875, 'Child Process 6': 353.3125}
Consumer 265511: [<Statistic traceback=<Traceback (<Frame filename='/tmp/ipykernel_265511/1304482496.py' lineno=4>,)> size=57827640 count=3>]
Child Process: 282553
{'Main Process': 797.5625}
Consumer 282553: [<Statistic traceback=<Traceback (<Frame filename='/tmp/ipykernel_265511/1304482496.py' lineno=4>,)> size=57827640 count=3>]
Child Process: 282950
{'Main Process': 672.296875}
Child Process: 282695
{'Main Process': 806.56640625}
Consumer 282950: [<Statistic traceback=<Traceback (<Frame filename='/tmp/ipykernel_265511/1304482496.py' lineno=4>,)> size=57827640 count=3>]
Consumer 282695: [<Statistic traceback=<Traceback (<Frame filename='/tmp/ipykernel_265511/1304482496.py' lineno=4>,)> size=57827640 count=3>]
Child Process: 282891
{'Main Proce

KeyboardInterrupt: 

Exception in thread Thread-4 (monitor_memory_usage):
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/alean/stuff/popcast-ai/.venv/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_265511/3370002723.py", line 15, in monitor_memory_usage
  File "/usr/lib/python3.10/multiprocessing/managers.py", line 1137, in get
    return self._callmethod('get')
  File "/usr/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.10/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)


In [55]:
songs_data_full.to_csv('data/songs_data_full_1.csv', index=True)

In [None]:
songs_data_full.dropna(subset=['Danceability'])
