# Artist Features
## Access Country, Genres, Career Start Year, Etc. using Musicbrainz API
### Insert into MongoDB

In [15]:
# install/import packages
!pip install musicbrainzngs requests pandas tqdm pymongo
import musicbrainzngs
import requests
import pandas as pd
from tqdm import tqdm
import time
import os
from pymongo import MongoClient

# setup MusicBrainz
musicbrainzngs.set_useragent("ArtistDataCollector", "1.0", "")

In [16]:
# replace CONNECTION_STRING with the connection string for your MongoDB connection
connection_URI = 'CONNECTION_STRING'
client = MongoClient(connection_URI, authSource="admin")

# replace NAME with the name of your MongoDB database
db_name = "NAME"

db = client[db_name]

# specify collection
ARTISTS = db.ARTISTS

In [17]:
# read cleaned songs csv file into dataframe
all_songs_clean = pd.read_csv("all_songs_clean.csv")

# isolate list of unique artist names
artists = list(set(all_songs_clean["artist"].dropna().astype(str).tolist()))

# number of unique artists
len(artists)

4322

In [3]:
# get basic artist info from musicbrainz
def get_musicbrainz_artist(artist_name):
    try:
        # search for the artist in musicbrainz and return the first match
        result = musicbrainzngs.search_artists(artist=artist_name, limit=1)
        if result["artist-list"]:
            artist = result["artist-list"][0]
            
            # extract relevant fields and return them as a dictionary
            return {
                "name": artist.get("name"),
                "type": artist.get("type"),
                "country": artist.get("country"),
                "begin_date": artist.get("life-span", {}).get("begin"),
                "end_date": artist.get("life-span", {}).get("end")
            }
    except Exception as e:
        print(f"MusicBrainz error for {artist_name}: {e}")
    return {}

# get artist info from wikidata using a sparql query
def get_wikidata_artist(artist_name):
    # sparql query to find artist, genre, country, and bio in english
    query = f"""
    SELECT ?item ?itemLabel ?genreLabel ?countryLabel ?bio WHERE {{
      ?item rdfs:label "{artist_name}"@en.
      OPTIONAL {{ ?item wdt:P136 ?genre. }}
      OPTIONAL {{ ?item wdt:P27 ?country. }}
      OPTIONAL {{ ?item schema:description ?bio. FILTER(LANG(?bio) = "en") }}
      SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en". }}
    }}
    """
      # wikidata sparql endpoint
    url = "https://query.wikidata.org/sparql"
    try:
        # send request to wikidata
        r = requests.get(url, params={"query": query, "format": "json"},
                         headers={"User-Agent": "ArtistDataCollector/1.0"})
        data = r.json()

        # initialize containers for unique values
        genres = set()
        countries = set()
        bio = None

        # loop through returned items and collect data
        for item in data["results"]["bindings"]:
            if "genreLabel" in item:
                genres.add(item["genreLabel"]["value"])
            if "countryLabel" in item:
                countries.add(item["countryLabel"]["value"])
            if "bio" in item and not bio:
                bio = item["bio"]["value"]
                
        # return the collected data as a dictionary
        return {
            "genres": list(genres),
            "country_wikidata": list(countries),
            "bio": bio
        }
    except Exception as e:
        print(f"Wikidata error for {artist_name}: {e}")
        return {}

# combine musicbrainz and wikidata info into one object
def get_artist_metadata(artist_name):
    mb = get_musicbrainz_artist(artist_name)
    wd = get_wikidata_artist(artist_name)
    combined = mb.copy()
    combined.update(wd)
    combined["artist"] = artist_name
    return combined

In [None]:
# load previous results if the output csv already exists
out_csv = "all_artists.csv"
if os.path.exists(out_csv):
    existing = pd.read_csv(out_csv)
    processed_artists = set(existing["artist"].astype(str))
    results_df = existing
    print(f"Resuming: {len(processed_artists)} artists already processed.")
else:
    processed_artists = set()
    results_df = pd.DataFrame()

# initialize records
records = []

# loop through all unprocessed artists in the data
for artist_name in tqdm(artists, desc="Fetching artist metadata"):
    if artist_name in processed_artists:
        continue

    # get metadata for the current artist
    data = get_artist_metadata(artist_name)
    records.append(data)

    # save records in batches of 100
    if len(records) % 100 == 0:
        results_df = pd.concat([results_df, pd.DataFrame(records)], ignore_index=True)
        results_df.to_csv(out_csv, index=False)
        records = []
        print("Progress saved.")

    # delay requests
    time.sleep(0.2)

# save any remaining records
if len(records) > 0:
    results_df = pd.concat([results_df, pd.DataFrame(records)], ignore_index=True)

# save all data
results_df.to_csv(out_csv, index=False)
print("Artist metadata saved to:", out_csv)

In [18]:
# read in all songs with lyrics csv data
csv_file = "all_artists.csv"
df = pd.read_csv(csv_file)

# convert to list of dictionaries
data_dict = df.to_dict(orient='records')

# insert list of dictionaries into MongoDB
if data_dict:
    result = ARTISTS.insert_many(data_dict)
    print(f"Inserted {len(result.inserted_ids)} records into ARTISTS collection")
else:
    print("No data to insert")

Inserted 4322 records into ARTISTS collection
