### Check occurrence of test words in corpus

This notebook will (1) generate the traing corpora and (2) count the number of occurrences of the words used for the tests in the corpus. The tests are:

- training monitor tests: here we have words with upper and lower cases, and possible inflection of words (e.g., in toefl and syn-sem tests we have also plural words, past tenses..)

- WEAT test: here all the words corresponds to their lemmas (all lower cased)

This means that the two sets of words are different. To count the occurrences of the words of each set, I will build each time two corpora: one keeping the words in their original inflection (used for training monitor tests) and the other lemmatizing all the words (for WEAT test). Counts are computed for each decade and finally aggregated to assess both occurrences per decade and in the whole corpus.

We create three corpora:
- all person artist song lyrics
- all person male artist song lyrics
- all person female artist song lyrics

In [1]:
import os, json, re
from collections import defaultdict
import pandas as pd

import nltk
import spacy

In [2]:
nlp = spacy.load("en_core_web_sm", disable=['parser', 'ner'])
print("Spacy version: ", spacy.__version__)
print("Spacy pipeline path (for version): ", nlp.path)

Spacy version:  3.2.0
Spacy pipeline path (for version):  /Users/lorenzo/miniconda3/envs/env_lyrics/lib/python3.9/site-packages/en_core_web_sm/en_core_web_sm-3.2.0


In [3]:
def read_words_ws(file):
    '''
    This returns all the unique words of the ws test files.
    '''
    
    words_all = set()
    with open(file, 'rt') as rr:
        for line in rr:
            words = line.strip().split("\t")[:-1]
            words_all.update(words)
            
    return words_all

def read_words_tfl(file):
    '''
    This returns all the unique words of the tfl test file.
    '''
    
    tfl_test = open(file).read()
    words_all = set([w for w in re.split("\n|\t", tfl_test) 
                     if w!='' and '.' not in w and not w[0].isdigit() and len(w)>1])
    
    return words_all

In [4]:
# initialize dict with test words
all_test_words = {}

In [5]:
# ws tests (three tests)
# these words are lemmas (no inflections in this test), contains upper cases
# example of test (scale of relatedness):
# computer	internet	7.58

# all these tests contains the same set of words! -> No need to store all of them
ws353_words = read_words_ws("../data/evaluation_tests_word_embedding/ws/ws353.txt")
all_test_words['ws353'] = ws353_words

ws353_relatedness_words = read_words_ws("../data/evaluation_tests_word_embedding/ws/ws353_relatedness.txt")
#all_test_words['ws353-relatedness'] = ws353_words

ws353_similarity_words = read_words_ws("../data/evaluation_tests_word_embedding/ws/ws353_similarity.txt")
#all_test_words['ws353-similarity'] = ws353_words

In [6]:
# toefl test 
# contains inflected words, no upper cases
# example of test (multiple choice):
#1.	enormously
#a.	appropriately
#b.	uniquely
#c.	tremendously
#d.	decidedly
#c

tfl_words = read_words_tfl("../data/evaluation_tests_word_embedding/tfl/toefl.txt")
all_test_words['toefl'] = tfl_words

In [7]:
# different tests based on syntactic and semantic properties
# contains inflected words
syn_sem_tests = open("../data/evaluation_tests_word_embedding/syn_sem/questions-words.txt", 'rt').read()
syn_sem_tests = re.split(": (\w+(?:-\w+)*)\n", syn_sem_tests)[1:]
syn_sem_tests = {test_name:words.strip() for test_name, words in zip(syn_sem_tests[::2], syn_sem_tests[1::2])}

print('All tests (with example):')
for test_name, words in syn_sem_tests.items():
    print('- ', test_name)
    print('\t', words.split("\n")[0])
    
    all_test_words[f'syn_sem-{test_name}'] = set([w.strip() 
                                                  for line in words.strip().split("\n") 
                                                  for w in line.split()])

All tests (with example):
-  capital-common-countries
	 Athens Greece Baghdad Iraq
-  capital-world
	 Abuja Nigeria Accra Ghana
-  currency
	 Algeria dinar Angola kwanza
-  city-in-state
	 Chicago Illinois Houston Texas
