# Modelos de Clasificación de PySpark

In [1]:
import os
import sys

# Forzar a Spark a usar el Python de tu venv
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('Models') \
    .getOrCreate()

### Regresión logística binomial

La regresión logística es un método popular para predecir una respuesta categórica. Es un caso especial de modelos lineales generalizados que predice la probabilidad de los resultados. 

En spark.ml la regresión logística se puede utilizar para predecir un **resultado binario** mediante el uso de la regresión logística binomial, o se puede utilizar para predecir un **resultado multiclase** mediante el uso de la regresión logística multinomial. 

Use el parámetro **family** para seleccionar entre estos dos algoritmos, o déjelo sin configurar y Spark deducirá la variante correcta.

In [3]:
from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, 
                        regParam=0.3, 
                        elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, 
                         elasticNetParam=0.8, 
                         family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Predict with the test dataset
predictions = mlrModel.transform(training)

Coefficients: (692,[272,300,323,350,351,378,379,405,406,407,428,433,434,435,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.52068987138421e-05,-8.115773146847101e-05,3.814692771846369e-05,0.0003776490540424337,0.00034051483661944103,0.0005514455157343105,0.0004085386116096913,0.000419746733274946,0.0008119171358670028,0.0005027708372668751,-2.3929260406601844e-05,0.000574504802090229,0.0009037546426803721,7.818229700244018e-05,-2.1787551952912764e-05,-3.4021658217896256e-05,0.0004966517360637634,0.0008190557828370367,-8.017982139522704e-05,-2.7431694037836214e-05,0.0004810832226238988,0.00048408017626778765,-8.926472920011488e-06,-0.00034148812330427335,-8.950592574121486e-05,0.00048645469116892167,-8.478698005186209e-05,-0.0004234783215831763,-7.29653577763134e-05])
Intercept: -0.5991460286401435


### Árbol de decisión

Los árboles de decisión para la clasificación son uno de los algoritmos de aprendizaje automático más antiguos y sencillos. Son populares debido a su **fácil interpretación visual**. Sin embargo, en la práctica, tienden a **sobreajustar** los datos de entrenamiento, lo que conduce a un rendimiento deficiente al intentar hacer predicciones 

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", 
                            featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]

