#Descarga de archivos para utilizar Spark en Colab

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
# install findspark using pip
!pip install -q findspark

In [None]:
# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext()

Algunas librerias que se usand durante estos ejemplos

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.mllib.tree import DecisionTreeModel, DecisionTree
from pyspark.mllib.classification import NaiveBayesModel, NaiveBayes

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkConf()

**Configuracion para descargar un csv para el ejemplo de Poker**

In [None]:
!wget personal.cimat.mx:8181/~alejandro.rosales/resource/susy-10k-tra.data
!wget personal.cimat.mx:8181/~alejandro.rosales/resource/susy-10k-tst.data
!wget personal.cimat.mx:8181/~alejandro.rosales/resource/poker.csv

#Poker

In [None]:
spark_session = SparkSession.builder.appName('Spark_session').config("spark.some.config.option", "some-value").getOrCreate()
#Leemos el dataset en un spark dataframe
dataset_dataframe = spark_session.read.csv('poker.csv', header = True, inferSchema = True)

Es bastante similar al metodo .head() de pandas

In [None]:
dataset_dataframe.show()

+---+---+---+---+---+---+---+---+---+---+-----+
| S1| C1| S2| C2| S3| C3| S4| C4| S5| C5|Class|
+---+---+---+---+---+---+---+---+---+---+-----+
|  3| 12|  3|  2|  3| 11|  4|  5|  2|  5|    1|
|  1|  9|  4|  6|  1|  4|  3|  2|  3|  9|    1|
|  1|  4|  3| 13|  2| 13|  2|  1|  3|  6|    1|
|  3| 10|  2|  7|  1|  2|  2| 11|  4|  9|    0|
|  1|  3|  4|  5|  3|  4|  1| 12|  4|  6|    0|
|  2|  6|  4| 11|  2|  3|  4|  9|  1|  7|    0|
|  3|  2|  4|  9|  3|  7|  4|  3|  4|  5|    0|
|  4|  4|  3| 13|  1|  8|  3|  9|  3| 10|    0|
|  1|  9|  3|  8|  4|  4|  1|  7|  3|  5|    0|
|  4|  7|  3| 12|  1| 13|  1|  9|  2|  6|    0|
|  2| 12|  1|  3|  2| 11|  2|  7|  4|  8|    0|
|  4|  2|  2|  9|  2|  7|  1|  5|  3| 11|    0|
|  1| 13|  2|  6|  1|  6|  2| 11|  3|  5|    1|
|  3|  8|  2|  7|  1|  9|  3|  6|  2|  3|    0|
|  2| 10|  1| 11|  1|  9|  3|  1|  1| 13|    0|
|  4|  2|  4| 12|  2| 12|  2|  7|  3| 10|    1|
|  4|  5|  2|  2|  4|  9|  1|  5|  4|  1|    1|
|  2|  3|  3|  9|  2|  1|  2|  6|  4| 10

De nuevo, similar al metodo .describe() de pandas

In [None]:
dataset_dataframe.describe().show()

+-------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|                S1|               C1|                S2|               C2|                S3|                C3|                S4|                C4|                S5|               C5|             Class|
+-------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|           1025009|          1025009|           1025009|          1025009|           1025009|           1025009|           1025009|           1025009|           1025009|          1025009|           1025009|
|   mean|2.5006960914489533|6.997867335799003|2.4998424404078405|7.006288725269729|2.5010999903415483| 6.999248787083821|2.5002843877468

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import VectorAssembler, StringIndexer

#Fijamos los atributos que se usaran para la prediccion y cual es el target
features = dataset_dataframe.columns[:-1]
label = dataset_dataframe.columns[-1]

#Unimos todos los atributos separados en un solo vector con toda la info, mediante Vector Assembler
featuresCol = VectorAssembler(inputCols = features, outputCol = 'features')
#Aplicamos la transformacion a los atributos antes fijados
dataset_dataframe = featuresCol.transform(dataset_dataframe)
#De forma similar hacemos para la columna del target, en este caso label
labelCol = StringIndexer(inputCol = label, outputCol = 'label').fit(dataset_dataframe)
#Apicamos la transformacion a el conjunto de datos actual
dataset_dataframe = labelCol.transform(dataset_dataframe)
#Nos quedamos unicamente con los atributos que acabamos de transformar
dataset_dataframe = dataset_dataframe.select('features', 'label')