-  family
	 boy girl brother sister
-  gram1-adjective-to-adverb
	 amazing amazingly apparent apparently
-  gram2-opposite
	 acceptable unacceptable aware unaware
-  gram3-comparative
	 bad worse big bigger
-  gram4-superlative
	 bad worst big biggest
-  gram5-present-participle
	 code coding dance dancing
-  gram6-nationality-adjective
	 Albania Albanian Argentina Argentinean
-  gram7-past-tense
	 dancing danced decreasing decreased
-  gram8-plural
	 banana bananas bird birds
-  gram9-plural-verbs
	 decrease decreases describe describes


In [8]:
# WEAT test words
# contains lemmas
weat_weat_file = "../data/Data_WEAT/weat_attrib_target.json"
weat_associations = json.load(open(weat_weat_file))
all_weat_tests = [k for k, v in weat_associations.items() 
                  if type(v) is dict and 'method' in v.keys() and v['method']=='weat']
#all_wefat_tests = [k for k, v in weat_associations.items() if type(v) is dict and 'method' in v.keys() 
#              and v['method']=='wefat']

print('All tests (with example):')
for which_test in all_weat_tests:
    print('- WEAT test name: ', which_test)
    
    A_key = weat_associations[which_test]['A_key']
    A_words = weat_associations[which_test][A_key]

    B_key = weat_associations[which_test]['B_key']
    B_words = weat_associations[which_test][B_key]

    X_key = weat_associations[which_test]['X_key']
    X_words = weat_associations[which_test][X_key]

    Y_key = weat_associations[which_test]['Y_key']
    Y_words = weat_associations[which_test][Y_key]
    
    print("\tAttributes (A and B):")
    print(f"\t\t{A_key.upper()}: {', '.join(A_words)}")
    print()
    print(f"\t\t{B_key.upper()}: {', '.join(B_words)}")
    print()
    print("\tTargets (X and Y):")
    print(f"\t\t{X_key.upper()}: {', '.join(X_words)}")
    print()
    print(f"\t\t{Y_key.upper()}: {', '.join(Y_words)}")
    
    print()
    
    all_words_ = set(A_words + B_words + X_words + Y_words)
    all_test_words[f'WEAT-{which_test}'] = all_words_

All tests (with example):
- WEAT test name:  EuropeanAmerican_AfricanAmerican_Pleasant_Unpleasant
	Attributes (A and B):
		PLEASANT: caress, freedom, health, love, peace, cheer, friend, heaven, loyal, pleasure, diamond, gentle, honest, lucky, rainbow, diploma, gift, honor, miracle, sunrise, family, happy, laughter, paradise, vacation

		UNPLEASANT: abuse, crash, filth, murder, sickness, accident, death, grief, poison, stink, assault, disaster, hatred, pollute, tragedy, bomb, divorce, jail, poverty, ugly, cancer, evil, kill, rotten, vomit

	Targets (X and Y):
		EUROPEAN AMERICAN NAMES: adam, chip, harry, josh, roger, alan, frank, ian, justin, ryan, andrew, fred, jack, matthew, stephen, brad, greg, jed, paul, todd, brandon, hank, jonathan, peter, wilbur, amanda, courtney, heather, melanie, sara, amber, crystal, katie, meredith, shannon, betsy, donna, kristin, nancy, stephanie, bobbie-sue, ellen, lauren, peggy, sue-ellen, colleen, emily, megan, rachel, wendy

		AFRICAN AMERICAN NAMES: alo

In [9]:
# WEAT2 test words
# contains lemmas
weat2_weat_file = "../data/Data_WEAT/weat_attrib_target_2.json"
weat2_associations = json.load(open(weat2_weat_file))
all_weat2_tests = [k for k, v in weat2_associations.items() 
                  if type(v) is dict and 'method' in v.keys() and v['method']=='weat']
#all_wefat_tests = [k for k, v in weat_associations.items() if type(v) is dict and 'method' in v.keys() 
#              and v['method']=='wefat']

