In [14]:
from nltk.corpus import wordnet
from nltk.stem import WordNetLemmatizer
from nltk import download, sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from pyspark.sql import SparkSession
import math


In [4]:
import pyspark

print("PySpark version:", pyspark.__version__)

PySpark version: 3.2.1


In [5]:
spark = SparkSession.builder.appName("Wiki_Parser").getOrCreate()
sc = spark.sparkContext    # to read input files in RDD


In [8]:
# parse (analyze) the header of Wikipedia article

def parseHeader(line):
    try:
        s = line[line.index("id=\"") + 4:]
        article_id = s[:s.index("\"")]
        s = s[s.index("url=\"") + 5:]
        url = s[:s.index("\"")]
        s = s[s.index("title=\"") + 7:]
        title = s[:s.index("\"")]
        return article_id, url, title
    except Exception as e:
        return "", "", ""

# parse Wikipedia article
def parse(lines):
    docs = []
    title = ""
    content = ""
    for line in lines:
        try:
            if line.startswith("<doc "):
                title = parseHeader(line)[2]
                content = ""
            elif line.startswith("</doc>"):
                if title and content:
                    docs.append((title, content))
            else:
                content += line + "\n"
        except Exception as e:
            content = ""
    return docs


In [6]:
sampleSize = 0.01  # 1 percent of available files. change to 1.0 for full experiment
numTerms = 5000    # change to 50000 for full experiment
k = 250            # number of latent concepts in the reduced matrix


In [7]:
textFiles = sc.wholeTextFiles("../Data/enwiki-articles/*/*").sample(False, sampleSize)
numFiles = textFiles.count()

print("Number of files:", numFiles) 


[Stage 0:>                                                          (0 + 2) / 2]

Number of files: 9




In [10]:
# to parse and flatMap the text.

def parse_flatMap(uri_text):
    uri, text = uri_text
    return parse(text.split("\n"))

plainText = textFiles.flatMap(parse_flatMap)  # Assuming parse is a function defined elsewhere

plainText.cache()
numDocs = plainText.count()
bNumDocs = sc.broadcast(numDocs)

# to check if a string contains only letters?

def isOnlyLetters(s):
    return s.isalpha()

# read stopwords and broadcast it.

stw = set(sc.textFile("../Data/stopwords.txt").collect())
bStopWords = sc.broadcast(stw)

                                                                                

In [11]:
# Download necessary resources for NLTK
download('punkt')
download('averaged_perceptron_tagger')
download('wordnet')

stop_words_set = set(bStopWords.value)

def createNLPPipeline():
    return WordNetLemmatizer()

def plainTextToLemmas(text, pipeline):
    lemmatizer = WordNetLemmatizer()

    sentences = sent_tokenize(text)            # tokenization

    lemmas = []
    for sentence in sentences:
        tokens = word_tokenize(sentence)              # word tokenization
        for token in tokens:
            lemma = lemmatizer.lemmatize(token.lower())    # lemmatization
            
            #  including lemma?
            if len(lemma) > 2 and lemma not in stop_words_set and lemma.isalpha():
                lemmas.append(lemma)

    return lemmas


[nltk_data] Downloading package punkt to /home/amir/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/amir/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package wordnet to /home/amir/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [12]:
# to perform lemmatization
def lemmatize_text(title_contents):
    pipeline = createNLPPipeline()
    title, contents = title_contents
    return (title, plainTextToLemmas(contents, pipeline))


# register lemmatization on each partition
lemmatized = plainText.mapPartitions(lambda it: map(lemmatize_text, it))

# calculate term frequencies
def calculate_term_freqs(title_terms):
    title, terms = title_terms
    termFreqs = {}
    for term in terms:
        termFreqs[term] = termFreqs.get(term, 0) + 1
    return (title, termFreqs)

docTermFreqs = lemmatized.map(calculate_term_freqs)

# RDD
docTermFreqs.cache()
num_docs = docTermFreqs.count()
print(num_docs)




283


                                                                                

In [15]:
# bring out document IDs and zip with unique IDs
docIds = docTermFreqs.map(lambda x: x[0]).zipWithUniqueId().map(lambda x: (x[1], x[0])).collectAsMap()

#  document frequencies
docFreqs = docTermFreqs.flatMap(lambda x: x[1].keys()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y, numPartitions = 24)

ordering = lambda x: x[1]
topDocFreqs = docFreqs.top(numTerms, key = ordering) # sort by frequency

# IDFs
idfs = {term: math.log(num_docs / count) for term, count in topDocFreqs}
idTerms = dict(zip(idfs.keys(), range(len(idfs)))) # term IDs

# reverse mapping of term IDs
termIds = {v: k for k, v in idTerms.items()}
bIdfs = sc.broadcast(idfs).value        # broadcast IDF 
bIdTerms = sc.broadcast(idTerms).value  # broadcast term mappings 


                                                                                

In [16]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import SingularValueDecomposition
from typing import List, Tuple
from pyspark.mllib.linalg import Vectors, Matrices
from scipy.sparse import csr_matrix
import numpy as np


In [17]:
# this part maps term frequencies to Vectors.

vecs = docTermFreqs.map(lambda x: x[1]).map(lambda termFreqs: 
                                              Vectors.sparse(len(bIdTerms), 
                                                             [(bIdTerms[term], bIdfs[term] * termFreqs[term] / sum(termFreqs.values())) 
                                                              for term in termFreqs.keys() 
                                                              if term in bIdTerms]))

vecs.cache()
vecs.count()

