Get million song subset data song list
Get metadata and join the data

use artist similarity and artists to train the model on similarity

use last.fm to get additional data on each song to augment this

In [None]:
# imports
import pandas as pd
import h5py
import os
from sqlalchemy import create_engine
import requests
import time
from dotenv import load_dotenv

In [None]:
pd.set_option('display.max_rows', 100)

# Loading Data

## Loading million song subset data

In [None]:
# load the data (only loading song_id, metadata contains the rest)
def read_song_features(file_path):
    with h5py.File(file_path, 'r') as f:
        song_id = f['metadata']['songs']['song_id'][0].decode('utf-8')
        return {'song_id': song_id}


# process all files in a directory into a df
def process_all_files_to_dataframe(root_dir):
    data = []
    print(f"Checking directory: {root_dir}")

    for subdir, dirs, files in os.walk(root_dir):
        print(f"Currently scanning {subdir} with {len(files)} files")
        for file in files:
            if file.endswith('.h5'):
                file_path = os.path.join(subdir, file)
                print(f"Processing file: {file_path}")
                song_data = read_song_features(file_path)
                data.append(song_data)

    if not data:
        print("No data to process.")

    df = pd.DataFrame(data)
    return df

In [None]:
root_dir = 'data/MillionSongSubset'
df = process_all_files_to_dataframe(root_dir)

### Loading million song subset metadata from sqlite db

In [None]:
# load metadata from sqlite
def load_data_from_sqlite(db_path, table_name):
    engine = create_engine(f'sqlite:///{db_path}')
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql_query(query, engine)
    return df

# load metadata and merge with song data
db_path3 = 'data/MillionSongSubsetMetadata/track_metadata.db'
df3 = load_data_from_sqlite(db_path3, 'songs')
df = df.merge(df3, on='song_id', how='left')


In [None]:
columns_to_drop = ['track_id', 'artist_id', 'song_id', 'artist_mbid', 'track_7digitalid', 'shs_perf', 'shs_work']

for column in columns_to_drop:
    if column in df.columns:
        df.drop(columns=[column], inplace=True)

In [None]:
df.columns

In [None]:
df.head()

## Loading last.fm data

In [None]:
def fetch_data(api_key, method, params):
    base_url = "http://ws.audioscrobbler.com/2.0/"
    params['api_key'] = api_key
    params['method'] = method
    params['format'] = 'json'
    response = requests.get(base_url, params=params)
    return response.json()


def get_artist_info(api_key, artist_name):
    params = {'artist': artist_name}
    return fetch_data(api_key, 'artist.getInfo', params)


def get_track_info(api_key, artist_name, track_name):
    params = {'artist': artist_name, 'track': track_name}
    return fetch_data(api_key, 'track.getInfo', params)


def batch_fetch_data(api_key, items, fetch_function, sleep_time=1):
    results = []
    for item in items:
        result = fetch_function(api_key, *item)
        results.append(result)
        # time.sleep(sleep_time)
    return results

In [None]:
# load LASTFM_API_KEY from .env
import requests
load_dotenv()
api_key = os.getenv('LASTFM_API_KEY')


def fetch_lastfm_data(api_key, artist_name, track_name):
    base_url = "http://ws.audioscrobbler.com/2.0/"
    params = {
        'method': 'track.getInfo',
        'api_key': api_key,
        'artist': artist_name,
        'track': track_name,
        'format': 'json'
    }
    response = requests.get(base_url, params=params)
    if response.status_code == 200 and response.text.strip():
        return response.json()
    else:
        return None


def parse_lastfm_data(data):
    if data and 'track' in data:
        track = data['track']
        return {
            'listeners': track.get('listeners', '0'),
            'playcount': track.get('playcount', '0'),
            'tags': ', '.join(tag['name'] for tag in track.get('toptags', {}).get('tag', [])),
        }
    return None

In [18]:
from tqdm import tqdm
tqdm.pandas()

load_dotenv()
api_key = os.getenv('LASTFM_API_KEY')
subset_df = df.head(1000)

tracks_skipped = 0


def fetch_and_parse(row):
    global tracks_skipped
    data = fetch_lastfm_data(api_key, row['artist_name'], row['title'])
    if data is None:
        tracks_skipped += 1
        return None
    parsed_data = parse_lastfm_data(data)
    if parsed_data is None:
        tracks_skipped += 1
    return parsed_data


# Use progress_apply instead of apply
subset_df['lastfm_data'] = subset_df.progress_apply(fetch_and_parse, axis=1)

# Remove rows where lastfm_data is None
subset_df = subset_df[subset_df['lastfm_data'].notna()]

subset_df.reset_index(drop=True, inplace=True)
track_details_df = pd.json_normalize(subset_df['lastfm_data'])
mixed = pd.concat(
    [subset_df.drop(columns=['lastfm_data']), track_details_df], axis=1)

print(f"Tracks skipped: {tracks_skipped}")

mixed.to_csv('data/music_data_small.csv', index=False)

 10%|█         | 101/1000 [00:14<02:02,  7.36it/s]

## Data processing

In [2]:
import pandas as pd

df = pd.read_csv('data/music_data.csv')
df.dropna(inplace=True)

In [3]:
import pandas as pd
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.model_selection import train_test_split
import torch.optim as optim

# Encode categorical data
label_encoders = {}
unknown_label = 'unknown'  # Define an unknown label

for column in ['artist_name', 'tags', 'title']:
    le = LabelEncoder()

    # Get unique categories plus an 'unknown' category
    unique_categories = df[column].unique().tolist()
    # Add 'unknown' to the list of categories
    unique_categories.append(unknown_label)

    # Fit the LabelEncoder to these categories
    le.fit(unique_categories)
    df[column] = le.transform(df[column].astype(str))

    # Store the encoder
    label_encoders[column] = le


