First, we install the lyricsgenius API:

In [1]:
!pip install lyricsgenius



We import the libraries and set a path for our input file of artists:

In [5]:
from lyricsgenius import Genius
import json
import csv
import multiprocess
import queue
import logging
import sys

# OS agnostic
import os 
CSV_PATH = os.path.join(os.path.curdir, 'artists', '10000-MTV-Music-Artists-page-%s.csv')

Now we import the artist dataset into an array. This consists of the top 10,000 artists listed on MTV:

We set up a lyricsgenius token, and use the API to pull the lyrics data for each artist in the dataset:

In [21]:
# Genius setup
             
def genius_setup():
    token = "EBufquOcw_ts4Y4V7yiddUNyUakTdqCpnMZhiI3XtAScWOntEom8Hj4T87gAV_cA"
    genius = Genius(token, retries=2)

    genius.verbose = False
    genius.remove_section_headers = True
    genius.skip_non_songs = True
    genius.excluded_terms = ["(Remix)", "(Live)"]

    return genius    
# Multiprocessing cores
process_number = int(multiprocess.cpu_count()) * 2

# Data management
final_ = multiprocess.Manager().list()

# artist_queue = queue.Queue()
# final_ = []
checked_artists = set()

# Pull out artists
def get_artists(queue):
    for x in range(1,5):
        path = CSV_PATH % str(x)
        with open(path, encoding="UTF-8") as csvfile:
            TopArtists = csv.reader(csvfile)
            
            # Skip header
            next(TopArtists)
            for row in TopArtists:
                artist = row[0]
                # Check if we should skip this artists since we already found the data
                if artist not in checked_artists:
                    queue.put(artist)
                      


# File management
def write_to_csv(data, file_name="song_data.csv"):
    """
    data: list of dictionaries {artist, song, data}
    """
    
    csv_path = os.path.join(os.path.curdir, 'data', file_name)
    with open(csv_path, 'w') as csv_file: 
        # creating a csv dict writer object 
        print("Entries: {num}".format(num=len(data)))
        keys = data[0].keys()
        writer = csv.DictWriter(csv_file, fieldnames = keys) 
        
        # writing headers (field names) 
        writer.writeheader() 
        
        # writing data rows 
        writer.writerows(data) 
        

def read_csv(file_name="song_data.csv"):
    global final_, checked_artists     
    
    csv_path = os.path.join(os.path.curdir, 'data', file_name)
    
    # opening the CSV file
    try:
        with open(csv_path, mode ='r') as file:   

            # reading the CSV file
            data = csv.DictReader(file)

            for entry in data:
                checked_artists.add(entry["artist"])
                final_.append(entry)
                
        print("Number of artists already found {num}".format(num=len(checked_artists)))
    except FileNotFoundError:
        pass
    

# Run genius search
def search_genius(args):
    import sys
    artist_queue, num, genius, final_ = args
    
    def log(string):
        print("[{num}] ".format(num=num) + string + "\n", end='')
        sys.stdout.flush()
    
    # Processing
    def clean_data(data):
        cleaned_data = data.replace("\n", " ").replace(",", " ")
        return cleaned_data

    def process_artist(artist):
        artist_dict = artist.to_dict()
        return ""

    def process_song(song):
        lyrics = clean_data(song.lyrics)
        return lyrics

    def build_entry(artist, song, data, columns = ["artist", "song", "data"]):
        entry = {"artist": artist, "song": song, "data": data}
        return entry
    
    log("Starting")
    
    try:
        while True:
            artist = artist_queue.get()
            if artist is None:
                log("Done")
                return
            log("Searching {artist}".format(num=num, artist=artist.strip()))
            
            # Pull data for artist from genius
            genius_artist = genius.search_artist(artist, per_page=50, get_full_info=False)
            
            log("Finished {artist}".format(num=num, artist=artist.strip()))
            if genius_artist == None:
                log("{artist} not found".format(num=num, artist=artist.strip()))
                continue
                           
            artist_data =  process_artist(genius_artist)
                           
            log("{artist} number of songs: {song_num}".format(num=num, artist=artist.strip(), song_num=len(genius_artist.songs)))
            
            for song in genius_artist.songs:
                song_data = process_song(song)
                
                # Add to final list
                final_.append(build_entry(artist, song.title, song_data))
    
    except Exception as e:
        log("Something went wrong: {error}".format(num=num, error= e))
    
    
def run(multi_core=False): 
    
    # Setup Genius
    genius = genius_setup()
    
    # Load in any previous data
    print("Reading previous")
    read_csv()
    
    pool = None
    try:  
        if multi_core:
            # multiprocess.log_to_stderr().setLevel(logging.DEBUG)
            print("Multiprocessing with {process_number} processes".format(process_number=process_number))
            
            artist_queue = multiprocess.Manager().Queue()
            get_artists(artist_queue)
            
            for x in range(process_number):
                artist_queue.put(None)
            
            print(artist_queue.qsize())
            # creating processes
            with multiprocess.get_context("spawn").Pool(process_number) as pool:
                args = [(artist_queue, x, genius, final_) for x in range(process_number)]
                pool.map(search_genius, args)
                pool.close()
                pool.join()
            
        else:
            print("Running single core")
            artist_queue = queue.Queue()
            get_artists(artist_queue)
            artist_queue.put(None)
            print(artist_queue.qsize())
            search_genius((artist_queue, 0, genius, final_))

    
    except KeyboardInterrupt:
        if pool:
            pool.close()
            pool.terminate()
            pool.join()
        print("KeyboardInterrupt: Writing results")
    
    finally:
        write_to_csv(list(final_))                       


run(multi_core=True) 




Reading previous
Number of artists already found 27
Multiprocessing with 32 processes
Number of artists total 2999
Number of artists total 2999
Number of artists total 1329
Number of artists total 1017
8349
[1] Starting
[1] Searching Justin Bieber
[0] Starting
[0] Searching Peer van Mladen
[2] Starting
[2] Searching Drake
[3] Starting
[3] Searching Ed Sheeran
[4] Starting
[4] Searching Taylor Swift
[7] Starting
[5] Starting
[7] Searching Chris Brown
[5] Searching Nicki Minaj
[8] Starting
[8] Searching Eminem
[9] Starting
[9] Searching Beyoncé
[6] Starting
[6] Searching Fetty Wap
[11] Starting
[10] Starting
[11] Searching Rihanna
[10] Searching Elvis Presley
[12] Starting
[12] Searching Ariana Grande
[13] Starting
[13] Searching JD Shelburne
[14] Starting
[14] Searching Lil Wayne
[17] Starting
[17] Searching Wiz Khalifa
[18] Starting
[18] Searching DJ Khaled
[20] Starting
[15] Starting
[20] Searching Katy Perry
[15] Searching Miley Cyrus
[16] Starting
[16] Searching Tim McGraw
[21] Star