In [91]:
%load_ext dotenv
%dotenv

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [92]:
import os
import sqlite3
import pandas as pd
import requests
from typing import Optional
API_KEY = str(os.getenv("LASTFM_KEY"))
API_SECRET = str(os.getenv("LASTFM_SECRET"))
headers = {
    'user-agent': "S&DS 230"
}

In [93]:
from tqdm.auto import tqdm
tqdm.pandas()

In [94]:
SQL_FILEPATH = os.getenv("PLAYBACK_FILE")
lyrics_db = sqlite3.connect(SQL_FILEPATH)
df = pd.read_sql_query("SELECT * FROM tracks", lyrics_db)

In [95]:
from difflib import SequenceMatcher

# Modified from: https://stackoverflow.com/a/17388505
def match(orig_name, found_name):
    return SequenceMatcher(None, orig_name, found_name).ratio()

In [96]:
match("B.o.B & Hayley Williams", "B.o.B")

0.35714285714285715

In [97]:
# Improve: search for tracks where last.fm artist is similar to the searched artist but has more listener count
def find_track(track: str, artist: Optional[str] = None):
    MIN_LISTEN = 100
    payload = {
        'api_key': API_KEY,
        'method': 'track.search',
        'format': 'json',
        'track': track,
    }
    no_artist = query_lastfm(payload)
    w_artist = None
    if artist and no_artist:
        payloadArtist = payload
        payloadArtist["artist"] = artist
        w_artist = query_lastfm(payloadArtist)
    return_payload = None
    if no_artist and w_artist:
        if w_artist["last_fm_listeners"] > MIN_LISTEN:
            return_payload = w_artist
        elif no_artist["last_fm_listeners"] > MIN_LISTEN and match(no_artist["artist"], artist) > .25:
            return_payload = w_artist
    elif w_artist and w_artist["last_fm_listeners"] > MIN_LISTEN:
        return_payload = w_artist
    elif no_artist and no_artist["last_fm_listeners"] > MIN_LISTEN and match(no_artist["artist"], artist) > .25:
        return_payload = no_artist
    return return_payload
    # B.o.B & Hayley Williams

In [98]:
def query_lastfm(payload):
    try:
        r = requests.get('https://ws.audioscrobbler.com/2.0/', headers=headers, params=payload)
        if r.status_code != 200:
            return None
        numResults = int(r.json()["results"]["opensearch:totalResults"])
        if numResults > 0:
            matches = r.json()["results"]["trackmatches"]["track"][0]["url"]
            listeners = int(r.json()["results"]["trackmatches"]["track"][0]["listeners"])
            artist = r.json()["results"]["trackmatches"]["track"][0]["artist"]
            return {"last_fm_url": matches, "last_fm_listeners": listeners, "artist": artist}
        else:
            return None
    except requests.exceptions.RequestException as e:  # This is the correct syntax
        return None

In [102]:
def find_track_series(series):
    track_data = find_track(series["track"], series["artist"])
    if track_data:
        series["last_fm_url"] = track_data["last_fm_url"]
        series["last_fm_listeners"] = track_data["last_fm_listeners"]
    else:
        series["last_fm_url"] = None
        series["last_fm_listeners"] = None
    return series

In [None]:
df = df.progress_apply(find_track_series, axis=1).drop("level_0", axis=1).drop("index", axis=1)

  0%|          | 0/3595 [00:00<?, ?it/s]

In [9]:
df.to_sql("tracks", lyrics_db, if_exists="replace")

3595

In [73]:
find_track("37623", "Gloria Estefan & Miami Sound Machine")

{'last_fm_url': 'https://www.last.fm/music/I.F.K./_/37623', 'last_fm_listeners': 35, 'artist': 'I.F.K.'}
None
