# Initialize

In [2]:
from pyspark import SparkContext, SparkConf
import os, string, math
import numpy as np

In [3]:
conf = SparkConf().setAppName("wordCounter")
sc = SparkContext(conf=conf)
doc = sc.textFile("datafiles/f1.txt")

In [4]:
f = open('stopwords.txt', 'r')
stoplist = f.read().splitlines()
stops = set(stoplist)

In [5]:
def rm_punctuation(text):
    #converted = text.encode('utf-8')
    lowercased_str = text.lower().replace('--', ' ')
    translator = str.maketrans('', '', string.punctuation)
    clean_str = lowercased_str.translate(translator)
    return clean_str

# Stage 1

1. Clean each row in the document and split into words
2. Filter out the stopwords
3. Filter out the short words
4. Append document identifier at the end of word and create key value pair
5. Count occurances of words by reduce

In [6]:
docs_dir = 'datafiles/'
doc_names = []
for filename in os.listdir(docs_dir):
    doc_names.append(filename)

doc_names

['f7.txt',
 'f6.txt',
 'f9.txt',
 'f4.txt',
 'f3.txt',
 'f8.txt',
 'f5.txt',
 'f1.txt',
 'f2.txt',
 'f10.txt']

In [7]:
for i in range(1, len(doc_names) + 1):
    if i == 1:
        doc = sc.textFile("datafiles/f1.txt")
        word_counts = doc.flatMap(lambda line: rm_punctuation(line).split()) \
                .filter(lambda word: word not in stops) \
                .filter(lambda word: len(word) > 3) \
                .map(lambda word: (word + '@d1', 1)) \
                .reduceByKey(lambda a, b: a + b)
    else:
        doc = sc.textFile("datafiles/f" + str(i) + ".txt")
        pairs = doc.flatMap(lambda line: rm_punctuation(line).split()) \
                .filter(lambda word: word not in stops) \
                .filter(lambda word: len(word) > 3) \
                .map(lambda word: (word + '@d' + str(i), 1)) \
                .reduceByKey(lambda a, b: a + b)
        word_counts = word_counts.union(pairs)
        print(word_counts.count())

1191
1771
2734
4371
4839
5019
5183
5452
6167


# Stage 2
Stage 2 contains 5 status:<br/>
(word@doc, freq) => (word, doc=freq) => (word, (doc=freq)) => (word, (doc=tfidf)) => (word, doc=tfidf )

In [8]:
term_freq = word_counts
no_of_docs = len(doc_names)
print(term_freq.count())
term_freq.take(5)

6167


[('treated@d1', 1),
 ('naturally@d1', 1),
 ('dentists@d1', 1),
 ('tedious@d1', 2),
 ('williss@d1', 2)]

In [9]:
def compute_tfidf(val):
    freqs = list(val)
    df = len(val)
    result = []
    for freq in freqs:
        doc = freq.split('=')[0]
        tf = int(freq.split('=')[1])
        tfidf = format((1 + np.log(tf)) * np.log(10 / df), '.3f')
        result.append(doc + '=' + tfidf)
    
    return result

In [10]:
def restore_docid(rdd):
    word, docs = rdd
    result = []
    for doc in docs:
        word_docid = '{0}@{1}'.format(word, doc.split('=')[0])
        tfidf = float(doc.split('=')[1])
        pair = (word_docid, tfidf)
        result.append(pair)
    
    return result

In [11]:
tf_idfs = word_counts.map(lambda pair: (pair[0].split('@')[0], pair[0].split('@')[1] + "=" + str(pair[1]))) \
                     .groupByKey() \
                     .mapValues(compute_tfidf) \
                     .flatMap(restore_docid)
tf_idfs.take(5)

[('volunteer@d5', 1.204),
 ('volunteer@d10', 2.039),
 ('volunteer@d6', 1.204),
 ('charities@d10', 1.609),
 ('charities@d5', 1.609)]

# Stage 3
(word@doc, tfidf) => (doc, word=tfidf) => (doc, (word=tfidf)) => (doc, (word=norm-tfidf)) => (word@doc, norm-tfidf)

In [15]:
def norm_tfidfs(val):
    word_tfidfs = list(val)
    tf_idfs = []
    result = []
    S = 0
    for word_tfidf in word_tfidfs:
        score = float(word_tfidf.split('=')[1])
        tf_idfs.append(score)
        S += score ** 2

    norms = np.array(tf_idfs) / math.sqrt(S)
    for i in range(len(word_tfidfs)):
        word_norm = word_tfidfs[i].split('=')[0] + '=' + str(format(norms[i], '.4f'))
        result.append(word_norm)
    
    return result

In [16]:
def restore_word(rdd):
    docid, scores = rdd
    result = []
    for score in scores:
        word_docid = '{0}@{1}'.format(score.split('=')[0], docid)
        norm = float(score.split('=')[1])
        pair = (word_docid, norm)
        result.append(pair)
    
    return result

In [17]:
norm_tfidfs = tf_idfs.map(lambda pair: (pair[0].split('@')[1], '{0}={1}'.format(pair[0].split('@')[0], pair[1]))) \
                     .groupByKey() \
                     .mapValues(norm_tfidfs) \
                     .flatMap(restore_word)
norm_tfidfs.take(5)

[('weeks@d8', 0.1073),
 ('head@d8', 0.0117),
 ('reproachful@d8', 0.1211),
 ('leave@d8', 0.0455),
 ('creepy@d8', 0.1211)]

# Stage 4
(word@doc, norm-tfidf) => (doc, word=norm-tfidf) => (doc, (word=norm-tfidf)) => (doc, sum-of-norm-tfidfs)

In [18]:
query = open('query.txt', 'r')
query_words = query.read().split()
query_words

['happy', 'dinner']

In [19]:
def sum_norm_tfidfs(val):
    word_norms = list(val)
    sum_of_norm_tfidfs = 0
    for word_norm in word_norms:
        norm_tfidf = float(word_norm.split('=')[1])
        sum_of_norm_tfidfs += norm_tfidf
    
    return sum_of_norm_tfidfs

In [20]:
relevance = norm_tfidfs.map(lambda pair: (pair[0].split('@')[1], '{0}={1}'.format(pair[0].split('@')[0], pair[1]))) \
                       .filter(lambda x: (x[1].split('=')[0]) in query_words) \
                       .groupByKey() \
                       .mapValues(sum_norm_tfidfs)
relevance.collect()

[('d8', 0.0482),
 ('d10', 0.0152),
 ('d2', 0.0183),
 ('d4', 0.0582),
 ('d5', 0.0239),
 ('d3', 0.0323),
 ('d1', 0.0189)]

# Stage 5
Top 3 documents are picked.

In [21]:
k = 3
relevance.takeOrdered(k, key = lambda x: -x[1])

[('d4', 0.0582), ('d8', 0.0482), ('d3', 0.0323)]