# Tutorial 1: Configuración y carga de datos en PySpark

##Paso 1: Configuración del entorno de PySpark en Colab

In [None]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.2.2-bin-hadoop3.2.tgz  
#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

from google.colab import drive
drive.mount("/content/gdrive")  
%cd "/content"

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:2 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done
Building dependency tree       
Reading state infor

##Paso 2: Selección y vista de los datos

In [None]:
import pandas as pd
data = pd.read_csv('sample_data/california_housing_test.csv')

In [None]:
data

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-122.05,37.37,27.0,3885.0,661.0,1537.0,606.0,6.6085,344700.0
1,-118.30,34.26,43.0,1510.0,310.0,809.0,277.0,3.5990,176500.0
2,-117.81,33.78,27.0,3589.0,507.0,1484.0,495.0,5.7934,270500.0
3,-118.36,33.82,28.0,67.0,15.0,49.0,11.0,6.1359,330000.0
4,-119.67,36.33,19.0,1241.0,244.0,850.0,237.0,2.9375,81700.0
...,...,...,...,...,...,...,...,...,...
2995,-119.86,34.42,23.0,1450.0,642.0,1258.0,607.0,1.1790,225000.0
2996,-118.14,34.06,27.0,5257.0,1082.0,3496.0,1036.0,3.3906,237200.0
2997,-119.70,36.30,10.0,956.0,201.0,693.0,220.0,2.2895,62000.0
2998,-117.12,34.10,40.0,96.0,14.0,46.0,14.0,3.2708,162500.0


## Paso 3: Crear la sesión de trabajo de Spark

Ya seleccionado y visto el conjunto de datos comencemos a trabajar con PySpark. Para comenzar a trabajar con PySpark, debemos iniciar la sesión de Spark. Para esto realizaremos lo siguiente:




1.   Importar SparkSession 
2.   Crear la sesión 



In [None]:
#Verificar la funcionalidad de Pyspark 
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName('PySpark_prueba1').getOrCreate()
spark_session

La SparkSession contiene los siguiente elementos:


*   Version: La versión de Spark
*   Master: Como estamos trabajando en un sistema en la nube pero no distribuido nos devuelve local, sin embargo, si tuvieramos un sistema distribuido aquí entonces podríamos tener diferentes clústeres, así como primero habrá un maestro y luego una estructura similar a un árbol (cluster_1, cluster_2 ... cluster_n).
*   AppName: Nombre de la aplicación.

##Paso 4: Cargar los datos para manipularlos dentro de Spark

In [None]:
df_spark = spark_session.read.csv('sample_data/california_housing_train.csv')
df_spark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

En el caso de PySpark para visualizar los datos tenemos la función *show()* ques similar a *head()* de pandas con algunas diferencias como:


1.   Mostrar 20 registro en lugar de 5
2.   La apariencia de los datos
3.   En lugar de tomar la primera fila como encabezados la incluye como un registro y coloca _c1 a _cn como nombre de la columna.



In [None]:
df_spark.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|        _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|    

Si queremos integrar la primera fila como los nombres de las columnas hay que agregar una opción a la hora de cargar los datos en el DataFrame.

In [None]:
#La opción option('header','true')
df_spark_col  = spark_session.read.option('header', 'true').csv('sample_data/california_housing_train.csv')
df_spark_col

DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [None]:
df_spark_col.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000| 262.000000|     1.925000|    

Como comparativa entre Pandas y PySpark ambos manejan la información dentro de DataFrames pero la función *show()* solo es aplicable en Spark mientras que *head()* funciona en ambos

In [None]:
print(type(df_spark_col))
print(type(data))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


La función *head()* muestra por cada columna el valor que tiene, sin embargo muestra la información por fila utilizando este formato mencionado

In [None]:
df_spark_col.head(10)

