In [1]:
#import library

from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import IDF
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
import re
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml.feature import Normalizer
from pyspark.sql import Row
from pyspark.ml import Pipeline

In [2]:
#function to clean the data

def clean_text(x):
    y = x.lower()
    y = re.sub(r'[^0-9a-z ]', '', y)
    y = re.sub('\s+', ' ', y)
    return y

In [3]:
# Define UDF's used in the process

clean_text_udf = F.udf(clean_text, StringType())
dot_udf = F.udf(lambda x,y: float(x.dot(y)), DoubleType()) 

In [4]:
# Function to read and process the file

def get_docs(path):
    textFiles = sc.wholeTextFiles(path)
    tmpFilesRDD = textFiles.map(lambda docs:(docs[0][docs[0].rfind('/')+1:], docs[1]))
    
    # Read all the docs in a dataframe. Clean the text of the docs
    textFilesDF = tmpFilesRDD.toDF(["doc_name", "text"])
    textFilesDF = textFilesDF.withColumn('text',clean_text_udf(F.col('text')))
    return textFilesDF

In [5]:
train = get_docs('datafiles/')

In [6]:
train.show()

+--------+--------------------+
|doc_name|                text|
+--------+--------------------+
|  f6.txt|project gutenberg...|
|  f7.txt|there is one that...|
|  f5.txt|gwendolen still s...|
|  f4.txt|jack a severe chi...|
|  f1.txt|the project guten...|
|  f3.txt|algernon have you...|
|  f2.txt|lady bracknell th...|
|  f9.txt|my darling said h...|
|  f8.txt|john says i musnt...|
| f10.txt|i often wonder if...|
+--------+--------------------+



In [7]:
# Make all the pipeline elements
# Tokenize, remove stopwords

tokenizer = Tokenizer().setInputCol("text").setOutputCol("tokenized")
stopwords = [clean_text(x.strip()) for x in open('stopwords.txt','r')]
stop_words_remover = StopWordsRemover(inputCol='tokenized',outputCol='tokenized_clean',stopWords=stopwords)

train = tokenizer.transform(train)
train = stop_words_remover.transform(train)

In [8]:
train.show()

+--------+--------------------+--------------------+--------------------+
|doc_name|                text|           tokenized|     tokenized_clean|
+--------+--------------------+--------------------+--------------------+
|  f6.txt|project gutenberg...|[project, gutenbe...|[project, gutenbe...|
|  f7.txt|there is one that...|[there, is, one, ...|[commands, road, ...|
|  f5.txt|gwendolen still s...|[gwendolen, still...|[gwendolen, stand...|
|  f4.txt|jack a severe chi...|[jack, a, severe,...|[jack, severe, ch...|
|  f1.txt|the project guten...|[the, project, gu...|[project, gutenbe...|
|  f3.txt|algernon have you...|[algernon, have, ...|[algernon, told, ...|
|  f2.txt|lady bracknell th...|[lady, bracknell,...|[lady, bracknell,...|
|  f9.txt|my darling said h...|[my, darling, sai...|[darling, beg, sa...|
|  f8.txt|john says i musnt...|[john, says, i, m...|[john, musnt, los...|
| f10.txt|i often wonder if...|[i, often, wonder...|[wonder, windows,...|
+--------+--------------------+-------

In [9]:
# Make TF-IDF
vectorizer = CountVectorizer(inputCol='tokenized_clean', outputCol='tf').fit(train)
train = vectorizer.transform(train)
idf = IDF(inputCol="tf", outputCol="tfidf").fit(train)
train = idf.transform(train)

In [10]:
train.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|doc_name|                text|           tokenized|     tokenized_clean|                  tf|               tfidf|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  f6.txt|project gutenberg...|[project, gutenbe...|[project, gutenbe...|(5375,[0,2,4,6,11...|(5375,[0,2,4,6,11...|
|  f7.txt|there is one that...|[there, is, one, ...|[commands, road, ...|(5375,[2,11,14,17...|(5375,[2,11,14,17...|
|  f5.txt|gwendolen still s...|[gwendolen, still...|[gwendolen, stand...|(5375,[0,1,2,3,4,...|(5375,[0,1,2,3,4,...|
|  f4.txt|jack a severe chi...|[jack, a, severe,...|[jack, severe, ch...|(5375,[1,2,3,4,5,...|(5375,[1,2,3,4,5,...|
|  f1.txt|the project guten...|[the, project, gu...|[project, gutenbe...|(5375,[0,1,2,3,5,...|(5375,[0,1,2,3,5,...|
|  f3.txt|algernon have you...|[algernon, have, ...|[algernon, told, ...

In [11]:
# Compute normalized TF-IDF
normalizer = Normalizer(inputCol="tfidf", outputCol="tfidf_norm")
train = normalizer.transform(train)

In [12]:
train.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|doc_name|                text|           tokenized|     tokenized_clean|                  tf|               tfidf|          tfidf_norm|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  f6.txt|project gutenberg...|[project, gutenbe...|[project, gutenbe...|(5375,[0,2,4,6,11...|(5375,[0,2,4,6,11...|(5375,[0,2,4,6,11...|
|  f7.txt|there is one that...|[there, is, one, ...|[commands, road, ...|(5375,[2,11,14,17...|(5375,[2,11,14,17...|(5375,[2,11,14,17...|
|  f5.txt|gwendolen still s...|[gwendolen, still...|[gwendolen, stand...|(5375,[0,1,2,3,4,...|(5375,[0,1,2,3,4,...|(5375,[0,1,2,3,4,...|
|  f4.txt|jack a severe chi...|[jack, a, severe,...|[jack, severe, ch...|(5375,[1,2,3,4,5,...|(5375,[1,2,3,4,5,...|(5375,[1,2,3,4,5,...|
|  f1.txt|the project guten...|[the, proj

In [13]:
# Transform the query 
query = get_docs('query.txt')
pipleine = Pipeline(stages=[tokenizer,stop_words_remover,vectorizer,idf,normalizer])
query = pipleine.fit(query).transform(query)
query = query.selectExpr("tfidf_norm as tfidf_query")
query = train.crossJoin(query)

In [14]:
query.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|doc_name|                text|           tokenized|     tokenized_clean|                  tf|               tfidf|          tfidf_norm|         tfidf_query|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  f6.txt|project gutenberg...|[project, gutenbe...|[project, gutenbe...|(5375,[0,2,4,6,11...|(5375,[0,2,4,6,11...|(5375,[0,2,4,6,11...|(5375,[325,560],[...|
|  f7.txt|there is one that...|[there, is, one, ...|[commands, road, ...|(5375,[2,11,14,17...|(5375,[2,11,14,17...|(5375,[2,11,14,17...|(5375,[325,560],[...|
|  f5.txt|gwendolen still s...|[gwendolen, still...|[gwendolen, stand...|(5375,[0,1,2,3,4,...|(5375,[0,1,2,3,4,...|(5375,[0,1,2,3,4,...|(5375,[325,560],[...|
|  f4.txt|jack a severe chi...|[jack, a, severe,...|

In [15]:
# Cosine similarity
query = query.withColumn('similariy', dot_udf("tfidf_query", "tfidf_norm"))
query= query.filter("similariy > 0").orderBy('similariy', ascending=False).select("doc_name")
query.show()

+--------+
|doc_name|
+--------+
|  f4.txt|
|  f2.txt|
|  f9.txt|
|  f5.txt|
+--------+

