In [2]:
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.types import *
from pyspark.sql.functions import split, col, struct, udf
from pyspark.sql import Row
import sys
#import google_compute_engine
import gensim
from gensim import corpora,models,similarities
from gensim.matutils import softcossim 
from gensim.utils import simple_preprocess
import numpy as np
from scipy.sparse import csr_matrix

## load Spark

In [None]:
conf = SparkConf().setAppName("Final")
sc = SparkContext(conf = conf)
spark = SparkSession(sparkContext=sc)

 ## load word embeddings model

In [None]:
sc.addFile("gs://wiki_final/subword.vec")
model = gensim.models.KeyedVectors.load_word2vec_format(SparkFiles.get("subword.vec"))

## load wiki dataset

In [None]:
#xml = spark.read.format('xml').options(rowTag="page").load('gs://wiki_final/big_data.xml.bz2')
xml = spark.read.format('xml').options(rowTag="page").load('gs://wiki_final/Wikipedia-test-SUBSET.xml')

## pre process data

In [None]:
#function used in map function
def getText(row):
        #complex struct structure to get text field 
        s = row.revision.text._VALUE
        #return text a id(used for join)
        return   Row(title =row.title, text= prepText(s) ,id_= row.id)

def prepText(s):
        punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~\n'
        lowercased_str = str(s).lower()
        for ch in punc:
            lowercased_str = lowercased_str.replace(ch, ' ')
        return rmSp(lowercased_str.split(' '))
                            
def rmSp(x):
        r = []
        for w in x:
            if w!='':
                r.append(w)
        return r

#df -> rdd to transform the data into sql Rows and make text array
textRDD = xml.select('revision', 'id', 'title').rdd.map(getText)#makes a DF with text and ID 
#back to df to strip stopwords 
t = textRDD.toDF()
remover = StopWordsRemover(inputCol="text", outputCol="filtered")
stop_removed =  remover.transform(t)

#New data with id_, title, text, filtered
stop_removed.show()

## create dictionary
## create similarity matrix

In [None]:
dt = corpora.Dictionary(stop_removed.select('filtered').rdd.map(lambda x: x.filtered).collect())
similarity_matrix = model.similarity_matrix(dt, tfidf=None, threshold=0.0, exponent=2.0, nonzero_limit=100)
print("similarity mat created")

## convert articles to bag of words

In [None]:
sent_dt = stop_removed.select('filtered','id_').rdd.map(lambda x:Row(id_ = x.id_, text =  dt.doc2bow(x.filtered)))

## for each article:
* calculate cosine similarities
* sort similarities
* save top 10 similarities 

In [None]:
rdd1 = sc.emptyRDD()
for article in sent_dt.collect():
        recs = sent_dt.map(lambda x:Row(id_ = x.id_, cos =softcossim(article.text, x.text, similarity_matrix))).sortBy(lambda x: x.cos, ascending=False).take(10) 
        # this will caclulate top 10 similarities 
        rdd2 = sc.parallelize(Row(id_ = article.id_,recs = recs))
        rdd1 = rdd1.union(rdd2)

        #save results RDD to Bucket
rdd1.saveAsTextFile("gs://wiki_final/rec_id")