### 1.1 Load the Data

In [None]:
# Set Partitions to 8 (parallelism of 8 / multiple executors)
from pyspark.sql import SQLContext, Row

tech_text = sc.wholeTextFiles("/mnt/dataset/public/bbcnews/tech/",8).map(lambda (a,b): Row(title =a.replace('dbfs:/mnt/dataset/public/bbcnews/tech/',''), 
                                                                                           text=b) ).toDF(["doc","text"])

In [None]:
#Show
display(tech_text.selectExpr("text as doc","doc as text").limit(10))

### 1.2 Compute N (Number Of Documents in Corpus)
- This count is used in IDF computation.
- We do this first inorder to activate the caching of the RDD above. So that subsequent calls to the RDD can would be faster.

In [None]:
#Note the parallelism of 8
number_of_docs = tech_text.count()
number_of_docs

### 1.3 Compute Term Frequencies

In [None]:
# tokenize text to words.
import re
def tokenize(s):
    return re.split("\\W+", s.lower())


tokenized_text = tech_text.map(lambda (text,title): (title, tokenize(text)) )

In [None]:
#term frequencies in each document
term_frequency = tokenized_text.flatMapValues(lambda x: x).countByValue()
term_frequency.items()[:20]

### 1.4 Compute Document Frequency

In [None]:
#count how many documents a word appears in.
document_frequency = tokenized_text.flatMapValues(lambda x: x).distinct()\
                        .filter(lambda x: x[1] != '')\
                        .map(lambda (title,word): (word,title)).countByKey()
document_frequency.items()[:10]

### 1.5 Compute TF-IDF

In [None]:
import numpy as np
from __future__ import division
def tf_idf(N, tf, df):
    result = []
    for key, value in tf.items():
        doc = key[0]
        term = key[1]
        df = document_frequency[term]
        if (df>0):
              tf_idf = float(value)*np.log(number_of_docs/df)
        
        result.append({"doc":doc, "term":term, "score":tf_idf})
    return result

In [None]:
tf_idf_output = tf_idf(number_of_docs, term_frequency, document_frequency)
tf_idf_output[:10]

### 2. Performing Search

In [None]:
# The search Funtion

tfidf_RDD = sc.parallelize(tf_idf_output).map(lambda x: (x['term'],(x['doc'],x['score']) )) # the corpus with tfidf scores

def search(query, topN):
    tokens = sc.parallelize(tokenize(query)).map(lambda x: (x,1) ).collectAsMap()
    bcTokens = sc.broadcast(tokens)
  
    joined_tfidf = tfidf_RDD.map(lambda (k,v): (k,bcTokens.value.get(k,'-'),v) ).filter(lambda (a,b,c): b != '-' )
  
    scount = joined_tfidf.map(lambda a: a[2]).aggregateByKey((0,0),
    (lambda acc, value: (acc[0] +value,acc[1]+1)),
    (lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1])) )
  
    scores = scount.map(lambda (k,v): ( v[0]*v[1]/len(tokens), k) ).top(topN)
  
    return scores

In [None]:
search('Ink helps drive democracy in Asia The Kyrgyz Republic, a small, mountainous state of the former Soviet republic, is using invisible ink and ultraviolet readers in the country',5 )

- Source: https://www.linkedin.com/pulse/understanding-tf-idf-first-principle-computation-apache-asimadi/
- And: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6052175677058526/3537626382528910/5364082293869370/latest.html