# summary only
print(treeModel)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[123,124,125...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[124,125,126...|
|       1.0|         1.0|(692,[127,128,129...|
|       1.0|         1.0|(692,[127,128,129...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.047619 
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_24c33605fdbe, depth=1, numNodes=3, numClasses=2, numFeatures=692


### Random Forest

Estos combinan muchos (cientos o miles) de árboles de decisión, donde tomamos muestras aleatorias de nuestras observaciones y predictores para formar nuevos árboles.
Los random forest **reducen en gran medida la posibilidad de sobreajuste** (es decir, reduce la varianza) promediando varios árboles; esto a veces se denomina sabiduría de las multitudes.

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", 
                            featuresCol="indexedFeatures", 
                            numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           0.0|  0.0|(692,[123,124,125...|
|           0.0|  0.0|(692,[124,125,126...|
|           0.0|  0.0|(692,[126,127,128...|
|           0.0|  0.0|(692,[126,127,128...|
|           0.0|  0.0|(692,[126,127,128...|
+--------------+-----+--------------------+
only showing top 5 rows

Test Error = 0
RandomForestClassificationModel: uid=RandomForestClassifier_724bccf4ffbd, numTrees=10, numClasses=2, numFeatures=692


### Gradient-boosted tree classifier

Gradient Boosting es una mejora en Random Forest. Ahora, en lugar de simplemente ajustar árboles agregados con bootstrap, tenemos en cuenta el **rendimiento de los árboles intermedios**. 

Usando estos árboles intermedios, ajustamos nuestros pesos de árboles futuros.
Se cogen los residuos donde fallaban los modelos y ajustan el próximo árbol a estos residuos y así sucesivamente para ir mejorando donde se obtenian malos resultados. 

"Gradient Boosting hereda todas las buenas características de los árboles (selección de variables, datos faltantes, predictores mixtos) y mejora las características débiles, como el rendimiento de la predicción". 


In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[95,96,97,12...|
|       1.0|         1.0|(692,[98,99,100,1...|
|       1.0|         1.0|(692,[100,101,102...|
|       1.0|         1.0|(692,[121,122,123...|
|       1.0|         1.0|(692,[123,124,125...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.0322581
GBTClassificationModel: uid = GBTClassifier_9ab59c416fa8, numTrees=10, numClasses=2, numFeatures=692


### Clasificador de perceptrón multicapa

El clasificador de perceptrones multicapa (MLPC) es un clasificador basado en la **red neuronal artificial feedforward** . MLPC consta de múltiples capas de nodos. Cada capa está completamente conectada a la siguiente capa de la red.

In [8]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm")\
    .load("data/sample_multiclass_classification_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)

predictionAndLabels = result.select("prediction", "label")

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.9523809523809523


### Linear Support Vector Machine

Construye un **hiperplano o un conjunto de hiperplanos** en un espacio de dimensión alta o infinita, que se puede utilizar para clasificación, regresión u otras tareas. Intuitivamente, se logra una buena separación por el hiperplano que tiene la mayor distancia a los puntos de datos de entrenamiento más cercanos de cualquier clase (el llamado margen funcional), ya que en general, cuanto mayor es el margen, menor es el error de generalización del clasificador. 

In [9]:
from pyspark.ml.classification import LinearSVC

# Load training data
training = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

lsvc = LinearSVC(maxIter=10, 
                 regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(training)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.00015154081891400172,-3.4357432628275696e-05,6.8868723771654e-05,0.0005825396368790324,0.0002658667437974877,-5.4448990232205866e-06,-0.000410876298911309,-0.00023771334401618933,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0015051243197375345,0.0005056741785679464,0.0007739871946118443,-7.439317729362511e-05,2.2395429551533054e-07,2.1502767568913162e-05,4.000155795906807e-05,2.841045988826015e-05,1.2172703998609027e-05,-1.4702408529921009e-05,-4.00596456869388e-05,3.0693747761902103e-06,0.00015395475863074347,0.00015205858963404883,-0.00021785419667457335,0.0,0.0,0.0,0.0,0.

### One-vs-Rest classifier

Los algoritmos como Perceptron, Regresión logística y LSVC se diseñaron para la clasificación binaria y no admiten de forma nativa tareas de clasificación con más de dos clases.

Un enfoque para usar algoritmos de clasificación binaria para problemas de clasificación múltiple es dividir el conjunto de datos de clasificación de clases múltiples en múltiples conjuntos de datos de clasificación binaria y ajustar un modelo de clasificación binaria en cada uno. 

Un ejemplo de este enfoque es la estrategia One-vs-Rest classifier

In [10]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# load data file.
inputData = spark.read.format("libsvm") \
    .load("data/sample_multiclass_classification_data.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0714286


### Naive Bayes

Es uno de los algoritmos más simples y poderosos para la clasificación basado en el **Teorema de Bayes** con una suposición de independencia entre los predictores. Naive Bayes es fácil de construir y particularmente **útil para conjuntos de datos muy grandes**.

Ventajas:
- Es fácil y rápido predecir la clase de conjunto de datos de prueba. También funciona bien en la predicción multiclase.
- Cuando se mantiene la suposición de independencia, un clasificador Naive - Bayes funciona mejor en comparación con otros modelos como la Regresión Logística y **se necesitan menos datos de entrenamiento**.

In [11]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm") \
    .load("data/sample_libsvm_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, 
                modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+-----+--------------------+--------------------+-----------+----------+
|label|            features|       rawPrediction|probability|prediction|
+-----+--------------------+--------------------+-----------+----------+
|  0.0|(692,[95,96,97,12...|[-172664.79564650...|  [1.0,0.0]|       0.0|
|  0.0|(692,[98,99,100,1...|[-176279.15054306...|  [1.0,0.0]|       0.0|
|  0.0|(692,[122,123,124...|[-189600.55409526...|  [1.0,0.0]|       0.0|
|  0.0|(692,[124,125,126...|[-274673.88337431...|  [1.0,0.0]|       0.0|
|  0.0|(692,[124,125,126...|[-183393.03869049...|  [1.0,0.0]|       0.0|
|  0.0|(692,[125,126,127...|[-256992.48807619...|  [1.0,0.0]|       0.0|
|  0.0|(692,[126,127,128...|[-210411.53649773...|  [1.0,0.0]|       0.0|
|  0.0|(692,[127,128,129...|[-170627.63616681...|  [1.0,0.0]|       0.0|
|  0.0|(692,[127,128,129...|[-212157.96750469...|  [1.0,0.0]|       0.0|
|  0.0|(692,[127,128,129...|[-183253.80108550...|  [1.0,0.0]|       0.0|
|  0.0|(692,[128,129,130...|[-246528.93739632...|  

In [12]:
spark.stop()