# Task for Cuetessa, Inc. – Predicting Valence of Pop Songs

## Overview
The aim of this task is to develop a Python-based module to predict the valence of newly released pop songs.  Two approaches are to use as input 
1. the audio data (e.g., .wav files) of songs  
2. the lyrics of songs.  Publicly available datasets can be used for training and testing. 


### Data Description 

DEAM dataset (DEAM dataset - The MediaEval Database for Emotional Analysis of Music) consists of 1802 excerpts and full songs annotated with valence and arousal values both continuously (per-second) and over the whole song. The metadata describing the audio excerpts (their duration, genre, folksonomy tags).

- Annotations Data: The annotated dataset comes from Soleymani et al. (2013) (http://cvml.unige.ch/databases/emoMusic/). It consists of 45-s clips of 744 songs from the Free Music Archive (https://freemusicarchive.org/) that span a variety of popular genres
    - Annotations are made available in csv format. There are six csv files in this database, four containing
average and standard deviation of arousal and valence continuous annotation for each song.
- Metadata: 
    - including, song title, genre and artist is also provided.
    
    
### Feature Extraction
In this version of the project, the approach of the following research paper will be implemented: [Measuring national mood with music](https://link.springer.com/article/10.3758/s13428-021-01747-7). Specifically, the same [set of features](https://static-content.springer.com/esm/art%3A10.3758%2Fs13428-021-01747-7/MediaObjects/13428_2021_1747_MOESM1_ESM.pdf) will be tried to extract. These features are:
- [x] Spectral Centroid;
- [x] Spectral Rolloff;
- [x] Spectral Contrast — ~~7 bands~~ *(kept the default 6 bands instead)*;
- [x] Mel-Frequency Cepstrum Coefficients (MFCC) — 24 coefficients;
- [x] Zero Crossing Rate;
- [x] Chroma Energy Normalized Statistics (CENS) — 12 chroma;
- [x] Beat Per Minute (BPM);
- [x] Root Mean Square (RMS);

## Initialization

In [None]:
# Initialization 

import pandas as pd
import numpy as np

# statistical visualization
import matplotlib.pyplot as plt
import seaborn as sns

import librosa as librosa
import librosa.display
import IPython.display as ipd

from scipy import stats as st

import os
import re
from tqdm import tqdm


# import module for splitting and cross-validation using gridsearch
from sklearn.model_selection import train_test_split, cross_validate, GridSearchCV
from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.preprocessing import StandardScaler

# import machine learning module from the sklearn library
from sklearn.svm import SVR
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor


from imblearn.pipeline import Pipeline, make_pipeline
from tqdm import tqdm
from time import time
from datetime import date



In [None]:
# set up some parameters for plots in this notebook
plt.style.use("seaborn-paper")
sns.set_style("white")
sns.set_palette("flare")

plt.rcParams["figure.figsize"] = (10, 4)
%config InlineBackend.figure_format = "retina"



In [None]:
seed = 12345

### Data Preprocessing

In [None]:
# function to determine if columns in file have null values
def get_percent_of_na(df, num):
    count = 0
    df = df.copy()
    s = (df.isna().sum() / df.shape[0])
    for column, percent in zip(s.index, s.values):
        num_of_nulls = df[column].isna().sum()
        if num_of_nulls == 0:
            continue
        else:
            count += 1
        print('Column {} has {:.{}%} percent of Nulls, and {} of nulls'.format(column, percent, num, num_of_nulls))
    if count != 0:
        print("\033[1m" + 'There are {} columns with NA.'.format(count) + "\033[0m")
    else:
        print()
        print("\033[1m" + 'There are no columns with NA.' + "\033[0m")
        
# function to display general information about the dataset
def get_info(df):
    """
    This function uses the head(), info(), describe(), shape() and duplicated() 
    methods to display the general information about the dataset.
    """
    print("\033[1m" + '-'*100 + "\033[0m")
    print('Head:')
    print()
    display(df.head())
    print('-'*100)
    print('Info:')
    print()
    display(df.info())
    print('-'*100)
    print('Describe:')
    print()
    for column in df:
        if df[column].dtype == 'object':
            display(df.describe(include='object'))
        else:
            display(df.describe())
    print('-'*100)
    print()
    print('Columns with nulls:')
    display(get_percent_of_na(df, 4))  # check this out
    print('-'*100)
    print('Shape:')
    print(df.shape)
    print('-'*100)
    print('Duplicated:')
    print("\033[1m" + 'We have {} duplicated rows.\n'.format(df.duplicated().sum()) + "\033[0m")

In [None]:
# Load the data
try:
    annotations = pd.read_csv('/Users/gguillau/Desktop/Practicum/Cuetessa Project/archive/DEAM_Annotations/annotations/annotations averaged per song/song_level/static_annotations_averaged_songs_1_2000.csv')
    metadata = pd.read_csv('/Users/gguillau/Desktop/Practicum/Cuetessa Project/archive/metadata_1_2000.csv')
except:
    annotations = pd.read_csv("datasets/static_annotations_averaged_songs_1_2000.csv")
    metadata = pd.read_csv('datasets/metadata_1_2000.csv')

print('Data has been read correctly!')




In [None]:
print('General information about the contract dataset')
get_info(annotations)


In [None]:
annotations.sample(5)

In [None]:
# the column names contain empty spaces, fix that
annotations.columns = [col.replace(" ","") for col in annotations.columns]
# drop extra columns
annotations.drop(columns=["valence_std","arousal_std"], inplace=True)
# shortent the column names
annotations.rename(columns={"valence_mean":"valence", "arousal_mean":"arousal"}, inplace=True)
annotations.sample(3)

In [None]:
# show general info
metadata.info()

In [None]:
print('General information about the contract dataset')
get_info(metadata)


In [None]:
# print a sample of the meta set
metadata.sample(3)

In [None]:
# tidy up the column names
metadata.columns = ["song_id","file_name","artist","song_title","segment_start","segment_end", "genre"]
# fill in missing song titles
metadata["song_title"].fillna("unknown", inplace=True)

In [None]:
import re

# remove artifacts
for col in ["artist","song_title","genre"]:
    metadata[col] = [re.sub(r"\t", "", string) for string in metadata[col]]

# constract new file_name column
metadata["file_name"] = metadata["song_id"].astype(str) + ".mp3"

# print sample to check
metadata.sample(3)

In [None]:
# construct new file_name column and write file path
metadata["file_name"] = metadata["song_id"].astype(str) + ".mp3"
metadata["file_path"] = "/Users/gguillau/Desktop/Practicum/Cuetessa Project/archive/DEAM_audio/MEMD_audio/" 
+ metadata["file_name"]

metadata.sample(5)

In [None]:
# Merge datasets
df = annotations.merge(metadata, on="song_id",how="outer")
df.info()

### Exploratory Data Analysis

In [None]:
# plot the valence and arousal values distribution
plt.scatter(df["valence"], df["arousal"], alpha=.3)
plt.title("Valence vs arousal scatter plot", fontsize=14)
plt.xlim([1,9])
plt.ylim([1,9])
plt.xlabel("Valence")
plt.ylabel("Arousal");

In [None]:
# count the number of genres present in the dataset
nunique_genres = df["genre"].nunique()
print("Unique genres in the dataset:  {}\n".format(nunique_genres))

# select the top 10
top10_genres = df["genre"].value_counts()[:10]
top10_genres.loc["Other"] = df["genre"].value_counts()[10:].sum()

# plot a pie chart with genres ratio
plt.figure(figsize=(6,6))
labels = top10_genres.index
plt.pie(top10_genres, labels=labels)
plt.title("Dataset Song Genres", fontsize=14);

## Audio Features
Investigate the audio features available with the librosa library.



In [None]:
random_idx = np.random.randint(0, 1744)

# select a random song from the dataset
song = df.loc[random_idx, :]

# load the file and print its sampling rate 
file_path = "/Users/gguillau/Desktop/Practicum/Cuetessa Project/archive/DEAM_audio/MEMD_audio/" + song["file_name"]
audio, sample_rate = librosa.load(file_path)
# print info about this song
print(f"Sampling rate: {sample_rate}")
print(song)

# plot the wavefrom
plt.figure(figsize=(12,4))
librosa.display.waveshow(audio, sr=sample_rate) # plot a waveform and play the file
plt.title(f'"{song.song_title[:15]}" by {song.artist}, {song.genre}')
plt.legend([f"song id {song.song_id}"])
plt.xlabel("Duration in seconds")

# output the audio
ipd.Audio(file_path)


### Spectral Centroid

The spectral centroid is a measure of the center of gravity of a sound. This can be used to classify the timbre of a sound, and also to identify different types of sounds. It can be used in applications such as Notion AI, where it can be used to identify particular sounds in a recording.


In [None]:
# derive the feature from the audio file
centroid = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)

# obtain other data for plotting
S, phase = librosa.magphase(librosa.stft(y=audio))
times = librosa.times_like(centroid)

fig, ax = plt.subplots()
ax.set(title=f"Spectral Centroid for song id {song.song_id}")

# show spectrogram
librosa.display.specshow(librosa.amplitude_to_db(S, ref=np.max),
                         y_axis="log", x_axis="time", ax=ax)
# plot spectral centroid
ax.plot(times, centroid.T, label="Spectral centroid",  color="w")
ax.legend(loc="upper right");


### Spectral Rolloff

Spectral rolloff is a feature of audio signal analysis which measures how quickly the power of a signal decreases as the frequency increases. It can be used to identify the tonal components of a signal and detect the presence of harmonic content.



In [None]:
# approximate minimum frequencies with roll_percent=0.95
rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate, roll_percent=0.95)
# approximate minimum frequencies with roll_percent=0.50
rolloff_middle = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate, roll_percent=0.5)
# approximate minimum frequencies with roll_percent=0.01
rolloff_min = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate, roll_percent=0.01)