[Row(longitude='-114.310000', latitude='34.190000', housing_median_age='15.000000', total_rooms='5612.000000', total_bedrooms='1283.000000', population='1015.000000', households='472.000000', median_income='1.493600', median_house_value='66900.000000'),
 Row(longitude='-114.470000', latitude='34.400000', housing_median_age='19.000000', total_rooms='7650.000000', total_bedrooms='1901.000000', population='1129.000000', households='463.000000', median_income='1.820000', median_house_value='80100.000000'),
 Row(longitude='-114.560000', latitude='33.690000', housing_median_age='17.000000', total_rooms='720.000000', total_bedrooms='174.000000', population='333.000000', households='117.000000', median_income='1.650900', median_house_value='85700.000000'),
 Row(longitude='-114.570000', latitude='33.640000', housing_median_age='14.000000', total_rooms='1501.000000', total_bedrooms='337.000000', population='515.000000', households='226.000000', median_income='3.191700', median_house_value='73400

Si queremos saber información acerca de los datos utilizamos la función *printSchema()* la cual muestra el nombre de cada columna, su tipo de dato y si permite valores nulos

In [None]:
df_spark_col.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



#Tutorial 2: Consultas en DataFrame dentro de PySpark 

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('OperacionesFiltrado').getOrCreate()
spark

Para cargar la información usaremos solamente la función *csv()* pero agregando parámetros de configuración para que tome el primer registro como los nombres de las columnas y tambien que a partir de los datos de entrada infiera el tipo de dato. Si no colocamos esto automáticamente considera de tipo *string* las columnas.

In [None]:
df_filter_pyspark = spark.read.csv('part2.csv', header = True, inferSchema=True)
df_filter_pyspark.show()

AnalysisException: ignored

In [None]:
df_filter_pyspark.printSchema()

Si se necesita podemos renombrar las columnas para referirnos a ellas de una forma más sencilla o simplificada con la función *withColumnRenamed()*.

## Filtros y selección

In [None]:
df_filter_pyspark= df_filter_pyspark.withColumnRenamed("Salary (per month - $)","EmpSalary")
df_filter_pyspark.show()


La función *filter()* nos permite filtrar la información a través de condiciones. Por ejemplo, vamos a mostrar unicamente aquellos empleados que tengan un salario menor o igual a $25,000.00.

In [None]:
df_filter_pyspark.filter("EmpSalary<=25000").show()

Como pudieron observar hay una cierta similitud de la función *filter()* con SELECT de SQL. Es por eso que se pueden utilizar consultas SQL y tratar los DataFrames como tablas o vistas de un modelo relacional. La función *createOrReplaceTempView()* registra el DataFrame como una vista temporal dentro de la sesión que puede ejecutar consutlas SQL.

In [None]:
df_filter_pyspark.createOrReplaceTempView("empleados")
sqlDF = spark.sql("SELECT * FROM empleados")
sqlDF.show()

Si la vista temporal que se produce quieren que sea utilizada en multiples sesiones entonces hay que utilizar la función *createOrReplaceTempView()*. El unico detalle es que la vista quedará anclada a una base de datos llamada *global_temp*.

In [None]:
df_filter_pyspark.createGlobalTempView("g_empleados")
sqlDF = spark.sql("SELECT * FROM global_temp.g_empleados")
sqlDF.show()

Como mencionamos la vista perdura en otras sesiones.

In [None]:
spark.newSession().sql("SELECT * FROM global_temp.g_empleados").show()

Se puede de igual forma cambiar el nombre de multiples columnas al mismo tiempo.

In [None]:
#Cambiamos el nombre de multiples columnas 
df_filter_pyspark= df_filter_pyspark.withColumnRenamed("Age of Employee","EmpAge").withColumnRenamed("Employee Name","EmpName")
df_filter_pyspark.show()

Al igual que en SQL se pueden seleccionar las columnas que serán mostradas dentro de la consulta acompañando a la función *filter()* con la función *select()*.

In [None]:
df_filter_pyspark.filter("EmpSalary<=25000").select(['EmpName','EmpAge']).show()

Otra manera de filtrar la información de los registros es utilizar un estilo similar a Pandas.

In [None]:
df_filter_pyspark.filter(df_filter_pyspark['EmpSalary']<=25000).select(['EmpName','EmpAge']).show()

## Operadores Lógicos

Los operadores lógicos disponibles son AND (&), OR (|) y NOT (~).

Ejemplo con AND: Los empleados que su salario sea menor o igual a \$30,000.00 y que sea mayor o igual a \$18,000.00.

In [None]:
df_filter_pyspark.filter((df_filter_pyspark['EmpSalary']<=30000)
                          & (df_filter_pyspark['EmpSalary']>=18000)).show()

In [None]:
#Cambiamos el nombre de la columna experiencia
df_filter_pyspark= df_filter_pyspark.withColumnRenamed("Experience (in years)","EmpExperience")
df_filter_pyspark.show()

Ejemplo con OR: Los empleados que su salario sea menor o igual a \$30,000.00 ó que su experiencia laboral sea mayor o igual a 3 años.

In [None]:
df_filter_pyspark.filter((df_filter_pyspark['EmpSalary']<=30000)
                          | (df_filter_pyspark['EmpExperience']>=3)).show()

Ejemplo NOT: Los empleandos que su edad no sea mayor o igual a 30 años.

In [None]:
df_filter_pyspark.filter(~(df_filter_pyspark['EmpAge']>=30)).show()

# Tutorial 3: Manejo de valores nulos

In [None]:
#Creamos una sesión de PySpark
from pyspark.sql import SparkSession
#spark.stop()
null_spark = SparkSession.builder.appName('ValoresNulos').getOrCreate()
null_spark

In [None]:
#Cargamos los datos dentro de un DataFrame de la sesión
df_null_pyspark = null_spark.read.csv('part2.csv', header = True, inferSchema = True)
df_null_pyspark

In [None]:
#Visualizamos la información
df_null_pyspark.show()

In [None]:
# Nuevamente vemos la estructura de la información
# recordando que cuando tenemos nullable = true significa que esa columna permite 
# valores nulos
df_null_pyspark.printSchema()

El función para eliminar los valores nulos dentro de la información es *na.drop()*. Esta función elimina completamente los registros que tiene algún valor nulo.



In [None]:
df_null_pyspark.na.drop().show()

Si queremos controlar el como se eliminan los registros la función tiene un parámetro llamado *how* con dos posibles valores:


*   ALL: Elimina la tupla siempre y cuando todos los valores asociados a cada columna sean nulos.
*   ANY: Elimina la tupla si alguno de los valores asociados a cada columns es nulo. Esta es la configuración por default.



In [None]:
df_null_pyspark.na.drop(how="all").show()

In [None]:
df_null_pyspark.na.drop(how="any").show()

Tambien hay forma de especificar el número mínimo de valores nulos aceptables con el parámetro *thresh*. En el ejemplo se puede observar que elimina solo una tupla que tenia tres valores nulos asociados.

In [None]:
df_null_pyspark.na.drop(thresh=2).show()

De igual forma podemos combiar el parámetro *how* con *subset* para indicarle las columnas donde nos interesan detectar valores nulos en las tuplas y eliminarlas.

In [None]:
df_null_pyspark.na.drop(how='any', subset=['Experience (in years)']).show()

Podemos rellenar los valores nulos con algún valor en especifico utilizando la función *na.fill()* indicando el valor y la columna.

In [None]:
df_null_pyspark.na.fill('NA values', 'Employee Name').show()

Otra alternativa para rellenar los valores faltantes es utilizando el método de imputación de datos utilizando la media. Para esto hay que utilizar la clase *Imputer* especificando las columnas de entrada y las de salida que se agregaran al DataFrame así como la estrategia en este caso utilizar la media. Después, se utiliza la función *fit()* y *transform()* para integrar las columnas imputadas.

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)'],
    outputCols = ["{}_imputed".format(a) for a in ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)']]
).setStrategy("mean")
imputer.fit(df_null_pyspark).transform(df_null_pyspark).show()

