In [1]:
import math, time, random
import numpy as np
from random import shuffle
from pyspark import SparkContext, SparkConf, rdd

In [2]:
file_object  = open('spam.data', 'r')
lines = file_object.readlines()
file_object.close()
    
total_size = len(lines)

conf = SparkConf().setAppName("Spam Filter").setMaster("local[1]").set("spark.hadoop.validateOutputSpecs", "false");
sc = SparkContext(conf=conf)

In [3]:
# Creating RDD
master_rdd = sc.parallelize(lines)
master_rdd = master_rdd.map(lambda x: [float(item) for item in x.split('\n')[0].split(" ")])

In [4]:
# Normalize values
def normalize_data(data):
    max_min = data.flatMap(lambda x: [ (index_key, x[index_key]) for index_key in range(len(x)-1)]) #Last position is label
    max__list = sorted(max_min.reduceByKey(lambda x,y: x if x > y else y).collect())
    min__list = sorted(max_min.reduceByKey(lambda x,y: x if x < y else y).collect())
    mean_list = sorted([ value[1]/data.count() for value in max_min.reduceByKey(lambda x,y: x + y).collect()])
    
    return data.map(lambda x: [(float(x[index]) - min__list[index][1])/(max__list[index][1] - min__list[index][1]) if index != len(x)-1 else x[index] for index in range(len(x))] )
    

    
master_norm_rdd = normalize_data(master_rdd)

In [5]:
# given a tuple of sub-rdds and the cross-validation iteration index,
#  this method returns a tuple containing training and validation rdds
def get_train_validation_rdds(sub_rdds, k, indices=list(range(0, 4))):
    
    # the validation set is the k-th sub-rdd
    validation_rdd = sub_rdds[indices.pop(k)]
    
    # initialize the train rdd with the first sub-rdd left
    train_rdd = sub_rdds[indices.pop(0)]
    
    # append all the remaining sub-rdds to the train-rdd
    for i in indices:
        train_rdd = train_rdd.union(sub_rdds[i])
    
    # save train and validation set in a file
    validation_rdd.saveAsTextFile('spam.validation' + str(k+1) + '.norm.data')
    train_rdd.saveAsTextFile('spam.train' + str(k+1) + '.data')
    
    validation_rdd = validation_rdd.cache()
    train_rdd = train_rdd.cache()
    
    return train_rdd, validation_rdd

In [6]:
import time

# divide the original rdd in non-test and test rdds
non_test_rdd, test_rdd = master_norm_rdd.randomSplit([0.8, 0.2])
non_test_rdd = non_test_rdd.cache()
test_rdd = test_rdd.cache()

# save test set in a file
test_rdd.saveAsTextFile('spam.test.set')

# divide the non-test rdd in 4 sub-rdds
sub_rdds = non_test_rdd.randomSplit([0.25, 0.25, 0.25, 0.25])

# k-fold iterations
for k in range(0, 4):
    # for every iteration get a different train and validation sets
    train_rdd, validation_rdd = get_train_validation_rdds(sub_rdds, k, indices=list(range(0,4)))

In [7]:
# x is the features vector without label
# w is the weights vector
# b is the bias
def predict(w, x, b):
    return (1 / (1 + math.exp(-(np.dot(w, x)+b))))

def get_cost_upd(x_y_yhat):
    x, y, yhat = x_y_yhat
    return y * math.log(yhat) + (1-y) * math.log(1-yhat)

def get_weight_upd(x_y_yhat, j):
    x, y, yhat = x_y_yhat
    return (yhat - y) * x[j]

In [8]:
train_rdd = train_rdd.cache()

# compute useful constants for further computations
m = train_rdd.count()
alpha = 0.1
lambdareg = 0 

# initialize the true labels vector
n_features = len(train_rdd.first()) - 1 # do not consider the true label
print("#Features: ", n_features)

# initialize the weights vector (one weight per feature) and bias
new_weights = np.zeros(n_features)
new_bias = 0