S, phase = librosa.magphase(librosa.stft(audio))

fig, ax = plt.subplots()
ax.set(title=f"Spectral Roll-off for song id {song.song_id}")

librosa.display.specshow(librosa.amplitude_to_db(S, ref=np.max),
                         y_axis="log", x_axis="time", ax=ax)

ax.plot(librosa.times_like(rolloff), rolloff[0], label="Roll-off freq 0.95", color="c")
ax.plot(librosa.times_like(rolloff), rolloff_middle[0], label="Roll-off freq 0.50", color="w")
ax.plot(librosa.times_like(rolloff), rolloff_min[0], label="Roll-off freq 0.01", color="y")
ax.legend(loc='center right');

### Spectral Contrast
Spectral contrast, as implemented in the librosa library, is used to highlight the differences in frequency content of a signal, and can be used to detect and identify individual instruments, as well as providing general information about the signal. The process of spectral contrast works by measuring the overall average energy of each frequency band, then subtracting this average from the original signal. This reveals the differences in the signal which would otherwise be difficult to detect. By doing this, spectral contrast can provide an understanding of the structure of a sound, and can even be used to detect and identify individual instruments.


In [None]:
S = np.abs(librosa.stft(y=audio))
contrast = librosa.feature.spectral_contrast(S=S, sr=sample_rate, n_bands=6)

