In [1]:
import numpy as np

# Part0 Config
#global variables
averages = []
col_sum = []
stds = []
N_COLS = 57
N_ROWS = 0

fileName ='../data/spam.data'

#########################################
#Part1 readFile()
#########################################   

#read each line and split it to 57 features and 1 label    
def inputMapper(line):
    cols = line.split()
    cols = [float(col) for col in cols]
    return(cols[0:57],cols[57])
    
def readFile(fileName):
    global N_ROWS
    rdd = sc.textFile(fileName)
    N_ROWS = rdd.count()
    return rdd.map(inputMapper).cache()
    
#########################################
#Part2 standardize
#########################################
def calculate_averages(n_cols,n_rows,col_sum):
    mean = []
    i = 0
    while(i < n_cols):
        mean.append (col_sum[i]/n_rows)
        i += 1
    return mean


def sumReducer(x,y):
    xf = x[0]
    yf = y[0]
    i = 0
    out = []
    while i < N_COLS:
        out.append(xf[i] + yf[i])
        i += 1
    return (out,0)

def residuesMapper(x):
    xf = x[0]
    xl = x[1]
    i = 0
    out =[]
    while i < N_COLS:
        out.append( np.math.pow(xf[i] - averages[i],2))
        i += 1
    return (out,xl)

def calculate_stds(n_cols,n_rows,sigmas):
    i = 0
    out =[]
    while i < n_cols:
        out.append(np.math.sqrt(sigmas[i]/n_rows))
        i += 1
    return out
    
def standardizeMapper(x):
    features = x[0]
    label = x[1]
    out0 = []
    i=0
    while i < N_COLS:
        f = features[i]
        t = (f - averages[i])/stds[i]
        out0.append(t)
        i += 1
    return (out0,label)
  


def standardize (RDD_Xy):
    global col_sum
    global averages
    global stds
    col_sum = RDD_Xy.reduce(sumReducer)[0]
    averages =  calculate_averages(N_COLS,N_ROWS,col_sum)
    sigmas = RDD_Xy.map(residuesMapper).reduce(sumReducer)[0]
    stds = calculate_stds(N_COLS,N_ROWS,sigmas)
    rdd = RDD_Xy.map(standardizeMapper)
    rdd.cache().count()
    return rdd


In [2]:
#########################################
#Part3 train
#########################################
#global variables for training
w = np.ones(N_COLS)
b = 0
dw = np.zeros(N_COLS)
db = 0
cost_list = []

def mlog(x):
    if(x == 0):
        return -100000
    return np.math.log(x)

def sigmoid(x):
    #deal with overflow
    if(x>700):
        return 1
    if(x<-700):
        return 0
    res = 1/(1 + np.math.exp(-x))
    return res
    
def calculate_t(x,w,b):
    t = 0
    i = 0
    while i < N_COLS :
        t += x[i]*w[i]
        i += 1
    t = t + b
    return t


def y_estimated_mapper(x):
    global w
    global b
    features = x[0]
    labels = x[1]
    z = sigmoid(calculate_t(features,w,b))
    return(features,z,labels)

def loss_mapper(RDD_Xzy):
    x = RDD_Xzy[0]
    z = RDD_Xzy[1]
    y = RDD_Xzy[2]
    return y*mlog(z)+(1-y)*mlog(1-z)

def loss_reducer(x,y):
     return x + y

def calculate_model_complexity(W,n_rows,lambda_reg):
    complexity = 0
    for w in W :
        complexity += w*w
    complexity = complexity * lambda_reg /n_rows
    return complexity


def calculate_cost(RDD_Xzy,lambda_reg):
    global w
    loss = RDD_Xzy.map(loss_mapper).reduce(loss_reducer)
    model_complexity = calculate_model_complexity(w,N_ROWS,lambda_reg)
    cost = -loss/N_ROWS + model_complexity
    return cost

def gradient_mapper(RDD_Xzy):
    x = RDD_Xzy[0]
    z = RDD_Xzy[1]
    y = RDD_Xzy[2]
    out = []
    i = 0
    while i < N_COLS:
        t = x[i]*(z-y)
        out.append(t)
        i += 1
    return out

def gradient_reducer(x,y):
    i = 0
    out = []
    while i < N_COLS:
        t= x[i] + y[i]
        out.append(t)
        i += 1
    return out

