# Parallel Logistic Regression Classifier on Spark

Author: **Giorgio Polla**  
Date: **27/11/2019**  

Implementation of a Logistic Regression classifier on Spark, with Python.  
The algorithm is tested using cross-validation on the _Spam_ dataset available at https://web.stanford.edu/~hastie/ElemStatLearn/data.html.


### Libraries and constants

Import of the following libraries:  
* `pyspark` to utilize *Spark*;  
* `time` to track the time performance of the implementation;  
* `numpy` to easily operate with number arrays;  
* `random` to provide the random functionalities used in the cross-validation dataset split;  
* `matplotlib` to plot graphs;  
* `sklearn` to easily compute some evaluation metrics.  

Moreover, some operative constants used in the following are defined, and finally the Spark context is intialised.

In [6]:
import pyspark
import time
import numpy as np
import random
import matplotlib.pyplot as plt
from random import randrange
from scipy.interpolate import make_interp_spline
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score

FILE_PATH = '../data/spam.txt'
OUT_PATH = '../out/'
IMG_FORMAT = 'png'
N_WORKERS = 8
EPSILON = 1e-10

In [7]:
if 'sc' in locals():
    print('Restart Spark Context')
    sc.stop()
else:
    print('Initialise Spark Context')
    
# conf = SparkConf().setAppName("lol").setMaster("local[8]")
# conf.set('spark.scheduler.mode', 'FAIR')
# conf.set('spark.dynamicAllocation.enabled', 'true')
# conf.set('spark.shuffle.service.enabled', 'true')
# sc = SparkContext(conf=conf) 

sc = pyspark.SparkContext('local[' + str(N_WORKERS) + ']')

Initialise Spark Context


### File reading
The spam dataset is loaded as an RDD, using the `textFile()` function.  
The RDD is then manipulated in order to have each record as a tuple **(X, y)** , where:  
**X** is an array containing the features of the example (57 float numbers);  
**y** is an integer containing the label of the example (0 or 1).  

The result is then quickly tested.

In [55]:
def read_file(file_path):
    rdd = sc.textFile(file_path, N_WORKERS)
    
    rdd = rdd.map(
        lambda x: (
            np.array([float(el) for el in x[:-1].split()]),
            int(x[-1])
        )
    )
    
    return rdd

In [65]:
rdd = read_file(FILE_PATH)
rdd.take(1)[0][0][56] # test -> 278.0

278.0

### Standardisation
Each row of the RDD is standardised; this is done by first obtaining the mean for each column, then obtaining the standard deviation for them, and finally applying the standardisation formula.  

The result is then again quickly tested.

In [66]:
def standardise(rdd):    
    n_rows = rdd.count()
    
    col_sum = rdd.map(
        lambda x:
            x[0]
    ).reduce(
        lambda x, y:
            x + y
    )
    mean = col_sum / n_rows
    
    variance = rdd.map(
        lambda x: np.square(x[0] - mean)
    ).reduce(
        lambda x, y:
            x + y
    )
    std_dev = np.sqrt(variance / n_rows)
    
    rdd = rdd.map(
        lambda x: (
            np.array([1] + list((x[0] - mean) /  std_dev)),
            x[1]
        )
    )
    
    return rdd

In [67]:
rdd = standardise(rdd)
rdd.first()[0][57] # test -> -0.008724

-0.008724133882501232

### Training
The training phase follows the typical Logistic Regression fashion, with some adjustments to exploit the Spark parallelisation.  
It is worth noting that the notation is consistent, and in particular:  
* **rdd** is an RDD containing the records on which to train the model;
* **x** is a data point feature array (57 float numbers);  
* **y** is a data point label (integer, either 0 or 1);  
* **b** is the regression bias term (float number);  
* **w** is the regression weights array (57 float numbers);  
* **p** is the predicted class probability for of data point (float number between 0 and 1);

In the first section there are the support functions for the proper training phase.  
* `sigmoid()` simply represents the Sigmoid function;  
* `predict_probability()` calculates and returns the predicted class probability **y** for each record, exploting the `sigmoid()` function;  
* `cross_entropy()` is the Cross Entropy function, utilised to obtain the training error of the model;  
* `gradient()` calculates the gradient for the bias term (**gradient_b**),  the gradient for the weights, in a single array (**gradient_w**), and the sum of the cross entropy error for each record.

In [84]:
def sigmoid(z):
    return 1 / (1 + np.exp(-z))