fig, ax = plt.subplots()

# plot spectral contrast
img2 = librosa.display.specshow(contrast, x_axis="time", ax=ax)
fig.colorbar(img2, ax=ax)
ax.set(ylabel="Frequency bands", title=f"Spectral contrast for song id {song.song_id}");

### Mel-Frequency Cepstrum Coefficients
MEL-Frequency Cepstrum Coefficients (MFCCs) are a powerful technique for recognizing patterns in audio signals. They are used to represent audio signals in a form that is more easily analyzed by a machine. MFCCs are derived from a Fourier transform of a signal and are used in speech recognition and music recognition applications. MFCCs are also used in audio fingerprinting and speaker recognition.

 

In [None]:
# generate mfccs from the audio file
mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=24)

fig, ax = plt.subplots()

img = librosa.display.specshow(mfccs, x_axis='time', ax=ax)
fig.colorbar(img, ax=ax)
ax.set(title=f"Mel-Frequency Cepstrum Coefficients for song id {song.song_id}");


### Onset Strength
The onset strength is used to measure the intensity of different sounds in audio files. It is an important measure to consider when analyzing audio signals, as it can help determine the loudness and intensity of a sound in comparison to other sounds. The OS algorithm is also used to identify and differentiate between different types of sounds, making it particularly useful for audio classification.



In [None]:
# locate note onset events by picking peaks in an onset strength envelope
# select only a slice for better visualisation
onset_env = librosa.onset.onset_strength(y=audio, sr=sample_rate)[750:1000]
onset_frames = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sample_rate)
times = librosa.times_like(onset_env, sr=sample_rate)

