First, we install the lyricsgenius API:

In [1]:
!pip install lyricsgenius



We import the libraries and set a path for our input file of artists:

In [1]:
from lyricsgenius import Genius
import json
import csv
import multiprocess
import queue
import logging

# OS agnostic
import os 
CSV_PATH = os.path.join(os.path.curdir, 'artists', '10000-MTV-Music-Artists-page-%s.csv')

Now we import the artist dataset into an array. This consists of the top 10,000 artists listed on MTV:

We set up a lyricsgenius token, and use the API to pull the lyrics data for each artist in the dataset:

In [None]:
# Genius setup

# Multiprocessing cores
process_number = int(multiprocess.cpu_count()) * 2

# Data management
final_ = multiprocess.Manager().list()

# artist_queue = queue.Queue()
# final_ = []
checked_artists = set()

# Pull out artists
def get_artists(queue):
    for x in range(1,5):
        path = CSV_PATH % str(x)
        with open(path, encoding="UTF-8") as csvfile:
            TopArtists = csv.reader(csvfile)
            
            # Skip header
            next(TopArtists)
            for row in TopArtists:
                artist = row[0]
                # Check if we should skip this artists since we already found the data
                if artist not in checked_artists:
                    queue.put(artist)
                      


# File management
def write_to_csv(data, columns = ["artist", "song", "data"], file_name="song_data.csv"):
    """
    data: list of dictionaries {artist, song, data}
    """
    
    csv_path = os.path.join(os.path.curdir, 'data', file_name)
    with open(csv_path, 'w') as csv_file: 
        # creating a csv dict writer object 
        print(data)
        writer = csv.DictWriter(csv_file, fieldnames = columns) 
        
        # writing headers (field names) 
        writer.writeheader() 
        
        # writing data rows 
        writer.writerows(data) 
        

def read_csv(file_name="song_lyrics.csv"):
    global final_, checked_artists     
    
    csv_path = os.path.join(os.path.curdir, 'data', file_name)
    artist_set = set()
    
    # opening the CSV file
    try:
        with open(csv_path, mode ='r') as file:   

            # reading the CSV file
            data = csv.DictReader(file)

            for entry in data:
                artist_set.add(entry["artist"])
                final_.append(entry)
                
    except FileNotFoundError:
        pass
    

# Run genius search
def search_genius(args):
    
    artist_queue, num = args
    print("[{num}] Starting\n".format(num=num), end='')
    
    # Processing
    def clean_data(data):
        cleaned_data = data.replace("\n", " ").replace(",", " ")
        return cleaned_data

    def process_artist(artist):
        artist_dict = artist.to_dict()
        return ""

    def process_song(song):
        lyrics = clean_data(song.lyrics)
        return lyrics

    def build_entry(artist, song, data, columns = ["artist", "song", "data"]):
        entry = [{"artist": artist, "song": song, "data": data}]
        return entry
        
        
    def genius_setup():
        print("[{num}] Setting up genius\n".format(num=num), end='')
        token = "EBufquOcw_ts4Y4V7yiddUNyUakTdqCpnMZhiI3XtAScWOntEom8Hj4T87gAV_cA"
        genius = Genius(token, retries=2)

        genius.verbose = False
        genius.remove_section_headers = True
        genius.skip_non_songs = True
        genius.excluded_terms = ["(Remix)", "(Live)"]
    
        return genius    
        
    
    
    from lyricsgenius import Genius
    
    genius = genius_setup()
    
    try:
        while True:
            artist = artist_queue.get()
            if artist is None:
                print("[{num}] Done\n".format(num=num), end='')
                return
            print("[{num}] Searching {artist}\n".format(num=num, artist=artist.strip()), end='')
            
            # Pull data for artist from genius
            genius_artist = genius.search_artist(artist, per_page=50, get_full_info=False)
            print("[{num}] Done".format(num=num), end='')              
            if genius_artist == None:
                print("[{num}] {artist} not found\n".format(num=num, artist=artist.strip()), end='')
                continue
                           
            artist_data =  process_artist(genius_artist)
                           
            print("[{num}] {artist} number of songs: {song_num}\n".format(num=num, artist=artist.strip(), song_num=len(genius_artist.songs)), end='')
            
            for song in genius_artist.songs:
                song_data = process_song(song)
                
                # Add to final list
                final_.append(build_entry(artist, song, song_data))
    
    except Exception as e:
        print("[{num}] Something went wrong: {error}\n".format(num=num, error= e), end='')
    
    
def run(multi_core=False):                  
    
    # Load in any previous data
    print("Reading previous")
    read_csv()
    
    pool = None
    try:  
        if multi_core:
            multiprocess.log_to_stderr().setLevel(logging.DEBUG)
            print("Multiprocessing with {process_number} processes".format(process_number=process_number))
            
            artist_queue = multiprocess.Manager().Queue()
            get_artists(artist_queue)
            
            for x in range(process_number):
                artist_queue.put(None)
            
            print(artist_queue.qsize())
            # creating processes
            with multiprocess.get_context("spawn").Pool(process_number) as pool:
                args = [(artist_queue, x) for x in range(process_number)]
                pool.map(search_genius, args)
                pool.close()
                pool.join()
            
        else:
            print("Running single core")
            artist_queue = queue.Queue()
            get_artists(artist_queue)
            artist_queue.put(None)
            print(artist_queue.qsize())
            search_genius((artist_queue, 0))

    
    except KeyboardInterrupt:
        if pool:
            pool.close()
            pool.terminate()
            pool.join()
        print("KeyboardInterrupt: Writing results")
    
    finally:
        write_to_csv(list(final_))                       


run(multi_core=True) 




Reading previous
Multiprocessing with 32 processes
8376