import time
start = time.time()
for epoch in range(400):
    
    weights = new_weights
    bias = new_bias

    #FIRST STEP: compute the predictions for the given weights
    xs_ys_yhats_rdd = train_rdd\
    .map(lambda x: (x[:-1], x[-1], predict(weights, x[:-1], bias)))\
    .cache()

    #SECOND STEP: compute the total cost for the computed predictions
    cost = xs_ys_yhats_rdd\
    .map(lambda x_y_yhat: get_cost_upd(x_y_yhat))\
    .reduce(lambda c1, c2: c1+c2)

    # (regularization)
    cost_reg_term = lambdareg/(2*m) + sum([w**2 for w in weights])
    cost = -1/m * cost + cost_reg_term
    
    if (epoch % 50 == 0)
        print("(", epoch, ") Cost: ", cost)

    #THIRD STEP: update all the weights simoultaneously
    # 3.1. get the updating term for all the weights
    weights_upds = xs_ys_yhats_rdd\
    .flatMap(lambda x_y_yhat: [(j, get_weight_upd(x_y_yhat, j))
                               for j in range(n_features)])\
    .reduceByKey(lambda u1, u2: u1+u2)\
    .sortByKey(True)\
    .map(lambda j_weightsumupds: - alpha / m * j_weightsumupds[1])\
    .collect()
    
    # 3.2. update the old weights (with regularization)
    weight_reg_term = (1 - alpha * lambdareg / m)
    new_weights = [weight * weight_reg_term + weight_upd 
                   for weight, weight_upd in zip(weights, weights_upds)]
    
end = time.time()
print()
print("Cost: ", cost)
print("Weights: ", weights)
print("> Total elapsed time: ", ((end-start)/60), "mins")



#Features:  57
( 0 ) Cost:  0.69314718056
( 1 ) Cost:  0.693033173087
( 2 ) Cost:  0.692944596023
( 3 ) Cost:  0.692881415797
( 4 ) Cost:  0.692843598882
( 5 ) Cost:  0.692831111799
( 6 ) Cost:  0.692843921114
( 7 ) Cost:  0.692881993441
( 8 ) Cost:  0.692945295442
( 9 ) Cost:  0.693033793822
( 10 ) Cost:  0.693147455336
( 11 ) Cost:  0.693286246784
( 12 ) Cost:  0.693450135014
( 13 ) Cost:  0.69363908692
( 14 ) Cost:  0.693853069442
( 15 ) Cost:  0.694092049569
( 16 ) Cost:  0.694355994336
( 17 ) Cost:  0.694644870823
( 18 ) Cost:  0.69495864616
( 19 ) Cost:  0.69529728752
( 20 ) Cost:  0.695660762128
( 21 ) Cost:  0.696049037251
( 22 ) Cost:  0.696462080207
( 23 ) Cost:  0.696899858357
( 24 ) Cost:  0.697362339112
( 25 ) Cost:  0.69784948993
( 26 ) Cost:  0.698361278313
( 27 ) Cost:  0.698897671814
( 28 ) Cost:  0.69945863803
( 29 ) Cost:  0.700044144606
( 30 ) Cost:  0.700654159235
( 31 ) Cost:  0.701288649655
( 32 ) Cost:  0.701947583654
( 33 ) Cost:  0.702630929063
( 34 ) Cost:  0

KeyboardInterrupt: 

In [85]:
sum([w**2 for w in weights])

6.657297925675477e-05

In [65]:


print(weights_upds)
#print(" 1st el (", type(f[0]), "): ", f[0])
#print(" 2nd el (", type(f[1]), "): ", f[1])
#print("Keys: ", len(set(weights_upds.keys().collect())))

[-0.11011574921452866, -0.08202888396314452, -0.2646418668477493, -0.007716386559429697, -0.14480315529986587, -0.07913141511726622, -0.06712535394819681, -0.041916883523459604, -0.07737248899535557, -0.06196593646912925, -0.10884987416294134, -0.2707127262802904, -0.0826885506083501, -0.028096957310303174, -0.04754759266570256, -0.05815633544087493, -0.09125081895242591, -0.09436598446613169, -0.4223064595109707, -0.022491868572623263, -0.35287101216875605, -0.027346082066765354, -0.09155955555383025, -0.03089527263324255, -0.13328976292967826, -0.07967519334763604, -0.1193444574469707, -0.07170660575625132, -0.03342924566348147, -0.09220464241546922, -0.026291505097520138, -0.05433271936024353, -0.025002606428841154, -0.05490243631059169, -0.02740413556437993, -0.06601693713837695, -0.09449137613593567, -0.004501834910314092, -0.03349729115736168, -0.06989412781137365, -0.03217378569361468, -0.042834758457191646, -0.06398311330810647, -0.015424224002466482, -0.06643515137504843, -0.0

In [12]:

.reduceByKey(lambda xj_y_yhat1, xj_y_yhat2: 
             get_weight_upd(xj_y_yhat1, xj_y_yhat2))\

.map(lambda j_weightsupds: - l_rate_over_size * j_weightsupds[1])\
.collect()

new_weights = [sum(_) for _ in zip(weights, weights_upds)]

#Labels:  2793
#Features:  57


In [49]:
len(train_rdd.first())

57