# Understanding Wikipedia with Latent Semantic Analysis

The present notebook is a Py-Spark implementation of the 6th chapter of the book "Advanced Analytics with Spark: Patterns for Learning from Data at Scale" (Uri Laserson, Sean Owen, Sandy Ryza, Josh Wills), originally implemented in scala. The goal is to apply the LSA (Latent Semantic Analysis) algorithm to a corpus of Wikipedia articles. In order to do this, we employ the Wikipedia Data Dumps dataset (available<a href=https://dumps.wikimedia.org/> here</a>).

LSA is a NLP techique which strives to find the relationship between words in a set of documents using relevant concepts. Using this technique we do not compare the words themselves, but the concepts attateched to these words. To solve this, we will use Linear Algebra techiques such as SVD and map words and documents to a new semantic space, and make concept comparisons in this space.

</div>

## Contents <br>

1. Pre Processing<br>
1.1. Data Structures<br>
1.1. Reading the dataset<br>
2. TF-IDF<br>
2.1. Term Frequency<br>
2.2. Document-Term Frequency <br>
2.3. Inverse-Document Frequency<br>
2.4. Complete TF-IDF<br>
2.5. Spark TF-IDF<br>
3. LSA  (SVD)<br>
4. Finding  Important Concepts <br>
5. Term-Term Relevance<br>
    5.1. Local Implementation<br>
    5.2. Distributed Implementation<br>
6. Document-Document Relevance<br>
7. Term-Document Relevance<br>
8. Multiple-Term Queries<br>



In [1]:
import gensim
import re
import numpy as np
import time
import math
import numpy as np
from operator import add
from nltk.corpus import stopwords
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import IndexedRow
from sklearn.preprocessing import normalize
from pyspark.mllib.linalg.distributed import *
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import *
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.clustering import LDA, LDAModel


## 1. Pre Processing

In order to extract the maximum ammount of information from the text we need to pre-prcess the dataset. As so, we perform the following:<br>
•	Remove stop words and function words; <br>
•	Remove punctuation;<br>
•	Remove words that are too short (l <= 2);<br>
•	Remove nubers;<br>
•	Perform stemming (E.g., the words "cat" and "cats" will be considered the same word);<br>
•	Remove any trace of the original XML format of the dataset.<br><br>

In [2]:
special_chars = re.compile("[@$/#.ô€€€:&*+=\\[\\]?!()){},\\'\">_<;%\\s]+")
stop_words = stopwords.words("english")

function_words = "a about above after again against ago ahead all almost along already also although always am among an and any are aren't around as at away backward backwards be because before behind below beneath beside between both but by can cannot can't cause 'cos could couldn't despite did didn't do does doesn't don't down during each either even ever every except for forward from had hadn't has hasn't have haven't he her here hers herself him himself his how however if in inside inspite instead into is isn't it its itself just 'll least less like 'm many may mayn't me might mightn't mine more most much must mustn't my myself near need needn't needs neither never no none nor not now of off often on once only onto or ought oughtn't our ours ourselves out outside over past perhaps quite 're rather 's seldom several shall shan't she should shouldn't since so some sometimes soon than that the their theirs them themselves then there therefore these they this those though through thus till to together too towards under unless until up upon us used usedn't usen't usually 've very was wasn't we well were weren't what when where whether which while who whom whose why will with without won't would wouldn't yet you your yours yourself yourselves"

function_words = function_words.split()

junk_words = 'disambiguation article refer ref related onlyinclude'
junk_words = junk_words.split()


def remove_function_words(text):
    return ' '.join([word for word in text.split() if word not in function_words])

def remove_junk_words(text):
    return ' '.join([word for word in text.split() if word not in junk_words])

def pre_process(text):
    
    text = text.lower()
    text = gensim.parsing.preprocessing.strip_tags(text) 
    text = gensim.parsing.preprocessing.strip_punctuation(text)
    text = gensim.parsing.preprocessing.strip_numeric(text) 
  
    text = remove_function_words(text)
    text = remove_junk_words(text)
    
    text = gensim.parsing.preprocessing.remove_stopwords(text) 
    
    stemmer = gensim.parsing.porter.PorterStemmer()
    text = stemmer.stem(text)
    text = gensim.parsing.preprocessing.strip_short(text, minsize=3) 
  
    
    return text

### 1.1. Used Data Structures 

RDDs (Resilient Distributed Datasets) are a better choice over DataFrames when: <br>
* The data is not-structured (e.g., text data);
* Data does not follow a schema, or if we don't need to access data by attribute (column or name);
* It is necessary to apply data transformations and have low-level control over the data.

As so, we structure the data using RDDs in this notebook. <a href=https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html>More information</a>

###  1.2. Reading the dataset <br>

The dataset is split in 24 files wich resulted from parsing the original wikipedia data dump using the <a href=https://github.com/attardi/wikiextractor>WikiExtractor Tool</a>. Each file is composed of numerous Wikipedia articles, delimited by html &lt;doc&gt; tags :<br>
&lt;doc id="12" url="https://en.wikipedia.org/wiki?curid=12" title="Anarchism"&gt;<br>
Anarchism<br>
Anarchism is a political philosophy that advocates (…) <br>
&lt;doc&gt;<br>

We use the method <b><i>newAPIHadoopFile()</i></b> in order to read the the articles to an RDD . This is simple as this method lets us configure the document format as XML, as well as the enclosing article tags.

In the cell bellow we create 3 RDDs: <br></div>
* doc_rdd: An RDD in which each line has the pre-processed text of each article;
* doc_ids: An RDD in which each line has the ID of each article;
* doc_titles: An RDD in which each line has the title of an article.

In [3]:
#path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
#read articles
doc_rdd = sc.newAPIHadoopFile(
    'hdfs://master:9000/home/ABD/datasets/ch6/wiki_02',
    'com.databricks.spark.xml.XmlInputFormat',
    'org.apache.hadoop.io.IntWritable',
    'org.apache.hadoop.io.Text',
    conf={
        'xmlinput.start':'<doc>',
        'xmlinput.end':'</doc>',
        'xmlinput.encoding': 'utf-8'})

doc_ids = doc_rdd.map(lambda x: x[1]).map(lambda x: x.split('doc id="')[1].split('"')[0])
doc_titles = doc_rdd.map(lambda x: x[1]).map(lambda x: x.split('title="')[1].split('"')[0])

doc_rdd = doc_rdd.map(lambda x: x[1]).map(lambda x: re.sub(r'\<doc id=.+>', '', x)).\
    map(lambda x: re.sub(r'</doc>', '', x)).filter(lambda x: len(x) > 0).map(pre_process)
    
num_docs = doc_rdd.count()

doc_rdd.take(1)

['australian white ibis australian white ibis threskiornis molucca wading bird ibis family threskiornithidae widespread australia predominantly white plumage bare black head long downcurved black legs sister species sacred ibis historically rare urban areas australian white ibis immigrated urban areas east coast increasing numbers late commonly seen wollongong sydney melbourne gold coast brisbane townsville recent years bird increasing common perth western australia surrounding towns south western australia populations disappeared natural breeding areas macquarie marshes north western new south wales management plans introduced control problematic urban populations sydney initially described georges cuvier ibis molucca considered superspecies complex sacred ibis aethiopicus africa black headed ibis melanocephalus asia status complex vacillated years older guidebooks referred bird species molucca comprehensive review plumage patterns holyoak holyoak noted species similarities australian

## 2. TF-IDF

Next we need to calculate the dataset TF-IDF, which is a feature vectorization technique used in text mining problems to represent the importance of a term to a document in a text corpus. In a TF-IDF matrix, each line,  i, represents a term, each column, j, represents a document, and each position M(i, j) captures the importance of term i to document j. Tf-IDF also captures two intuitions: (i) if a term occurs many times in a document, the greater the importance of that term to that document, and (ii) it is more relevant to find a word in a document that occurs sparsely in a text corpus than to find a word that appears in a larger amount of documents.

### 2.1. Term Frequency


Firstly, we count the occurrences of each term in each document. We end up with an RDD in which each entry is a dictionary of word occurrences for one article.

In [4]:
def count(text):
    unique_words = set(text)
    words_list = list(unique_words)
    res_dict = {}
    for word in unique_words:
        res_dict[word] = text.count(word)
    return res_dict

#term freq
tf = doc_rdd.map(lambda x: count(x.split()))
tf.take(1)


[{'absent': 1,
  'accepted': 1,
  'activities': 1,
  'adult': 1,
  'aethiopicus': 3,
  'africa': 1,
  'age': 1,
  'alternate': 1,
  'altricial': 1,
  'american': 1,
  'animals': 1,
  'anxiety': 1,
  'aquatic': 1,
  'areas': 8,
  'asia': 1,
  'assessment': 1,
  'attains': 1,
  'august': 1,
  'australia': 5,
  'australian': 10,
  'authorities': 1,
  'bald': 1,
  'bankstown': 2,
  'bare': 1,
  'beak': 1,
  'behaviour': 1,
  'big': 1,
  'bills': 1,
  'bin': 2,
  'bins': 1,
  'bird': 6,
  'birds': 6,
  'birth': 1,
  'black': 6,
  'body': 2,
  'botanic': 1,
  'breeding': 6,
  'brisbane': 2,
  'brown': 1,
  'build': 1,
  'captive': 1,
  'centennial': 1,
  'central': 1,
  'chicken': 1,
  'chook': 1,
  'chromosome': 1,
  'city': 1,
  'clutch': 1,
  'coast': 4,
  'colloquial': 1,
  'colony': 2,
  'come': 1,
  'common': 2,
  'commonly': 2,
  'community': 2,
  'compared': 1,
  'comparison': 1,
  'competition': 1,
  'complex': 2,
  'comprehensive': 1,
  'consider': 1,
  'considered': 3,
  'control'

### 2.2 Document-Term Frequency

The studied chapter proposes two solutions in order to calculate the document-term frequency (for each term, the number of documents in which it appears): <br>
* <i>aggregate()</i>
* <i>reduce()</i>

In this notebook we use the reduce solution. The document-term frequencies are calculated in a distributed manner using <i>reduceByKey()</i>:

In [7]:
#document term freq
doc_term_freq = tf.flatMap(lambda x: x.keys()).map(lambda word: (word, 1)).reduceByKey(add)
doc_term_freq.count()

723375

Next, we need to filter the less frequent terms, as the current number of terms is too high and many of those are useless since they only appear once in the entire corpus. We filter the terms and leave only the N more frequent terms.


In [7]:
#TAKE TOP 50000 TERMS
N = 50000
doc_term_freq = sc.parallelize(doc_term_freq.top(N, key=lambda x: x[1]))
doc_term_freq.cache()

ParallelCollectionRDD[14] at parallelize at PythonRDD.scala:475

### 2.3. Inverse-Document Frequency

The IDF captures the importance of a term to all the documents - for instance, if a word occurs a large amount of times in a document, and also in other documents, maybe it is not an important word, only a frequent one.

In [8]:
idfs = doc_term_freq.map(lambda x: (x[0], math.log((num_docs / x[1]), 10)))
l_idfs = idfs.collect()

In order to pass dictionary data to MLib the dict keys cannot be strings. Therefore we need to map each term to a numeric ID:

In [9]:
term_ids = idfs.keys().zipWithIndex().collectAsMap()
inverse_term_ids = idfs.keys().zipWithIndex().map(lambda x : (x[1], x[0])).collectAsMap()

# keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 
broadcasted_terms_ids = sc.broadcast(term_ids).value
reverse_broadcasted_ter_ids = sc.broadcast(inverse_term_ids).value

print(term_ids.get('functionalism'))
print(inverse_term_ids.get(43923))

43923
functionalism


### 2.4. Complete TF-IDF

We can now calculate the final TF-IDF RDD:

In [10]:
def get_tfidf(doc):
    l = []
    total_num_terms = sum(doc.values())
    for key, value in doc.items():
        if key in broadcasted_terms_ids:
            term_id = broadcasted_terms_ids.get(key)
        else:
             continue
        idf = l_idfs[term_id]
        tf = value
        #term tf-idf
        res = idf[1] * tf / total_num_terms
        l.append((term_id, res))
        
    return SparseVector(len(l_idfs), l)
        

tf_idf = tf.map(get_tfidf)
tf_idf.take(1)

[SparseVector(50000, {0: 0.0013, 2: 0.0015, 3: 0.0039, 15: 0.0011, 19: 0.0011, 20: 0.0058, 22: 0.0012, 26: 0.0012, 27: 0.0036, 35: 0.0012, 38: 0.0013, 40: 0.0064, 45: 0.0013, 46: 0.0027, 51: 0.0013, 59: 0.0014, 64: 0.0014, 69: 0.0014, 73: 0.0014, 76: 0.0043, 81: 0.0014, 84: 0.0014, 92: 0.0029, 94: 0.0015, 100: 0.0015, 101: 0.0015, 104: 0.0015, 107: 0.0031, 112: 0.0015, 117: 0.0016, 121: 0.0016, 138: 0.0049, 141: 0.0016, 143: 0.0033, 145: 0.0017, 148: 0.01, 151: 0.0217, 152: 0.0017, 153: 0.0017, 159: 0.0017, 162: 0.0017, 175: 0.0017, 179: 0.0052, 180: 0.0139, 185: 0.007, 190: 0.0018, 193: 0.0106, 194: 0.0018, 196: 0.0018, 211: 0.0018, 214: 0.0018, 226: 0.0018, 240: 0.0018, 244: 0.0037, 246: 0.0018, 253: 0.0019, 256: 0.0037, 266: 0.0019, 269: 0.0019, 274: 0.0019, 287: 0.0019, 289: 0.0019, 304: 0.0019, 306: 0.0019, 321: 0.0039, 328: 0.002, 336: 0.002, 342: 0.002, 347: 0.002, 365: 0.002, 380: 0.002, 391: 0.0041, 417: 0.0042, 423: 0.0042, 433: 0.0021, 473: 0.0021, 475: 0.0021, 485: 0.0043, 

Additionally, we can query this structure. Given a search term, a list with the highest valued TF-IDF for that term will be retrieved.

In [11]:
def query_tfidf(term):

    term_id = broadcasted_terms_ids.get(term)

    # extract the TF*IDF score for the term's id into
    # a new RDD for each document:
    term_relevance = tf_idf.map(lambda x: x[term_id])
    # zip with the document names so we can see which is which:
    zippedResults = term_relevance.zip(doc_titles)
    
    return zippedResults.top(10, lambda x: x[0])

start = time.time()
print(query_tfidf('university'))
print(str(time.time() - start) +'s')

[(0.36996278515018993, 'Lincoln University'), (0.36996278515018993, 'University of Leuven'), (0.36996278515018993, 'University Heights'), (0.29597022812015195, 'University Park'), (0.26906384374559267, 'University College'), (0.24664185676679329, 'Lingnan University'), (0.24664185676679329, 'University of New England'), (0.22197767109011396, 'Taiwan University System'), (0.22197767109011396, 'University System of Formosa'), (0.22197767109011396, 'Jiaotong University')]
399.71733713150024s


### 2.5. Spark TF-IDF

In alternative to the previous code, Spark has an <a href=https://spark.apache.org/docs/2.0.2/mllib-feature-extraction.html>TF-IDF implementation</a> which resorts to a hashing trick, in which each feature is mapped to a index by applying a hashing function. However, it is not possible to: (ii) compute the inverse transform (from indices to strings) and (ii) there can be collisions (multiple terms mapped to the same index).

In [12]:
# Maps a sequence of terms to their term frequencies using the hashing trick.
# Now hash the words in each document to their term frequencies:
hashingTF = HashingTF()
hashing_tf = hashingTF.transform(doc_rdd)
hashing_tf.cache()

# Computes the inverse document frequency.
hashing_idf = IDF().fit(hashing_tf)
hashing_tfidf = hashing_idf.transform(tf)

# 3. LSA  (SVD)

SVD (<i>Singular Value Decomposition</i>) is a matrix decomposition algorithm. Given a matrix m \times n, the algorithm produces three matrices, which are approximately equal to the first one when multiplied:

\begin{equation*}
M=USV^T,
\end{equation*}

<p> where, U is an m × k matrix, in which each row represents a document and each column a concept, S is a k × k diagonal matrix, in which each entry corresponds to the strength of a concept, and V^T is a k × n matrix in which each column represents a term and each row a concept.
In the text mining context, SVD is called LSA (Latent Semantic Analysis).

At this moment there are no PySpark implementations of the computeSVD method, which is available only for the Scala and Java APIs. We use the solution proposed <a href=https://stackoverflow.com/a/33500704/3415409>here</a>.

In [13]:
class SVD(JavaModelWrapper):
    """Wrapper around the SVD scala case class"""
    @property
    def U(self):
        """ Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
        u = self.call("U")
        if u is not None:
            return RowMatrix(u)

    @property
    def s(self):
        """Returns a DenseVector with singular values in descending order."""
        return self.call("s")

    @property
    def V(self):
        """ Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
        return self.call("V")
    
def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
    """
    Computes the singular value decomposition of the RowMatrix.
    The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
    * s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
    * U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
    * v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
    :param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
    :param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
    :param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
    :returns: SVD object
    """
    java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
    return SVD(java_model)

The code below creates a RowMatrix using de TF-IDF matrix and passes is to the computeSVD method.

In [14]:
start = time.time()
# 100 ?
NUM_CONCEPTS = 50

tf_idf.cache()
mat = RowMatrix(tf_idf)

svd = computeSVD(mat, NUM_CONCEPTS, True)

print(str(time.time() - start) + 's')

2293.676204919815s


We can obtain each of the three matrices. Note that the U and V matrices are stored in memory locally, while V is a distributed matrix.

In [15]:
u = svd.U # rows.collect()
s = svd.s
v = svd.V

print(type(u))
print(type(s))
print(type(v))

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>
<class 'pyspark.mllib.linalg.DenseVector'>
<class 'pyspark.mllib.linalg.DenseMatrix'>


In the rest of this notebook, we make distributed computations using MLib and the equivalent local computations using numpy arrays. The code bellow transforms the matrices to the formats used in latter cells.

In [16]:
# distributed
S_distributed =  sc.parallelize(np.diag(s.toArray())).zipWithIndex()
V_distributed =  sc.parallelize(v.toArray()).zipWithIndex()
S_distributed = IndexedRowMatrix( \
    S_distributed \
    .map(lambda row: IndexedRow(row[1], row[0])) \
    ).toBlockMatrix()


V_distributed = IndexedRowMatrix( \
    V_distributed \
    .map(lambda row: IndexedRow(row[1], row[0])) \
    ).toBlockMatrix()


SV_distributed = V_distributed.multiply(S_distributed)

# local
V = v.toArray()
S = np.diag(s.toArray())

SV = np.dot(V, S)
SV_normalized = normalize(SV, 'l2')

aux = u.rows.map(lambda row: row.toArray())
U = np.array(np.array(aux.collect()))

US = np.dot(U, S)
US_normalized = normalize(US, 'l2')


# 4. Finding  Important Concepts

It is possible to check the terms and documents relevant to each of the concepts.

<b>top_terms_in_concepts(N)</b>: Returns the most relevant terms for the first N concepts. As the V matrix is local these computations are performed locally.
<b>top_docs_in_concepts(N)</b>: Returns the most relevant documents for the first N concepts. As the U matrix (which corresponds to the relationship between documents and concepts) is distributed these computations are performed in a distributed manner.

In [17]:
# local
def top_terms_in_concepts(num_results):
    top_terms = []
    
    for compNum in range(num_results):
        comp = V.T[compNum]
        # Sort the weights in the first component, and get the indeces
        indeces = np.argsort(comp).tolist()
        # Reverse the indeces, so we have the largest weights first.
        indeces.reverse()
        terms = []
        weights = []
        for i in indeces[:num_results]:
            term = reverse_broadcasted_ter_ids.get(i) 
            terms.append(term)
        weights = comp[indeces[:10]]
        res = list(zip(terms, weights))
        top_terms.append(res)
        
    return top_terms

# distributed
def top_docs_in_concepts(num_results):
    top_docs = []
    for num in range(num_results):
        doc_weights = u.rows.map(lambda i : i.toArray())
        doc_weights_for_concept = doc_weights.map(lambda i : i[num]).zip(doc_titles)
        top_docs_concept = doc_weights_for_concept.top(num_results, lambda i : i[0])
        top_docs.append(top_docs_concept)
        
    return top_docs
    

In [18]:
# num results
N = 10
top_concept_terms = top_terms_in_concepts(N)
top_concept_docs = top_docs_in_concepts(N)

for i in range(N):
    print('Concept terms: ')
    print(top_concept_terms[i])
    print('\nConcept Docs')
    print(top_concept_docs[i])
    print('\n------------------------------------')

Concept terms: 
[('countout', -1.4197141405214752e-07), ('chromed', -1.5358158327644816e-07), ('rearm', -1.5507924207199048e-07), ('teleporting', -1.6619557592672089e-07), ('unmask', -1.7496211247549191e-07), ('teleported', -1.788225940199725e-07), ('starches', -1.7981636203898323e-07), ('armband', -1.9541219844880322e-07), ('concussive', -1.9599618682463553e-07), ('reassessed', -1.9968197834944072e-07)]

Concept Docs
[(0.0, 'Omoikane'), (0.0, 'Naxi'), (0.0, 'Tojo'), (0.0, 'Nikolaes Heinsius'), (0.0, 'Morus'), (0.0, 'UWC'), (0.0, '2060'), (0.0, 'Uniontown'), (0.0, 'Phylactery'), (0.0, 'HSP')]

------------------------------------
Concept terms: 
[('rowspan', 0.047212926855510304), ('senators', 0.0031371715317751723), ('senator', 0.0017092443780995603), ('class', 0.0014898894365866476), ('die', 0.0011524894199377266), ('elects', 0.00037413143165120278), ('republicans', 0.0002520801338609358), ('admitted', 0.0001888174181935986), ('recent', 0.000166364552987889), ('secession', 0.00011055

## 5. Term-Term Relevance

LSA interprets the relationship between two terms as the cosine similarity between the columns corresponding to these terms in the rebuilt matrix. Additionally, it provides a more useful data representation as, (i) in considers word synonyms, (ii) considers polysemies, (iii) removes noise.

The cosine similarity between two columns in the rebuilt matrix is equal to the cosine similarity between two columns in the SV^t matrix. Therefore, in order to find the most relevant terms to a certain term, we normalize de SV matrix and multiply it by the matrix line which corresponds to the term. Each entry in the resulting array will be the cosine similarity between that term and all the other terms.

### 5.1.  Local Implementation

We can implement the relevant_terms function in two ways. We need two structures: the normalized SV matrix and the column of the normalized SV matrix which corresponds to the current term so that we can multiply them and get an array with the cosine similarities between the current term and all the other terms.

We implement two functions that perform the same job in different ways.

In [19]:

def relevant_terms(term):
    try:
        index = broadcasted_terms_ids.get(term)
    except:
        print("Term doesn't exist")
        
    cosine_sim = np.dot(SV_normalized, SV_normalized[index])
    indeces = np.argsort(cosine_sim).tolist()
    indeces.reverse()
    
    terms = []
    for i in indeces[:10]:
        related_term = inverse_term_ids.get(i) 
        terms.append(related_term)
    return(list(zip(terms, cosine_sim[indeces])))
    

    
def relevant_terms2(term): 
    try:
        index = broadcasted_terms_ids.get(term)
    except:
        print('Term does not exist')
        return
    
    #vector with one non-zero entry (term idf)
    term_vector = np.zeros((len(l_idfs),))
    term_vector[index] = l_idfs[index][1]
    
    #get original row from V
    row = np.dot(term_vector, V)
    #get row multiplied by S
    row = np.dot(row, S)

    row = normalize([row], 'l2') 
    cosine_sim = np.dot(SV_normalized, row.reshape(NUM_CONCEPTS,))
    
    indeces = np.argsort(cosine_sim).tolist()
    indeces.reverse()
    
    terms = []
    for i in indeces[:10]:
        related_term = inverse_term_ids.get(i) 
        terms.append(related_term)
        
    return(list(zip(terms, cosine_sim[indeces])))

print('Método 1:')
print(relevant_terms('obama'))
print('-----------')
print('Método 2:')
print(relevant_terms2('obama'))


Método 1:
[('obama', 0.99999999999999978), ('nativist', 0.99908273401611714), ('oversight', 0.99890863500119087), ('medicaid', 0.99886963690024222), ('hillary', 0.99879282507608658), ('vouchers', 0.99877638001720426), ('barack', 0.99877066093904077), ('allegiances', 0.99876433677028809), ('watchdog', 0.99872863379154408), ('congresswoman', 0.99868737406575148)]
-----------
Método 2:
[('obama', 0.99999999999999989), ('nativist', 0.99908273401611736), ('oversight', 0.99890863500119131), ('medicaid', 0.99886963690024244), ('hillary', 0.9987928250760868), ('vouchers', 0.99877638001720459), ('barack', 0.99877066093904099), ('allegiances', 0.99876433677028842), ('watchdog', 0.99872863379154431), ('congresswoman', 0.9986873740657517)]


### 5.2. Distributed Implementation

This section exists only as an example to demonstrate that we can get the most relevant terms for a term in a distributed manner, using different Mlib matrix types. Mlib provides two general matrix types: local and distributed matrices. There are two implementations of local matrices (DenseMatrix and SparseMatrix) and four implementations of distributed matrices  (CoordinateMatrix,  IndexedRowMatrix, RowMatrix e BlockMatrix). Out of all the matrix implementations, the only one that supports multiplication operations is the BlockMatrix. <br>

Therefore, in order to perform these computations in a distributed manner, we have to perform the following operations:<br>
* Find the index of the term passed as an argument;
* Normalize the SV matrix: this is a problem as we need SV to be a BlockMAtrix -- as this is the only type that allows matrix multiplication--, and these matrices have no "rows" attribute (only a "blocks" attribute) that allow us to access to the matrix rows and do modifications in a distributed manner. Therefore, we need to:
* convert this matrix into an IndexedRowMatrix (a collection of IndexedRows where each IndexedRow is a tuple with the line index and an array);
* Normalize this matrix; The result is an RDD;
* Transform the result into a BlockMatrix, as this is the only matrix that allows us to perform multiplication operations;
* Find the term array (i.e., the array that corresponds to the term, which is a row of the SV matrix and has the weights of each concept to that term);
* Transform the array into a BlockMatrix;
* Compute the cosine similarity of this term with all other terms by multiplying the term array by the normalized SV matrix. The result is a BlockMatrix.
* Transform the cosine similarity BlockMatrix into an IndexedRowMatrix in order to access the rows;
* Order the values and return the N most similar.

<br>
This is a huge work-around and therefore the rest of this notebook is implemented to run locally.

In [28]:
def normalize_distributed_matrix(row):
    index = row.index
    vector = row.vector
    v = DenseVector(normalize(vector, 'l2').reshape(NUM_CONCEPTS,))
    return (v, index)  


def relevant_terms_distributed(term, N):
    
    try:
        index = broadcasted_terms_ids.get(term)
    except:
        print('Term does not exist')
        return
    
    # normalize SV
    SV_rdd = SV_distributed.toIndexedRowMatrix().rows.map(normalize_distributed_matrix).sortBy(lambda x: x[1])
    
    #transform SV back to a BlockMatrix
    SV_normalized = IndexedRowMatrix( \
        SV_rdd \
        .map(lambda row:  IndexedRow(row[1], row[0])) \
        ).toBlockMatrix()
    
    # get term Array (index term)
    term_array = SV_normalized.toIndexedRowMatrix().rows.filter(lambda r: r.index == index).\
        map(lambda x: x.vector).first()
   
    # transform term array to a Block Matrix
    term_array = sc.parallelize([term_array.toArray()])

    term_array = term_array.zipWithIndex()

    term_array = IndexedRowMatrix(term_array \
            .map(lambda row: IndexedRow(row[1], row[0]))).toBlockMatrix()
    # Multiply SV by term:array
    cosine_sim = SV_normalized.multiply(term_array.transpose())
    a = cosine_sim.toIndexedRowMatrix().rows.top(N, lambda x: x.vector[0])
    
    results = []
    for ele in a:
        results.append((inverse_term_ids.get(ele.index), ele.vector[0]))
    return results

relevant_terms_distributed('obama', 10)
    

[('obama', 0.99999999999999956),
 ('nativist', 0.99908273401611747),
 ('oversight', 0.9989086350011912),
 ('medicaid', 0.99886963690024222),
 ('hillary', 0.99879282507608647),
 ('vouchers', 0.99877638001720448),
 ('barack', 0.99877066093904088),
 ('allegiances', 0.99876433677028809),
 ('watchdog', 0.99872863379154408),
 ('congresswoman', 0.99868737406575148)]

## 6. Document-Document Relevance

We can employ the technique from the previous section in order to compute the document-document relevancies. Let u1 be a feature vector for document 1, in order to find the similarity between this document and the remaining, we need to perform the operation (US) × u1, where US is normalized. The remaining computations are the same as in section 5.1.

In [29]:
def relevant_docs(doc):
    
    try:
        index = np.where(np_titles == doc)[0][0]
        #print(index)
    except IndexError:
        print('No such document')
        return
    
    cosine_sim = np.dot(US_normalized, US_normalized[index])

    indeces = np.argsort(cosine_sim).tolist()
    indeces.reverse()
    
    print(list(zip(np_titles[indeces[:10]], cosine_sim[indeces])))

In [30]:
np_titles = np.array(doc_titles.collect())
relevant_docs('Modulation (music)')

[('Modulation (music)', 1.0000000000000002), ('Dynamic range compression', 0.99411443174973124), ('Mystic chord', 0.99374833542782792), ('Thirteenth', 0.99281379406060255), ('Gudok', 0.99265466923047119), ('Semitone', 0.99262905480767816), ('Secondary dominant', 0.99226633118166496), ('Jostein Hasselgård', 0.99165677191222212), ('Ring modulation', 0.99147842907840711), ('Enrique Iglesias', 0.99140837634422774)]


## 7. Term-Document Relevance

Similarly to the previous sections, if v1 is the feature vector for a term, we can obtain the similarity vector between this term and the remaining by performing the operation (US) × v1.

In [31]:
def topDocsForTerm(term):
    
    try:
        index = broadcasted_terms_ids.get(term)
        term_row = V[index]
    except:
        print("Term doesn't exist")
        return
    
    cosine_sim = np.dot(U, np.dot(S, term_row))
    indeces = np.argsort(cosine_sim).tolist()
    indeces.reverse()
    
    return list(zip(np_titles[indeces[:10]], cosine_sim[indeces]))

In [32]:
print(topDocsForTerm('obama'))
print('----')
print(topDocsForTerm('canada'))

[('Partial', 0.00079606926244068794), ('Republican Party', 0.00065511300383191127), ('Unionist Party', 0.00064655768542568244), ('Progressive Party', 0.00064260817895718362), ('Union Party', 0.00064004578222817091), ('Liberty Party', 0.00061416126895609167), ('Freedom Party', 0.00060772873133703182), ('New Party', 0.00060357535639102679), ('American Party', 0.0006018677331747779), ('Democratic Socialist Party', 0.00059126595399951448)]
----
[('1899 in Canada', 0.51613812534439163), ('1890 in Canada', 0.51613812534439163), ('1882 in Canada', 0.51613812534439163), ('1884 in Canada', 0.51613812534439163), ('1977 in Canada', 0.51613812534439163), ('1944 in Canada', 0.51613812534439163), ('1933 in Canada', 0.51613812534439163), ('1894 in Canada', 0.51613812534439163), ('1956 in Canada', 0.51613812534439163), ('1891 in Canada', 0.51613812534439163)]


## 8. Multiple-Term Queries

Another way to find the relevance from one term to all the others is to generate an array, set the value at the index of that term to 1 and multiply that array by V. We can escalate this method to handle multiple terms: the only alteration is that here, we set two entries in the array to 1. In order to maintain the weights of the terms, the value set at the index of the terms will not be one, but the IDF value for each of the terms.
Then, to calculate the similarities we multiply the resulting array by S.

In [33]:
def topDocsFromTermQuery(term1, term2):
    
    try:
        index1 = broadcasted_terms_ids.get(term1)
    except:
        print("Term 1 doesn't exist")
        return
    
    try:
        index2 = broadcasted_terms_ids.get(term2)
    except IndexError:
        print('Term 2 does not exist')
        return
    
    # vector with one non-zero entry (term idf)
    term_vector = np.zeros((len(l_idfs),))
    term_vector[index1] = l_idfs[index1][1]
    term_vector[index2] = l_idfs[index2][1]
    
    # get original row from V
    row = np.dot(term_vector, V)
    
    # get row
    row = np.dot(term_vector, V)
    
    # get SV
    row = np.dot(S, row)
    
    cosine_sim = np.dot(U, row)
    
    indices = np.argsort(cosine_sim).tolist()
    indices.reverse()
    
    return list(zip(np_titles[indices[:10]], cosine_sim[indices]))
    

In [34]:
print(topDocsFromTermQuery('obama', 'states'))
print('-----')
print(topDocsFromTermQuery('factorization', 'matrix'))

[('USS', 0.038413293745060272), ('CDP', 0.02713847552955647), ('USS Adirondack', 0.021318501488897919), ('USS Randolph', 0.020609249300116855), ('USS Intrepid', 0.020413701949279266), ('USS Eel', 0.020282411561892772), ('USS Norfolk', 0.020177801124196849), ('USS Pueblo', 0.020109936796160504), ('USS Raleigh', 0.020083393053348698), ('USS Savannah', 0.020065702228895768)]
-----
[('Bilinear', 0.024044916207816316), ('Bilinear transformation', 0.012579352824499916), ('Denham', 0.0054376893418000144), ('Partial', 0.0043001705367694629), ('Republican Party', 0.0035980139092924826), ('Unionist Party', 0.0035364045740243722), ('Union Party', 0.0035311054704769613), ('John Denham', 0.0035296391200278182), ('Progressive Party', 0.0035236510674512326), ('Liberty Party', 0.0033634673320921031)]
