# MusicBrainz artist lookup

To see this analysis live, check out my article ["Analyzing Last.fm Listening History"](http://geoffboeing.com/2016/05/analyzing-lastfm-history/)

Get artist information, including place name, for each artist that has a music brainz id in my data set generated by the [lastfm_downloader](lastfm_downloader.ipynb). Script uses a local JSON cache to store artist and area details from the music brainz api, by unique id. If any id appears in the cache, the script uses the local version instead of requesting from the api, making the process very fast.

Documentation:
 - Web service: https://wiki.musicbrainz.org/Development/XML_Web_Service/Version_2/Search
 - Artist entities: https://musicbrainz.org/doc/Artist
 - Area entities: https://musicbrainz.org/doc/Area

Sample queries:
 - Artist: https://musicbrainz.org/ws/2/artist/d4659efb-b8eb-4f03-95e9-f69ce35967a9
 - Area: https://musicbrainz.org/ws/2/area/0a70f24b-1263-4341-8d70-17b8df84154f?inc=area-rels

In [4]:
import pandas as pd, requests, time, json, os.path
import logging as lg, datetime as dt
# from keys import mb_user_agent
mb_user_agent="python-musicbrainz/0.7.3"

In [5]:
pause_standard = 1.1
pause_exceeded_rate = 2

# where to save the csv output
csv_filename = 'data/mb.csv'

# configure URLs and user-agent header
artist_name_url = 'https://musicbrainz.org/ws/2/artist/?query=artist:{}&fmt=json'
artist_id_url = 'https://musicbrainz.org/ws/2/artist/{}?fmt=json'
area_id_url = 'https://musicbrainz.org/ws/2/area/{}?inc=area-rels&fmt=json'
headers = {'User-Agent':mb_user_agent}

In [8]:
# configure local caching
area_cache_filename = 'data/area_cache.js'
artist_cache_filename = 'data/artist_cache.js'
cache_save_frequency = 10
area_requests_count = 0
artist_requests_count = 0
area_cache = json.load(open(area_cache_filename)) if os.path.isfile(area_cache_filename) else {}
artist_cache = json.load(open(artist_cache_filename)) if os.path.isfile(artist_cache_filename) else {}

In [9]:
# create a logger to capture progress
log = lg.getLogger('mb')
if not getattr(log, 'handler_set', None):
    todays_date = dt.datetime.today().strftime('%Y_%m_%d_%H_%M_%S')
    log_filename = 'logs/mb_{}.log'.format(todays_date)
    handler = lg.FileHandler(log_filename, encoding='utf-8')
    formatter = lg.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
    handler.setFormatter(formatter)
    log.addHandler(handler)
    log.setLevel(lg.INFO)
    log.handler_set = True

## Define functions

In [10]:
# make a http request to musicbrainz api and return the result
def make_request(url, headers=headers, attempt_count=1):
    
    global pause_standard
    
    time.sleep(pause_standard)
    log.info('request: {}'.format(url))
    try:
        response = requests.get(url, headers=headers)
    except Exception as e:
        log.error('requests.get failed: {} {} {}'.format(type(e), e, response.json()))
        
    if response.status_code == 200: #if status OK
        return {'status_code':response.status_code, 'json':response.json()}
    
    elif response.status_code == 503: #if status error (server busy or rate limit exceeded)
        try:
            if 'exceeding the allowable rate limit' in response.json()['error']:
                #pause_standard = pause_standard + 0.1
                log.warning('exceeded allowable rate limit, pause_standard is now {} seconds'.format(pause_standard))
                log.warning('details: {}'.format(response.json()))
                time.sleep(pause_exceeded_rate)
        except:
            pass
        
        next_attempt_count = attempt_count + 1
        log.warning('request failed with status_code 503, so we will try it again with attempt #{}'.format(next_attempt_count))
        return make_request(url, attempt_count=next_attempt_count)
    
    else: #if other status code, display info and return None for caller to handle
        log.error('make_request failed: status_code {} {}'.format(response.status_code, response.json()))
        return None

In [11]:
# query the musicbrainz api for an artist's name and return the resulting id
def get_artist_id_by_name(name):
    response = make_request(artist_name_url.format(name))
    try:
        if response is not None:
            result = response['json']
            artist_id = result['artists'][0]['id']
            return artist_id
    except:
        log.error('get_artist_id_by_name error: {}'.format(response))

In [12]:
# parse the details of an artist from the API response
def extract_artist_details_from_response(response):
    try:
        if response is not None:
            result = response['json']
            artist_details = {'id':result['id'],
                              'name':result['name'],            
                              'type':result['type'],
                              'gender':result['gender'],
                              'country':result['country'],
                              'begin_date':None,
                              'end_date':None,
                              'area_id':None,
                              'area_name':None,
                              'begin_area_id':None,
                              'begin_area_name':None,
                              'place_id':None,
                              'place':None}

            if result['life-span'] is not None and 'begin' in result['life-span'] and 'end' in result['life-span']:
                artist_details['begin_date'] = result['life-span']['begin']
                artist_details['end_date'] = result['life-span']['end']
            if result['area'] is not None and 'id' in result['area'] and 'name' in result['area']:
                artist_details['area_id'] = result['area']['id']
                artist_details['area_name'] = result['area']['name']
            if result['begin_area'] is not None and 'id' in result['begin_area'] and 'name' in result['begin_area']:
                artist_details['begin_area_id'] = result['begin_area']['id']
                artist_details['begin_area_name'] = result['begin_area']['name']
            
            # populate place with begin_area_name if it's not null, else area_name if it's not null, else None
            if artist_details['begin_area_name'] is not None:
                artist_details['place'] = artist_details['begin_area_name']
                artist_details['place_id'] = artist_details['begin_area_id']
            elif artist_details['area_name'] is not None:
                artist_details['place'] = artist_details['area_name']
                artist_details['place_id'] = artist_details['area_id']
            
            return artist_details
    
    except:
        log.error('get_artist_by_id error: {}'.format(response))

In [13]:
# get an artist object from the musicbrainz api by the musicbrainz artist id
def get_artist_by_id(artist_id):
    
    global artist_cache, artist_requests_count
    
    # first, get the artist details either from the cache or from the API
    if artist_id in artist_cache:
        # if we've looked up this ID before, get it from the cache
        log.info('retrieving artist details from cache for ID {}'.format(artist_id))
        artist_details = artist_cache[artist_id]
    else:
        # if we haven't looked up this ID before, look it up from API now
        response = make_request(artist_id_url.format(artist_id))
        artist_details = extract_artist_details_from_response(response)
        
        # add this artist to the cache so we don't have to ask the API for it again
        artist_cache[artist_id] = artist_details 
        log.info('adding artist details to cache for ID {}'.format(artist_id))
        
        # save the artist cache to disk once per every cache_save_frequency API requests
        artist_requests_count += 1
        if artist_requests_count % cache_save_frequency == 0: save_cache_to_disk(artist_cache, artist_cache_filename)
    
    # now that we have the artist details...
    return artist_details

In [14]:
# create a dataframe of artist details and place info from a list of artist IDs
def make_artists_df(artist_ids, row_labels=None, df=None, csv_save_frequency=100):
    
    # create a list of row labels if caller didn't pass one in
    if row_labels is None:
        row_labels = range(len(artist_ids))
    
    # create a new dataframe if caller didn't pass an existing one in
    cols = ['id', 'name', 'type', 'gender', 'country', 'begin_date', 'end_date', 
            'begin_area_id', 'begin_area_name', 'area_id', 'area_name', 'place_id', 'place']
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(columns=cols)
    
    start_time = time.time()
    for artist_id, n in zip(artist_ids, row_labels):
        try:
            # get the artist info object
            artist = get_artist_by_id(artist_id)

            # create (or update) a df row containing the data from this artist object
            df.loc[n] = [ artist[col] for col in cols ]
            log.info('successfully got artist details #{:,}: artist_id={}'.format(n, artist_id))
            
            # save csv dataset to disk once per every csv_save_frequency rows
            if n % csv_save_frequency == 0: df.to_csv(csv_filename, index=False, encoding='utf-8')
            
        except Exception as e:
            log.error('row #{} failed: {}'.format(n, e))
            pass
    
    df.to_csv(csv_filename, index=False, encoding='utf-8')
    finish_time = time.time()
    message = 'processed {:,} artists in {:,} seconds and saved csv'.format(len(artist_ids), round(finish_time-start_time, 2))
    log.info(message)
    print(message)
    
    return df

In [15]:
# parse the details of an area object from the API response
def extract_area_details_from_response(response):
    area_details = {}
    try:
        area_details['name'] = response['json']['name']
        if 'relations' in response['json']:
            for relation in response['json']['relations']:
                if relation['direction']=='backward' and relation['type']=='part of':
                    area_details['parent_id'] = relation['area']['id']
                    area_details['parent_name'] = relation['area']['name']
        else:
            log.warning('area returned no relations: {}'.format(result))
        return area_details
    except Exception as e:
        log.error('extract_area_details_from_response failed: {}'.format(response))
        return None

In [16]:
# get details of an 'area' from the musicbrainz api by area id
def get_area(area_id, full_area_str=''):
    
    global area_cache, area_requests_count
    
    # first, get the area details either from the cache or from the API
    if area_id in area_cache:
        # if we've looked up this ID before, get it from the cache
        log.info('retrieving area details from cache for ID {}'.format(area_id))
        area_details = area_cache[area_id]
    else:
        # if we haven't looked up this ID before, look it up from API now
        response = make_request(area_id_url.format(area_id))
        area_details = extract_area_details_from_response(response)
        
        # add this area to the cache so we don't have to ask the API for it again
        area_cache[area_id] = area_details 
        log.info('adding area details to cache for ID {}'.format(area_id))
        
        # save the area cache to disk once per every cache_save_frequency API requests
        area_requests_count += 1
        if area_requests_count % cache_save_frequency == 0: save_cache_to_disk(area_cache, area_cache_filename)
    
    # now that we have the area details...
    try:
        if full_area_str == '': 
            full_area_str = area_details['name']
        if 'parent_name' in area_details and 'parent_id' in area_details:
            full_area_str = '{}, {}'.format(full_area_str, area_details['parent_name'])
            return area_details['parent_id'], full_area_str #recursively get parent's details
        else:
            # if no parents exist, we're done
            return None, full_area_str
    except Exception as e:
        log.error('get_area error: {}'.format(e)) 
        return None, full_area_str

In [17]:
# construct a full name from an area ID
# recursively traverse the API, getting coarser-grained place details each time until top-level country
def get_place_full_name_by_area_id(area_id):
    area_name=''
    while area_id is not None:
        area_id, area_name = get_area(area_id, area_name)
    return area_name

In [18]:
# take a list of place IDs and return a dict linking each to its constructed full name
def get_place_full(unique_place_ids):
    start_time = time.time()
    message = 'we will attempt to get place full names for {:,} place IDs'.format(len(unique_place_ids))
    log.info(message)
    print(message)
    
    place_ids_names = {}
    for place_id, n in zip(unique_place_ids, range(len(unique_place_ids))):
        try:
            place_name = get_place_full_name_by_area_id(place_id)
        except:
            place_name = None
        place_ids_names[place_id] = place_name
        log.info('successfully created place #{:,}: "{}" from place ID "{}"'.format(n + 1, place_name, place_id))
    
    message = 'finished getting place full names from place IDs in {:.2f} seconds'.format(time.time()-start_time)
    log.info(message)
    print(message)
    return place_ids_names

In [19]:
# find place id in dict (created by get_place_full) and return its constructed full name
def get_place_full_from_dict(place_id):
    try:
        return place_ids_names[place_id]
    except:
        return None

In [20]:
# save local cache object in memory to disk as JSON
def save_cache_to_disk(cache, filename):
    with open(filename, 'w', encoding='utf-8') as cache_file:
        cache_file.write(json.dumps(cache))
    log.info('saved {:,} cached items to {}'.format(len(cache.keys()), filename))

## Test it with a demo

In [22]:
# where is david bowie from?
name = 'david bowie'
start_time = time.time()
artist_id = get_artist_id_by_name(name)
artist = get_artist_by_id(artist_id)
artist['place_full'] = get_place_full_name_by_area_id(artist['place_id'])
message = 'demo test finished in {:.2f} seconds'.format(time.time()-start_time)
log.info(message)
print(message)
artist['place_full']

demo test finished in 1.24 seconds


'Brixton, Lambeth, London, England, United Kingdom'

## Now run it

In [23]:
log.info('musicbrainz downloader script started')

In [24]:
# load the artist IDs from the lastfm scrobble history data set
scrobbles = pd.read_csv('data/lastfm_scrobbles.csv', encoding='utf-8')
artist_ids = scrobbles['artist_mbid'].dropna().unique()#[1000:1005]
message = 'there are {:,} unique artists to get details for'.format(len(artist_ids))
log.info(message)
print(message)

there are 2,490 unique artists to get details for


In [25]:
# get details for each unique artist and turn results into dataframe
df = make_artists_df(artist_ids)

processed 2,490 artists in 3,235.74 seconds and saved csv


## Re-try any failed rows one more time

In [26]:
# get all the row labels missing in the df (due to errors that prevented row creation)
missing_row_labels = [ label for label in range(len(artist_ids)) if label not in df.index ]

# get the artist mbid for each
row_labels_to_retry = sorted(missing_row_labels)
artist_ids_to_retry = [ artist_ids[label] for label in row_labels_to_retry ]

message = '{} artists to retry'.format(len(artist_ids_to_retry))
log.info(message)
print(message)

5 artists to retry


In [27]:
# get details for each artist to re-try, and turn results into dataframe
df = make_artists_df(artist_ids_to_retry, row_labels_to_retry, df)

processed 5 artists in 1.27 seconds and saved csv


In [28]:
# save to csv and show the head
df.to_csv(csv_filename, index=False, encoding='utf-8')
df[['name', 'place_id', 'place']].head()

Unnamed: 0,name,place_id,place
0,Anne Akiko Meyers,82f3a697-ba65-404d-a1ed-360147af7d10,San Diego
1,Camille Saint‐Saëns,dc10c22b-e510-4006-8b7f-fecb4f36436e,Paris
2,Ludwig van Beethoven,b86b7e97-c4e2-4ec2-942b-5a6cd8eea1da,Bonn
3,Remo Giazotto,c6500277-9a3d-349b-bf30-41afdbf42add,Italy
4,Tomaso Giovanni Albinoni,a6e08cd9-a712-4152-8808-9485c75e1196,Venice


## Now get full place name for each unique place ID

In [29]:
# create a dict where keys are area IDs and values are full place names from MB API
unique_place_ids = df['place_id'].dropna().unique()
place_ids_names = get_place_full(unique_place_ids)

we will attempt to get place full names for 875 place IDs
finished getting place full names from place IDs in 3466.13 seconds


In [30]:
# for each row in dataframe, pull place_full from the place_ids_names dict by place_id
df['place_full'] = df['place_id'].map(get_place_full_from_dict)
df[['name', 'place_id', 'place', 'place_full']].head()

Unnamed: 0,name,place_id,place,place_full
0,Anne Akiko Meyers,82f3a697-ba65-404d-a1ed-360147af7d10,San Diego,"San Diego, San Diego County, California, Unite..."
1,Camille Saint‐Saëns,dc10c22b-e510-4006-8b7f-fecb4f36436e,Paris,"Paris, Île-de-France, France"
2,Ludwig van Beethoven,b86b7e97-c4e2-4ec2-942b-5a6cd8eea1da,Bonn,"Bonn, Nordrhein-Westfalen, Germany"
3,Remo Giazotto,c6500277-9a3d-349b-bf30-41afdbf42add,Italy,Italy
4,Tomaso Giovanni Albinoni,a6e08cd9-a712-4152-8808-9485c75e1196,Venice,"Venice, Venezia, Veneto, Italy"


In [31]:
# for some reason MB constructs Irish places' country as "Ireland, Ireland" - so clean up the duplicate
df['place_full'] = df['place_full'].str.replace('Ireland, Ireland', 'Ireland')

## All done - wrap up

In [32]:
# OK, one final check - see how many artist ids did not make it into the final dataframe
# first get all the rows missing place_full that have place_id
mask = (pd.isnull(df['place_full'])) & (pd.notnull(df['place_id']))
rows_missing_place_full = list(df[mask].index)

# then get all the row labels missing in the df (due to errors that prevented row creation)
missing_row_labels = [ label for label in range(len(artist_ids)) if label not in df.index ]

message = '{} row labels are missing in the df'.format(len(missing_row_labels))
log.info(message)
print(message)
message = '{} rows are missing place_full but have place_id'.format(len(rows_missing_place_full))
log.info(message)
print(message)

4 row labels are missing in the df
0 rows are missing place_full but have place_id


In [33]:
# finish by saving the csv and cache files to disk
df.to_csv(csv_filename, index=False, encoding='utf-8')
save_cache_to_disk(area_cache, area_cache_filename)
save_cache_to_disk(artist_cache, artist_cache_filename)