# Yarowsky Algorithm Pyspark Implementation 

In [None]:
# import findspark, random
findspark.init("/u/cs451/packages/spark")

from pyspark import SparkContext, SparkConf
sc = SparkContext(appName="YourTest", master="local[2]", conf=SparkConf().set('spark.ui.port', random.randrange(4000,5000)))

In [2]:
from math import exp
from operator import add
import numpy as np
import re

In [3]:
dataset='bank_final.txt'

In [20]:
sents = sc.textFile(dataset).map(lambda x: (x[:-3],x[-1:]))\
.map(lambda x: (x[0],1 if x[1]=="+" else -1 if x[1]=='-' else 0))
sents_indexed = sents.zipWithIndex()
sents_train = sents_indexed.filter(lambda x: (x[0])[1] == 0 or x[1]%5!=0).map(lambda x: x[0])
sents_test = sents_indexed.filter(lambda x: (x[0])[1] != 0 and x[1]%5==0).map(lambda x: x[0])
model_path='model/group_x_model'
result_path='result/group_x_result'

In [None]:
def simple_tokenize(s):
    return re.findall(r"[a-z]+(?:'[a-z]+)?",s.lower())

In [None]:
def spamminess(f: list, w: dict):
    """compute spamminess of feature list f using weights w""" 
    score = 0
    for feature in f:
        score += w.get(feature,0)
    return score

In [9]:
def extractNGram(sents,m,N):
    def generate_ngrams_lc(sent):
        ngrams_list = []
        words_list = simple_tokenize(sent[0])
        for num in range(0, len(words_list)-N+1):
            ngram = ' '.join(words_list[num:num + N])
            ngrams_list.append(ngram)
        return (ngrams_list,sent[1])
    
    def helper(imp,i2):
        out = []
        lst = imp
        for string in lst:
            ele = ((string, i2), 1)
            out.append(ele)
        return out

    def helper2 (inp1, inp2):
        return (inp1[0]+inp2[0],inp1[1]+inp2[1])

    def helper3 (inp):
        p1=(inp[1])[0]+0.0
        n1=(inp[1])[1]+0.0
        p2 = Positives - p1
        n2 = Negatives - n1
        w1 = (p1+n1)/(Positives+Negatives)
        w2 = 1 - w1
        prob1 = p1/(p1+n1)
        prob2 = 0
        if p2!=0:
            prob2 = p2/(p2+n2)
        neg_entropy_1 = prob1*np.log10(prob1)/np.log10(2.0) + (1-prob1)*np.log10(1-prob1)/np.log10(2.0)
        neg_entropy_2 = prob1*np.log10(prob2)/np.log10(2.0) + (1-prob2)*np.log10(1-prob2)/np.log10(2.0)
        IG = w1 * neg_entropy_1 + w2 * neg_entropy_2
        if np.isnan(IG):
            IG = float('-inf')
        return (inp[0], IG)

    Positives = sents.filter(lambda x: x[1]>0).count()
    Negatives = sents.filter(lambda x: x[1]<0).count()
    interm = sents.filter(lambda x: x[1]!=0).map(generate_ngrams_lc)\
    .flatMap(lambda x: helper(np.unique(x[0]),x[1])).reduceByKey(add)\
    .map(lambda x: ((x[0])[0], (x[1], 0)) if (x[0])[1] >0 else ((x[0])[0], (0, x[1])))\
    .reduceByKey(helper2).map(helper3).top(m, key=lambda x: x[1])
    output= sc.parallelize(interm).map(lambda x: x[0]).zipWithIndex().collectAsMap()
    return output


In [10]:
def accuracy(f_classified, w):
    corrects = f_classified.map(lambda x: ('',1 if spamminess(x[0],w)*x[1]>0 else 0))\
    .reduceByKey(add).take(1)
    return (corrects[0])[1]/f_classified.count()

In [15]:

def trainer(f_classified, n_itr, alpha, delta,N):
    w={}
    for itr in range(0,n_itr):
        d_w = f_classified.map(lambda x: (x[0], 1 if x[1]>0 else 0))\
            .map(lambda x: (x[0],x[1],1/(1+exp(-spamminess(x[0],w)))))\
            .map(lambda x: (x[0],(x[1]-x[2])*alpha))\
            .flatMap(lambda x: map(lambda y: (y,x[1]), x[0]))\
            .reduceByKey(add)
        w = d_w.map(lambda x: (x[0],x[1]+w.get(x[0],0))).collectAsMap()
        s_d_w =  d_w.map(lambda x: ("",abs(x[1]))).reduceByKey(add).take(1)
        if (s_d_w[0])[1] < delta:
            break
    return w