# Normalize numerical features
scaler = MinMaxScaler()
df[['duration', 'listeners', 'playcount']] = scaler.fit_transform(
    df[['duration', 'listeners', 'playcount']])

# Split data into features and target
X = df[['artist_name', 'tags', 'duration', 'listeners', 'playcount']]
y = df['title']

# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42)

In [4]:
class SongRecommender(nn.Module):
    def __init__(self):
        super(SongRecommender, self).__init__()
        self.fc1 = nn.Linear(5, 128)  # Adjust input features if needed
        self.fc2 = nn.Linear(128, 256)
        self.fc3 = nn.Linear(256, 128)
        # Output size = number of unique titles including 'unknown'
        # Add 1 for the 'unknown' label
        self.output = nn.Linear(128, len(y.unique()) + 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.output(x)
        return x


model = SongRecommender()
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [5]:
def train_model(model, X_train, y_train, X_test, y_test):
    train_loader = DataLoader(
        list(zip(X_train.values.astype(float), y_train)), batch_size=50, shuffle=True)
    test_loader = DataLoader(
        list(zip(X_test.values.astype(float), y_test)), batch_size=50, shuffle=False)

    model.train()
    for epoch in range(10):  # Number of epochs
        train_loss = 0
        for features, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(torch.tensor(features).float())
            # Ensure labels are long type
            loss = criterion(outputs, torch.tensor(labels).long())
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        # Validation phase
        model.eval()
        validation_loss = 0
        for features, labels in test_loader:
            outputs = model(torch.tensor(features).float())
            loss = criterion(outputs, torch.tensor(labels).long())
            validation_loss += loss.item()

        print(f'Epoch {epoch+1}, Training Loss: {train_loss / len(train_loader)}, Validation Loss: {validation_loss / len(test_loader)}')

In [6]:
train_model(model, X_train, y_train, X_test, y_test)

  outputs = model(torch.tensor(features).float())
  loss = criterion(outputs, torch.tensor(labels).long())
  outputs = model(torch.tensor(features).float())
  loss = criterion(outputs, torch.tensor(labels).long())


Epoch 1, Training Loss: 15.853079501493477, Validation Loss: 8.523108754839216
Epoch 2, Training Loss: 8.47606591825132, Validation Loss: 8.582034428914389
Epoch 3, Training Loss: 8.462377866109213, Validation Loss: 8.63951537722633
Epoch 4, Training Loss: 8.449959908002688, Validation Loss: 8.69593588511149
Epoch 5, Training Loss: 8.438638180862238, Validation Loss: 8.751291093372163
Epoch 6, Training Loss: 8.428259943738396, Validation Loss: 8.805644398643857
Epoch 7, Training Loss: 8.418747219038599, Validation Loss: 8.859075591677712
Epoch 8, Training Loss: 8.40998915684076, Validation Loss: 8.91154280162993
Epoch 9, Training Loss: 8.401934929835944, Validation Loss: 8.96310533796038
Epoch 10, Training Loss: 8.394473370210624, Validation Loss: 9.013808522905622


In [8]:
def recommend_songs(model, input_features):
    model.eval()
    with torch.no_grad():
        try:
            artist_index = label_encoders['artist_name'].transform(
                [input_features['artist_name']])
        except ValueError:
            artist_index = label_encoders['artist_name'].transform(['unknown'])

        try:
            tags_index = label_encoders['tags'].transform(
                [input_features['tags']])
        except ValueError:
            tags_index = label_encoders['tags'].transform(['unknown'])

        # Create a DataFrame with feature names
        scaled_features = pd.DataFrame(
            [[input_features['duration'], input_features['listeners'],
                input_features['playcount']]],
            columns=['duration', 'listeners', 'playcount']
        )
        scaled_features = scaler.transform(scaled_features)[0]

        features = torch.tensor(
            [artist_index[0], tags_index[0], *scaled_features]).float().unsqueeze(0)
        predictions = model(features)
        top_5_values, top_5_indices = predictions.topk(5)
        recommended_song_ids = top_5_indices.squeeze().tolist()
        
        return label_encoders['title'].inverse_transform(recommended_song_ids)

In [9]:
import requests


def fetch_song_data(api_key, artist_name, track_name):
    url = "http://ws.audioscrobbler.com/2.0/"
    params = {
        'method': 'track.getInfo',
        'api_key': api_key,
        'artist': artist_name,
        'track': track_name,
        'format': 'json'
    }
    response = requests.get(url, params=params)
    return response.json() if response.status_code == 200 else {}


def parse_song_data(song_data):
    if song_data and 'track' in song_data:
        track = song_data['track']
        return {
            'artist_name': track['artist']['name'],
            'tags': ', '.join([tag['name'] for tag in track.get('toptags', {}).get('tag', [])]),
            'duration': float(track.get('duration', 0)),
            'listeners': int(track.get('listeners', 0)),
            'playcount': int(track.get('playcount', 0)),
            'album': track.get('album', {}).get('title', 'Unknown')
        }
    return {}

In [10]:
from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv('LASTFM_API_KEY')

artist_name = 'Lagy Gaga'
track_name = 'Poker Face'

# Fetch and parse song data
song_data = fetch_song_data(api_key, artist_name, track_name)
parsed_data = parse_song_data(song_data)

# if the song is not found, or the tags column is empty, print a message
if not parsed_data or not parsed_data['tags']:
    print("Song not found or tags not available.")


recommend_songs(model, parsed_data)

array(['Smile', 'Hey Joe', 'Intro', 'Macarena',
       'Skit #2 (Kanye West/Late Registration)'], dtype='<U97')