# Tutorial 4: Manejo de DataFrames en PySpark

In [None]:
#Creamos la sesión de trabajo
from pyspark.sql import SparkSession

data_spark = SparkSession.builder.appName('DataFrame_article').getOrCreate()
data_spark

In [None]:
#Cargamos los datos e imprimimos la descripción del schema
df_pyspark = data_spark.read.option('header','true').csv('/content/sample_data/california_housing_train.csv', inferSchema=True)
df_pyspark.printSchema()

In [None]:
#Visualizamos la información
df_pyspark.show()

En caso de querer cambiar el tipo de dato de alguna columna lo podemos hacer con las funciones *withColumn()* y *cast()*.

In [None]:
from pyspark.sql.functions import column
df_pyspark=df_pyspark.withColumn("housing_median_age",column("housing_median_age").cast("int"))

Con el atributo *dtypes* podemos saber el tipo de dato por columna

In [None]:
df_pyspark.dtypes

Si queremos saber el nombre de las columnas utilizamos el atributo *columns*

In [None]:
df_pyspark.columns

También, se puede seleccionar todos los datos de una columna en particular con la función *select()*.

In [None]:
df_pyspark.select('total_rooms').show()

O en caso de querer seleccionar varias columnas tambien se puede lograr enviando una lista con el nombre de las columnas como parámetro.