fig, ax = plt.subplots()
ax.set(title=f"The Onset Strength for the song id {song.song_id}", xlabel="Time")

ax.plot(times, onset_env, label='Onset strength')
ax.vlines(times[onset_frames], 0, onset_env.max(), color='r', alpha=0.3, linestyle='--', label='Onsets')
ax.legend();


### Zero Crossing Rate
The Zero Crossing Rate (ZCR) is a measure used in signal processing to identify the number of times a signal crosses the zero axis. It is often used to analyze audio signals, as it can provide an indication of the perceptual loudness of the audio. ZCR can be calculated by counting the number of times the signal crosses the zero axis in a given time interval.



In [None]:
zcrs = librosa.feature.zero_crossing_rate(audio)

plt.plot(zcrs[0])
plt.title(f"Zero Crossing Rates for song id {song.song_id}");


### Chroma Energy Normalized Statistics
CENS is calculated by first computing the chroma vectors for each audio signal. The chroma vector is a 12-dimensional vector composed of the energy in each of the 12 semitones of an octave. The chroma vectors of each signal are then normalized to have unit energy. Finally, the normalized chroma vectors are compared using a similarity measure such as cosine distance.



In [None]:
chroma_cens = librosa.feature.chroma_cens(y=audio, sr=sample_rate)

fig, ax = plt.subplots()
img = librosa.display.specshow(chroma_cens, y_axis="chroma", x_axis="time")
fig.colorbar(img)
ax.set(title=f"Chroma CENS for song id {song.song_id}");

### Beats Per Minute: Dynamic Tempo
The tempo of a song can be defined as its speed, or how quickly the music moves. It can range from slow and gentle to fast and intense, depending on the genre and style of the music. In a dynamic tempo, the speed of the music changes throughout the song, creating a sense of excitement and anticipation. This can be done by increasing the tempo gradually or by sudden changes in speed.



In [None]:
onset_env = librosa.onset.onset_strength(y=audio, sr=sample_rate)
dtempo = librosa.beat.tempo(onset_envelope=onset_env, sr=sample_rate, aggregate=None)

fig, ax = plt.subplots()
tg = librosa.feature.tempogram(onset_envelope=onset_env, sr=sample_rate, hop_length=512)
librosa.display.specshow(tg, x_axis="time", y_axis="tempo", cmap="magma", ax=ax)

ax.plot(librosa.times_like(dtempo), dtempo, color="w", linewidth=1.5, label="Tempo estimate")

ax.set(title=f"Dynamic tempo estimation for song id {song.song_id}")
ax.legend();

### Root Mean Square
RMS is a measure of the average power of a signal over time. It is useful for comparing the loudness of different audio signals, as well as for analyzing the spectral content of signals.


In [None]:

rms = librosa.feature.rms(y=audio)[0]

fig, ax = plt.subplots()
times = librosa.times_like(rms)
ax.set(title=f"RMS energy for each frame for song id {song.song_id}", xlabel="Time")
ax.semilogy(times, rms, label='RMS Energy');

In [None]:
rms = librosa.feature.rms(y=audio)[0]

fig, ax = plt.subplots()
times = librosa.times_like(rms)
ax.set(title=f"RMS energy for each frame for song id {song.song_id}", xlabel="Time")
ax.semilogy(times, rms, label='RMS Energy');

In [None]:
# import essentia

### MACHINE LEARNING

In [None]:
def get_stats(array):
    """
    Takes an array and gives back its mean, standard deviation, 
    first-order difference mean, and first-order difference 
    standard deviation — in this exact order.
    """
    mean = array.mean()
    var = array.var()
    
    diff_mean = np.diff(array).mean()
    diff_var = np.diff(array).var()
    
    return [mean, var, diff_mean, diff_var]

