In [1]:
from pyspark import RDD
import json
from pyspark.mllib.feature import IDF
import numpy as np
from pyspark.mllib.linalg import Vectors

In [2]:
class StoringTF(object):
    def fit(self, vacabulary):
        self.word_idx = vacabulary.distinct().zipWithIndex().collectAsMap()
        self.numFeatures = len(self.word_idx)
        
    def indexOf(self, term):
        """ Returns the index of the input term. """
        return self.word_idx[term]

    def transform(self, document):
        """
        Transforms the input document (list of terms) to term frequency
        vectors, or transform the RDD of document to RDD of term
        frequency vectors.
        """
        if isinstance(document, RDD):
            return document.map(self.transform)

        freq = {}
        for term in document:
            i = self.indexOf(term)
            freq[i] = freq.get(i, 0) + 1.0
        return Vectors.sparse(self.numFeatures, freq.items())

In [12]:
raw_data = sc.textFile('hdfs://master:54310/exp_2/ml_data'). \
    map(lambda line: json.loads(line))

In [13]:
word_idx = raw_data.flatMap(lambda doc: set(doc['Features'])).distinct().zipWithIndex().collectAsMap()

In [14]:
word_idx_inv = dict([(v,k) for k,v in word_idx.items()])

In [6]:
with open('/home/hadoop/data/w_i_ex_2.json', 'w+') as fp:
    json.dump(word_idx, fp)
with open('/home/hadoop/data/w_i_inv_ex_2.json', 'w+') as fp:
    json.dump(word_idx_inv, fp)

In [4]:
with open('/home/hadoop/data/w_i_ex_2.json', 'r') as fp:
    word_idx = json.load(fp)
with open('/home/hadoop/data/w_i_inv_ex_2.json', 'r') as fp:
    word_idx_inv = json.load(fp)

In [15]:
word_idx_br = sc.broadcast(word_idx)
word_idx_inv_br = sc.broadcast(word_idx_inv)

In [16]:
def vect_f(doc: dict):
    doc['Features'] = [word_idx_br.value[w] for w in doc['Features']]
    return doc

vect_raw_data = raw_data.map(vect_f)

In [20]:
not_excl_unigramms_br = sc.broadcast(set(vect_raw_data.flatMap(lambda x: set(x['Features'])). \
    map(lambda f: (f, 1)). \
    reduceByKey(int.__add__). \
    filter(lambda x: x[1] > 14).map(lambda x: x[0]).collect()))

In [22]:
def filter_unigramms(doc: dict):
    unigramms = [word_idx_inv_br.value[ug] for ug in doc['Features'] if ug in not_excl_unigramms_br.value]
    return {'Id': doc['Id'], 'Unigramms': unigramms}

In [23]:
vect_raw_data.map(filter_unigramms).map(json.dumps).repartition(6).saveAsTextFile('hdfs://master:54310/exp_2/unigramms')

In [9]:
def get_bigramms(doc: dict):
    words = doc['Features']
    bigramms = [(w_1, w_2) for w_1, w_2 in zip(words[:-1], words[1:])]
    return {'Id': doc['Id'], 'Bigramms': bigramms}

def get_trigramms(doc: dict):
    words = doc['Features']
    trigramms = [(w_1, w_2, w_3) for w_1, w_2, w_3 in zip(words[:-2], words[1:-1], words[2:])]
    return {'Id': doc['Id'], 'Trigramms': trigramms}

In [10]:
bigramms = vect_raw_data.map(get_bigramms)

In [19]:
trigramms = vect_raw_data.map(get_trigramms)

In [27]:
trigramms.count()

112807

In [15]:
not_excl_bigramms = set(bigramms.flatMap(lambda doc: set(doc['Bigramms'])). \
    map(lambda f: (f, 1)). \
    reduceByKey(int.__add__). \
    filter(lambda fc: fc[1] > 69). \
    map(lambda fc: fc[0]).collect())

not_excl_bigramms_br = sc.broadcast(not_excl_bigramms)

In [16]:
len(not_excl_bigramms)

49054

In [20]:
not_excl_trigramms_br = sc.broadcast(set(trigramms.flatMap(lambda doc: set(doc['Trigramms'])). \
    map(lambda f: (f, 1)). \
    reduceByKey(int.__add__). \
    filter(lambda fc: fc[1] > 14). \
    map(lambda fc: fc[0]).collect()))

In [21]:
len(not_excl_trigramms_br.value)

38606

In [17]:
def filter_bigramms(doc: dict):
    doc['Bigramms'] = ['{} {}'.format(word_idx_inv_br.value[bg[0]], word_idx_inv_br.value[bg[1]])
                       for bg in doc['Bigramms'] if bg in not_excl_bigramms_br.value]
    
    return doc

bigramms.map(filter_bigramms). \
    map(json.dumps). \
    repartition(6). \
    saveAsTextFile('hdfs://master:54310/exp_2/bigramms')

In [22]:
def filter_trigramms(doc: dict):
    doc['Trigramms'] = ['{} {} {}'.format(word_idx_inv_br.value[tg[0]],
                                          word_idx_inv_br.value[tg[1]], word_idx_inv_br.value[tg[2]])
                       for tg in doc['Trigramms'] if tg in not_excl_trigramms_br.value]
    
    return doc

trigramms.map(filter_trigramms). \
    map(json.dumps). \
    repartition(6). \
    saveAsTextFile('hdfs://master:54310/exp_2/trigramms')

## Merge

In [24]:
unigramms = sc.textFile('hdfs://master:54310/exp_2/unigramms'). \
    map(json.loads).map(lambda x: (x['Id'], x['Unigramms']))
bigramms = sc.textFile('hdfs://master:54310/exp_2/bigramms'). \
    map(json.loads).map(lambda x: (x['Id'], x['Bigramms']))
trigramms = sc.textFile('hdfs://master:54310/exp_2/trigramms'). \
    map(json.loads).map(lambda x: (x['Id'], x['Trigramms']))

In [25]:
features = sc.union([unigramms, bigramms, trigramms]). \
    reduceByKey(list.__add__)

In [26]:
labels = raw_data.map(lambda x: (x['Id'],x['Labels']))
data = labels.join(features).map(lambda x: {'Id':x[0],'Labels':x[1][0],'Features':x[1][1]})
data.map(json.dumps).repartition(6). \
    saveAsTextFile('hdfs://master:54310/exp_2/clean_data')

In [3]:
data = sc.textFile('hdfs://master:54310/new_lables'). \
    map(json.loads)

In [4]:
top_1000_labels = dict(data.flatMap(lambda x: x['Labels']). \
    map(lambda l: (l, 1)). \
    reduceByKey(int.__add__). \
    sortBy(lambda lc: lc[1], ascending=False). \
    take(800))

top_1000_labels_br = sc.broadcast(top_1000_labels)

In [5]:
def filter_labels(doc: dict):
    doc['Labels'] = [l for l in doc['Labels'] if l in top_1000_labels_br.value]
    return doc

In [6]:
clean_data = data.map(filter_labels).filter(lambda x: len(x['Labels']) > 2)

In [7]:
documents = clean_data.map(lambda x: x['Features'])
vocabulary = documents.flatMap(lambda d: d).distinct()
storingTF = StoringTF()
storingTF.fit(vocabulary)
tf = storingTF.transform(documents)

In [8]:
idf = IDF(minDocFreq=3).fit(tf)
tfidf = idf.transform(tf)

In [9]:
t = tfidf.take(1)[0]
t.size

64781

In [10]:
labels = clean_data.map(lambda x: x['Labels'])

In [11]:
new_data = tfidf.zip(labels)

In [12]:
from collections import namedtuple, Counter
from pyspark.mllib.linalg import Vectors
import json
from pyspark import RDD
import numpy as np
sc.addPyFile('/home/hadoop/spark/lib/sparse.py')
sc.addPyFile('/home/hadoop/spark/lib/model.py')
from sparse import sparse_vector
from model import MLNaiveBayesModel
def shuffle_and_split(data: RDD, fold_n: int, seed = 0):
    fold_weights = [1 / fold_n] * fold_n
    return data.randomSplit(fold_weights)

def hold_out(sc, data: RDD, k: int, model_builder, metrics: list):
    folds = shuffle_and_split(data, k)
    for i in range(k):
        test = folds[i]
        training = sc.union(folds[:i] + folds[i + 1:])
        model = model_builder(training)
        model_broadcast = sc.broadcast(model)
        lables_and_predictions = test.map(lambda x: (x['lables'],
                                      model_broadcast.value.predict_all(x['features'])))
        for metric in metrics:
            metric.evaluate(lables_and_predictions)
    return metrics

class Metric:
    def __init__(self, name: str, verbose=False):
        self._name = name
        self._results = []
        self._verbose = verbose
        
    @property
    def name(self):
        return self._name
    
    @property
    def results(self):
        return self._results
    
    @property
    def avg(self):
        return np.average(self._results)
    
    def evaluate(self, lables, predictions):
        pass

class AccuracyMetric(Metric):
    def __init__(self, pred_n: int, intersect_n: int):
        self._pred_n = pred_n
        self._intersect_n = intersect_n
        super(AccuracyMetric, self).__init__(name='Accuracy', verbose=False)
        
    def evaluate(self, lables_and_predictions: RDD):
        TP = lables_and_predictions.map(lambda x:
                                    (set(x[0]), set([p for p,w in x[1][:self._pred_n]]))). \
                                    filter(lambda x:
                                           len(x[0].intersection(x[1])) >= self._intersect_n)
        accuracy = 100.0 * TP.count() / lables_and_predictions.count()
        if self._verbose:
            print('accuracy: ', accuracy)
        self._results.append(accuracy)
        return accuracy

from pyspark.mllib.classification import NaiveBayesModel
from pyspark.mllib.linalg import _convert_to_vector
from pyspark.mllib.linalg import Vectors
from pyspark import RDD
import numpy as np
import math


# RDD (labels) (features)
def train_model(data, l = 1.0):
    aggreagated = data.flatMap(lambda x: [(l, x['features']) for l in x['lables']]). \
        combineByKey(lambda v: (1, v),
                 lambda c, v: (c[0] + 1, c[1] + v),
                 lambda c1, c2: (c1[0] + c2[0], c1[1] + c2[1])). \
        sortBy(lambda x: x[0]). \
        collect()
    num_labels = len(aggreagated)
    num_documents = data.count()
    num_features = aggreagated[0][1][1].size
    labels = np.zeros(num_labels)
    pi = np.zeros(num_labels, dtype=int)
    theta = np.zeros((num_labels, num_features))
    pi_log_denom = math.log(num_documents + num_labels * l)
    i = 0
    for (label, (n, sum_term_freq)) in aggreagated:
        labels[i] = label
        pi[i] = math.log(n + l) - pi_log_denom
        sum_term_freq_dense = sum_term_freq.toarray()
        theta_log_denom = math.log(sum_term_freq.sum() + num_features * l)
        theta[i,:] = np.log(sum_term_freq_dense + l) - theta_log_denom
        i += 1  
    return MLNaiveBayesModel(labels, pi, theta)

In [13]:
labels_idx = dict([(l, i) for i,l in enumerate(top_1000_labels.keys())])
label_idx_br = sc.broadcast(labels_idx)

In [14]:
def transform(x):
    doc = x[0]
    labels = x[1]
    features = sparse_vector(doc.toArray())
    labels = [label_idx_br.value[l] for l in labels]
    return {'lables':labels, 'features':features}

In [3]:
class AccuracyMetricWithStat(Metric):
    def __init__(self, pred_n: int, intersect_n: int):
        self._pred_n = pred_n
        self._intersect_n = intersect_n
        self.stats = []
        super(AccuracyMetric, self).__init__(name='Accuracy', verbose=False)
    
    def evaluate(self, lables_and_predictions: RDD):
        TP = lables_and_predictions.map(lambda x:
                                    (set(x[0]), set([p for p,w in x[1][:self._pred_n]]))). \
                                    filter(lambda x:
                                           len(x[0].intersection(x[1])) >= self._intersect_n)
        accuracy = 100.0 * TP.count() / lables_and_predictions.count()
        
        stat = lables_and_predictions.map(lambda x:
                                    (set(x[0]), set([p for p,w in x[1][:self._pred_n]]))). \
                                    map(lambda x:
                                           (len(x[0].intersection(x[1])), len(x[0]))). \
            map(lambda x: _calc_sim(x[0], x[1], self._pred_n)).collect()
        self.stats.append(stat)
            
        if self._verbose:
            print('accuracy: ', accuracy)
        self._results.append(accuracy)
        return accuracy
    
    def _calc_sim(interselction, label_count, pred_n):
        return intersection / (label_count if label_count <= pred_n else pred_n)

NameError: name 'Metric' is not defined

In [15]:
vect_data = new_data.map(transform)

In [17]:
metric_3 = AccuracyMetric(3, 1)
metric_5 = AccuracyMetric(5, 1)

In [18]:
res = hold_out(sc, vect_data, 4, train_model, [metric_3, metric_5])

In [25]:
res[1].results

[87.56090285373372, 87.14550238768152, 86.57375601138483, 87.7895752895753]

In [16]:
res[0].results

[80.4757872045577, 79.31957186544342, 80.21952539228569, 80.44751464183811]

In [16]:
vect_data.cache()

PythonRDD[53] at RDD at PythonRDD.scala:43

In [17]:
res_3 = hold_out(sc, vect_data, 4, train_model, [AccuracyMetric(3, 1)])

In [18]:
res_3[0].results

[78.56844159464734, 79.10375732506033, 78.5027329966097, 78.61372344130965]

In [19]:
sum([78.56844159464734, 79.10375732506033, 78.5027329966097, 78.61372344130965]) / 4

78.69716383940676

In [20]:
sum([79.21065341889188, 78.67745339430179, 77.92459356624005, 79.13822819717339]) / 4

78.73773214415178