In [42]:
def run (sents0, sents_test, model_path, result_path, save, N, m, niter, threshold, alpha, delta):
    #initialize
    n_unclassified = 0
    n_unclassified_new = sents0.filter(lambda x: x[1]== 0).count()
    itr = 0
    sents = sents0
    f_map = {}
    w = {}
    def generate_ngrams_lc(sent): #the goal is to general ngrams of the form ('ABC',0)
        ngrams_list = []
        words_list = simple_tokenize(sent[0])
        for num in range(0, len(words_list)-N+1):
            ngram = ' '.join(words_list[num:num + N])
            ngrams_list.append(ngram)
        return (ngrams_list,sent[1])
    f_map = extractNGram(sents,m,N)
    while(n_unclassified_new>0 and n_unclassified_new != n_unclassified):

        def tonums(interm): #the goal is to convert ngrams to numbers
            numlst = []
            strlst = interm[0]
            for ele in strlst:
                val = f_map.get(ele)
                numlst.append(val)
            numlst = [x for x in numlst if x is not None]
            return (numlst, interm[1])
        f_classified = sents.filter(lambda x: x[1]!=0).map(generate_ngrams_lc).map(tonums)
        w = trainer(f_classified, niter, alpha, delta,N) 
        train_acc =  accuracy(f_classified, w)

        def generate_string_ngrams(sent): #generate ngrams with original text in the front
            ngrams_list = []
            words_list = simple_tokenize(sent[0])
            for num in range(0, len(words_list)-N+1):
                ngram = ' '.join(words_list[num:num + N])
                ngrams_list.append(ngram)
            return (sent[0],ngrams_list,sent[1])

        def stringandnums(interm): #generate converted ngrams with text in the front
            numlst = []
            strlst = interm[1]
            for ele in strlst:
                val = f_map.get(ele)
                numlst.append(val)
            numlst = [x for x in numlst if x is not None]
            return (interm[0], numlst, interm[2])
        
        f_input = sents.map(generate_string_ngrams).map(stringandnums)
        
        def findscore (inp): #calculate the new label 
            label = inp[2]
            if label==0:
                score = spamminess(inp[1],w)
                if score > threshold:
                    label = 1
                elif score < -threshold:
                    label = -1
            return (inp[0], label)
        sents_new = f_input.map(findscore)
        
        #update
        n_unclassified = sents.filter(lambda x: x[1] == 0).count()
        n_unclassified_new = sents_new.filter(lambda x: x[1]== 0).count()
        
        itr = itr+1
        sents = sents_new
        
    f_test = sents_test.map(generate_ngrams_lc).map(tonums)
    #calculate test accuracy
    test_acc =  accuracy(f_test, w)
    #save model and file 
    if save:
        sents.map(lambda x: (x[0]+'+' if x[1]==1 else x[0]+'-' if x[1]==-1 else 0))\
        .saveAsTextFile(result_path)
        model = f_map.map(lambda x: (x[0], w.get(x[1],0.0))).saveAsTextFile(model_path)
    return test_acc


In [46]:
import itertools
#find the optimal parameters
def parameter_search(
    sents_train, sents_test, model_path, result_path,n_start, n_end, n_step, 
    m_start, m_end, m_step, niter_start, niter_end, niter_step, 
    threshold_start, threshold_end, threshold_step, 
    alpha_start, alpha_end, alpha_step,  
    delta_start, delta_end, delta_step):
    N_best = n_start
    m_best = m_start
    niter_best = niter_start
    threshold_best = threshold_start
    alpha_best = alpha_start
    delta_best = delta_start
    acc_best = 0.0

    for N, m, niter, threshold, alpha, delta in itertools.product(
        np.arange(n_start,n_end,n_step), np.arange(m_start,m_end,m_step),np.arange(niter_start,niter_end,niter_step),
    np.arange(threshold_start,threshold_end,threshold_step),np.arange(alpha_start,alpha_end,alpha_step),
        np.arange(delta_start,delta_end,delta_step)):
        acc = run(sents_train, sents_test, model_path, result_path, 0 ,N, m, niter, threshold, alpha, delta)
        if acc>acc_best:
            N_best = N
            m_best = m
            niter_best = niter
            threshold_best = threshold
            alpha_best = alpha
            delta_best = delta
            acc_best = acc
            
    return (N_best,m_best,niter_best,threshold_best,alpha_best,delta_best)


In [None]:
parameter_search(sents_train, sents_test, model_path, result_path, 1,6,1,100,1000,100,1,101,10,0.5,0.85,0.05,0.01,0.2,0.01,0.01,0.2,0.01)