In [106]:
import string
import json
import re
import numpy as np
from nltk.stem.wordnet import WordNetLemmatizer
import pickle as pkl
from pyspark.mllib.feature import HashingTF, IDF, Word2Vec
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

In [2]:
data_raw = sc.textFile('s3n://[YOUR_AWS_ACCESS_KEY_ID]:[YOUR_AWS_SECRET_ACCESS_KEY]@sparkdatasets/news.txt')

In [5]:
data_raw.getNumPartitions()
data_raw = data_raw.repartition(4)
data_raw.getNumPartitions()

4

In [10]:
def extract_tuple(x, keys):
    x_dict = json.loads(x)
    return tuple(x_dict[k] for k in keys)

label_name_rdd = data_raw\
    .map(lambda x: extract_tuple(x, ['label', 'label_name']))
    
text_rdd = data_raw\
    .map(lambda x: extract_tuple(x, ['label', 'text']))


In [41]:
stopset = stopwords.words('english')
regex = re.compile('[%s]' % re.escape(string.punctuation))
stemmer = PorterStemmer()

preprocessed_text_rdd = text_rdd\
    .mapValues(lambda x: x.lower().encode('ascii', 'ignore'))\
    .mapValues(lambda x: regex.sub(' ', x))\
    .mapValues(lambda x: [word for word in nltk.word_tokenize(x) 
                    if word not in stopset])\
    .mapValues(lambda x: [stemmer.stem(w) for w in x])

In [45]:
vocab = preprocessed_text_rdd.flatMap(lambda x: x[1]).distinct().collect()
reverse_vocab = dict(zip(vocab, range(len(vocab))))

def get_tf(word_list):
    word_count = Counter(word_list)
    tf_vector = [0]*len(vocab)
    for word in word_count:
        tf_vector[reverse_vocab[word]] = word_count[word]
    return tf_vector

tf_rdd = preprocessed_text_rdd.mapValues(get_tf)

In [88]:
htf = HashingTF(10000)
word_vecs_rdd = preprocessed_text_rdd.mapValues(htf.transform).cache()

labeled_point_rdd = word_vecs_rdd.map(lambda (target, feature): LabeledPoint(target, feature))
labeled_point_rdd.setName('labeled_point').persist()

train_rdd, test_rdd = labeled_point_rdd.randomSplit([70, 30])

model = NaiveBayes.train(train_rdd)

y = np.array(test_rdd.map(lambda x: x.label).collect())
y_pred = np.array(test_rdd.map(lambda x: model.predict(x.features)).collect())
print (y == y_pred).mean()

In [107]:
data = sc.textFile('s3n://[YOUR_AWS_ACCESS_KEY_ID]:[YOUR_AWS_SECRET_ACCESS_KEY]@sparkdatasets/text8_lines')
word2vec = Word2Vec()
word2vec = word2vec.fit(preprocessed_text_rdd.map(lambda x: x[1]))

In [1]:
# vocab[100:400]

In [132]:
word2vec.transform('neurologist')

DenseVector([-0.0052, 0.0531, -0.0075, 0.004, -0.0089, 0.0484, 0.0201, 0.0092, 0.0032, -0.0249, -0.0277, 0.0174, -0.0896, -0.0166, 0.0233, -0.02, -0.0321, -0.0299, 0.042, 0.1446, 0.0206, 0.0009, 0.0218, -0.0082, -0.0165, -0.0187, -0.0502, -0.0431, 0.0476, 0.1121, -0.0419, 0.0265, -0.0564, 0.0503, 0.0781, -0.0062, -0.0451, 0.0308, 0.006, -0.0292, 0.0678, -0.0421, 0.0584, -0.035, 0.022, -0.024, 0.0134, -0.0717, -0.0062, -0.0593, 0.0607, -0.006, -0.0495, 0.0107, 0.0209, -0.0143, -0.0877, 0.0508, 0.0031, -0.0167, -0.0248, -0.0265, 0.013, -0.0132, -0.0197, -0.0167, 0.0526, 0.027, 0.0763, -0.046, 0.0436, 0.0112, -0.1059, 0.0299, 0.0222, -0.0504, 0.0137, -0.053, 0.1092, -0.0377, -0.0254, 0.0012, -0.1183, -0.0051, 0.0975, 0.0424, -0.0492, -0.0619, 0.0404, 0.0155, -0.0051, 0.0415, -0.0327, -0.0141, 0.0319, -0.0802, -0.1111, -0.0101, -0.0165, -0.0136])