##Decision Tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

#Aplicamos un arbol de decision, en este caso bastante similar a sklearn, le damos los atributos
decision_tree = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')
#Aplicamos el metodo fit
decision_tree = decision_tree.fit(dataset_dataframe)
#Utilizamos el metodo 'transform' como el metodo predict
y = decision_tree.transform(dataset_dataframe)

In [None]:
y.select("prediction").show(5)

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 5 rows



In [None]:
dataset_dataframe.show(15)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[3.0,12.0,3.0,2.0...|  1.0|
|[1.0,9.0,4.0,6.0,...|  1.0|
|[1.0,4.0,3.0,13.0...|  1.0|
|[3.0,10.0,2.0,7.0...|  0.0|
|[1.0,3.0,4.0,5.0,...|  0.0|
|[2.0,6.0,4.0,11.0...|  0.0|
|[3.0,2.0,4.0,9.0,...|  0.0|
|[4.0,4.0,3.0,13.0...|  0.0|
|[1.0,9.0,3.0,8.0,...|  0.0|
|[4.0,7.0,3.0,12.0...|  0.0|
|[2.0,12.0,1.0,3.0...|  0.0|
|[4.0,2.0,2.0,9.0,...|  0.0|
|[1.0,13.0,2.0,6.0...|  1.0|
|[3.0,8.0,2.0,7.0,...|  0.0|
|[2.0,10.0,1.0,11....|  0.0|
+--------------------+-----+
only showing top 15 rows



##Pipeline (Standar Scaler + Decision Tree)

Ejemplo de la funcion de estandarizacion

In [None]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True)
scaler = scaler.fit(dataset_dataframe)

Crea un nuevo objeto con los atributos escalados

In [None]:
data_scaled = scaler.transform(dataset_dataframe)
data_scaled.show(5)

+--------------------+-----+--------------------+
|            features|label|      scaledFeatures|
+--------------------+-----+--------------------+
|[3.0,12.0,3.0,2.0...|  1.0|[0.44670994050467...|
|[1.0,9.0,4.0,6.0,...|  1.0|[-1.3426208973049...|
|[1.0,4.0,3.0,13.0...|  1.0|[-1.3426208973049...|
|[3.0,10.0,2.0,7.0...|  0.0|[0.44670994050467...|
|[1.0,3.0,4.0,5.0,...|  0.0|[-1.3426208973049...|
+--------------------+-----+--------------------+
only showing top 5 rows



**Se crea un pipeline, que estandariza primero los atributos y finalmente ajusta el modelo**

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True)
decision_tree_std = DecisionTreeClassifier(featuresCol = 'scaledFeatures', labelCol = 'label')
model = Pipeline(stages = [scaler, decision_tree_std])
model = model.fit(dataset_dataframe)

y = model.transform(dataset_dataframe)

In [None]:
y.select('prediction').show(5)

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 5 rows



#Breast Cancer

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Pandas to Spark").getOrCreate()

In [None]:
import pandas as pd
from sklearn.datasets import load_iris
dataset = load_iris()
X = dataset['data']
y = dataset['target']

In [None]:
data=pd.concat([pd.DataFrame(X, columns=['x1', 'x2', 'x3', 'x4']),pd.DataFrame(y, columns=['class'])], axis=1)
data.head()

Unnamed: 0,x1,x2,x3,x4,class
0,5.1,3.5,1.4,0.2,0
1,4.9,3.0,1.4,0.2,0
2,4.7,3.2,1.3,0.2,0
3,4.6,3.1,1.5,0.2,0
4,5.0,3.6,1.4,0.2,0


**Pasamos de un Pandas dataframe a un Spark Dataframe**

In [None]:
data_spark=spark_session.createDataFrame(data)

  for column, series in pdf.iteritems():


In [None]:
data_spark.show(5)

+---+---+---+---+-----+
| x1| x2| x3| x4|class|
+---+---+---+---+-----+
|5.1|3.5|1.4|0.2|    0|
|4.9|3.0|1.4|0.2|    0|
|4.7|3.2|1.3|0.2|    0|
|4.6|3.1|1.5|0.2|    0|
|5.0|3.6|1.4|0.2|    0|
+---+---+---+---+-----+
only showing top 5 rows



**Preparamos nuestros datos de forma similar a lo que se hizo en el ejemplo anterior**

In [None]:
from pyspark.ml.classification import NaiveBayes

from pyspark.ml.feature import StringIndexer, VectorIndexer

from pyspark.ml.feature import VectorAssembler, StringIndexer

features = data_spark.columns[:-1]
label1= data_spark.columns[-1]

featuresCol = VectorAssembler(inputCols = features, outputCol = 'features')
dataset_dataframe = featuresCol.transform(data_spark)
labelCol = StringIndexer(inputCol = label1, outputCol = 'label').fit(data_spark)
dataset_dataframe = labelCol.transform(dataset_dataframe)
dataset_dataframe = dataset_dataframe.select('features', 'label')

In [None]:
dataset_dataframe.show()

+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  0.0|
|[4.9,3.0,1.4,0.2]|  0.0|
|[4.7,3.2,1.3,0.2]|  0.0|
|[4.6,3.1,1.5,0.2]|  0.0|
|[5.0,3.6,1.4,0.2]|  0.0|
|[5.4,3.9,1.7,0.4]|  0.0|
|[4.6,3.4,1.4,0.3]|  0.0|
|[5.0,3.4,1.5,0.2]|  0.0|
|[4.4,2.9,1.4,0.2]|  0.0|
|[4.9,3.1,1.5,0.1]|  0.0|
|[5.4,3.7,1.5,0.2]|  0.0|
|[4.8,3.4,1.6,0.2]|  0.0|
|[4.8,3.0,1.4,0.1]|  0.0|
|[4.3,3.0,1.1,0.1]|  0.0|
|[5.8,4.0,1.2,0.2]|  0.0|
|[5.7,4.4,1.5,0.4]|  0.0|
|[5.4,3.9,1.3,0.4]|  0.0|
|[5.1,3.5,1.4,0.3]|  0.0|
|[5.7,3.8,1.7,0.3]|  0.0|
|[5.1,3.8,1.5,0.3]|  0.0|
+-----------------+-----+
only showing top 20 rows



##Decision Tree Classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

decision_tree = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')
decision_tree = decision_tree.fit(dataset_dataframe)

y = decision_tree.transform(dataset_dataframe)

In [None]:
y.show()

+-----------------+-----+--------------+-------------+----------+
|         features|label| rawPrediction|  probability|prediction|
+-----------------+-----+--------------+-------------+----------+
|[5.1,3.5,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.9,3.0,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.7,3.2,1.3,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.6,3.1,1.5,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.0,3.6,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.4,3.9,1.7,0.4]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.6,3.4,1.4,0.3]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.0,3.4,1.5,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.4,2.9,1.4,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.9,3.1,1.5,0.1]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[5.4,3.7,1.5,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.8,3.4,1.6,0.2]|  0.0|[50.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|[4.8,3.0,

In [None]:
y.select('prediction').show(5)

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 5 rows



In [None]:
y.where(y.prediction == 1.0).show(5)

+-----------------+-----+--------------+-------------+----------+
|         features|label| rawPrediction|  probability|prediction|
+-----------------+-----+--------------+-------------+----------+
|[7.0,3.2,4.7,1.4]|  1.0|[0.0,47.0,0.0]|[0.0,1.0,0.0]|       1.0|
|[6.4,3.2,4.5,1.5]|  1.0|[0.0,47.0,0.0]|[0.0,1.0,0.0]|       1.0|
|[6.9,3.1,4.9,1.5]|  1.0|[0.0,47.0,0.0]|[0.0,1.0,0.0]|       1.0|
|[5.5,2.3,4.0,1.3]|  1.0|[0.0,47.0,0.0]|[0.0,1.0,0.0]|       1.0|
|[6.5,2.8,4.6,1.5]|  1.0|[0.0,47.0,0.0]|[0.0,1.0,0.0]|       1.0|
+-----------------+-----+--------------+-------------+----------+
only showing top 5 rows



##Pipeline (Standar Scaler + Decision Tree)

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True)
decision_tree_std = DecisionTreeClassifier(featuresCol = 'scaledFeatures', labelCol = 'label')
model = Pipeline(stages = [scaler, decision_tree_std])
model = model.fit(dataset_dataframe)

y = model.transform(dataset_dataframe)
y.select('prediction').show(5)

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 5 rows



**Mas algoritmos**

In [None]:
from pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier