# Analogy Creation

In [None]:
import itertools
import os

main_path = "F:\\BDRAD MiniProject\\NLP"

#main directory containing analogy components
#these should be individual files that contain analogous a:b pairings on separate lines
analogy_main_path = os.path.join(main_path,"analogies")

#list of file paths to each file of analogy components
analogy_file_paths = [os.path.join(root_path,file) for (root_path,sub_dirs,files) in os.walk(analogy_main_path,topdown=True) if files for file in files]


In [None]:
import csv

def analogy_creator(filepath):
    """
    input is list of filepaths to an analogy component file
    analogy files are csv files with a,b on each line where for all a in each line, the relationship to b is of the same/similar type
    creator will take all permuations of a,bentries and write to a csv file with the full a:b::c:d relationship as a,b,c,d
    """
    with open(filepath) as file:
        reader = csv.reader(file)
        data = list(reader)
        analogies=list(itertools.combinations(data,2))
        #each entry in analogies has format ([a,b],[c,d])
        analogies_formatted=list()
        for i in analogies:
            #entry in j has format [a,b]
            format_analogy = [word for j in i for word in j]
            #only append if all word are unique (the evaluator later will suppress identical output)
            if(len(set(format_analogy)) == len(format_analogy)):
                analogies_formatted.append(format_analogy)
                
    return analogies_formatted

def csv_write(row_text,output_file_path,append=False):
    if append:
        with open(output_file_path,'a',newline='',encoding='utf-8') as file:
            writer = csv.writer(file)
            #note the csv will have a trailing newline which may or may not be a problem
            writer.writerows(row_text)  
    else:
        with open(output_file_path,'w',newline='',encoding='utf-8') as file:
            writer = csv.writer(file)
            writer.writerows(row_text) 
            

analogy_output_master_path = os.path.join(main_path,"med_analogy.csv")  
with open(analogy_output_master_path,'a',newline='',encoding='utf-8') as file:
    for filepath in analogy_file_paths:
        file_name=os.path.basename(filepath)
#         file_name_indiv = file_name.replace(".csv","_analogy.csv")
#         output_analogy_file_path = os.path.join(main_path,file_name_indiv)
#         #create separate files for different analogy categories
#         csv_write(analogy_creator(filepath),output_analogy_file_path)
        #also create master file
        writer = csv.writer(file)
        writer.writerows([["#"+file_name.replace(".csv","")]])
        writer.writerows(analogy_creator(filepath))


Process word embeddings from text file. 

Will need to have vectors associated with words and word2id dictionary that maps a word to its id in the array of vectors

Will also need to create a set of normalized vectors with suffix "_n"

See: https://medium.com/@martinpella/how-to-use-pre-trained-word-embeddings-in-pytorch-71ca59249f76

# Analogy Evaluation

In [None]:
import csv
import os
import pickle
import bcolz
import numpy as np
import pandas as pd

class AnalogyEvaluate:
    def __init__(self,dimensions,glove_path,analogy_path):
        self.glove_path = glove_path
        self.analogy_path = analogy_path
        self.freq_threshold = 2
        self.dimensions = dimensions
        self.analogies = self.analogy_generate(analogy_path)
        self.file_prefix = "25custom.50d"
        
        self.load_word_vectors()
        
        with open("./maxemerling/counts.pkl","rb") as file:
            self.count_dict=pickle.load(file)
        
        
    def load_word_vectors(self):
    
        ROOT_DIR = os.path.join(f'{self.glove_path}',f'{self.file_prefix}')
        vectors = bcolz.open(ROOT_DIR+".dat")[:]
        self.word_embedding = vectors
        
        vectors = bcolz.open(ROOT_DIR+"_n.dat")[:]
        self.word_embedding_n = vectors
        
        word2idx = pickle.load(open(ROOT_DIR + "_idx.pkl", 'rb'))
        self.word2idx = word2idx
        #create reverse lookup dictionary
        id2word = dict((id, word) for word, id in self.word2idx.items())
        self.id2word = id2word
    
    def most_similar(self,positive,negative,topn=10):
 
        cumulative_vecs = list()
        for word in positive: cumulative_vecs.append(self.word_embedding_n[self.word2idx[word]])
        for word in negative: cumulative_vecs.append(-1 * self.word_embedding_n[self.word2idx[word]])

        cumulative_vector = np.array(cumulative_vecs).sum(axis=0)
        cumulative_vector /= np.linalg.norm(cumulative_vector)

        #when word vector and cumulative_vector are normalized, cosine similarity reduces to a dot product
        cos_sim = np.dot(self.word_embedding_n, cumulative_vector)

        best = np.argsort(cos_sim)[::-1][:topn + len(positive) + len(negative)+100]

        if self.freq_threshold:
            result = [(self.id2word[i], cos_sim[i]) for i in best if (self.count_dict[self.id2word[i]]>=self.freq_threshold and self.id2word[i] not in positive
                                                               and self.id2word[i] not in negative)]
        else:
            result = [(self.id2word[i], cos_sim[i]) for i in best if (self.id2word[i] not in positive and self.id2word[i] not in negative)]
        
        return result[:topn]
    
    def analogy_generate(self,filepath):
        with open(filepath,"r") as file:
            reader = csv.reader(file)
            data = list(reader)
            for i in data:
                yield i

    def eval_analogy(self):
        self.analogies = self.analogy_generate(self.analogy_path)
        correct_analogies_subcategory = 0
        total_analogies_subcategory = 0
        skipped_subcategory = 0
        indiv_subcategory_correctness = list()
        
        sub_category_list = list()
        correct_list = list()
        total_list = list()
        skipped_list = list()
        indiv_correctness_list=list()
        
        for analogy in self.analogies:
            if analogy[0].startswith("#"):
                sub_category_list.append(analogy[0])
                correct_list.append(correct_analogies_subcategory)
                total_list.append(total_analogies_subcategory)
                skipped_list.append(skipped_subcategory)
                indiv_correctness_list.append(indiv_subcategory_correctness)

                correct_analogies_subcategory = 0
                total_analogies_subcategory = 0
                skipped_subcategory = 0
                indiv_subcategory_correctness=list()
                print(analogy)
                pass
            else:
                positives = [analogy[1],analogy[2]]
                negatives = [analogy[0]]
                
                try:
                    most_similar_vec=self.most_similar(positives,negatives,topn=3)

                    #give credit if target word is in topn results
                    indiv_subcategory_correctness.append(0)
                    
                    for candidate_word in most_similar_vec:
                        #candidate_word is a tuple with the word and its cossim with 3CosAdd vector of input
                        print(analogy," ",candidate_word)
                        if (candidate_word[0] == analogy[3]) :
                            #print("CORRECT")
                            correct_analogies_subcategory +=1
                            indiv_subcategory_correctness[-1]=1
                            #can stop printing if correct, saves space. if we remove the break, the counts will still be correct assuming model doesn't give two identical outputs
                            break
                        
                    total_analogies_subcategory+=1

                except:
                    #will enter this block if word is not found in vocab
                    print(analogy,"Error - skipping")
                    skipped_subcategory +=1
                    total_analogies_subcategory+=1
                    indiv_subcategory_correctness.append(0)

        #append results of last iteration
        sub_category_list.append("Total")
        correct_list.append(correct_analogies_subcategory)
        total_list.append(total_analogies_subcategory)
        skipped_list.append(skipped_subcategory)
        indiv_correctness_list.append(indiv_subcategory_correctness)
            
        correct_sum = sum(correct_list)
        correct_list.append(correct_sum)

        total_sum = sum(total_list)
        total_list.append(total_sum)

        skipped_sum = sum(skipped_list)
        skipped_list.append(skipped_sum)


        #don't return initial 0
        return sub_category_list,correct_list[1:],total_list[1:],skipped_list[1:],indiv_correctness_list[1:]
    
    def bulk_eval(self):
        indiv_dict=dict()
        
        for i,d in enumerate(self.dimensions):
            custom_glove_str = "25customv2."+d+"d"
            pretrain_glove_str="glove.6B."+d+"d"
            
            self.file_prefix = custom_glove_str
            self.load_word_vectors()
            self.freq_threshold=2
            
            
            print(f"Radiopaedia embedding {d} dimensions")
            
            category_list,correct_list,total_list,skipped_list,indiv_correctness_list = self.eval_analogy()
            dat = {'Category':[custom_glove_str +"_"+s for s in category_list], 'Correct':correct_list,
                   "Total Analogy":total_list,"Skipped":skipped_list}
            
            #no individual list for "total"
            for num,category in enumerate(category_list[:-1]):
                indiv_dict[custom_glove_str+category]=indiv_correctness_list[num]
            
            if i == 0:
                df_master = pd.DataFrame(data=dat)
            else:
                df = pd.DataFrame(data=dat)
                df_master = df_master.append(df)

            print('@','***'*25)
            
            self.freq_threshold = 0
            
            self.file_prefix=pretrain_glove_str
            self.load_word_vectors()
            
            print(f"Pretrained embedding {d} dimensions")
            
            category_list,correct_list,total_list,skipped_list,indiv_correctness_list = self.eval_analogy()
            dat = {'Category':[pretrain_glove_str +"_"+s for s in category_list], 'Correct':correct_list,
                   "Total Analogy":total_list,"Skipped":skipped_list}
            
            for i,category in enumerate(category_list[:-1]):
                indiv_dict[pretrain_glove_str+category]=indiv_correctness_list[i]

            df = pd.DataFrame(data=dat)
            df_master = df_master.append(df)
            print('@','***'*25)
            
        #reset to smaller dimension afterwards
        self.file_prefix = "25customv2.50d"
        self.load_word_vectors()
            
        return df_master,indiv_dict

