<a href="https://colab.research.google.com/github/ibe16/cc2-sparkML/blob/master/Spark_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Práctica 4 Cloud Computing

## *Procesamiento de grandes volúmenes de datos con Spark*

![texto alternativo](https://joserzapata.github.io/es/post/pyspark-google-colab/featured.png)



# Instalar Spark

Como Colab está en un ambiente linux se puede instalar con los comandos de la términal.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

Importamos las variables de entorno necesarias

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

Iniciamos Spark

In [0]:
import findspark
findspark.init()

# Importar los datos 

Una vez seleccionadas las columnas del dataset vamos a importarlas a Colab para poder trabajar con ellas.

Los datos se encuentran en una Carpeta de Google Drive. Para poder leerlos hay que montar Google Drive como un sistema de archivos normal, de esta manera se puede acceder a los datos desde python.


In [3]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
root_dir = "/content/gdrive/My Drive/CC2/"

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


También se pueden subir al almacenaminto local de Colab desde el ordenador.

In [0]:
# Para subir ficheros desde el ordenador
from google.colab import files
files.upload()

# Script de Python

Teniendo el dataset ya se puede trabajar con el script de Python desarrollado para la práctica.


## Funciones

In [0]:
# -*- coding: utf-8 -*-

import sys
 
from pyspark import SparkContext, SparkConf, SQLContext

from pyspark.sql.types import StructType, StructField, FloatType, IntegerType

from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml import Pipeline

from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import GBTClassifier, LogisticRegression, RandomForestClassifier





def createContext(appName= "Practica 4: Irene Bejar Maldonado"):
    # create Spark context with Spark configuration  
    conf = SparkConf().setAppName(appName)  
    sc = SparkContext(conf=conf)
    # Set Log info to display only error messages
    sc.setLogLevel("ERROR")
    return sc





def createDataset(sc, path_data, path_header):
    # Read header file
    headers = sc.textFile(path_header).collect()
    for line in headers:
        if "@inputs" in line:
            header_list = [x.replace(",","").strip() for x in line.split()]

    # Delete '@inputs' from the list
    del header_list[0]
    # Append column 'class' at the end
    header_list.append("class")

    # Read data file
    sqlContext = SQLContext(sc)
    df = sqlContext.read.csv(path_data, header=False, sep=",", inferSchema=True)
    print(df.schema)
    # Rename columns
    for i, colname in enumerate(df.columns):
        df = df.withColumnRenamed(colname, header_list[i])

    df = df.select("PSSM_r2_3_V", "AA_freq_global_L", "PSSM_central_-1_W", "PSSM_r2_1_H", "PSSM_central_0_Y", "PSSM_r1_3_N", "class")
    print(df.schema)

    # Save the new small dataframe in a file
    df.write.csv('./filteredC.small.training', header=True)





def loadSmallDataset(sc, path='./filteredC.small.training'):
    sqlContext = SQLContext(sc)

    # Prepare data schema
    data_schema = StructType([
        StructField("PSSM_r2_3_V", FloatType(), True),
        StructField("AA_freq_global_L", FloatType(), True),
        StructField("PSSM_central_-1_W", FloatType(), True),
        StructField("PSSM_r2_1_H", FloatType(), True),
        StructField("PSSM_central_0_Y", FloatType(), True),
        StructField("PSSM_r1_3_N", FloatType(), True),
        StructField("class", IntegerType(), True)
    ])

    # Read the data using the schema
    df = sqlContext.read.csv(path, header=True, sep=",", schema=data_schema)

    print ("--------------------------------------------------------------\n" + 
            "Dataset loaded\n" +
            "--------------------------------------------------------------\n")

    return df





def preprocessingData(sc, dataset):
    # Transform all the feature columns into a single vector column
    assemblerInputs = ["PSSM_r2_3_V", "AA_freq_global_L", "PSSM_central_-1_W", "PSSM_r2_1_H", "PSSM_central_0_Y", "PSSM_r1_3_N"]
    vec_assembler = VectorAssembler(inputCols =assemblerInputs, outputCol='features')

    # Add class column
    # StringIndexer converts a single column into a index column (similar to a factor column in R)
    indexer = StringIndexer(inputCol="class", outputCol="label")

    # Normalization 
    scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

    # Apply transformations
    pipeline = Pipeline (stages=[vec_assembler, indexer, scaler])
    final_data = pipeline.fit(dataset).transform(dataset).select("scaled_features", "label")

    print ("--------------------------------------------------------------\n" + 
            "Preprocessed data\n")
    final_data.printSchema()
    print ("--------------------------------------------------------------\n")

    return final_data





def RUS_data(dataset):
    # The data is unbalanced. The ratio for each class is:
    #  BEFORE Class 0=1375458 Class 1=687729 ratio=2
    #  AFTER  Class 0=688517 Class 1=687729 ratio=1

    class_0 = dataset.filter(dataset["label"] == 0)
    class_1 = dataset.filter(dataset["label"] == 1)
    ratio = float(class_0.count()/class_1.count())
    
    # Data has to be balanced. RUS is used for this.
    sampled_majority = class_0.sample(False, float(1/ratio))
    final_data = sampled_majority.unionAll(class_1)

    print ("--------------------------------------------------------------\n" + 
            "Dataset balanced using RUS\n" +
            "--------------------------------------------------------------\n")

    return final_data





def split_dataset(dataset, train_size, test_size):
    class_0 = dataset.filter(dataset["label"] == 0)
    class_1 = dataset.filter(dataset["label"] == 1)

    train_0, test_0 = class_0.randomSplit(weights=[train_size, test_size], seed=16)
    train_1, test_1 = class_1.randomSplit(weights=[train_size, test_size], seed=16)

    train = train_0.unionAll(train_1)
    test = test_0.unionAll(test_1)

    return train, test





def tune_model(estimator, param_grid, evaluator, train_data):
    tvs = TrainValidationSplit(estimator=estimator,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8,
                           seed=16)
    
    model = tvs.fit(train_data)
    
    # print results for each combination
    for i, item in enumerate(model.getEstimatorParamMaps()):
        grid = ["%s: %s" % (p.name, str(v)) for p, v in item.items()]
        print(grid, model.getEvaluator().getMetricName(),
              model.validationMetrics[i])


    return model.bestModel





def train_and_validation(estimator, train_data, test_data):
    model = estimator.fit(train_data)

    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(model.transform(test_data))

    print ("--------------------------------------------------------------\n" + 
            "Validation Result\n")
    print("AUC sobre el conjunto de test: ", auc)
    print ("--------------------------------------------------------------\n")





def train_logisticRegresion(train_data, file_name):
    lr = LogisticRegression(labelCol="label", featuresCol="scaled_features", seed=16)

    paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.5, 1.0])\
    .addGrid(lr.maxIter, [10, 15])\
    .build()

    evaluator = BinaryClassificationEvaluator(labelCol="label")

    best_model = tune_model(lr, paramGrid, evaluator, train_data)
    best_model.overwrite().save(file_name)