In [None]:
def extract_features(file_path):
    """
    Takes path to an audio file and returns statistics (mean, standard deviation, 
    first-order difference mean, and first-order difference standard deviation)
    for each output array of the following methods:
    
    - Spectral Centroid (1)
    - Spectral Rolloff (3)
    - Spectral Contrast (7)
    - MFCC (24)
    – Onset Strength (1)
    – Zero Crossing Rate (1)
    – CENS (12)
    – BPM Dynamic (1)
    – RMS (1)
    
      204 values output in total.
      
    """
    features = [] # empty list for storing features
    cnt = 0 # counter for keeping track of features number
    
    # load the audio file 
    audio, sample_rate = librosa.load(file_path)
    
    # Spectral Centroid
    cent = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)
    features.append(get_stats(cent))
    cnt += 1
    
    # Spectral Rolloff
    rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate, roll_percent=0.95)
    features.append(get_stats(rolloff))
    cnt += 1
    
    rolloff_middle = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate, roll_percent=0.5)
    features.append(get_stats(rolloff_middle))
    cnt += 1
    
    rolloff_min = librosa.feature.spectral_rolloff(y=audio, sr=sample_rate, roll_percent=0.01)
    features.append(get_stats(rolloff_min))
    cnt += 1
    
    # Spectral Contrast
    S = np.abs(librosa.stft(y=audio))
    contrast = librosa.feature.spectral_contrast(S=S, sr=sample_rate)
    for band in contrast:
        features.append(get_stats(band))
        cnt += 1
        
    # MFCCs
    mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=24)
    for mfcc in mfccs:
        features.append(get_stats(mfcc))
        cnt += 1
    
    # Onset strength
    onset_env = librosa.onset.onset_strength(y=audio, sr=sample_rate)
    features.append(get_stats(onset_env))
    cnt += 1
    
    # ZCR
    zcrs = librosa.feature.zero_crossing_rate(audio)
    features.append(get_stats(zcrs))
    cnt += 1
    
    # CENS
    chroma_cens = librosa.feature.chroma_cens(y=audio, sr=sample_rate)
    for chroma in chroma_cens:
        features.append(get_stats(chroma))
        cnt += 1
        
    # BPM
    dtempo = librosa.beat.tempo(onset_envelope=onset_env, sr=sample_rate, aggregate=None)
    features.append(get_stats(dtempo))
    cnt += 1
    
    # RMS
    rms = librosa.feature.rms(y=audio)[0]
    features.append(get_stats(rms))
    cnt += 1
    
    features = np.array(features).reshape(cnt * 4)
    
    return features


In [None]:
features_df = []
for i in tqdm(range(len(df))):
    
     file_path = df.loc[i, "file_path"]
     features_df.append(extract_features(file_path))

In [None]:
features_df = pd.DataFrame(np.array(features_df))

In [None]:
features_df.to_csv("features_df1.csv")


In [None]:
features_df = pd.read_csv("/Users/gguillau/Desktop/Practicum/Cuetessa Project/features_df1.csv", index_col=0)


In [None]:
# create a featuers amount lookup table
lkp_dict = dict(centroid=1, rolloff_high=1, rolloff_mid=1, rolloff_min=1, 
              contrast=7, mfcc=24, onset=1, zcr=1, cens=12, bpm=1, rms=1)

In [None]:
column_names = []
statistics = ["mean","var","diff_mean","diff_var"]

for feature in lkp_dict:
    for i in range(1, lkp_dict[feature] + 1, 1):
        for statistic in statistics:
            if lkp_dict[feature] != 1:
                column_names.append(feature + "_" + str(i) + "_" + statistic)
            else:
                column_names.append(feature + "_" + statistic)


In [None]:
features_df = pd.DataFrame(np.array(features_df), columns=column_names) 


In [None]:
X = features_df.values
y = df.valence.values

# split the data into the train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.2, random_state=seed)

print(X_train.shape)
print(X_test.shape)

In [None]:
# creata a function for models evaluation
def eval_model(model, X=X_train, y=y_train, show_metrics=1):
    
    # create a pipline to avoid possible target leakage
    pipe = make_pipeline(StandardScaler(), model)
    
    scores = cross_validate(pipe, X, y, cv=5, scoring=("r2", "neg_mean_absolute_error"), n_jobs=-1)
    
    r2 = np.average(scores["test_r2"])
    mae = abs(np.average(scores["test_neg_mean_absolute_error"]))
    fit_time = np.average(scores["fit_time"])
    score_time = np.average(scores["score_time"])
    
    if show_metrics == 1:
        print("Fit time: {:.5f}".format(fit_time))
        print("Score time: {:.5f}".format(score_time))
        print("R2: {:.4f}".format(r2))
        print("MAE {:.4f}".format(mae))
    else:
        return r2, mae

In [None]:
eval_model(SVR())


In [None]:
eval_model(KNeighborsRegressor())

In [None]:
eval_model(LinearRegression())