In [None]:
main_path = "F:\\BDRAD MiniProject\\NLP"
dim = ["50","100","200","300"]
#path to pretrained embeddings folder
GLOVE_PATH = os.path.join(main_path,'glove_pretrain')
#path to analogy file format of a:b::c:d as a,b,c,d
ANALOGY_PATH = os.path.join(main_path,"med_analogy.csv")

analogy = AnalogyEvaluate(dim,GLOVE_PATH,ANALOGY_PATH)

In [None]:
df,indiv_dict = analogy.bulk_eval()

# Prepare in format for R script

In [None]:
import re
dim =[50,100,200,300]
categories = set()

for i,key in enumerate(indiv_dict.keys()):
    m = re.search("#.+",key)
    categories.add(m.group(0).replace("#",""))

comparison_keys = list()    
    
for cat in categories:
    for d in dim:
        main_str = re.compile(".+"+str(d)+"d#"+cat)
        for i,key in enumerate(indiv_dict.keys()):
            m = main_str.match(f'{key}')
            if m:
                comparison_keys.append(m.group(0))

In [None]:
import numpy as np
import pandas as pd

def correct_compare(v1,v2):
    #v1, v2 must be lists of same length
    #format should be 1 for correct, 0 for incorrect
    
    if len(v1)==len(v2):
        
        v1 = np.asarray(v1)
        v2 = np.asarray(v2)
        
        #both correct
        cc=0
        #v1 incorrect v2 correct
        ic=0
        #v1 correct v2 incorrect
        ci=0
        #both incorrect
        ii=0

        #1 represents ci case, -1 represents ic case, 0 represnts either cc or ii
        dif_vec = v1-v2
        
        for i,dif in enumerate(dif_vec):
            if dif==1:
                ci+=1
            elif dif==-1:
                ic+=1
            elif dif==0:
                if v1[i]==1:
                    cc+=1
                else:
                    ii+=1              
            else:
                print("Vectors should be only 1's and 0's")
        return cc,ic,ci,ii
    else:
        print("v1 and v2 must be same length")
        return

In [None]:
column_names =["c-c","i-c","c-i","i-i"]
df_mcnemar = pd.DataFrame(columns = column_names)
title_list=list()
for i in range(0,len(comparison_keys),2):
    title = str(comparison_keys[i]+"_"+comparison_keys[i+1])
    title_list.append(title)
    
    output_list=correct_compare(indiv_dict[comparison_keys[i]],indiv_dict[comparison_keys[i+1]])
    #convert from tuple
    output_list = list(output_list)
    output_list = np.asarray(output_list)
    df_mcnemar.loc[i/2]=output_list

df_mcnemar.insert(0,"Category",title_list)