In [1]:
from pyspark import SparkContext
from operator import add
from math import log
import numpy as np

In [2]:
sc = SparkContext()

In [3]:
rdd = sc.textFile('./project2_demo.txt')

### Total Terms

In [4]:
def extract_terms(line):
    terms = line.split(' ')[1:]
    return terms

def filter_empty_strings(term):
    return term != ''

def extract_terms_with_doc_id(line):
    data = line.split(' ')
    doc_id, terms = data[0], data[1:]
    return [term + ",," + doc_id for term in terms if term != '']

def split_term_doc_id_pair(pair):
    term, doc_id = pair.split(",,")
    return (term, [doc_id])

## Word Count by Document based on doc id

In [5]:
frequency_by_document = rdd.flatMap(lambda line: [[word + "_" + line.split(' ')[0], 1] for word in line.split(' ')[1:] if word != ''])\
    .reduceByKey(lambda x, y: x + y)\
    .map(lambda x: [x[0].split("_")[1], [x[0].split("_")[0],  x[1]]])

## Term Frequency (First Hundred Entries)

In [6]:
term_frequency = rdd.map(lambda line: [line.split(' ')[0], len(line.split(' ')[1:])])\
                    .join(frequency_by_document)\
                    .map(lambda pair: [pair[1][1][0], [pair[0], pair[1][1][1], pair[1][0]]])

## Interdocument Frequency

### Total Documents

In [7]:
total_documents = rdd.count()

### IDF

In [8]:
word_counts = rdd \
    .flatMap(lambda line: [[term, 1] for term in line.split(' ')[1:] if term != ""]) \
    .reduceByKey(add) \
    .map(lambda x: [x[0], log(total_documents / x[1])])

## Combined TF IDF (RDDs)

In [9]:
def transform_tf_idf(joined_pair):
    term = joined_pair[0]
    idf = joined_pair[1][1]
    tf_doc = joined_pair[1][0][1]
    doc_total_terms = joined_pair[1][0][2]
    document_id = joined_pair[1][0][0]
    return [term, document_id , idf * (tf_doc / doc_total_terms)]

In [10]:
tf_idf_sparse_matrix = term_frequency.join(word_counts)\
    .map(transform_tf_idf)\
    .collect()

### Presennt mxn matrix by using dictionary with indices for document id and term index

Now you can just do indexing into a matrix with assignments based on the term and document id in the above

In [11]:
all_terms  = rdd.flatMap(extract_terms).filter(filter_empty_strings).distinct()

In [12]:
all_doc_ids = rdd.map(lambda line: line.split(' ')[0])

In [13]:
terms_map = all_terms.zipWithIndex().collectAsMap()

In [None]:
doc_map = all_doc_ids.zipWithIndex().collectAsMap()

In [None]:
def make_matrix(data, term_map, document_map):
    matrix = np.zeros(shape=(len(document_map), len(term_map)))
    for (term, doc_id, value) in data:
            doc_index = document_map[doc_id]
            term_index = term_map[term]
            matrix[doc_index][term_index] = value
    return matrix

In [None]:
mat = make_matrix(tf_idf_sparse_matrix, terms_map, doc_map)

In [None]:
mat = mat.transpose()

In [None]:
term = 'growth'
term_index = terms_map[term]

In [None]:
n = [np.sqrt(np.sum(np.square(mat[i]))) for i in range(len(terms_map))]

In [None]:
y = [np.multiply(mat[term_index], mat[i]) for i in range(len(n)) if i != term_index]

In [None]:
y = [/ (n[term_index] * n[other_term_index]) for other_term_index in range(len(n)) if n != term_index]