# **Semiparametric SVM training using subgradients in Spark **

#### bla, bla, bla. 

#### We will benchmark the algorithms with data files from UCI:

* **Ripley**, the Ripley dataset
* **Kwok**, the Kwok dataset
* **Twonorm**, the Twonorm dataset
* **Waveform**, the Waveform dataset
* **Covertype**, the Covertype dataset


In [1]:
# definir variable de usuario y ejecutar condicionalmente cualquier código que dependa del contexto de ejecución
# no borrar código, simplemente ejecutarlo o no con un "if"
# cada uno mantiene actualizada su parte del "if" y no toca la del otro

user = 'navia'
#user = 'roberto'
modeloSGMA_IRWLS = True

#modelo = 'hybrid' 
#modelo = 'kernelgrad' 
overwrite_results = False # overwrites results even when the result file exists, skips the execution otherwise

if user == 'roberto':
    # definir sc
    import findspark
    findspark.init()
    from pyspark import SparkConf, SparkContext
    conf = (SparkConf().setMaster("local[4]").setAppName("My app").set("spark.executor.memory", "2g"))
    sc = SparkContext(conf = conf)
    import common.lib.svm_utils as SVM_UTILS
    import common.lib.quadtree_utils as QUADTREE
    import numpy as np
    from pyspark.mllib.regression import LabeledPoint
    import pickle
    from math import sqrt
    %matplotlib inline
    # Se importan las funciones
    from common.lib.SGMAUtils import SGMA
    from common.lib.IRWLSUtils import IRWLS
    from common.lib.ResultsUtils import show_results



if user == 'navia':
    sc.addPyFile("file:///export/usuarios01/navia/spark/SVM_spark/common/lib/svm_utils.py")
    sc.addPyFile("file:///export/usuarios01/navia/spark/SVM_spark/common/lib/IRWLSUtils.py")   
    sc.addPyFile("file:///export/usuarios01/navia/spark/SVM_spark/common/lib/KernelUtils.py")   
    sc.addPyFile("file:///export/usuarios01/navia/spark/SVM_spark/common/lib/ResultsUtils.py")   
    sc.addPyFile("file:///export/usuarios01/navia/spark/SVM_spark/common/lib/SGMAUtils.py")   
    import svm_utils as SVM_UTILS
    from SGMAUtils import SGMA
    from IRWLSUtils import IRWLS
    from ResultsUtils import show_results
    import numpy as np
    from pyspark.mllib.regression import LabeledPoint
    import pickle
    from math import sqrt
    %matplotlib inline

    
if modeloSGMA_IRWLS:

    # Esta función habrá que moverla a una librería.
    # mllib tiene una función que hace esto pero por alguna razón luego va todo muy lento.
    
    from sklearn.datasets import load_svmlight_file
    from pyspark.mllib.regression import LabeledPoint   
    
    def loadFile(filename,sc,dimensions):
        X,Y = load_svmlight_file(filename,dimensions)
        X=X.toarray()
        return sc.parallelize(np.concatenate((Y.reshape((len(Y),1)),X),axis=1)).map(lambda x: LabeledPoint(x[0],x[1:]),12)
    
    # Se carga entrenamiento y test
    XtrRDD = loadFile('data/a9a',sc,123)
    XtstRDD = loadFile('data/a9a.t',sc,123)
    
    # Se declaran las variables
    datasetSize = XtrRDD.count()
    NC = 50
    sigma = np.sqrt(123)
    gamma = 1.0/(sigma*sigma)
    C = 1000
    samplingRate=min(1.0,1000.0/datasetSize)    
    
    # Se ejecuta el algoritmo
    Bases = SGMA(XtrRDD,NC,gamma,samplingRate)
    Pesos = IRWLS(XtrRDD,Bases,C,gamma)
    
    show_results(XtstRDD,Bases,Pesos,gamma)     
    

kdatasets = [1, 2, 3, 4]
folds = [1, 2, 3, 4, 5]

#Niter = 2
Nnodes = 32
Samplefraction = 0.05

Niter = 150
NCs = [5, 10, 25, 50, 100, 200]

#Niter = 300
#NCs = [50, 100, 200]