def predict_probability(x, ws):
    z = np.dot(x, ws[1:]) + ws[0]
    return sigmoid(z)


def cross_entropy(p, y):    
    if y == 1:
        error_loss = -np.log(p + EPSILON)
    else:
        error_loss = -np.log(1 - p + EPSILON)
    
    return error_loss


def g_help(x, ws):
    p = predict_probability(x[0][1:], ws)
    
    g_part = (p - x[1]) * x[0]
    ce_part = cross_entropy(p, x[1])
    
    return g_part, ce_part


def gradient(rdd, ws):
    gradient_ws, error_loss = rdd.map(
        lambda x: 
            g_help(x, ws)
    ).reduce(
        lambda x, y: (
            x[0] + y[0],
            x[1] + y[1]
        )
    )
    
    return gradient_ws, error_loss

In [88]:
b, w, _, _ = train(rdd,
                   iterations=100,
                   learning_rate=10,
                   lambda_reg=1
                  )

a = evaluate(rdd, b, w, plot=False)['Accuracy']
a

It.    0	|	Loss: 0.6931	|	Time: 0.32 s
It.   20	|	Loss: 0.2189	|	Time: 2.97 s
It.   40	|	Loss: 0.2148	|	Time: 5.97 s
It.   60	|	Loss: 0.2132	|	Time: 8.83 s
It.   80	|	Loss: 0.2124	|	Time: 11.62 s

