In [1]:
%matplotlib inline
#%load_ext autoreload
#%autoreload 2
%reload_ext autoreload
import numpy as np
import matplotlib.pyplot as plt
import math, sys, os
from numpy.random import randn
from sklearn.datasets import make_blobs
from copy import deepcopy

# setup pyspark for IPython_notebooks
spark_home = os.environ.get('SPARK_HOME', None)
data_home = os.environ.get('DATA_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Python version 2.7.10 (default, Jul 13 2015 12:05:58)
SparkContext available as sc, HiveContext available as sqlContext.


In [2]:
def add_paths_from_directory(paths,cwd):
    for f in os.listdir(cwd):
        if f.endswith(".txt"):
            paths.append(cwd + "/" + f )

def get_text_file_paths():
    paths = []
    add_paths_from_directory(paths, data_home + "/text/gutenberg")
    add_paths_from_directory(paths, data_home + "/text/movie_reviews/neg")
    add_paths_from_directory(paths, data_home + "/text/movie_reviews/pos")
    return paths


def clean_line(line):
    remove_chs = ["\n",".",",",";",":","/","!","[","]","?"]
    for ch in remove_chs:
        line = line.replace(ch,"")
    return line.lower()


def get_local_filename(file_path): 
    return file_path.split("/")[-1].replace(".txt","")


def tokenize_line(line, doc_name = False):
    line = clean_line(line)
    if len(line) == 0: 
        return []
    if doc_name:
        return [((doc_name,word),1) for word in line.split(" ")]
    else:
        return [(word,1) for word in line.split(" ")]


def tokenize_text_file(file_path, with_doc_name = False):
    tokens = []
    if with_doc_name:
        doc_name = get_local_filename(file_path)
    else:
        doc_name = False
    tokens = []
    for line in open(file_path,"r+"):
        tokens.extend(tokenize_line(line, doc_name))
    
    return tokens


def make_document_word_counter(file_path):
    counter_dict = {"word_counts":{},
                               "file_path": file_path,
                               "document_name": get_local_filename(file_path),
                               "total_count": 0.0,
                               "max_count": 0.0}
    return counter_dict


def count_word(counter_dict, word):
    if word in counter_dict["word_counts"]:
        counter_dict["word_counts"][word] += 1.0
    else:
        counter_dict["word_counts"][word] = 1.0
    counter_dict["total_count"] += 1.0
    counter_dict["max_count"] = max(counter_dict["max_count"], counter_dict["word_counts"][word])


def open_doc_count_words(file_path, counter_dict):
    for line in open(file_path, "r+"):
        line = clean_line(line)
        if len(line) == 0: 
            continue
        for word in line.split(" "):
            if word.isalpha():
                count_word(counter_dict, word)
    return counter_dict

    
def load_and_count_doc_words(file_path):
    return open_doc_count_words(file_path, make_document_word_counter(file_path))

def get_counts_only(A):
    return dict([(word,1) for word in A["word_counts"].items()])

def reduce_to_document_frequency(A, B):
    """ reduce function to get corpus counts """
    for word in A.keys():
        if word in B:
            B[word] += A[word]
        else:
            B[word] = A[word]
    return B

In [3]:
# Term Frequency Metrics

In [4]:
def tf_binary(count):
    if count > 0:
        return 1
    else:
        return 0

def tf_raw(doc_word_count):
    return float(doc_word_count)

def tf_log_norm(doc_word_count):
    return 1 + np.log(doc_word_count)

def tf_double_norm(doc_word_count, doc_max_word_count, scale = 0.5):
    return scale + ((1.0 - scale) * (doc_word_count / float(doc_max_word_count)))

In [5]:
# IDF metrics

In [6]:
def idf_unary():
    return 1

def idf(n_docs_with_word, N_docs):
    return np.log(N / float(n_docs_with_word))

def idf_smooth(n_docs_with_word, N_docs):
    return np.log(1 + (N / float(n_docs_with_word)))

def idf_max(n_docs_with_word, max_n_docs_with_word):
    return np.log(1 + (max_n_docs_with_word / float(n_docs_with_word)))

def idf_p(n_docs_with_word, N_docs):
    return np.log((N_docs - n_docs_with_word) / float(n_docs_with_word))

idf_funcs = {"idf_unary": idf_unary, "idf": idf, "idf_smooth": idf_smooth, "idf_max": idf_max, "idf_p": idf_p}

In [7]:
# read docs, count words, broadcast necessary variables

In [8]:
paths = get_text_file_paths()

N_docs = sc.broadcast(len(paths))

doc_word_counts_RDD = sc.parallelize(paths).map(load_and_count_doc_words).cache()

document_frequency_dict = sc.broadcast(doc_word_counts_RDD.map(get_counts_only).reduce(reduce_to_document_frequency))

words = sc.broadcast(document_frequency_dict.value.keys())

d = sc.broadcast(len(words.value))

max_n_docs_with_word = sc.broadcast(max(document_frequency_dict.value.values()))

In [9]:
print d.value

120661


In [None]:
# TF-IDF

In [41]:
def compute_tf_idf(doc_word_counter):
    if_idf = np.zeros(d.value)
    N_words = doc_word_counter["total_count"]
    doc_max_word_count = doc_word_counter["max_count"]

    for idx, word in enumerate(words.value):
        n_docs_with_word = document_frequency_dict.value[word]
        word_count = float(doc_word_counter["word_counts"][word]) if word in doc_word_counter["word_counts"] else 0.0
        # compute
        tf = tf_double_norm(word_count, doc_max_word_count, scale = 0.9)
        idf = idf_max(n_docs_with_word, max_n_docs_with_word.value)
        if_idf[idx] = tf * idf

    return if_idf

In [42]:
TF_IDF_vecs = doc_word_counts_RDD.map(compute_tf_idf).collect()
array = np.stack(TF_IDF_vecs)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 8.0 failed 1 times, most recent failure: Lost task 3.0 in stage 8.0 (TID 35, localhost): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
	at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
array[:,39430:]