# Download and process the dataset

In [None]:
# Install required packages
!pip install spotdl
!pip install ffmpeg-python

# Download FFmpeg and set up PATH
!apt-get install -y ffmpeg

In [None]:
import pandas as pd
from google.colab import drive
import os
# import ffmpeg
# import spotdl
# from spotdl import Spotdl
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd

In [None]:
# mount google drive for dataset
drive.mount('/content/gdrive', force_remount = True)

Mounted at /content/gdrive


In [None]:
database_path = r'/content/gdrive/MyDrive/Musicoset/'
path = r'/content/gdrive/MyDrive/Musicoset/songs.csv'
save_path = r'/content/gdrive/MyDrive/Musicoset/songs/%(title)s.%(ext)s'

if os.path.exists == False:
  os.makedirs(save_path)

In [None]:
# check the count of the data

data = pd.read_csv(path, sep="\t")
explicit_true_count = len([1 for i in data['explicit'] if i ==True])
explicit_false_count = len([1 for i in data['explicit'] if i ==False])
print(explicit_true_count, explicit_false_count)

In [None]:
# balance out the samples between explicit == True and explicit == False
true_explicit = data[data['explicit']==True]
false_explicit = data[data['explicit']== False]
false_explicit_sample = data.sample(n=explicit_true_count, replace=False, random_state=1).reset_index()
false_explicit_sample = false_explicit_sample.iloc[:,1:]
balanced_data = pd.concat([true_explicit, false_explicit_sample]).reset_index(drop=True)
# balanced_data = balanced_data.iloc[1159:, :]

In [None]:
# add song name to the dataset
balanced_data['song_name'] = ''
for i, val in enumerate(balanced_data['billboard']):
  val = eval(val)
  song, artist = val[0], val[1]
  song_name = f"{artist} - {song}"
  balanced_data.loc[i, 'song_name'] = song_name

In [None]:
# save the csv file
csv_save_path = os.path.join(database_path, 'balanced_data.csv')
if not os.path.exists(csv_save_path):
  balanced_data.to_csv(csv_save_path)



In [None]:
# scraping

for i, n in enumerate(balanced_data['song_id']):
  track_link = 'https://open.spotify.com/track/{}'.format(n)
  os.system(f'spotdl {track_link} --output "{save_path}"')
  print(f"track no. {i} is saved")


In [None]:
# since the name in the dataset and the result of scraping is different,
# I incorporate the similarity algorithm to match both names, and rename it.

def similarity(sentence1, sentence2, vectorizer):
    tfidf_matrix = vectorizer.transform([sentence1, sentence2])
    return cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]

def get_similar_sentences(df, queries):
    vectorizer = TfidfVectorizer().fit(df['song_name'].str.lower())
    df_copy = df.copy()
    results = {}

    for i, query in enumerate(queries):
        query = query.lower()
        similarities = df['song_name'].apply(lambda x: similarity(query, x.lower(), vectorizer))
        most_similar_idx = similarities.idxmax()
        most_similar_score = similarities.max()
        results[query] = [(df.iloc[most_similar_idx]['song_name'], most_similar_score)]
        df_copy.at[most_similar_idx, 'song_name'] = query
        print(i)
        print(query)
        print(results[query])

    return results, df_copy


df = pd.read_csv(r'D:/NLP final project/balanced_data.csv')
path = r'D:/NLP final project\songs\whisper\whisper_dataset_np'
path_label = r'D:/NLP final project/balanced_data.csv'
pth = r'D:/NLP final project'


queries = [x for x in os.listdir(path)]
similar_sentences, updated_df = get_similar_sentences(df, queries)
updated_df.to_csv(os.path.join(pth, 'balanced_data_new.csv'), index=False)



# data processing (with background music)

In [None]:
!pip install --upgrade numba



In [None]:
import math
import librosa
import numpy as np
import os
from google.colab import drive
import pandas as pd

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [None]:
ast_np_path = r'/content/drive/MyDrive/Musicoset/ast_dataset_np_correct'
audio_path = r'/content/drive/MyDrive/Musicoset/songs/%(title)s.%(ext)s'
csv_path = r'/content/drive/MyDrive/Musicoset/df_final.csv'
if os.path.exists(ast_np_path) == False:
  os.makedirs(ast_np_path)

In [None]:
df = pd.read_csv(csv_path)
name_list = [x[:-4] for x in df['song_name']]

In [None]:
def audio_sampling(audio_path, song, sampling_rate, n_mels,cut_rate, model='ast'):

  # load mp3 file with librosa
  y, _ = librosa.load(os.path.join(audio_path, song))

  # sampling
  sample = librosa.feature.melspectrogram(y=y, n_mels=n_mels, sr=sampling_rate, n_fft=400, hop_length=160, pad_mode='constant')
  sample_log = librosa.power_to_db(sample, ref=np.max)

  # finding the possible amount of sample of one song based on time resolution
  x = math.ceil(sample_log.shape[-1] / cut_rate)

  # padding with zero
  if sample_log.shape[-1] < (x * cut_rate):
    length_dev = abs(sample_log.shape[-1] - (x * cut_rate))
    width = [(0, 0)] * (sample_log.ndim - 1) + [(0, length_dev)]
    sample_log = np.pad(sample_log, width, mode= "constant")
  else: sample_log = sample_log

  # stacking the samples on the first dimension to simplify processing
  sample_split_log = np.hsplit(sample_log, x)
  sample_split_log_stack = np.stack(sample_split_log, 0)

  # swap the second and third dimension as the input of Audio spectrogram Transformer
  if model == 'ast':
    sample_split_log_stack = np.einsum('abc -> acb', sample_split_log_stack)

  return sample_split_log_stack

# save audio file
def save_audio(audio_path, name, files):
  filename = name + '.npy'
  np.save(os.path.join(audio_path, filename), files)



In [None]:
# Example of songs with AST

sampling_rate = 16000
cut_rate = 3000
count = 0
n_mels = 128
for song in os.listdir(audio_path):
  filename, filetype = os.path.splitext(song)
  filename = filename.lower()
  if filename in name_list:
    x = audio_sampling(audio_path, song, sampling_rate, n_mels, cut_rate, model='ast')
    print(x.shape)
    print(count)
    count += 1
    save_audio(ast_np_path, filename, x)
  else:
    print(f'song {filename} is not in name_list')
    continue
print('process finished')