def train_randomForest(train_data, file_name):
    rf = RandomForestClassifier(labelCol="label", featuresCol="scaled_features", seed=16)

    paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [10,15,20]) \
    .addGrid(rf.maxDepth, [3, 5])\
    .build()

    evaluator = BinaryClassificationEvaluator(labelCol="label")

    best_model = tune_model(rf, paramGrid, evaluator, train_data)
    best_model.overwrite().save(file_name)





def train_GBT(train_data, file_name):
    gbt = GBTClassifier(labelCol="label", featuresCol="scaled_features", seed=16)

    paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxIter, [5, 10, 15]) \
    .addGrid(rf.maxDepth, [2, 3, 10])\
    .build()

    evaluator = BinaryClassificationEvaluator(labelCol="label")

    best_model = tune_model(gbt, paramGrid, evaluator, train_data)
    best_model.overwrite().save(file_name)


## Preparación de los datos

Se carga el dataset, se preprocesa y se le aplica RUS.
También se divide en dos conjuntos: entrenamiento y validación.

In [0]:
spark_context = createContext() # Start Spark Session

In [0]:
data = "/user/datasets/ecbdl14/ECBDL14_IR2.data"
header = "/user/datasets/ecbdl14/ECBDL14_IR2.header"

path_to_load = root_dir+f'small_dataset.csv'

# createDataset(sc=spark_context, path_data=data, path_header=header)
df = loadSmallDataset(sc=spark_context, path=path_to_load)
df = preprocessingData(spark_context, df)
df = RUS_data(df)
train, test = split_dataset(df, 70.0, 30.0)