print('All tests (with example):')
for which_test in all_weat2_tests:
    print('- WEAT test name: ', which_test)
    
    A_key = weat2_associations[which_test]['A_key']
    A_words = weat2_associations[which_test][A_key]

    B_key = weat2_associations[which_test]['B_key']
    B_words = weat2_associations[which_test][B_key]

    X_key = weat2_associations[which_test]['X_key']
    X_words = weat2_associations[which_test][X_key]

    Y_key = weat2_associations[which_test]['Y_key']
    Y_words = weat2_associations[which_test][Y_key]
    
    print("\tAttributes (A and B):")
    print(f"\t\t{A_key.upper()}: {', '.join(A_words)}")
    print()
    print(f"\t\t{B_key.upper()}: {', '.join(B_words)}")
    print()
    print("\tTargets (X and Y):")
    print(f"\t\t{X_key.upper()}: {', '.join(X_words)}")
    print()
    print(f"\t\t{Y_key.upper()}: {', '.join(Y_words)}")
    
    print()
    
    all_words_ = set(A_words + B_words + X_words + Y_words)
    all_test_words[f'WEAT2-{which_test}'] = all_words_

All tests (with example):
- WEAT test name:  Career_Family_Male_Female
	Attributes (A and B):
		MALE ATTRIBUTES: male, man, boy, brother, he, him, his, son, father, uncle, grandfather

		FEMALE ATTRIBUTES: female, woman, girl, sister, she, her, hers, daughter, mother, aunt, grandmother

	Targets (X and Y):
		CAREER WORDS: executive, management, professional, corporation, salary, office, business, career

		FAMILY WORDS: home, parents, children, family, cousins, marriage, wedding, relatives

- WEAT test name:  Math_Arts_Male_Female
	Attributes (A and B):
		MALE ATTRIBUTES: male, man, boy, brother, he, him, his, son, father, uncle, grandfather

		FEMALE ATTRIBUTES: female, woman, girl, sister, she, her, hers, daughter, mother, aunt, grandmother

	Targets (X and Y):
		MATH WORDS: math, algebra, geometry, calculus, equations, computation, numbers, addition

		ARTS WORDS: poetry, art, shakespeare, dance, literature, novel, symphony, drama

- WEAT test name:  Science_Arts_Male_Female
	Attrib

In [10]:
# WEAT3 test words
# contains lemmas
weat3_weat_file = "../data/Data_WEAT/weat_attrib_target_3.json"
weat3_associations = json.load(open(weat3_weat_file))
all_weat3_tests = [k for k, v in weat3_associations.items() 
                  if type(v) is dict and 'method' in v.keys() and v['method']=='weat']
#all_wefat_tests = [k for k, v in weat_associations.items() if type(v) is dict and 'method' in v.keys() 
#              and v['method']=='wefat']

print('All tests (with example):')
for which_test in all_weat3_tests:
    print('- WEAT test name: ', which_test)
    
    A_key = weat3_associations[which_test]['A_key']
    A_words = weat3_associations[which_test][A_key]

    B_key = weat3_associations[which_test]['B_key']
    B_words = weat3_associations[which_test][B_key]

    X_key = weat3_associations[which_test]['X_key']
    X_words = weat3_associations[which_test][X_key]

    Y_key = weat3_associations[which_test]['Y_key']
    Y_words = weat3_associations[which_test][Y_key]
    
    print("\tAttributes (A and B):")
    print(f"\t\t{A_key.upper()}: {', '.join(A_words)}")
    print()
    print(f"\t\t{B_key.upper()}: {', '.join(B_words)}")
    print()
    print("\tTargets (X and Y):")
    print(f"\t\t{X_key.upper()}: {', '.join(X_words)}")
    print()
    print(f"\t\t{Y_key.upper()}: {', '.join(Y_words)}")
    
    print()
    
    all_words_ = set(A_words + B_words + X_words + Y_words)
    all_test_words[f'WEAT3-{which_test}'] = all_words_