In [None]:
df_pyspark.select(['total_rooms', 'total_bedrooms', 'median_income']).show()

Si queremos saber algunas medidas de tendencia central de los datos para los análisis estadísticos se puede utilizar la función *describe()* similar a Pandas.

In [None]:
df_pyspark.describe().show()

De igual manera se pueden agregar columnas directamente al DataFrame si se requiere.

In [None]:
df_pyspark = df_pyspark.withColumn('Updated_medianhousevalue', df_pyspark['median_house_value']*2)
df_pyspark.show()

De igual forma se pueden eliminar con la función *drop()*

In [None]:
df_pyspark.drop('Updated_medianhousevalue').show()

# Tutorial 5: Agregación y agrupamientos

Agrupar los datos es una de las habilidades más esenciales cuando trabajamos con Big Data dado que estamos tratando con una gran cantidad de datos y si no somos capaces de segmentar esos datos, entonces será mucho más difícil analizarlos y usarlos para obtener información relevante

La regla de oro es recordar que la función *groupBy()* y la función de agregación van de la mano, es decir, no podemos usar groupBy sin la función agregada como SUM, COUNT, AVG, MAX, MIN, etc.

In [None]:
from pyspark.sql import SparkSession

spark_aggregate = SparkSession.builder.appName('Aggregate and GroupBy').getOrCreate()
spark_aggregate

In [None]:
spark_aggregate_data = spark_aggregate.read.csv('part4.csv', header = True, inferSchema = True)
spark_aggregate_data.show()

Si llegamos a ejecutar unicamente la función *groupBy()* la respuesta será la ubicación de los datos agrupados lo cual no es relevante

In [None]:
spark_aggregate_data.groupBy('Name')

## Funciones de agregación

Algunas de las funciones más comunes son:



*   AVG: devuelve el conjunto de resultados agrupando la columna según el promedio del conjunto de valores.
*   COUNT: devolverá el número total de conjuntos de valores en una columna particular correspondiente a la función groupBy.
*   MIN: devuelve el valor mínimo o más pequeño entre todo el conjunto de valores en toda la fila.
*   MAX: el funcionamiento y el enfoque de usar la función agregada MAX es el mismo que la función agregada MIN, solo que la principal diferencia es que devolverá el valor máximo entre el conjunto de valores en la fila.
*   SUM: devolverá la suma de todos los valores numéricos correspondientes a la columna agrupada



Si ejecutamos la función de agrupamiento y agregación el resultado será la descripción del DataFrame por lo que si queremos visualizar la información hay que utilizar la función *show()*.

In [None]:
spark_aggregate_data.groupBy('Name').sum()

Ejemplo: Conocer la cantidad de dinero total que le pago la compañia a cada empleado agrupando por nombre

In [None]:
spark_aggregate_data.groupBy('Name').sum().show()

Ejemplo: Conocer la cantidad de dinero total que pago cada departamento a sus empleados

In [None]:
spark_aggregate_data.groupBy('Departmens').sum().show()

Ejemplo: Conocer el salario promedio que se le pago a los empleados por departamento

In [None]:
spark_aggregate_data.groupBy('Departmens').mean().show()

Ejemplo: Saber el número de pagos que recibio cada empleado

In [None]:
spark_aggregate_data.groupBy(['Name']).count().show()

# Tutorial 6: Usando ML en PySpark

In [None]:
from pyspark.sql import SparkSession