Total time: 14.36 s
Iters frequency: 6.97 Hz



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 929.0 failed 1 times, most recent failure: Lost task 0.0 in stage 929.0 (TID 7131, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-80-5a3bf784263a>", line 12, in <lambda>
TypeError: predict_probability() takes 2 positional arguments but 3 were given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-80-5a3bf784263a>", line 12, in <lambda>
TypeError: predict_probability() takes 2 positional arguments but 3 were given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


The train function considerds the additional parameters:  
* **iterations**, the number of iterations to repeat in the training phase;
* **learning_rate**, the learing rate in the training phase;
* **lambda_reg**, the regularization parameter lambda, used to control the Ridge-like regularization adopted;
* **verbose**, controlling whether or not to stamp additional information.  

The training phase proceeds inside the main iteration loop by calculating each time the gradient for the bias **b** and the weights **w**, applying the gradient descent technique with appropriate regularization, and recording the total loss (cross-entropy loss **err_loss** plus regularization loss **reg_loss**).  
At the end of the training phase the function returns the trained bias and weights, as well as the array **loss_history** containing the records of the total loss during the whole process.

In [79]:
def train(rdd, iterations=10, learning_rate=10, 
          lambda_reg=0.1, verbose=True):
    start_time = time.time()
    n_rows = rdd.count()
    n_features = len(rdd.first()[0])
    ws = np.zeros(n_features)
    alpha = learning_rate / n_rows
    
    loss_history = []
    
    for it in range(iterations):
        gradient_ws, err_loss = gradient(rdd, ws)
        
        reg_loss = lambda_reg * np.sum(np.square(ws[1:])) / 2
        regularization = np.array([0] + list(lambda_reg * ws[1:]))
        
        ws = ws - alpha * (gradient_ws + regularization)
        
        total_loss = (err_loss + reg_loss) / n_rows
        loss_history.append(total_loss)
        
        if verbose and it % (iterations / 5) == 0:
            print("It. %4d\t|\tLoss: %0.4f\t|\tTime: %0.2f s" %  
                  (it, total_loss, (time.time() - start_time)))
    total_time = time.time() - start_time
    if verbose:
        print('\nTotal time: %0.2f s\nIters frequency: %0.2f Hz' % 
              (total_time, iterations / total_time))
    print()
    return b, w, loss_history, total_time

### Performance Evaluation
The evaluation is entirely managed by the follwing two functions, following the usual parameter naming conventions.  
* `classify()` is a support function that returns wether a data point has been classified as a _True Positve_ (**tp**), a _True Negative_ (**tn**), a _False Negative_ (**fn**) or a _False Positve_ (**fp**);  
* `evaluate()` computes the traditional evaluation metric for classification, and in particular the **Accuracy**, **Precision**, **Recall**, **Specificity**, **F1-Score**, final **Cost Error**, area under the ROC curve (**AUC**) and precision-recall score (**P-R Score**), and returns them packed in a dictionary. If the parameter **plot** is positively flagged, the two plots of the ROC and precision-recall curves are displayed.

In [80]:
def classify(p, y, threshold=0.5):
    if (p >= threshold) == y:
        return 'tp' if y==1 else 'tn'
    else:
        return 'fn' if y==1 else 'fp'
            
        
def evaluate(rdd, b, w, loss_history=[None], plot=True):
    
    rdd_pred = rdd.map(
        lambda x: (
            predict_probability(x[0], b, w),
            x[1]
        )
    )
    
    predictions = rdd_pred.map(
        lambda x:
            x[0]
    ).collect()
    
    corrects = rdd_pred.map(
        lambda x:
            x[1]
    ).collect()
    
    c_m = rdd_pred.map(
        lambda x: (
            classify(x[0], x[1]),
            1
        )
    ).reduceByKey(
        lambda x, y:
            x+y
    )
    c_m = dict(c_m.collect())
    
    roc1, roc2, _ = roc_curve(corrects, predictions)
    
    results = {}
    
    results['Confusion Matrix'] = c_m
    
    results['Accuracy'] = (c_m['tp'] + c_m['tn']) \
        / (c_m['tp'] + c_m['tn'] + c_m['fp'] + c_m['fn'])
    
    results['Precision'] = c_m['tp'] / (c_m['tp'] + c_m['fp'])
    
    results['Recall'] = c_m['tp'] / (c_m['tp'] + c_m['fn'])
    
    results['Specificity'] = c_m['tn'] / (c_m['tn'] + c_m['fp'])
    
    results['F1-Score'] = 2 * results['Precision'] * results['Recall'] \
        / (results['Precision'] + results['Recall'])
    
    results['Cost Error'] = loss_history[-1]
    
    results['AUC'] = auc(roc1, roc2)
    
    results['PR-Score'] = average_precision_score(corrects, predictions)
    
    if plot:
        plt.title('ROC Curve')
        plt.plot(roc1, roc2, color='darkorange', 
                 label='ROC curve (area %0.2f)' % results['AUC'])
        plt.plot([0, 1], [0, 1], color='navy', linestyle = '--')
        plt.legend(loc='lower right')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.show()

        pr1, pr2, _ = precision_recall_curve(corrects, predictions)

        plt.title('Precision-Recall Curve')
        plt.step(pr2, pr1, color='black', alpha=0.2,
                 label='PR curve (area %0.2f)' % results['PR-Score'])
        plt.fill_between(pr2, pr1, alpha=0.2, color='black')
        plt.legend(loc='lower right')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.show()
    
    return(results)

### Cross-Validation
The traditional cross-validation technique is here implemented, with some tweakings to exploit the Spark parallelisation.  
* `transform_c_v()` transforms the RDD to allow the cross-validation split: each record is randomly assigned to one fold (which range depends on the **folds** number chosen) by adding a label assigned by the `randrange()` functions;  
* `get_block_data()` takes the original train RDD and splits it in **rdd_train** and **rdd_test** according to the labels of each element and the training fold extracted **fold_train**;

In [81]:
def transform_cv(rdd, folds):
    random.seed()
    rdd_cv = rdd.map(
        lambda x: (
            x[0],
            x[1],
            randrange(folds)
        )
    )
    return rdd_cv


def get_block_data(rdd, fold_train):
    rdd_train = rdd.filter(
        lambda x:
            x[2] != fold_train
    ).map(
        lambda x: (
            x[0],
            x[1]
        )
    )
    
    rdd_test = rdd.filter(
        lambda x:
            x[2] == fold_train
    ).map(
        lambda x: (
            x[0],
            x[1]
        )
    )
    return rdd_train, rdd_test

The proper cross-validation process is here executed: given an RDD and a number of folds **folds** in which to split the dataset (and the usual `train()` parameters, to pass to that function when called), it iterates the training and testing phases selecting each time the proper **rdd_test** and **rdd_train** split, according to the appropriate training fold **fold** selected.  

The accuracy of the trained model (obtained using the previously seen evaluating functions) is recorded for each fold iteration and then finally averaged in **average_accuracy**.  
The implementation also provides additional information about the time elapsed and the fold or training iteration frequency in the computation: these information can be displayed or not by acting on the **verbose** and **verbose_train** flags (respectively regarding information about the cross-validation process and the specific training iterations).

In [82]:
b, w, _, _ = train(rdd,
                   iterations=100,
                   learning_rate=10,
                   lambda_reg=1
                  )

a = evaluate(rdd, b, w, plot=False)['Accuracy']
a

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 514.0 failed 1 times, most recent failure: Lost task 5.0 in stage 514.0 (TID 3851, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/rdd.py", line 839, in func
    initial = next(iterator)
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-78-fc08e8ae9810>", line 31, in <lambda>
  File "<ipython-input-78-fc08e8ae9810>", line 20, in g_help
  File "<ipython-input-78-fc08e8ae9810>", line 6, in predict_probability
IndexError: invalid index to scalar variable.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/rdd.py", line 839, in func
    initial = next(iterator)
  File "/home/giorgio/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-78-fc08e8ae9810>", line 31, in <lambda>
  File "<ipython-input-78-fc08e8ae9810>", line 20, in g_help
  File "<ipython-input-78-fc08e8ae9810>", line 6, in predict_probability
IndexError: invalid index to scalar variable.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [16]:
def cross_validation(rdd, folds=10, iterations=20,
                     learning_rate=10, lambda_reg=0.1,
                     verbose=True, verbose_train=False):
    start_time = time.time()
        
    total_accuracy = 0
    rdd_cv = transform_cv(rdd, folds)

    for fold in range(folds):
        rdd_train, rdd_test = get_block_data(rdd_cv, fold)
        
        if verbose:
            tot_train = rdd_train.count()
            tot_test = rdd_test.count()
            perc_test = tot_test / (tot_test + tot_train) * 100
            print('Fold %2d of %2d' % (fold + 1, folds))
            print('Train %4d\tTest %4d\t|\t%0.1f %% test' %
                  (tot_train, tot_test, perc_test)
            )
        
        b, w, loss_history, _ = train(
            rdd_train,
            iterations=iterations,
            lambda_reg=lambda_reg,
            verbose=verbose_train
        )
        
        results = evaluate(rdd_test, b, w, plot=False)
        accuracy = results['Accuracy']
        total_accuracy += accuracy
        if verbose:
            print('Accuracy: %0.4f\n' % accuracy)

    average_accuracy = total_accuracy / folds
    total_time = time.time() - start_time
    if verbose:
        print('Average accuracy: %0.4f' % average_accuracy)
        print('Total time: %0.2f s\nFold frequency: %0.2f Hz' % 
              (total_time, (folds / total_time))
        )
          
    return average_accuracy, total_time

In [17]:
avg_acc, tot_time = cross_validation(rdd, folds=10, iterations=20, 
                 learning_rate=10, lambda_reg=0.001,
                 verbose_train=False)

Fold  1 of 10
Train 4131	Test  470	|	10.2 % test

Accuracy: 0.9234

Fold  2 of 10
Train 4205	Test  396	|	8.6 % test

Accuracy: 0.9318

Fold  3 of 10
Train 4044	Test  557	|	12.1 % test

Accuracy: 0.9102

Fold  4 of 10
Train 4170	Test  431	|	9.4 % test

Accuracy: 0.9142

Fold  5 of 10
Train 4209	Test  392	|	8.5 % test

Accuracy: 0.9184

Fold  6 of 10
Train 4099	Test  502	|	10.9 % test

Accuracy: 0.9382

Fold  7 of 10
Train 4148	Test  453	|	9.8 % test

Accuracy: 0.9183

Fold  8 of 10
Train 4175	Test  426	|	9.3 % test

Accuracy: 0.9131

Fold  9 of 10
Train 4052	Test  549	|	11.9 % test

Accuracy: 0.9271

Fold 10 of 10
Train 4176	Test  425	|	9.2 % test

Accuracy: 0.9153

Average accuracy: 0.9210
Total time: 32.02 s
Fold frequency: 0.31 Hz


In [14]:
def experiment():
    learning_rate_list = [0.1, 1.0, 10.0, 100.0]
    reg_list = [0.01, 0.1, 1.0]    
    res = []
    i = 0
    
    for lr in learning_rate_list:
        for reg in reg_list:
            acc, _ = cross_validation(rdd, folds=4, iterations=100,
                                     learning_rate = lr, lambda_reg = reg,
                                     verbose=False)
            res.append((acc, lr, reg))
            i += 1
            print('It %d of 16. Acc %0.3f\t|\t' % (i, acc))
    return res

In [15]:
r = experiment()

0.921583850931677
0.8818722139673105

0.9271575613618369
0.8966309341500766

0.9295987887963664
0.883453237410072

0.9310344827586207
0.8890479599141017

It 1 of 16. Acc 0.925	|	
0.9274809160305344
0.8901098901098901

0.92467332820907
0.8826118855465884

0.934748427672956
0.8960060286360211

0.9272175890826384
0.8836705202312138

It 2 of 16. Acc 0.924	|	
0.9296754250386399
0.8845588235294117

0.9301611665387567
0.885317750182615

0.9224204809930179
0.8807407407407407

0.9289533995416348
0.8941176470588236

It 3 of 16. Acc 0.923	|	
0.9243421052631579
0.8808777429467085

0.9287856071964018
0.893294881038212

0.9265246142542248
0.889280677009873



KeyboardInterrupt: 

In [None]:
def experiment():
    learning_rate_list = [10]
    reg_list = [0.01, 0.1, 1.0]    
    res = []
    i = 0
    
    for lr in learning_rate_list:
        for reg in reg_list:
            train(rdd, iterations=100, learning_rate= lr, lambda_reg=reg, verbose=False)
    return res

In [None]:
res_a = []
res_lr = []
for i in r:
    if i[2] == 0.01:
        res_lr.append(i[1])
        res_a.append(i[0])
plt.plot(res_lr, res_a)
plt.xscale('log')

r

### Performance and Speed Up Curves
The main code below re-runs multiple times the main custering process for a standard of 10 initialisations, recording the time required with a different number of workers assigned, while maintaining a total of 8 partitions in the **rdd**.

In [None]:
worker_list = list(range(1, N_WORKERS + 1))
time_list = []
n_folds = 10

for workers in worker_list:
    print('Starting with %d worker(s)' % workers, end ='')
    if 'sc' in locals():
        sc.stop()
    sc = pyspark.SparkContext('local[' + str(workers) + ']')

    rdd = read_file(FILE_PATH)
    # rdd = rdd.sample(True, 10)
    rdd = standardise(rdd)

    _, _, _, tmp_time = train(
        rdd,
        iterations=10,
        verbose=False
    )
    
    time_list.append(tmp_time)
    print('\t|\tFinished in %0.2f s\n-----' % tmp_time)
print('Ended.')

This functions plots and eventually saves the performance curve, given the array of times required previously obtained. The parameter **interpolate** allows to manage the line interpolation.  

The function is then executed.

In [None]:
time_list

In [None]:
def performance_curve(time_array, n_init, save=False, interpolate=False):
    t = np.array(time_array.copy())
    x_axis = [x for x in range(1, N_WORKERS + 1)]
    
    if interpolate: 
        spl = make_interp_spline(x_axis, t, k=3)
        x_axis = np.linspace(1, N_WORKERS, 300)
        t = spl(x_axis)

    plt.style.use('seaborn-darkgrid')
    plt.plot(x_axis, t, linewidth=2)
    plt.gcf().subplots_adjust(bottom=0.1)
    plt.title('Performance Curve')
    plt.xlabel('Workers')
    plt.ylabel('Time (s)')
    if save:
        if interpolate:
            name = 'performance_curve_int.'
        else:
            name = 'performance_curve.'
        plt.savefig(OUT_PATH + str(n_init)  + 'it-' + name + IMG_FORMAT)
    plt.show()

In [None]:
performance_curve(time_list, n_folds)

This functions plots and eventually saves the speed up curve, given the array of times required previously obtained. The parameter **interpolate** allows to manage the line interpolation.

The function in then run.

In [None]:
def speed_up_curve(time_array, n_init, save=False, interpolate=False):
    t = [time_array[0]/x for x in time_array]
    x_axis = [x for x in range(1, N_WORKERS + 1)]
    
    if interpolate: 
        spl = make_interp_spline(x_axis, t, k=3)
        x_axis = np.linspace(1, N_WORKERS, 300)
        t = spl(x_axis)

    plt.style.use('seaborn-darkgrid')
    plt.plot(x_axis, t, linewidth=2)
    plt.gcf().subplots_adjust(bottom=0.1)
    plt.title('Speed Up Curve')
    plt.xlabel('Workers')
    plt.ylabel('Time ratio')
    if save:
        if interpolate:
            name = 'speed_up_curve_int-'
        else:
            name = 'speed_up_curve-'
        plt.savefig(OUT_PATH + str(n_init)  + 'it-' + name + IMG_FORMAT)
    plt.show()

In [None]:
speed_up_curve(time_list, n_folds)