All tests (with example):
- WEAT test name:  Flowers_Insects_Pleasant_Unpleasant
	Attributes (A and B):
		PLEASANT: family, honest, gift, wonderful, vacation, miracle, loyal, pleasure, gentle, rainbow, love, peace, lucky, honor, freedom, happy, health, friend, laughter, cheer, joy, heaven, diploma, paradise, diamond, caress, sunrise

		UNPLEASANT: tragedy, awful, pollute, failure, agony, poison, terrible, poverty, grief, disaster, nasty, filth, murder, horrible, ugly, evil, rotten, jail, vomit, assault, sickness, accident, crash, divorce, cancer, hatred, kill, abuse, bomb, death, prison, war, stink

	Targets (X and Y):
		FLOWERS: lilac, bluebell, violet, zinnia, peony, crocus, buttercup, iris, rose, pansy, tulip, aster, daisy, azalea, marigold, hyacinth, daffodil, orchid, petunia, carnation, magnolia, lily, gladiola, poppy, clover

		INSECTS: cockroach, wasp, bedbug, ant, beetle, gnat, flea, centipede, tarantula, blackfly, fly, locust, horsefly, caterpillar, spider, roach, mosquito, dr

In [11]:
# functions to create linesentence files from lyrics corpus

def load_contractions(file):
    '''
    Load a dictionary of expansion of contractions (ex. he'll: he will / he shall). This function will load this file as a dictionary where contractions are keys and the first expansion is the value.
    
    ATTENTION TO APOSTROPHE CHARACTER!!
    
    Input:
    file : str, the file where there are the contractions
    
    Returns:
    CONTRACTIONS : dict of strings (contr:expa), couple contraction and expansion
    '''
    
    CONTRACTIONS = []
    with open(file, 'rt') as rr:
        for line in rr:
            line = line.split(':')
            CONTRACTIONS.append({line[0].strip().lower() : line[1].split('/')[0].strip()})
            CONTRACTIONS.append({line[0].strip().capitalize() : line[1].split('/')[0].strip().capitalize()})

    CONTRACTIONS = {cont:exp for dic in CONTRACTIONS for cont,exp in dic.items()}
    
    return CONTRACTIONS


def expand_contractions(text, contraction_dict):
    '''
    Given a text, expand all its contracted form.
    
    Inputs:
    text: str, a text
    contraction_dict : dict of strings, (contraction:expansion)
    
    Returns:
    text : str, the text with expanded contractions
    '''
    
    #contraction_in_text = re.findall("\w+[']\w+", text)
        
    contraction_in_text = re.findall(r"\b(?:{})\b".format("|".join(contraction_dict.keys())),
                                    text)
    
    if len(contraction_in_text)==0:
        return text
    
    expansion_pairs = [(contraction, contraction_dict[contraction]) for contraction in contraction_in_text]
    
    for con, exp in expansion_pairs:
        #text = text.replace(con, exp)
        text = re.sub(r"\b{}\b".format(con), exp, text)
        
    return text


def remove_consecutive_repeated_lines(lyric):
    '''
    Remove consecutive repeated lines across the lyric. 
    
    Preserve the stanza structure.
    '''
    
    lines = lyric.strip().split('\n')
    new_lines = [lines[0]]
    for line in lines[1:]:
        
        if line==new_lines[-1]:
            # it is a repeated line
            continue 
        else:
            new_lines.append(line)
        
    lyric_new = '\n'.join(new_lines)
    return lyric_new

def spacy_lemmatizer_single_text(text, nlp):
    
    doc = nlp(text)
    tokens = [get_lemma(token) for token in doc]
    
    return tokens
    
def spacy_get_lemma(token):
        lemma = token.lemma_
        lemma = lemma if lemma!='-PRON-' else token.text
        return lemma
    
    
