# TF-IDF

In this task Hadoop Streaming is used to process Wikipedia articles dump (/data/wiki/en_articles_part).

The purpose of this task is to calculate tf*idf for each pair (word, article) from the Wikipedia dump. Apply the stop words filter to speed up calculations. Term frequency (tf) is a function depending on a term (word) and a document (article):

tf(term, doc_id) = Nt/N,

where Nt - quantity of particular term in the document, N - the total number of terms in the document (without stop words)

Inverse document frequency (idf) is a function depends on a term:

idf(term) = 1/log(1 + Dt),

where Dt - number of documents in the dataset with the particular term.

You can find more information here: https://en.wikipedia.xn--org/wiki/Tfidf-q82h but use just the formulas mentioned above.

Dataset location: /data/wiki/en_articles_part

Stop words list is in ‘/datasets/stop_words_en.txt’ file.

Format: article_id <tab> article_text

## Mapper

Idea is simple: create an array of words of each article. Going through the article, check do we have a word in array (using try-exception mechanism). If exists: increase its count in parallel array containing amount of counts. If it's a new word - add it. It's not the most optimal solution, because scanning through array require some extra time with the increasing of size of articles, but it allows us to calculate everything we need in 1 single run

In [None]:
%%writefile mapper.py
#!/usr/bin/env python2.7
import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

path = 'stop_words_en.txt'

def read_stop_words(file_path):
    return set(word.strip().lower() for word in open(file_path))

stop_words = read_stop_words(path)

words_in_article = []
cnt_in_article = []
id_in_list = 0
total_cnt_in_article = 0
word_cnt = 0

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue
    words_in_article = []
    cnt_in_article = []
    id_in_list = 0
    word_cnt = 0
    total_cnt_in_article = 0
    text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    for word in words:
        word = word.lower()
        if word in stop_words:
            continue  
        word_cnt = word_cnt + 1
        try:
            id_in_list = words_in_article.index(word.lower())
            cnt_in_article[id_in_list] = cnt_in_article[id_in_list] + 1
        except ValueError as e:
            words_in_article.append(word.lower())
            cnt_in_article.append(1)
            total_cnt_in_article = total_cnt_in_article + 1
    for i in range(0,len(words_in_article)):
        print "%s\t%f\t%d\t%f\t%d" % (words_in_article[i], float(cnt_in_article[i]), word_cnt, float(cnt_in_article[i]) / float(word_cnt), int(article_id)) 

## Reducer Script

Idea is simple: we sort out all articles by their word (so that, for example, all "labor" articles going one by one) and run through them counting amount of all of them and saving the result into mid-list

Once the word is over, we 1 more time run through saved list, calculate TF * IDF for each article separetely

In [None]:
%%writefile reducer.py
#!/usr/bin/env python2.7
import sys
import math

current_key = None
word_sum = 0
lines_list = []
IDF = float(0.0)
print "%s" % ("Start!")
articles_list = set()

for line in sys.stdin:
    try:
        word, cnt, all_cnt, tdf, article_id = line.strip().split('\t', 4)
        tdf = float(tdf)
        all_cnt = int(all_cnt)
    except ValueError as e:
        lines_list = []
        continue
    if current_key != word:
        if current_key:
            IDF = float(float(1.0) / float(math.log(1 + word_sum)))
            for i in range(0, len(lines_list)):
                w, c, all_, t, a = lines_list[i].strip().split('\t', 4)
                print "%s\t%f\t%f\t%f\t%d\t%d" % (w, float(c), float(t), IDF, int(a), word_sum) 
            lines_list = []
        word_sum = 0
        articles_list.clear()
        current_key = word
    word_sum += 1
    lines_list.append(line)
    articles_list.add(article_id)

if current_key:
    print "%s\t%d" % (current_key, word_sum)
    lines_list = []

In [None]:
%%writefile parser.py
#!/usr/bin/env python2.7
import sys

result=0.0

for line in sys.stdin:
    try:
        word, cnt, tf, idf, article_id, word_sum = line.strip().split('\t', 6)
        tf = float(tf)
        idf = float(idf)
        article_id = int(article_id)
    except ValueError as e:
        continue
    if word == 'labor' and article_id == 12:
        result = float(tf) * float(idf)

print >> sys.stdout, "%f" % result

## Script

In [None]:
%%bash

OUT_DIR="Week6_task1_"$(date +"%s%6N")
NUM_REDUCERS=8
LOGS="stderr_logs.txt"

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordCount" \
    -D mapreduce.job.reduces=1 \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D map.output.key.field.separator \
    -D mapreduce.partition.keycomparator.options=-k1 \
    -files mapper.py,reducer.py,/datasets/stop_words_en.txt \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR}  > /dev/null
    
hdfs dfs -cat ${OUT_DIR}/part-* > result.txt
cat result.txt | grep -w 'labor' | grep -w '12' | python2 parser.py