mat = RowMatrix(vecs)                     # RowMatrix from vecs
svd = mat.computeSVD(k, computeU = True)  # to compute SVD


24/02/14 18:26:44 WARN RowMatrix: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
24/02/14 18:31:15 WARN RowMatrix: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.


## Query the Latent Semantic Index

In [18]:
def topTermsInTopConcepts(svd: SingularValueDecomposition, numConcepts: int, numTerms: int) -> List[List[Tuple[str, float]]]:
    v = svd.V
    topTerms = []
    arr = v.toArray()
    
    for i in range(numConcepts):
        
        offs = i * v.numRows
                
        termWeights = [(arr[offs + j], j) for j in range(v.numRows) if offs + j < len(arr)]
        
        # Correct the sorting to consider the actual score of each term
        sorted_terms = sorted(termWeights, key = lambda x: -x[0][0])
        
        topTerms.append([(bIdTerms.get(idx, ("", -1))[0], score[0]) for score, idx in sorted_terms[:numTerms]])
    
    return topTerms

def topDocsInTopConcepts(svd: SingularValueDecomposition, numConcepts: int, numDocs: int) -> List[List[Tuple[str, float]]]:
    u = svd.U
    topDocs = []
    
    for i in range(numConcepts):
        docWeights = [(score, doc_id) for score, doc_id in zip(u.rows.map(lambda row: row.toArray()[i]).collect(), range(u.numRows()))]
        sorted_docs = sorted(docWeights, key = lambda x: -x[0])
        topDocs.append([(docIds.get(idx, ""), score) for score, idx in sorted_docs[:numDocs]])
    
    return topDocs


top_concept_terms = topTermsInTopConcepts(svd, 12, 12)
top_concept_docs = topDocsInTopConcepts(svd, 12, 12)

for terms, docs in zip(top_concept_terms, top_concept_docs):
    print("Concept terms: " + ", ".join([term for term, _ in terms]))
    print("Concept docs: " + ", ".join([doc for doc, _ in docs]))
    print()


                                                                                

Concept terms: , , , , , , , , , , , 
Concept docs: Red blood cell, , Beastie Boys, Bluetooth, Skyscraper, Bourbon, , Jellyfish, Desegregation, Holy Roman Emperor, Spinel, Biotin

Concept terms: 
Concept docs: Beastie Boys, British Isles, Alkanna tinctoria, Māori language, Polymer, Transphobia, Pope Liberius, Anzac biscuit, Bhangra (music), Berlin Wall, , Carl Lewis

Concept terms: 
Concept docs: Bluetooth, Red blood cell, , Skyscraper, Bourbon, , Jellyfish, Carl Lewis, Sonja Henie, Martian meteorite, Andaman Sea, Salvation

Concept terms: 
Concept docs: Red blood cell, , Skyscraper, Bourbon, , Jellyfish, Desegregation, Holy Roman Emperor, Spinel, Biotin, Apollonius of Tyana, Sonja Henie

Concept terms: 
Concept docs: Holy Roman Emperor, Pulitzer Prize for History, Income tax, Philosophy of religion, Preliminary hearing, Black Holes and Baby Universes and Other Essays, Panama, Louis de Broglie, William McGonagall, Ritual Entertainment, Battle of Stoke Field, Wyoming

Concept terms: 
Co

##  Keyword Queries

In [19]:

def termsToQueryVector(terms, idTerms, idfs):
    indices = [idTerms[term] for term in terms]
    values = [idfs[term] for term in terms]
    return csr_matrix((values, (indices, [0]*len(indices))), shape=(len(idTerms), 1))

def topDocsForTermQuery(US, V, query):
    term_row_arr = np.dot(V.toArray().T, query.toarray()).flatten()
    term_row_vec = Matrices.dense(len(term_row_arr), 1, term_row_arr)
    doc_scores = US.multiply(term_row_vec)
    all_doc_weights = doc_scores.rows.zipWithUniqueId().map(lambda x: (x[0].toArray()[0], x[1]))
    return sorted(all_doc_weights.collect(), key=lambda x: -x[0])[:10]

def multiplyByDiagonalRowMatrix(mat, diag):
    s_arr = diag.toArray()
    return RowMatrix(mat.rows.map(lambda vec: Vectors.dense(np.multiply(vec.toArray(), s_arr))))

US = multiplyByDiagonalRowMatrix(svd.U, svd.s)

terms = ["serious", "incident"]
queryVec = termsToQueryVector(terms, idTerms, idfs)
topDocsForTermQuery(US, svd.V, queryVec)


                                                                                

[(0.09522831958479537, 94),
 (0.017633439482589663, 207),
 (0.013101517516039005, 64),
 (0.01198477444436551, 209),
 (0.01160014620839545, 55),
 (0.010327259323650608, 271),
 (0.007201631651629089, 305),
 (0.006516759976356174, 4),
 (0.0058830638907972806, 37),
 (0.005283032294726741, 24)]

In [21]:
# dimensions of matrix US
num_rows_US = US.numRows()
num_cols_US = US.numCols()

# dimensions of matrix V
num_rows_V = svd.V.numRows
num_cols_V = svd.V.numCols

print("Dimensions of matrix US:")
print(f"Number of rows: {num_rows_US}")
print(f"Number of columns: {num_cols_US}")

print("\nDimensions of matrix V:")
print(f"Number of rows: {num_rows_V}")
print(f"Number of columns: {num_cols_V}")


Dimensions of matrix US:
Number of rows: 283
Number of columns: 250

Dimensions of matrix V:
Number of rows: 5000
Number of columns: 250