df_ml = SparkSession.builder.appName('EjemploML').getOrCreate()
df_ml

In [None]:
#Cargamos la información en un DataFrame
training_dataset  = df_ml.read.csv('UserCarDataExample.csv', header=True, inferSchema=True)
training_dataset

In [None]:
#Mostramos la información
training_dataset.show()

In [None]:
#Visualizamos el esquema de la base de datos
training_dataset.printSchema()

In [None]:
#Visualizamos el nombre de las columnas
training_dataset.columns

Para trabajar con modelos de regresión tenemos que utilizar *VectorAssembler* para convertir las variables independientes en un vector que las incluya

In [None]:
from pyspark.ml.feature import VectorAssembler

featassembler = VectorAssembler(inputCols=['age',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats'], outputCol = "Independent Features" )
featassembler

Posteriormente se integran al conjunto de datos que ya estaba cargado utilizando la función *transform()*.

In [None]:
result = featassembler.transform(training_dataset)
result.show()

Para construir nuestro modelo de regresión debemos selecccionar la columna que integró los valores de las columnas en un vector y la columna que representa la variable dependiente.

In [None]:
final_data = result.select("Independent features", "selling_price")
final_data.show()

Del conjunto total de datos se puede generar un división de conjunto de datos entre entrenamiento y prueba con la función *randomSplit*.

In [None]:
train_data, test_data = final_data.randomSplit([0.75, 0.25])

Ahora bien hay que importar *LinearRegression* de la biblioteca de machine learning de PySpark. Especificando cuales son la variables independientes y cual es la dependiente.

In [None]:
from pyspark.ml.regression import LinearRegression

model = LinearRegression(featuresCol = 'Independent features', labelCol='selling_price')
model = model.fit(train_data)

Podemos imprimir la matriz de correlación para verificar la congruencia del modelo.

In [None]:
from pyspark.ml.stat import Correlation

matrix = Correlation.corr(final_data, 'Independent features')
cor_np = matrix.collect()[0][matrix.columns[0]].toArray()
cor_np

Obtener el valor del intercepto.

In [None]:
model.intercept

Los coeficientes por cada variable independiente

In [None]:
model.coefficients

Así como los p-values para determinar la transendencia de cada variable dentro del modelo.

In [None]:
model.summary.pValues

Obtener indicadores de desempeño como la $r^2$ ajustada, dado que es un problema multivariado. Que nos sirve para indicar el porcentaje de la variabilidad de la variable dependiente explicada por el modelo.

In [None]:
model.summary.r2adj

Podemos realizar predicciones para evaluar el modelo obtenido.

In [None]:
prediction_result = model.evaluate(test_data)
prediction_result.predictions.show()

Mostrar algunos indicadores de desempeño utiles.

In [None]:
prediction_result.meanAbsoluteError, prediction_result.meanSquaredError

#Tutorial 7: Utilizando MLlib en PySpark

MLlib es la biblioteca de aprendizaje automático (ML) de Spark. Su objetivo es hacer que el aprendizaje automático práctico sea escalable y fácil a un alto nivel. La biblioteca proporciona herramientas como:

Algoritmos de ML: algoritmos de aprendizaje comunes como clasificación, regresión, agrupamiento y filtrado colaborativo.
Características: extracción de características, transformación, reducción de dimensionalidad y selección
Pipelines: herramientas para construir, evaluar y ajustar ML Pipelines
Persistencia: guardar y cargar algoritmos, modelos y Pipelines
Utilidades: álgebra lineal, estadística, manejo de datos, etc.

In [None]:
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTreeModel

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [None]:
from pyspark.mllib.util import MLUtils

data = MLUtils.loadLibSVMFile(sc, 'spark-3.2.2-bin-hadoop3.2/data/mllib/sample_libsvm_data.txt')


In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [None]:
data.take(10)

In [None]:
numClasses = 2
categoricalFeaturesInfo = {}
impurity = "gini"

model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
  impurity)

In [None]:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

In [None]:
model.save(sc, "myDecisionTreeClassificationModel.dt")
sameModel = DecisionTreeModel.load(sc, "myDecisionTreeClassificationModel.dt")