def process_lyrics(text, lower=False, lemmatize=False):
    
    vocab_count = defaultdict(int)
    if lemmatize:
        vocab_count_lemma = defaultdict(int)
    
    text = text.replace("`", "'").replace("’", "'")
    text = expand_contractions(text, contraction_dict)
    text = remove_consecutive_repeated_lines(text)
    
    # remove newline chars and remove additional spaces
    text = text.replace("\n", " ")
    text = re.sub("\s{2,}", " ", text)
    
    # tokenize
    doc = nlp(text)
    tokens = [token.text for token in doc if token.is_punct==False and token.is_alpha]

    if lemmatize:
        tokens_lemma = [spacy_get_lemma(token) for token in doc if token.is_punct==False and token.is_alpha]

    if lower:
        tokens = [t.lower() for t in tokens]
        if lemmatize:
            tokens_lemma = [t.lower() for t in tokens_lemma]
            
    # remove repeated tokens
    tokens = [tokens[0]] + [token_post for token_pre, token_post in zip(tokens[:-1], tokens[1:]) 
                          if token_pre!=token_post]
    if lemmatize:
        tokens_lemma = [tokens_lemma[0]] + [token_post for token_pre, token_post in zip(tokens_lemma[:-1], tokens_lemma[1:]) 
                          if token_pre!=token_post]
        
    # count occurrences of tokens
    for token in tokens:
        vocab_count[token] += 1
    if lemmatize:
        for token in tokens_lemma:
            vocab_count_lemma[token] += 1

    line = " ".join(tokens)
    if lemmatize:
        line_lemma = " ".join(tokens_lemma)

    if lemmatize:
        return vocab_count, vocab_count_lemma, line, line_lemma
    else:
        return vocab_count, line
            
            
            

In [12]:
contraction_dict = load_contractions("../data/contractions_eng.txt")

In [13]:
# create folders to store the corpus corpus and save it
if not os.path.exists('../data/corpora'):
    os.mkdir('../data/corpora')

if not os.path.exists('../data/corpora_lemmatized'):
    os.mkdir('../data/corpora_lemmatized')
    
decades = ['1960', '1970', '1980', '1990', '2000']
lower = True

vocab_count_all_person = defaultdict(int)
vocab_count_male_person = defaultdict(int)
vocab_count_female_person = defaultdict(int)

vocab_count_all_person_lemma = defaultdict(int)
vocab_count_male_person_lemma = defaultdict(int)
vocab_count_female_person_lemma = defaultdict(int)

# open files
out_file_corpus_all_person = open("../data/corpora/all_person_artist_lyrics.cor", 'wt')
out_file_corpus_male_person = open("../data/corpora/male_person_artist_lyrics.cor", 'wt')
out_file_corpus_female_person = open("../data/corpora/female_person_artist_lyrics.cor", 'wt')

out_file_corpus_all_person_lemma = open("../data/corpora_lemmatized/all_person_artist_lyrics.cor", 'wt')
out_file_corpus_male_person_lemma = open("../data/corpora_lemmatized/male_person_artist_lyrics.cor", 'wt')
out_file_corpus_female_person_lemma = open("../data/corpora_lemmatized/female_person_artist_lyrics.cor", 'wt')

out_file_song_ids_all_person = open("../data/info_copus_lyrics_all_person.txt", 'wt')
out_file_song_ids_male_person = open("../data/info_copus_lyrics_male_person.txt", 'wt')
out_file_song_ids_female_person = open("../data/info_copus_lyrics_female_person.txt", 'wt')

for decade in decades:
    
    print(f'Processing decade {decade}...')
    
    # load all the lyrics of the decade (only person)
    lyrics_artist = pd.read_json(f"../data/dataset_10_no_duplicates/data_lyrics_person_decades/lyrics_{decade}.json.gz",
                                orient='records', lines=True)
    lyrics_artist = lyrics_artist[['artist_id', 'song_id', 'lyrics', 'other_artist_info']]
    lyrics_artist.loc[:, 'gender'] = lyrics_artist.other_artist_info.apply(lambda a_info: a_info['gender'])
    lyrics_artist.loc[:, 'artist_type'] = 'Person'
    
    lyrics = lyrics_artist.reset_index(drop=True)
    
    for idx, row in lyrics.iterrows():
        
        song_id = row.song_id
        artist_gender = row.gender
        artist_type = row.artist_type
        lyrics = row.lyrics
        
        # process lyrics
        vocab_count, vocab_count_lemma, line, line_lemma = process_lyrics(lyrics, 
                                                                          lower=lower, 
                                                                          lemmatize=True)
        
        if line.strip()=='' or line_lemma.strip()=='':
            print(f"Song ID {song_id} has empty lyrics..")
            continue
        
        # update vocab counts
        for w, count in vocab_count.items():
            vocab_count_all_person[w] += count
            if artist_gender=='Male':
                vocab_count_male_person[w] += count
            elif artist_gender=='Female':
                vocab_count_female_person[w] += count
                
        for w, count in vocab_count_lemma.items():
            vocab_count_all_person_lemma[w] += count
            if artist_gender=='Male':
                vocab_count_male_person_lemma[w] += count
            elif artist_gender=='Female':
                vocab_count_female_person_lemma[w] += count 
                
        # write lines
        out_file_corpus_all_person.write(line+'\n')
        out_file_corpus_all_person_lemma.write(line_lemma+'\n')
        out_file_song_ids_all_person.write(song_id+'\n')
        
        if artist_gender=='Male':
            out_file_corpus_male_person.write(line+'\n')
            out_file_corpus_male_person_lemma.write(line_lemma+'\n')
            out_file_song_ids_male_person.write(song_id+'\n')
        elif artist_gender=='Female':
            out_file_corpus_female_person.write(line+'\n')
            out_file_corpus_female_person_lemma.write(line_lemma+'\n')
            out_file_song_ids_female_person.write(song_id+'\n')
            
            
