In [1]:
from lyricsgenius import Genius, artist
import json
import pandas as pd
import asyncio
import re
import os
from requests.exceptions import HTTPError
from IPython.display import clear_output
import nest_asyncio
import time

In [2]:
df = pd.read_csv('../datasets/rank_1/query.csv')
idf = pd.read_csv('../datasets/rank_1/unique.csv')
df['track_id'] = idf['Track ID']

json_path = '../datasets/rank_1/lyrics.json'
with open(json_path, 'r') as json_file:
    existing_data = json.load(json_file)

In [3]:
batch_size = 20
batches_done = int(len(existing_data) / batch_size)
batched_dfs = [df.iloc[i:i+batch_size] for i in range(0, len(df), batch_size)]
total_batches = len(batched_dfs)

In [4]:
with open('../secrets.json') as f:
    secrets = json.load(f)
    
token = secrets['GENIUS_ACCESS_TOKEN']
genius = Genius(token)

artist_cache = {}

In [5]:
async def search_genius(search_func, retries, delay, *args, **kwargs):
    for i in range(retries):
        try:
            data = await asyncio.to_thread(search_func, *args)
            return data
        except Exception as e:
            print(f"Error: {e}")
            if isinstance(e, HTTPError) and e.response:
                if e.response.status in {429, 503, 504}:
                    await asyncio.sleep(delay)
                elif e.response.status == 403:
                    raise e
            else:
                return None

In [6]:
async def search_artist(artist_name, retries, delay):
    res = await search_genius(genius.search_artist, retries, delay, artist_name, max_songs=0)
    return res

async def search_song(song_name, artist_name, genius_artist, retries, delay):
    if genius_artist:
        res = await search_genius(genius_artist.song, retries, delay, song_name)
    else:
        res = await search_genius(genius.search_song, retries, delay, song_name, artist_name)
    return res

In [7]:
async def get_results(batch):
    genius_artists = await asyncio.gather(*(search_artist(artist_name, retries=1, delay=50) for artist_name in batch['Artist']))
    songs_results = await asyncio.gather(*(search_song(song_name, artist_name, genius_artist, retries=2, delay=3) for song_name, artist_name, genius_artist, in zip(batch['Song'], batch['Artist'], genius_artists)))
    return songs_results

In [8]:
def clean_lyrics(raw_lyrics):
    cleaned = re.sub(r'^\d+\s+Contributors\S*\s+', '', raw_lyrics)
    cleaned = re.sub(r"^.*Lyrics", '', cleaned)
    cleaned = re.sub(r"\[.*?\]", "", cleaned)
    cleaned = re.sub(r"[^a-zA-Z0-9\s']", "", cleaned)
    cleaned = re.sub(r"\s+", " ", cleaned).strip()
    cleaned = re.sub(r"\d?Embed.*$", "", cleaned).strip()
    return cleaned

In [9]:
def clean_and_extract(track_ids, song_results):
    lyrics = [song.lyrics if song and hasattr(song, 'lyrics') else 'None' for song in song_results]
    lyrics_clean = [clean_lyrics(l) for l in lyrics]
    return {
        index: {"track_id": track_id, "lyrics": lyric}
        for index, (track_id, lyric) in enumerate(zip(track_ids, lyrics_clean))
    }

In [10]:
def append_to_file(data):
    with open(json_path, 'r') as json_file:
        existing_data = json.load(json_file)
    
    existing_data.update(data)
    
    with open(json_path, 'w') as json_file:
        json.dump(existing_data, json_file)

In [11]:
async def process_batches():
    global batches_done
    while batches_done < total_batches and input('Continue? (y/n): ').lower() == 'y':
        current_batch = batched_dfs[batches_done]
        batch_results = await get_results(current_batch)
        if batch_results:
            batch_results_cleaned = clean_and_extract(current_batch['track_id'], batch_results)
            input(batch_results_cleaned)
            append_to_file(batch_results_cleaned)
            print(f'Processed batch {batches_done+1}/{total_batches}')
            batches_done += 1
        else:
            print(f'Failed to process batch {batches_done+1}')
            break

In [None]:
nest_asyncio.apply()
start_time = time.time()
init_batches_done = batches_done
asyncio.get_event_loop().run_until_complete(process_batches())
round_batches_done = batches_done - init_batches_done
clear_output()
print(f'Batches processed: {round_batches_done}')
print(f"Execution Time: {time.time() - start_time:.2f} seconds")