In [None]:
# !pip install billboard.py

In [None]:
# !pip install git+https://github.com/johnwmillr/LyricsGenius.git

In [None]:
from datetime import datetime
import numpy as np
import pandas as pd
import re
import os
import time
import billboard
import multiprocessing as mp
import lyricsgenius
client_access_token = '<YOUR-GENIUS-API-SECRET>'
genius = lyricsgenius.Genius(client_access_token)
genius.excluded_terms = ["(Remix)", "(Live)", "(Cover)"]
genius.remove_section_headers = True
genius.verbose = False

In [None]:
# look at available charts
billboard.charts()

In [None]:
ft_terms = ['featuring', 'feature', 'ft.', 'ft', 'feat.', 'feat']
expression = r"".join(["({})|".format(term) for term in ft_terms]).strip('|')
regex = re.compile(expression, re.IGNORECASE)

def job(chart):
    songs = []
    
    for i, song in enumerate(chart):
        try:
            billboard_artist = [a.strip().lower() for a in regex.split(song.artist) if isinstance(a, str) and a.strip().lower() not in ft_terms]
            
            for search_artist in billboard_artist:
                genius_song = genius.search_song(song.title, search_artist)
                genius_artists = [a.strip().lower() for a in regex.split(genius_song.artist) if isinstance(a, str) and a.strip().lower() not in ft_terms]            
                genius_artists.extend([ft_artist['name'] for ft_artist in genius_song.featured_artists])
                
                if len(set(genius_artists).intersection(set(billboard_artist))) / len(set(billboard_artist)) >= 0.5:
                    break
            
            if len(set(genius_artists).intersection(set(billboard_artist))) / len(set(billboard_artist)) < 0.5:
                genius_song = genius.search_song(song.title)
                genius_artists = [a.strip().lower() for a in regex.split(genius_song.artist) if isinstance(a, str) and a.strip().lower() not in ft_terms]
                genius_artists.extend([ft_artist['name'] for ft_artist in genius_song.featured_artists])
                
                if len(set(genius_artists).intersection(set(billboard_artist))) / len(set(billboard_artist)) < 0.5:
                       raise Exception
            else:
                if genius_song:
                    print(i, '-', re.sub(r'\s+', ' ', genius_song.lyrics[:100]))
            
            songs.append({
            'billboardChartDate': chart.date, 
            'title': song.title,
            'artist': song.artist, 
            'releaseDate': genius_song._body['release_date'] if genius_song else None,
            'rank': song.rank,
            'weight': np.sqrt(len(chart) - song.rank + 1),
            'text': genius_song.lyrics if genius_song else None
            })
            

        except Exception:
            failed_query.append({
                'billboardChartDate': chart.date, 
                'title': song.title,
                'artist': song.artist,
                'releaseDate': None,
                'rank': song.rank,
                'weight': np.sqrt(len(chart) - song.rank + 1),
                'text': None
            })
    
    if songs:
        df = pd.DataFrame.from_dict(songs)
        df.to_csv(lyrics_destination_fpath, mode='a+', header=False, index=False)
        
        print("Finished downloading chart data ({})".format(chart.date))

In [None]:
# Create objects for multiprocessing management
manager = mp.Manager()
failed_query = manager.list()

lyrics_destination_fpath = './rapLyrics.csv'
chart_name = 'r-b-hip-hop-songs'

# lastest chart as of today 
# or specify a date as a starting point to get all the historical charts in the next step
start_date = None
chart = billboard.ChartData(chart_name, date=start_date)
to_year = 2008

que = []
n_cores = 4
pool = mp.pool.Pool(n_cores)

if not os.path.exists(lyrics_destination_fpath):
    with open(lyrics_destination_fpath, 'w') as f:
        f.write('billboardChartDate,title,artist,releaseDate,rank,weight,text\n')
        f.close()
    
while datetime.strptime(chart.date, '%Y-%m-%d').year >= to_year:
    que.append(chart)
    
    if len(que) == n_cores:
        pool.map(job,que)
        # clear the que
        que = []
    
    chart = billboard.ChartData(chart_name, date=chart.previousDate)

if que:
    pool.map(job, que)
    
pool.close()
pool.join()

In [None]:
pool.terminate()