# close all files
out_file_corpus_all_person.close()
out_file_corpus_male_person.close()
out_file_corpus_female_person.close()

out_file_corpus_all_person_lemma.close()
out_file_corpus_male_person_lemma.close()
out_file_corpus_female_person_lemma.close()

out_file_song_ids_all_person.close()
out_file_song_ids_male_person.close()
out_file_song_ids_female_person.close()
    
    
# now count words for each test
word_tests_occurrence = []
for test_name, word_set in all_test_words.items():

    if '-' in test_name:
        test_name, test_type = test_name.split('-')[0], test_name.split('-')[1:]
        test_type = '-'.join(test_type)
    else:
        test_name, test_type = test_name, ''

    if lower:
        word_set = set([w.lower() for w in word_set]) 
        
        
    # get how many times test words occurs in corpora
    test_word_count_all_person = {w:vocab_count_all_person[w] for w in word_set}
    test_word_count_all_person_lemma = {w:vocab_count_all_person_lemma[w] for w in word_set}
    
    test_word_count_male_person = {w:vocab_count_male_person[w] for w in word_set}
    test_word_count_male_person_lemma = {w:vocab_count_male_person_lemma[w] for w in word_set}
    
    test_word_count_female_person = {w:vocab_count_female_person[w] for w in word_set}
    test_word_count_female_person_lemma = {w:vocab_count_female_person_lemma[w] for w in word_set}
    
    # append all
    word_tests_occurrence.append({
        'test_name':test_name,
        'test_type':test_type,
        'test_word_count_all_person':test_word_count_all_person,
        'test_word_count_all_person_lemmatized':test_word_count_all_person_lemma,
        'test_word_count_male_person':test_word_count_male_person,
        'test_word_count_male_person_lemmatized':test_word_count_male_person_lemma,
        'test_word_count_female_person':test_word_count_female_person,
        'test_word_count_female_person_lemmatized':test_word_count_female_person_lemma,
    })

word_tests_occurrence = pd.DataFrame(word_tests_occurrence)
word_tests_occurrence.to_json(f'../data/occurrence_test_words_in_person_corpora.json')
print('*-'*20+'\n')

Processing decade 1960...
Processing decade 1970...
Processing decade 1980...
Processing decade 1990...
Processing decade 2000...
*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-



In [14]:
# save all the word counts
json.dump(vocab_count_all_person, open("../data/vocab_count_all_person.json", 'wt'))
json.dump(vocab_count_male_person, open("../data/vocab_count_male_person.json", 'wt'))
json.dump(vocab_count_female_person, open("../data/vocab_count_female_person.json", 'wt'))

json.dump(vocab_count_all_person_lemma, open("../data/vocab_count_all_person_lemma.json", 'wt'))
json.dump(vocab_count_male_person_lemma, open("../data/vocab_count_male_person_lemma.json", 'wt'))
json.dump(vocab_count_female_person_lemma, open("../data/vocab_count_female_person_lemma.json", 'wt'))