kdatasets = [1, 2, 3, 4]
folds = [1, 2, 3, 4, 5]
Niters = [50, 100, 200]
NCs = [5, 10, 25, 50, 100, 200]
modelos = ['hybrid', 'kernelgrad']
#modelos = ['kernelgrad']

for modelo in modelos:
    for kdataset in kdatasets:
        for kfold in folds:
            for Niter in Niters:
                x_tr, y_tr, x_val, y_val, x_tst, y_tst, name_dataset = SVM_UTILS.load_data(kdataset, kfold)
                NI = x_tr.shape[1]
                sigma = sqrt(NI)
                C = 10.0
                XtrRDD = sc.parallelize(np.hstack((y_tr, x_tr)), Nnodes).map(lambda x: LabeledPoint(x[0], x[1:len(x)]))
                XvalRDD = sc.parallelize(np.hstack((y_val, x_val)), Nnodes).map(lambda x: LabeledPoint(x[0], x[1:len(x)]))
                XtstRDD = sc.parallelize(np.hstack((y_tst, x_tst)), Nnodes).map(lambda x: LabeledPoint(x[0], x[1:len(x)]))
                for NC in NCs:
                    print "Dataset = %s, modelo = %s, kfold = %d, Niter = %d, NC = %d" % (name_dataset, modelo, kfold, Niter, NC)
                    filename = './results/dataset_' + str(kdataset) + '_modelo_' + modelo + '_NC_' + str(NC) + '_Niter_' + str(Niter) + '_kfold_' + str(kfold) + '.pkl'
                    try:
                        f = open(filename,'r')
                        f.close()
                        file_exists = True
                    except:
                        file_exists = False
                        pass
                    execute = False
                    if file_exists:
                        if overwrite_results:
                            execute = True
                    else:
                        execute = True                                  
                    if execute:
                        
                        if modelo == 'hybrid':
                            auc_val, auc_tst, exe_time = SVM_UTILS.train_hybridSVM(XtrRDD, XvalRDD, XtstRDD, sigma, C, NC, name_dataset, Niter, Samplefraction)
                        
                        if modelo == 'kernelgrad':
                            auc_val, auc_tst, exe_time = SVM_UTILS.train_kernelgrad(XtrRDD, XvalRDD, XtstRDD, sigma, C, NC, name_dataset, Niter, Samplefraction)
                        
                        with open(filename, 'w') as f:
                            pickle.dump([auc_val, auc_tst, exe_time], f)


Centroid 1 : Taking candidates, Evaluating ED,

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 5.0 failed 4 times, most recent failure: Lost task 2.3 in stage 5.0 (TID 64, node84.cluster.tsc.uc3m.es): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/export/workdir/mesos/work/slaves/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/frameworks/b7816db4-74d6-4d70-8fe3-b397efddd83b-0077/executors/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/runs/9060bdcd-f55a-460d-bf04-fc20f57e1830/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/export/workdir/mesos/work/slaves/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/frameworks/b7816db4-74d6-4d70-8fe3-b397efddd83b-0077/executors/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/runs/9060bdcd-f55a-460d-bf04-fc20f57e1830/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/export/workdir/mesos/work/slaves/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/frameworks/b7816db4-74d6-4d70-8fe3-b397efddd83b-0077/executors/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/runs/9060bdcd-f55a-460d-bf04-fc20f57e1830/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
ImportError: No module named common.lib.SGMAUtils

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/export/workdir/mesos/work/slaves/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/frameworks/b7816db4-74d6-4d70-8fe3-b397efddd83b-0077/executors/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/runs/9060bdcd-f55a-460d-bf04-fc20f57e1830/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/export/workdir/mesos/work/slaves/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/frameworks/b7816db4-74d6-4d70-8fe3-b397efddd83b-0077/executors/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/runs/9060bdcd-f55a-460d-bf04-fc20f57e1830/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/export/workdir/mesos/work/slaves/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/frameworks/b7816db4-74d6-4d70-8fe3-b397efddd83b-0077/executors/b7816db4-74d6-4d70-8fe3-b397efddd83b-S31/runs/9060bdcd-f55a-460d-bf04-fc20f57e1830/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
ImportError: No module named common.lib.SGMAUtils

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