def calculate_gradients(RDD_Xzy,lambda_reg):
    global w
    out = []
    temp = RDD_Xzy.map(gradient_mapper).reduce(gradient_reducer)
    i = 0
    while i < N_COLS:
        out.append(temp[i] + lambda_reg * w[i]/ N_ROWS)
        i += 1
    return out


def calculate_db(RDD_Xzy):
    return RDD_Xzy.map(lambda x : x[1] - x [2]).reduce(lambda x,y: x+y)

def update_weights(w,b,dw,db,learning_rate):
    i = 0
    while i < N_COLS:
        w[i] = w[i] - learning_rate * dw[i]
        i += 1
    b = b - learning_rate * db
    return w,b

In [3]:
def train (RDD_Xy, iterations, learning_rate, lambda_reg):
    #initialize weights and bias
    dw=0
    db=0
    global w
    global b
    global N_ROWS
    N_ROWS = RDD_Xy.count()
    w = np.ones(N_COLS)
    b = 0
    for iteration in range(iterations):
        print('iteration',iteration)
        #propagation
        RDD_Xzy = RDD_Xy.map(y_estimated_mapper)
        #calculate Cost function
        cost = calculate_cost(RDD_Xzy,lambda_reg)
        cost_list.append(cost)
        #calculate accuracy
        #backPropagation
        dw = calculate_gradients(RDD_Xzy,lambda_reg)
        db = calculate_db(RDD_Xzy)
        #Update weights and bias with Gradient descent
        w,b = update_weights(w,b,dw,db,learning_rate)
    return w,b


In [4]:
def predict_test(x):
    if x >0.8:
        return 1
    return 0

def predictMapper(RDD_Xy):
    x,z,y = y_estimated_mapper(RDD_Xy)
    z = predict_test(z)
    return (z,y)

def predict(w_, b_, RDD_Xy):
    global w 
    global b
    w = w_
    b = b_
    return RDD_Xy.map(predictMapper)

def accuracyMapper(RDD_Zy):

    z = RDD_Zy[0]
    y = RDD_Zy[1]
    return np.math.fabs(z - y)

def accuracy(w_, b_, RDD_Xy):
    global w 
    global b
    w = w_
    b = b_
    n_rows = RDD_Xy.count()
    RDD_Zy = predict(w_,b_,RDD_Xy)
    sum_error = RDD_Zy.map(accuracyMapper).reduce(lambda x ,y : x + y)
    acc_avg = 1 - sum_error/n_rows
    return acc_avg


In [5]:
def shuffle_transfrom(RDD_Xy,k_fold):
    weights = np.ones(10)
    weights = weights.tolist()
    RDD_folds = RDD_Xy.randomSplit(weights,6)
    return RDD_folds


def get_block_data(RDD_folds,i):
    RDD_test = RDD_folds[i]
    RDD_train = None
    for k in range(len(RDD_folds)):
        if k != i:
            if RDD_train == None:
                RDD_train =  RDD_folds[k]
            else:
                RDD_train = RDD_train.union(RDD_folds[k])
    RDD_train.count()
    return RDD_test,RDD_train

In [6]:
def calculate_avgerage_accuracy(acus):
    aveg = 0
    for acu in acus:
        aveg += acu
    return aveg/len(acus)

def cross_validation(k_fold = 10,iterations=10, learning_rate=0.7, lambda_reg=0.4):
    RDD_Xy = readFile('../data/spam.data')
    RDD_SXy = standardize(RDD_Xy)
    RDD_folds = shuffle_transfrom(RDD_SXy,k_fold)
    acus = []
    for i in range(k_fold):
        print('cv',i)
        RDD_test,RDD_train = get_block_data(RDD_folds,i)
        print(RDD_test.count(),RDD_train.count())
        w,d = train(RDD_train,iterations,learning_rate,lambda_reg)
        acu = accuracy(w,b,RDD_test)
        acus.append(acu)
        print('accuracy',acu)
    avg_acu = calculate_avgerage_accuracy(acus)
    print(avg_acu)

In [7]:
cross_validation(10,30,0.7,0.4)

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 37848)
Traceback (most recent call last):
  File "/usr/lib/python3.4/socketserver.py", line 305, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.4/socketserver.py", line 331, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.4/socketserver.py", line 673, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 263, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 238, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 251, in authenticate_and_accum_updates
    received_token = self.rfile.read(len(auth_token))
TypeError: 

In [8]:
np.savetxt('p_cv_cost',cost_list)