# Author LCCN Download

This script will talk to id.loc.gov to find the LCCN for an authorized author name heading. 

It is a requirement that you have a full name heading from the MARC record such as field 100. You can add these from Hathi trust data running the `parse_hathi_add_auth_name` script

Since name string matching can be error prone if you have work titles in your dataset make sure to configure `use_title_reconcilation` and `title_column` to get better results by using the work title in reconciliation process.

This script modifies the TSV file itself in batches, should the script timeout or other error you can rerun it and it will pickup where it left off, always run it on a backup of your orginal data files.

It creates a new column in the file `author_lccn` with the LCCN value `author_authorized_heading` which will have the authorized string heading.

In [114]:
import pandas as pd
import requests
import time
import string


## Config
Set these variables below based on your setup

`path_to_tsv` - the path to the TSV file you want to run it on

`id_column_name` - the name of the column header that contains authorized author heading value

`user_agent` - this is the value put into the headers on each request, it is good practice to identifiy your client/project when working with open free APIs

`pause_between_req` - number of seconds to wait between each API call

`use_title_reconcilation` - boolean true/false to use a title to help reconcile non-exact matches, if you have a title in your dataset set this to true to get better results

`title_column` - the name of the column that has the title to use

In [115]:
path_to_tsv = "/Users/m/Downloads/data-tmp/hathitrust_post45fiction_metadata.tsv"
id_column_name = "author_marc"
user_agent = 'YOUR PROJECT NAME HERE'
pause_between_req = 1

use_title_reconcilation = True
title_column = "shorttitle"

cache = {}

In [116]:
def add_lccn(d):
    
    if type(d[id_column_name]) != str:     
        # no heading to use skipp
        print(d)
        return d
        
    # if there is already a value skip it
    if 'author_lccn' in d:
        if type(d['author_lccn']) == str:        
            print('Skip',d[id_column_name])
            return d


    name = d[id_column_name]

    # drop any trailing commas or periods
    if name[-1] == '.'  or name[-1] == ',':
        name = name[:-1]

    # there is a common notation for the birth year that can be fixed easily ex: "Hart, Frank W. (Frank William), b. 1881" to "Hart, Frank W. (Frank William), 1881"
    name = name.replace(', b. 1', ', 1')
    
    # keep a in memory cache to speed repeated requests up
    if name in cache:
        results = cache[name]
    else:
        params = {
            'q' : name,
            'count': 5
        }
        headers={'Accept': 'application/json', 'User-Agent': user_agent}
        url = f"https://id.loc.gov/authorities/names/suggest2/"

        r = requests.get(url,params=params,headers=headers)
        try:
            data = r.json()
        except:
            print("JSON decode error with:",d[id_column_name])
            return d            

        results = data['hits']
        cache[name] = data['hits']
    
    # loop throguh each result and test the name
    for hit in results:
        if hit['suggestLabel'] == name:
            d['author_lccn'] = hit['uri'].split('/')[-1]
            d['author_authorized_heading'] = hit['aLabel']
            return d
    # check the main variant label 
    for hit in results:
        if hit['vLabel'] == name:
            d['author_lccn'] = hit['uri'].split('/')[-1]
            d['author_authorized_heading'] = hit['aLabel']
            return d

    # if there is only one hit and it has unclosed life dates and the name partially matches then select it
    if name[-1] == '-':
        if len(results) == 1:
            if name in results[0]['aLabel'] or name in results[0]['vLabel']:
                d['author_lccn'] = hit['uri'].split('/')[-1]
                d['author_authorized_heading'] = hit['aLabel']
                return d

    # if we are here then no match, loop again and look at the titles if enabled
    if use_title_reconcilation == True:            
        for hit in results:
            url = 'https://id.loc.gov/resources/works/relationships/contributorto/'
            params = {
                'page': 0,
                'label':hit['suggestLabel']
            }
            headers={'Accept': 'application/json', 'User-Agent': user_agent}

            r = requests.get(url,params=params,headers=headers)
            try:
                title_data = r.json()
            except:
                print("JSON decode error with:",d[id_column_name])
                return d

            if title_data['results'] != None:
                # convert it to a list if it a single result dictonary
                if type(title_data['results']) != list:
                    title_data['results'] = [title_data['results']]
                for title in title_data['results']:
                    if normalize_string(d[title_column]) in normalize_string(title['label']):
                        # we found the title hit, use this one
                        d['author_lccn'] = hit['uri'].split('/')[-1]
                        d['author_authorized_heading'] = hit['aLabel']

                        return d

        # often the wrong life dates are used but the main heading part is correct, so keep choping off the end of the heading and check it
        # if we get a hit and then get a title match we can be confident it is correct. but it has to have a least 2 parts
        # for example:
        # "Gorham, Charles O. (Charles Orson), 1911-"
        # "Gorham, Charles O. (Charles Orson)"
        # "Gorham, Charles O. (Charles"
        # "Gorham, Charles O" <- hits a result
        for x in range(len(name.split())-1,1,-1):
            cropped_name = " ".join(name.split()[0:x])
            if cropped_name[-1] == '.'  or cropped_name[-1] == ',':
                cropped_name = cropped_name[:-1]
            

            params = {
                'q' : cropped_name,
                'count': 5
            }
            headers={'Accept': 'application/json', 'User-Agent': user_agent}
            url = f"https://id.loc.gov/authorities/names/suggest2/"

            r = requests.get(url,params=params,headers=headers)
            try:
                data = r.json()
            except:
                print("JSON decode error with:",d[id_column_name])
                return d
            
            if len(data['hits']) == 0:
                continue

            results = data['hits']
            for hit in results:
                url = 'https://id.loc.gov/resources/works/relationships/contributorto/'
                params = {
                    'page': 0,
                    'label':hit['suggestLabel']
                }
                headers={'Accept': 'application/json', 'User-Agent': user_agent}

                r = requests.get(url,params=params,headers=headers)
                try:
                    title_data = r.json()
                except:
                    print("JSON decode error with:",d[id_column_name])
                    return d

                if title_data['results'] != None:
                    # convert it to a list if it a single result dictonary
                    if type(title_data['results']) != list:
                        title_data['results'] = [title_data['results']]
                    for title in title_data['results']:
                        if normalize_string(d[title_column]) in normalize_string(title['label']):
                            # we found the title hit, use this one
                            d['author_lccn'] = hit['uri'].split('/')[-1]
                            d['author_authorized_heading'] = hit['aLabel']
                            print("Found",name,"using cropped", cropped_name)
                            return d


        

    print("No results for ",d[id_column_name])
    
    time.sleep(pause_between_req)

    return d