## Búsqueda de hiperparámetros

Se entrenan los modelos con varias configuraciones para ver cual da mejor resultado.

In [0]:
# Regresión logística
lr = train_logisticRegresion(train, "lr.model")

In [0]:
# Random Forest
rf = train_randomForest(train, "rf.model")

In [0]:
# GBT
gbt = train_GBT(train, "gbt.model")

## Resultados del entrenamiento

Resultados del entrenamiento que se ha hecho para la búsqueda de parámetros.

### Regresión logística

``` python
(['regParam: 0.1', 'elasticNetParam: 0.5', 'maxIter: 10'], 'areaUnderROC', 0.5)
(['regParam: 0.1', 'elasticNetParam: 0.5', 'maxIter: 15'], 'areaUnderROC', 0.5)
(['regParam: 0.1', 'elasticNetParam: 1.0', 'maxIter: 10'], 'areaUnderROC', 0.5)
(['regParam: 0.1', 'elasticNetParam: 1.0', 'maxIter: 15'], 'areaUnderROC', 0.5)
(['regParam: 0.01', 'elasticNetParam: 0.5', 'maxIter: 10'], 'areaUnderROC', 0.5255320922773915)
(['regParam: 0.01', 'elasticNetParam: 0.5', 'maxIter: 15'], 'areaUnderROC', 0.5269092508177255)
(['regParam: 0.01', 'elasticNetParam: 1.0', 'maxIter: 10'], 'areaUnderROC', 0.5341054461968381)
(['regParam: 0.01', 'elasticNetParam: 1.0', 'maxIter: 15'], 'areaUnderROC', 0.5352423263189457)
```

### Random Forest

```python
(['maxDepth: 3', 'numTrees: 10'], 'areaUnderROC', 0.5604672226801575)
(['maxDepth: 3', 'numTrees: 15'], 'areaUnderROC', 0.5624006470644223)
(['maxDepth: 3', 'numTrees: 20'], 'areaUnderROC', 0.5599379950693529)
(['maxDepth: 5', 'numTrees: 10'], 'areaUnderROC', 0.5701677290767797)
(['maxDepth: 5', 'numTrees: 15'], 'areaUnderROC', 0.5698462502705504)
(['maxDepth: 5', 'numTrees: 20'], 'areaUnderROC', 0.568444386126271)
(['maxDepth: 10', 'numTrees: 10'], 'areaUnderROC', 0.5849019161955297)
(['maxDepth: 10', 'numTrees: 15'], 'areaUnderROC', 0.5852199319212533)
(['maxDepth: 10', 'numTrees: 20'], 'areaUnderROC', 0.5857663904229639)
```


### Gradient Boost Trees

```python
['maxIter: 5', 'maxDepth: 2'] areaUnderROC 0.5599631877122353
['maxIter: 5', 'maxDepth: 3'] areaUnderROC 0.5656066571484597
['maxIter: 5', 'maxDepth: 10'] areaUnderROC 0.5849454688788238
['maxIter: 10', 'maxDepth: 2'] areaUnderROC 0.5640069604974786
['maxIter: 10', 'maxDepth: 3'] areaUnderROC 0.5704346140656521
['maxIter: 10', 'maxDepth: 10'] areaUnderROC 0.5911215374107248
['maxIter: 15', 'maxDepth: 2'] areaUnderROC 0.567284795231468
['maxIter: 15', 'maxDepth: 3'] areaUnderROC 0.5741910996336299
['maxIter: 15', 'maxDepth: 10'] areaUnderROC 0.5946650969503753
```



# Entrenamiento y validación del modelo

El modelo que mejor resultado ha obtenido es GBT con un AUC de 0.594 con la siguiente configuración:

* maxIter = 15 
* maxDepth = 10

Vamos a entrenarlo sobre todo el conjunto de train con esta configuración y a validarlo sobre el conjunto de test.


In [19]:
gbt = GBTClassifier(labelCol="label", featuresCol="scaled_features", maxIter=15, maxDepth=10, seed=16)
train_and_validation(gbt, train, test)

--------------------------------------------------------------
Validation Result

AUC sobre el conjunto de test:  0.5971556448620811
--------------------------------------------------------------



In [0]:
spark_context.stop() #To finalize Spark session

El resultado final sobre el conjunto de TEST es un **0.597 de AUC**.