### Document Generaion
We download the text file 5000-8.txt from Dropbox provided in the lecture.

In [None]:
# hadoop

wget https://www.dropbox.com/s/2f3nt4rn7t4wee1/5000-8.txt

hdfs dfs -mkdir /user/hadoop/proj

hdfs dfs -mkdir /user/hadoop/proj/raw

hdfs dfs -put 5000-8.txt /user/hadoop/proj/raw

hdfs dfs -mkdir /user/hadoop/proj/input

### Document preparation
We split the text into words using regex and count the word occurrence in the documents.

In [None]:
# spark 
# generate 5 text files

doc = sc.textFile("hdfs:///user/hadoop/proj/raw/5000-8.txt")

# take 100 lines from the text, without replacement
doc = doc.takeSample(False, 100, 1)
doc = sc.parallelize(doc)
import re

def process(doc):
    doc = doc.lower()
    clean_doc = re.split("\\W+|_",doc)
    return filter(None,clean_doc)

vocab = doc.flatMap(lambda x: process(x))

In [None]:
# to test the scalability, we set the five documents with different size
# (100, 200, 400, 800, 1600 lines each) 
t1 = vocab.takeSample(True, 100, 1)
t2 = vocab.takeSample(True, 200, 2)
t3 = vocab.takeSample(True, 400, 3)
t4 = vocab.takeSample(True, 800, 4)
t5 = vocab.takeSample(True, 1600, 5)

In [None]:
sc.parallelize(t1).repartition(1).saveAsTextFile("hdfs:///user/hadoop/proj/t1.6")

sc.parallelize(t2).repartition(1).saveAsTextFile("hdfs:///user/hadoop/proj/t2.6")

sc.parallelize(t3).repartition(1).saveAsTextFile("hdfs:///user/hadoop/proj/t3.6")

sc.parallelize(t4).repartition(1).saveAsTextFile("hdfs:///user/hadoop/proj/t4.6")

sc.parallelize(t5).repartition(1).saveAsTextFile("hdfs:///user/hadoop/proj/t5.6")

exit()

In [None]:
# hadoop

hdfs dfs -mv /user/hadoop/proj/t1.6/part-00000 /user/hadoop/proj/input/d1.txt

hdfs dfs -mv /user/hadoop/proj/t2.6/part-00000 /user/hadoop/proj/input/d2.txt

hdfs dfs -mv /user/hadoop/proj/t3.6/part-00000 /user/hadoop/proj/input/d3.txt

hdfs dfs -mv /user/hadoop/proj/t4.6/part-00000 /user/hadoop/proj/input/d4.txt

hdfs dfs -mv /user/hadoop/proj/t5.6/part-00000 /user/hadoop/proj/input/d5.txt

### Compute the term frequency, document size and document frequency in Spark

In [None]:
# take all the text files in the folder
texts = sc.wholeTextFiles("hdfs:///user/hadoop/proj/input/")

# count the number of files
num_of_sets = texts.count()

# take the file name from the directory 
terms = texts.map(lambda (k, v): (k.split("/")[-1],v.strip().split()))

# for each word in each file count the frequency
term_occur = terms.flatMapValues(lambda x: x).countByValue()

# count the number of words for each file
term_size = terms.flatMapValues(lambda x: x).countByKey()

# count the number of files containing the word
doc_freq = terms.flatMapValues(lambda x: x).distinct().map(lambda (k,v): (v,k)).countByKey()

### Compute TF-IDF
We define a function that takes document number, Term Frequencies & Document Frequencies and returns TF-IDF scores.

In [None]:
import numpy as np

# define the function to calculate the tf_idf score
def tf_idf(num_of_sets, term_occur, term_size, doc_freq):
    result = []
    for key, value in term_occur.items():
        file = key[0]
        term = key[1]
        total = term_size[file]
        df = doc_freq[term]
        
        # we multiplied by 100000 because the value is very small
        # otherwise it returns all 0. 
        tf_idf = float(value*100000/total)*np.log(num_of_sets/df)
        result.append((term,file,tf_idf))
    return result

tfidf = tf_idf(num_of_sets, term_occur, term_size, doc_freq)