def normalize_string(s):
    s = s.translate(str.maketrans('', '', string.punctuation))
    s = " ".join(s.split())
    s = s.lower()
    s = s.replace('the','')
    return s




In [117]:
# load the tsv
df = pd.read_csv(path_to_tsv, sep='\t', header=0, low_memory=False)

# we are going to split the dataframe into chunks so we can save our progress as we go but don't want to save the entire file on on every record operation
n = 100  #chunk row size
list_df = [df[i:i+n] for i in range(0,df.shape[0],n)]

# loop through each chunk
for idx, df_chunk in enumerate(list_df):

    print("Working on chunk ", idx, 'of', len(list_df))

    # if you want it to skip X number of chunks uncomment this, the number is the row to skip to
    if idx < 5500 / n:
        continue

    list_df[idx] = list_df[idx].apply(lambda d: add_lccn(d),axis=1 )  

    reformed_df = pd.concat(list_df)
    reformed_df.to_csv(path_to_tsv, sep='\t')




Working on chunk  0 of 760
Working on chunk  1 of 760
Working on chunk  2 of 760
Working on chunk  3 of 760
Working on chunk  4 of 760
Working on chunk  5 of 760
Working on chunk  6 of 760
Working on chunk  7 of 760
Working on chunk  8 of 760
Working on chunk  9 of 760
Working on chunk  10 of 760
Working on chunk  11 of 760
Working on chunk  12 of 760
Working on chunk  13 of 760
Working on chunk  14 of 760
Working on chunk  15 of 760
Working on chunk  16 of 760
Working on chunk  17 of 760
Working on chunk  18 of 760
Working on chunk  19 of 760
Working on chunk  20 of 760
Working on chunk  21 of 760
Working on chunk  22 of 760
Working on chunk  23 of 760
Working on chunk  24 of 760
Working on chunk  25 of 760
Working on chunk  26 of 760
Working on chunk  27 of 760
Working on chunk  28 of 760
Working on chunk  29 of 760
Working on chunk  30 of 760
Working on chunk  31 of 760
Working on chunk  32 of 760
Working on chunk  33 of 760
Working on chunk  34 of 760
Working on chunk  35 of 760
Wo

In [None]:
#This last block just does QA on the data to see what if any rows were not populated

# df = pd.read_csv(path_to_tsv, sep='\t', header=0, low_memory=False)
# print("There are ", df['hathi_marc'].isnull().any().sum(), 'rows with no hathi_marc column populated, here are their', id_column_name, 'values:')
# res = df.loc[df['hathi_marc'].isnull(), id_column_name].tolist()
# print(print(res))


