# Tutorial 1: Configuración y carga de datos en PySpark SEP 2023

##Paso 1: Configuración del entorno de PySpark en Colab

In [41]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

#!tar xf spark-3.2.2-bin-hadoop3.2.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz

#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["SPARK_HOME"] = "/content/spark-3.2.2-bin-hadoop3.2"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or[0m                                                                               Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
[33m0% [Waiting for headers] [2 InRelease 14.2 kB/110 kB 13%] [Waiting for headers][0m                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
[33m0% [Waiting for headers] [2 InRelease 14.2 kB/110 kB 13%] [Connecting to ppa.la[0m                                                                               Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.lau

'/content/spark-3.5.0-bin-hadoop3'

##Paso 2: Selección y vista de los datos

In [2]:
import pandas as pd
data = pd.read_csv('sample_data/california_housing_test.csv')

In [3]:
data

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-122.05,37.37,27.0,3885.0,661.0,1537.0,606.0,6.6085,344700.0
1,-118.30,34.26,43.0,1510.0,310.0,809.0,277.0,3.5990,176500.0
2,-117.81,33.78,27.0,3589.0,507.0,1484.0,495.0,5.7934,270500.0
3,-118.36,33.82,28.0,67.0,15.0,49.0,11.0,6.1359,330000.0
4,-119.67,36.33,19.0,1241.0,244.0,850.0,237.0,2.9375,81700.0
...,...,...,...,...,...,...,...,...,...
2995,-119.86,34.42,23.0,1450.0,642.0,1258.0,607.0,1.1790,225000.0
2996,-118.14,34.06,27.0,5257.0,1082.0,3496.0,1036.0,3.3906,237200.0
2997,-119.70,36.30,10.0,956.0,201.0,693.0,220.0,2.2895,62000.0
2998,-117.12,34.10,40.0,96.0,14.0,46.0,14.0,3.2708,162500.0


## Paso 3: Crear la sesión de trabajo de Spark

Ya seleccionado y visto el conjunto de datos comencemos a trabajar con PySpark. Para comenzar a trabajar con PySpark, debemos iniciar la sesión de Spark. Para esto realizaremos lo siguiente:




1.   Importar SparkSession
2.   Crear la sesión



In [4]:
#Verificar la funcionalidad de Pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName('PySpark_LuisTrejo1').getOrCreate()
spark_session

La SparkSession contiene los siguiente elementos:


*   Version: La versión de Spark
*   Master: Como estamos trabajando en un sistema en la nube pero no distribuido nos devuelve local, sin embargo, si tuvieramos un sistema distribuido aquí entonces podríamos tener diferentes clústeres, así como primero habrá un maestro y luego una estructura similar a un árbol (cluster_1, cluster_2 ... cluster_n).
*   AppName: Nombre de la aplicación.

##Paso 4: Cargar los datos para manipularlos dentro de Spark

In [5]:
df_spark = spark_session.read.csv('sample_data/california_housing_train.csv')
df_spark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

En el caso de PySpark para visualizar los datos tenemos la función *show()* ques similar a *head()* de pandas con algunas diferencias como:


1.   Mostrar 20 registro en lugar de 5
2.   La apariencia de los datos
3.   En lugar de tomar la primera fila como encabezados la incluye como un registro y coloca _c1 a _cn como nombre de la columna.



In [6]:
df_spark.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|        _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|    

Si queremos integrar la primera fila como los nombres de las columnas hay que agregar una opción a la hora de cargar los datos en el DataFrame.

In [7]:
#La opción option('header','true')
df_spark_col  = spark_session.read.option('header', 'true').csv('sample_data/california_housing_train.csv')
df_spark_col

DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [8]:
df_spark_col.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000| 262.000000|     1.925000|    

Como comparativa entre Pandas y PySpark ambos manejan la información dentro de DataFrames pero la función *show()* solo es aplicable en Spark mientras que *head()* funciona en ambos

In [9]:
print(type(df_spark_col))
print(type(data))

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


La función *head()* muestra por cada columna el valor que tiene, sin embargo muestra la información por fila utilizando este formato mencionado

In [10]:
df_spark_col.head(10)

[Row(longitude='-114.310000', latitude='34.190000', housing_median_age='15.000000', total_rooms='5612.000000', total_bedrooms='1283.000000', population='1015.000000', households='472.000000', median_income='1.493600', median_house_value='66900.000000'),
 Row(longitude='-114.470000', latitude='34.400000', housing_median_age='19.000000', total_rooms='7650.000000', total_bedrooms='1901.000000', population='1129.000000', households='463.000000', median_income='1.820000', median_house_value='80100.000000'),
 Row(longitude='-114.560000', latitude='33.690000', housing_median_age='17.000000', total_rooms='720.000000', total_bedrooms='174.000000', population='333.000000', households='117.000000', median_income='1.650900', median_house_value='85700.000000'),
 Row(longitude='-114.570000', latitude='33.640000', housing_median_age='14.000000', total_rooms='1501.000000', total_bedrooms='337.000000', population='515.000000', households='226.000000', median_income='3.191700', median_house_value='73400

Si queremos saber información acerca de los datos utilizamos la función *printSchema()* la cual muestra el nombre de cada columna, su tipo de dato y si permite valores nulos

In [11]:
df_spark_col.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



#Tutorial 2: Consultas en DataFrame dentro de PySpark

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('OperacionesFiltrado').getOrCreate()
spark

Para cargar la información usaremos solamente la función *csv()* pero agregando parámetros de configuración para que tome el primer registro como los nombres de las columnas y tambien que a partir de los datos de entrada infiera el tipo de dato. Si no colocamos esto automáticamente considera de tipo *string* las columnas.

In [13]:
df_filter_pyspark = spark.read.csv('part2.csv', header = True, inferSchema=True)
df_filter_pyspark.show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
|        Oscar|           NULL|                 NULL|                 40000|
|         NULL|             34|                   10|                 38000|
|         NULL|             36|                 NULL|                  NULL|
+-------------+---------------+---------------------+----------------------+

In [15]:
df_filter_pyspark.printSchema()

root
 |-- Employee Name: string (nullable = true)
 |-- Age of Employee: integer (nullable = true)
 |-- Experience (in years): integer (nullable = true)
 |-- Salary (per month - $): integer (nullable = true)



Si se necesita podemos renombrar las columnas para referirnos a ellas de una forma más sencilla o simplificada con la función *withColumnRenamed()*.

## Filtros y selección

In [16]:
df_filter_pyspark= df_filter_pyspark.withColumnRenamed("Salary (per month - $)","EmpSalary")
df_filter_pyspark.show()

+-------------+---------------+---------------------+---------+
|Employee Name|Age of Employee|Experience (in years)|EmpSalary|
+-------------+---------------+---------------------+---------+
|       Oliver|             31|                   10|    30000|
|        Harry|             30|                    8|    25000|
|       George|             29|                    4|    20000|
|         Jack|             24|                    3|    20000|
|        Jacob|             21|                    1|    15000|
|          Leo|             23|                    2|    18000|
|        Oscar|           NULL|                 NULL|    40000|
|         NULL|             34|                   10|    38000|
|         NULL|             36|                 NULL|     NULL|
+-------------+---------------+---------------------+---------+




La función *filter()* nos permite filtrar la información a través de condiciones. Por ejemplo, vamos a mostrar unicamente aquellos empleados que tengan un salario menor o igual a $25,000.00.

In [17]:
df_filter_pyspark.filter("EmpSalary<=25000").show()

+-------------+---------------+---------------------+---------+
|Employee Name|Age of Employee|Experience (in years)|EmpSalary|
+-------------+---------------+---------------------+---------+
|        Harry|             30|                    8|    25000|
|       George|             29|                    4|    20000|
|         Jack|             24|                    3|    20000|
|        Jacob|             21|                    1|    15000|
|          Leo|             23|                    2|    18000|
+-------------+---------------+---------------------+---------+



Como pudieron observar hay una cierta similitud de la función *filter()* con SELECT de SQL. Es por eso que se pueden utilizar consultas SQL y tratar los DataFrames como tablas o vistas de un modelo relacional. La función *createOrReplaceTempView()* registra el DataFrame como una vista temporal dentro de la sesión que puede ejecutar consutlas SQL.

In [18]:
df_filter_pyspark.createOrReplaceTempView("empleados")
sqlDF = spark.sql("SELECT * FROM empleados")
sqlDF.show()

+-------------+---------------+---------------------+---------+
|Employee Name|Age of Employee|Experience (in years)|EmpSalary|
+-------------+---------------+---------------------+---------+
|       Oliver|             31|                   10|    30000|
|        Harry|             30|                    8|    25000|
|       George|             29|                    4|    20000|
|         Jack|             24|                    3|    20000|
|        Jacob|             21|                    1|    15000|
|          Leo|             23|                    2|    18000|
|        Oscar|           NULL|                 NULL|    40000|
|         NULL|             34|                   10|    38000|
|         NULL|             36|                 NULL|     NULL|
+-------------+---------------+---------------------+---------+



Si la vista temporal que se produce quieren que sea utilizada en multiples sesiones entonces hay que utilizar la función *createGlobalTempView()*. El unico detalle es que la vista quedará anclada a una base de datos llamada *global_temp*.

In [19]:
df_filter_pyspark.createGlobalTempView("g_empleados")
sqlDF = spark.sql("SELECT * FROM global_temp.g_empleados")
sqlDF.show()

+-------------+---------------+---------------------+---------+
|Employee Name|Age of Employee|Experience (in years)|EmpSalary|
+-------------+---------------+---------------------+---------+
|       Oliver|             31|                   10|    30000|
|        Harry|             30|                    8|    25000|
|       George|             29|                    4|    20000|
|         Jack|             24|                    3|    20000|
|        Jacob|             21|                    1|    15000|
|          Leo|             23|                    2|    18000|
|        Oscar|           NULL|                 NULL|    40000|
|         NULL|             34|                   10|    38000|
|         NULL|             36|                 NULL|     NULL|
+-------------+---------------+---------------------+---------+



Como mencionamos la vista perdura en otras sesiones.

In [20]:
spark.newSession().sql("SELECT * FROM global_temp.g_empleados").show()

+-------------+---------------+---------------------+---------+
|Employee Name|Age of Employee|Experience (in years)|EmpSalary|
+-------------+---------------+---------------------+---------+
|       Oliver|             31|                   10|    30000|
|        Harry|             30|                    8|    25000|
|       George|             29|                    4|    20000|
|         Jack|             24|                    3|    20000|
|        Jacob|             21|                    1|    15000|
|          Leo|             23|                    2|    18000|
|        Oscar|           NULL|                 NULL|    40000|
|         NULL|             34|                   10|    38000|
|         NULL|             36|                 NULL|     NULL|
+-------------+---------------+---------------------+---------+



Se puede de igual forma cambiar el nombre de multiples columnas al mismo tiempo.

In [21]:
#Cambiamos el nombre de multiples columnas
df_filter_pyspark= df_filter_pyspark.withColumnRenamed("Age of Employee","EmpAge").withColumnRenamed("Employee Name","EmpName")
df_filter_pyspark.show()

+-------+------+---------------------+---------+
|EmpName|EmpAge|Experience (in years)|EmpSalary|
+-------+------+---------------------+---------+
| Oliver|    31|                   10|    30000|
|  Harry|    30|                    8|    25000|
| George|    29|                    4|    20000|
|   Jack|    24|                    3|    20000|
|  Jacob|    21|                    1|    15000|
|    Leo|    23|                    2|    18000|
|  Oscar|  NULL|                 NULL|    40000|
|   NULL|    34|                   10|    38000|
|   NULL|    36|                 NULL|     NULL|
+-------+------+---------------------+---------+



Al igual que en SQL se pueden seleccionar las columnas que serán mostradas dentro de la consulta acompañando a la función *filter()* con la función *select()*.

In [22]:
df_filter_pyspark.filter("EmpSalary<=25000").select(['EmpName','EmpAge']).show()

+-------+------+
|EmpName|EmpAge|
+-------+------+
|  Harry|    30|
| George|    29|
|   Jack|    24|
|  Jacob|    21|
|    Leo|    23|
+-------+------+



Otra manera de filtrar la información de los registros es utilizar un estilo similar a Pandas.

In [23]:
df_filter_pyspark.filter(df_filter_pyspark['EmpSalary']<=25000).select(['EmpName','EmpAge']).show()

+-------+------+
|EmpName|EmpAge|
+-------+------+
|  Harry|    30|
| George|    29|
|   Jack|    24|
|  Jacob|    21|
|    Leo|    23|
+-------+------+



## Operadores Lógicos

Los operadores lógicos disponibles son AND (&), OR (|) y NOT (~).

Ejemplo con AND: Los empleados que su salario sea menor o igual a \$30,000.00 y que sea mayor o igual a \$18,000.00.

In [26]:
df_filter_pyspark.filter((df_filter_pyspark['EmpSalary']<=30000)
                          & (df_filter_pyspark['EmpSalary']>=18000)).show()

+-------+------+---------------------+---------+
|EmpName|EmpAge|Experience (in years)|EmpSalary|
+-------+------+---------------------+---------+
| Oliver|    31|                   10|    30000|
|  Harry|    30|                    8|    25000|
| George|    29|                    4|    20000|
|   Jack|    24|                    3|    20000|
|    Leo|    23|                    2|    18000|
+-------+------+---------------------+---------+



In [27]:
#Cambiamos el nombre de la columna experiencia
df_filter_pyspark= df_filter_pyspark.withColumnRenamed("Experience (in years)","EmpExperience")
df_filter_pyspark.show()

+-------+------+-------------+---------+
|EmpName|EmpAge|EmpExperience|EmpSalary|
+-------+------+-------------+---------+
| Oliver|    31|           10|    30000|
|  Harry|    30|            8|    25000|
| George|    29|            4|    20000|
|   Jack|    24|            3|    20000|
|  Jacob|    21|            1|    15000|
|    Leo|    23|            2|    18000|
|  Oscar|  NULL|         NULL|    40000|
|   NULL|    34|           10|    38000|
|   NULL|    36|         NULL|     NULL|
+-------+------+-------------+---------+



Ejemplo con OR: Los empleados que su salario sea menor o igual a \$30,000.00 ó que su experiencia laboral sea mayor o igual a 3 años.

In [28]:
df_filter_pyspark.filter((df_filter_pyspark['EmpSalary']<=30000)
                          | (df_filter_pyspark['EmpExperience']>=3)).show()

+-------+------+-------------+---------+
|EmpName|EmpAge|EmpExperience|EmpSalary|
+-------+------+-------------+---------+
| Oliver|    31|           10|    30000|
|  Harry|    30|            8|    25000|
| George|    29|            4|    20000|
|   Jack|    24|            3|    20000|
|  Jacob|    21|            1|    15000|
|    Leo|    23|            2|    18000|
|   NULL|    34|           10|    38000|
+-------+------+-------------+---------+



Ejemplo NOT: Los empleandos que su edad no sea mayor o igual a 30 años.

In [29]:
df_filter_pyspark.filter(~(df_filter_pyspark['EmpAge']>=30)).show()

+-------+------+-------------+---------+
|EmpName|EmpAge|EmpExperience|EmpSalary|
+-------+------+-------------+---------+
| George|    29|            4|    20000|
|   Jack|    24|            3|    20000|
|  Jacob|    21|            1|    15000|
|    Leo|    23|            2|    18000|
+-------+------+-------------+---------+



# Tutorial 3: Manejo de valores nulos

In [30]:
#Creamos una sesión de PySpark
from pyspark.sql import SparkSession

null_spark = SparkSession.builder.appName('ValoresNulos').getOrCreate()
null_spark

In [31]:
#Cargamos los datos dentro de un DataFrame de la sesión
df_null_pyspark = null_spark.read.csv('part2.csv', header = True, inferSchema = True)
df_null_pyspark

DataFrame[Employee Name: string, Age of Employee: int, Experience (in years): int, Salary (per month - $): int]

In [32]:
#Visualizamos la información
df_null_pyspark.show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
|        Oscar|           NULL|                 NULL|                 40000|
|         NULL|             34|                   10|                 38000|
|         NULL|             36|                 NULL|                  NULL|
+-------------+---------------+---------------------+----------------------+

In [33]:
# Nuevamente vemos la estructura de la información
# recordando que cuando tenemos nullable = true significa que esa columna permite
# valores nulos
df_null_pyspark.printSchema()

root
 |-- Employee Name: string (nullable = true)
 |-- Age of Employee: integer (nullable = true)
 |-- Experience (in years): integer (nullable = true)
 |-- Salary (per month - $): integer (nullable = true)



La función para eliminar los valores nulos dentro de la información es *na.drop()*. Esta función elimina completamente los registros que tiene algún valor nulo.



In [34]:
df_null_pyspark.na.drop().show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
+-------------+---------------+---------------------+----------------------+



Si queremos controlar el como se eliminan los registros la función tiene un parámetro llamado *how* con dos posibles valores:


*   ALL: Elimina la tupla siempre y cuando todos los valores asociados a cada columna sean nulos.
*   ANY: Elimina la tupla si alguno de los valores asociados a cada columns es nulo. Esta es la configuración por default.



In [35]:
df_null_pyspark.na.drop(how="all").show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
|        Oscar|           NULL|                 NULL|                 40000|
|         NULL|             34|                   10|                 38000|
|         NULL|             36|                 NULL|                  NULL|
+-------------+---------------+---------------------+----------------------+

In [36]:
df_null_pyspark.na.drop(how="any").show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
+-------------+---------------+---------------------+----------------------+



Tambien hay forma de especificar el número mínimo de valores nulos aceptables con el parámetro *thresh*. En el ejemplo se puede observar que elimina solo una tupla que tenia tres valores nulos asociados.

In [37]:
df_null_pyspark.na.drop(thresh=2).show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
|        Oscar|           NULL|                 NULL|                 40000|
|         NULL|             34|                   10|                 38000|
+-------------+---------------+---------------------+----------------------+



De igual forma podemos cambiar el parámetro *how* con *subset* para indicarle las columnas donde nos interesan detectar valores nulos en las tuplas y eliminarlas.

In [38]:
df_null_pyspark.na.drop(how='any', subset=['Experience (in years)']).show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
|         NULL|             34|                   10|                 38000|
+-------------+---------------+---------------------+----------------------+



Podemos rellenar los valores nulos con algún valor en especifico utilizando la función *na.fill()* indicando el valor y la columna.

In [39]:
df_null_pyspark.na.fill('LT NA values', 'Employee Name').show()

+-------------+---------------+---------------------+----------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|
+-------------+---------------+---------------------+----------------------+
|       Oliver|             31|                   10|                 30000|
|        Harry|             30|                    8|                 25000|
|       George|             29|                    4|                 20000|
|         Jack|             24|                    3|                 20000|
|        Jacob|             21|                    1|                 15000|
|          Leo|             23|                    2|                 18000|
|        Oscar|           NULL|                 NULL|                 40000|
| LT NA values|             34|                   10|                 38000|
| LT NA values|             36|                 NULL|                  NULL|
+-------------+---------------+---------------------+----------------------+

Otra alternativa para rellenar los valores faltantes es utilizando el método de imputación de datos utilizando la media. Para esto hay que utilizar la clase *Imputer* especificando las columnas de entrada y las de salida que se agregaran al DataFrame así como la estrategia en este caso utilizar la media. Después, se utiliza la función *fit()* y *transform()* para integrar las columnas imputadas.

In [40]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)'],
    outputCols = ["{}_imputed".format(a) for a in ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)']]
).setStrategy("mean")
imputer.fit(df_null_pyspark).transform(df_null_pyspark).show()


+-------------+---------------+---------------------+----------------------+-----------------------+-----------------------------+------------------------------+
|Employee Name|Age of Employee|Experience (in years)|Salary (per month - $)|Age of Employee_imputed|Experience (in years)_imputed|Salary (per month - $)_imputed|
+-------------+---------------+---------------------+----------------------+-----------------------+-----------------------------+------------------------------+
|       Oliver|             31|                   10|                 30000|                     31|                           10|                         30000|
|        Harry|             30|                    8|                 25000|                     30|                            8|                         25000|
|       George|             29|                    4|                 20000|                     29|                            4|                         20000|
|         Jack|             

# Tutorial 4: Manejo de DataFrames en PySpark

In [None]:
#Creamos la sesión de trabajo
from pyspark.sql import SparkSession
data_spark = SparkSession.builder.appName('DataFrame_article').getOrCreate()
data_spark

In [None]:
#Cargamos los datos e imprimimos la descripción del schema
df_pyspark = data_spark.read.option('header','true').csv('/content/sample_data/california_housing_train.csv', inferSchema=True)
df_pyspark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
#Visualizamos la información
df_pyspark.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

En caso de querer cambiar el tipo de dato de alguna columna lo podemos hacer con las funciones *withColumn()* y *cast()*.

In [None]:
from pyspark.sql.functions import column
df_pyspark=df_pyspark.withColumn("housing_median_age",column("housing_median_age").cast("int"))

Con el atributo *dtypes* podemos saber el tipo de dato por columna

In [None]:
df_pyspark.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'int'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

Si queremos saber el nombre de las columnas utilizamos el atributo *columns*

In [None]:
df_pyspark.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

También, se puede seleccionar todos los datos de una columna en particular con la función *select()*.

In [None]:
df_pyspark.select('total_rooms').show()

+-----------+
|total_rooms|
+-----------+
|     5612.0|
|     7650.0|
|      720.0|
|     1501.0|
|     1454.0|
|     1387.0|
|     2907.0|
|      812.0|
|     4789.0|
|     1497.0|
|     3741.0|
|     1988.0|
|     1291.0|
|     2478.0|
|     1448.0|
|     2556.0|
|     1678.0|
|       44.0|
|     1388.0|
|       97.0|
+-----------+
only showing top 20 rows



O en caso de querer seleccionar varias columnas tambien se puede lograr enviando una lista con el nombre de las columnas como parámetro.

In [None]:
df_pyspark.select(['total_rooms', 'total_bedrooms', 'median_income']).show()

+-----------+--------------+-------------+
|total_rooms|total_bedrooms|median_income|
+-----------+--------------+-------------+
|     5612.0|        1283.0|       1.4936|
|     7650.0|        1901.0|         1.82|
|      720.0|         174.0|       1.6509|
|     1501.0|         337.0|       3.1917|
|     1454.0|         326.0|        1.925|
|     1387.0|         236.0|       3.3438|
|     2907.0|         680.0|       2.6768|
|      812.0|         168.0|       1.7083|
|     4789.0|        1175.0|       2.1782|
|     1497.0|         309.0|       2.1908|
|     3741.0|         801.0|       2.6797|
|     1988.0|         483.0|        1.625|
|     1291.0|         248.0|       2.1571|
|     2478.0|         464.0|        3.212|
|     1448.0|         378.0|       0.8585|
|     2556.0|         587.0|       1.6991|
|     1678.0|         322.0|       2.9653|
|       44.0|          33.0|       0.8571|
|     1388.0|         386.0|       1.2049|
|       97.0|          24.0|       1.2656|
+----------

Si queremos saber algunas medidas de tendencia central de los datos para los análisis estadísticos se puede utilizar la función *describe()* similar a Pandas.

In [None]:
df_pyspark.describe().show()

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

De igual manera se pueden agregar columnas directamente al DataFrame si se requiere.

In [None]:
df_pyspark = df_pyspark.withColumn('Updated_medianhousevalue', df_pyspark['median_house_value']*2)
df_pyspark.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|Updated_medianhousevalue|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------------------+
|  -114.31|   34.19|                15|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|                133800.0|
|  -114.47|    34.4|                19|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|                160200.0|
|  -114.56|   33.69|                17|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|                171400.0|
|  -114.57|   33.64|                14|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400

De igual forma se pueden eliminar con la función *drop()*

In [None]:
df_pyspark.drop('Updated_medianhousevalue').show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|                15|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|                19|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|                17|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|                14|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|                20|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

# Tutorial 5: Agregación y agrupamientos

Agrupar los datos es una de las habilidades más esenciales cuando trabajamos con Big Data dado que estamos tratando con una gran cantidad de datos y si no somos capaces de segmentar esos datos, entonces será mucho más difícil analizarlos y usarlos para obtener información relevante

La regla de oro es recordar que la función *groupBy()* y la función de agregación van de la mano, es decir, no podemos usar groupBy sin la función agregada como SUM, COUNT, AVG, MAX, MIN, etc.

In [None]:
from pyspark.sql import SparkSession

spark_aggregate = SparkSession.builder.appName('Aggregate and GroupBy').getOrCreate()
spark_aggregate

In [None]:
spark_aggregate_data = spark_aggregate.read.csv('part4.csv', header = True, inferSchema = True)
spark_aggregate_data.show()


+------+------------+------+
|  Name|  Departmens|salary|
+------+------------+------+
|Oliver|Data Science| 10000|
|Oliver|         IOT|  5000|
| Johny|    Big Data|  4000|
|Oliver|    Big Data|  4000|
| Johny|Data Science|  3000|
|Mathew|Data Science| 20000|
|Mathew|         IOT| 10000|
|Mathew|    Big Data|  5000|
| Jacob|Data Science| 10000|
| Jacob|    Big Data|  2000|
+------+------------+------+



Si llegamos a ejecutar unicamente la función *groupBy()* la respuesta será la ubicación de los datos agrupados lo cual no es relevante

In [None]:
spark_aggregate_data.groupBy('Name')

<pyspark.sql.group.GroupedData at 0x7f5510f6a290>

## Funciones de agregación

Algunas de las funciones más comunes son:



*   AVG: devuelve el conjunto de resultados agrupando la columna según el promedio del conjunto de valores.
*   COUNT: devolverá el número total de conjuntos de valores en una columna particular correspondiente a la función groupBy.
*   MIN: devuelve el valor mínimo o más pequeño entre todo el conjunto de valores en toda la fila.
*   MAX: el funcionamiento y el enfoque de usar la función agregada MAX es el mismo que la función agregada MIN, solo que la principal diferencia es que devolverá el valor máximo entre el conjunto de valores en la fila.
*   SUM: devolverá la suma de todos los valores numéricos correspondientes a la columna agrupada



Si ejecutamos la función de agrupamiento y agregación el resultado será la descripción del DataFrame por lo que si queremos visualizar la información hay que utilizar la función *show()*.

In [None]:
spark_aggregate_data.groupBy('Name').sum()

DataFrame[Name: string, sum(salary): bigint]

Ejemplo: Conocer la cantidad de dinero total que le pago la compañia a cada empleado agrupando por nombre

In [None]:
spark_aggregate_data.groupBy('Name').sum().show()

+------+-----------+
|  Name|sum(salary)|
+------+-----------+
| Jacob|      12000|
| Johny|       7000|
|Mathew|      35000|
|Oliver|      19000|
+------+-----------+



Ejemplo: Conocer la cantidad de dinero total que pago cada departamento a sus empleados

In [None]:
spark_aggregate_data.groupBy('Departmens').sum().show()

+------------+-----------+
|  Departmens|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



Ejemplo: Conocer el salario promedio que se le pago a los empleados por departamento

In [None]:
spark_aggregate_data.groupBy('Departmens').mean().show()

+------------+-----------+
|  Departmens|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



Ejemplo: Saber el número de pagos que recibio cada empleado

In [None]:
spark_aggregate_data.groupBy(['Name']).count().show()

+------+-----+
|  Name|count|
+------+-----+
| Jacob|    2|
| Johny|    2|
|Mathew|    3|
|Oliver|    3|
+------+-----+



In [None]:
spark_aggregate_data.groupBy('Name').count().show()

+------+-----+
|  Name|count|
+------+-----+
| Jacob|    2|
| Johny|    2|
|Mathew|    3|
|Oliver|    3|
+------+-----+



Extracción de valores numércios
Referencia
https://www.geeksforgeeks.org/get-value-of-a-particular-cell-in-pyspark-dataframe/

In [None]:
spark_aggregate_data.show()


+------+------------+------+
|  Name|  Departmens|salary|
+------+------------+------+
|Oliver|Data Science| 10000|
|Oliver|         IOT|  5000|
| Johny|    Big Data|  4000|
|Oliver|    Big Data|  4000|
| Johny|Data Science|  3000|
|Mathew|Data Science| 20000|
|Mathew|         IOT| 10000|
|Mathew|    Big Data|  5000|
| Jacob|Data Science| 10000|
| Jacob|    Big Data|  2000|
+------+------------+------+



Extraer el primer renglón

In [None]:
spark_aggregate_data.collect()[0]

Row(Name='Oliver', Departmens='Data Science', salary=10000)

Extarer su Salario

In [None]:
Salario = spark_aggregate_data.collect()[0][2]
Salario
Salario = Salario * 1.5
Salario

15000.0

# Tutorial 6: Usando ML en PySpark

In [None]:
from pyspark.sql import SparkSession

df_ml = SparkSession.builder.appName('EjemploML').getOrCreate()
df_ml

In [None]:
#Cargamos la información en un DataFrame
training_dataset  = df_ml.read.csv('UserCarDataExample.csv', header=True, inferSchema=True)
training_dataset

DataFrame[age: int, selling_price: int, km_driven: int, mileage: double, engine: int, max_power: double, seats: int]

In [None]:
#Mostramos la información
training_dataset.show()

+---+-------------+---------+-------+------+---------+-----+
|age|selling_price|km_driven|mileage|engine|max_power|seats|
+---+-------------+---------+-------+------+---------+-----+
|  8|       450000|   145500|   23.4|  1248|     74.0|    5|
|  8|       370000|   120000|  21.14|  1498|   103.52|    5|
| 16|       158000|   140000|   17.7|  1497|     78.0|    5|
| 12|       225000|   127000|   23.0|  1396|     90.0|    5|
| 15|       130000|   120000|   16.1|  1298|     88.2|    5|
|  5|       440000|    45000|  20.14|  1197|    81.86|    5|
| 15|        96000|   175000|   17.3|  1061|     57.5|    5|
| 21|        45000|     5000|   16.1|   796|     37.0|    4|
| 11|       350000|    90000|  23.59|  1364|     67.1|    5|
|  9|       200000|   169000|   20.0|  1399|     68.1|    5|
|  8|       500000|    68000|  19.01|  1461|   108.45|    5|
| 17|        92000|   100000|   17.3|   993|     60.0|    5|
| 13|       280000|   140000|   19.3|  1248|     73.9|    5|
| 13|       180000|    9

In [None]:
#Visualizamos el esquema de la base de datos
training_dataset.printSchema()

root
 |-- age: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- mileage: double (nullable = true)
 |-- engine: integer (nullable = true)
 |-- max_power: double (nullable = true)
 |-- seats: integer (nullable = true)



In [None]:
#Visualizamos el nombre de las columnas
training_dataset.columns

['age',
 'selling_price',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats']

Para trabajar con modelos de regresión tenemos que utilizar *VectorAssembler* para convertir las variables independientes en un vector que las incluya

In [None]:
from pyspark.ml.feature import VectorAssembler

featassembler = VectorAssembler(inputCols=['age',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats'], outputCol = "Independent Features" )
featassembler

VectorAssembler_bf6225cdeb5d

Posteriormente se integran al conjunto de datos que ya estaba cargado utilizando la función *transform()*.

In [None]:
result = featassembler.transform(training_dataset)
result.show()

+---+-------------+---------+-------+------+---------+-----+--------------------+
|age|selling_price|km_driven|mileage|engine|max_power|seats|Independent Features|
+---+-------------+---------+-------+------+---------+-----+--------------------+
|  8|       450000|   145500|   23.4|  1248|     74.0|    5|[8.0,145500.0,23....|
|  8|       370000|   120000|  21.14|  1498|   103.52|    5|[8.0,120000.0,21....|
| 16|       158000|   140000|   17.7|  1497|     78.0|    5|[16.0,140000.0,17...|
| 12|       225000|   127000|   23.0|  1396|     90.0|    5|[12.0,127000.0,23...|
| 15|       130000|   120000|   16.1|  1298|     88.2|    5|[15.0,120000.0,16...|
|  5|       440000|    45000|  20.14|  1197|    81.86|    5|[5.0,45000.0,20.1...|
| 15|        96000|   175000|   17.3|  1061|     57.5|    5|[15.0,175000.0,17...|
| 21|        45000|     5000|   16.1|   796|     37.0|    4|[21.0,5000.0,16.1...|
| 11|       350000|    90000|  23.59|  1364|     67.1|    5|[11.0,90000.0,23....|
|  9|       2000

Para construir nuestro modelo de regresión debemos selecccionar la columna que integró los valores de las columnas en un vector y la columna que representa la variable dependiente.

In [None]:
final_data = result.select("Independent features", "selling_price")
final_data.show()

+--------------------+-------------+
|Independent features|selling_price|
+--------------------+-------------+
|[8.0,145500.0,23....|       450000|
|[8.0,120000.0,21....|       370000|
|[16.0,140000.0,17...|       158000|
|[12.0,127000.0,23...|       225000|
|[15.0,120000.0,16...|       130000|
|[5.0,45000.0,20.1...|       440000|
|[15.0,175000.0,17...|        96000|
|[21.0,5000.0,16.1...|        45000|
|[11.0,90000.0,23....|       350000|
|[9.0,169000.0,20....|       200000|
|[8.0,68000.0,19.0...|       500000|
|[17.0,100000.0,17...|        92000|
|[13.0,140000.0,19...|       280000|
|[13.0,90000.0,18....|       180000|
|[6.0,40000.0,18.1...|       400000|
|[6.0,70000.0,24.5...|       778000|
|[10.0,53000.0,23....|       500000|
|[20.0,80000.0,19....|       150000|
|[6.0,100000.0,22....|       680000|
|[11.0,100000.0,21...|       174000|
+--------------------+-------------+
only showing top 20 rows



Del conjunto total de datos se puede generar un división de conjunto de datos entre entrenamiento y prueba con la función *randomSplit*.

In [None]:
train_data, test_data = final_data.randomSplit([0.75, 0.25])

Ahora bien hay que importar *LinearRegression* de la biblioteca de machine learning de PySpark. Especificando cuales son la variables independientes y cual es la dependiente.

In [None]:
from pyspark.ml.regression import LinearRegression

model = LinearRegression(featuresCol = 'Independent features', labelCol='selling_price')
model = model.fit(train_data)

Podemos imprimir la matriz de correlación para verificar la congruencia del modelo.

In [None]:
from pyspark.ml.stat import Correlation

matrix = Correlation.corr(final_data, 'Independent features')
cor_np = matrix.collect()[0][matrix.columns[0]].toArray()
cor_np



array([[ 1.        ,  0.42854848, -0.32854385, -0.0182631 , -0.2265978 ,
         0.00792303],
       [ 0.42854848,  1.        , -0.17298035,  0.20603073, -0.03815852,
         0.22725939],
       [-0.32854385, -0.17298035,  1.        , -0.57640787, -0.37462089,
        -0.45170047],
       [-0.0182631 ,  0.20603073, -0.57640787,  1.        ,  0.70397453,
         0.61110339],
       [-0.2265978 , -0.03815852, -0.37462089,  0.70397453,  1.        ,
         0.19199918],
       [ 0.00792303,  0.22725939, -0.45170047,  0.61110339,  0.19199918,
         1.        ]])

Obtener el valor del intercepto.

In [None]:
model.intercept

-173769.9227155356

Los coeficientes por cada variable independiente

In [None]:
model.coefficients

DenseVector([-44971.9493, -1.2105, 6639.738, 108.4468, 15387.114, -78662.7525])

Así como los p-values para determinar la transendencia de cada variable dentro del modelo.

In [None]:
model.summary.pValues

[0.0,
 0.0,
 0.002742261743691632,
 3.523048235209991e-05,
 0.0,
 6.661338147750939e-16,
 0.04900845799491815]

Obtener indicadores de desempeño como la $r^2$ ajustada, dado que es un problema multivariado. Que nos sirve para indicar el porcentaje de la variabilidad de la variable dependiente explicada por el modelo.

In [None]:
model.summary.r2adj

0.6321404714030512

Podemos realizar predicciones para evaluar el modelo obtenido.

In [None]:
prediction_result = model.evaluate(test_data)
prediction_result.predictions.show()

+--------------------+-------------+------------------+
|Independent features|selling_price|        prediction|
+--------------------+-------------+------------------+
|[2.0,1000.0,20.3,...|       500000| 641722.6632724255|
|[2.0,1000.0,21.21...|       445000| 871067.4907851031|
|[2.0,1600.0,13.96...|      1900000|1760665.7369542336|
|[2.0,2136.0,22.5,...|       350000| 629055.9157200744|
|[2.0,5000.0,0.0,2...|       679000| 923617.4918622139|
|[2.0,5000.0,21.01...|       630000| 864897.6729820727|
|[2.0,5000.0,21.21...|       570000| 866225.6205875105|
|[2.0,5000.0,21.21...|       600000| 866225.6205875105|
|[2.0,5500.0,26.8,...|      2125000|1503128.7539815935|
|[2.0,10000.0,18.7...|       737000| 1202851.782968651|
|[2.0,10000.0,19.0...|       450000| 618780.7713926989|
|[2.0,15000.0,23.2...|       550000| 970826.0720435185|
|[2.0,15000.0,24.3...|       810000| 983262.1843195695|
|[2.0,20000.0,23.2...|       700000| 964773.7342965277|
|[2.0,40000.0,10.7...|      1500000| 1940968.776

Mostrar algunos indicadores de desempeño utiles.

In [None]:
prediction_result.meanAbsoluteError, prediction_result.meanSquaredError

(288271.49654614413, 225886958021.10565)

In [None]:
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTreeModel

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [None]:
from pyspark.mllib.util import MLUtils

data = MLUtils.loadLibSVMFile(sc, 'spark-3.2.2-bin-hadoop3.2/data/mllib/sample_libsvm_data.txt')


In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [None]:
data.take(10)

[LabeledPoint(0.0, (692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,253.0,159.0,50.0,48.0,238.0,252.0,252.0,252.0,237.0,54.0,227.0,253.0,252.0,239.0,233.0,252.0,57.0,6.0,10.0,60.0,224.0,252.0,253.0,252.0,202.0,84.0,252.0,253.0,122.0,163.0,252.0,252.0,252.0,253.0,252.0,252.0,96.0,189.0,253.0,167.0,51.0,238.0,253.0,253.0,190.0

In [None]:
numClasses = 2
categoricalFeaturesInfo = {}
impurity = "gini"

model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
  impurity)

In [None]:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

Test Error = 0.06060606060606061
Learned classification tree model:
DecisionTreeModel classifier of depth 1 with 3 nodes
  If (feature 406 <= 22.0)
   Predict: 0.0
  Else (feature 406 > 22.0)
   Predict: 1.0



In [None]:
!./bin/pyspark

/bin/bash: ./bin/pyspark: No such file or directory


In [None]:
model.save(sc, "myDecisionTreeClassificationModel.dt")
sameModel = DecisionTreeModel.load(sc, "myDecisionTreeClassificationModel.dt")

Py4JJavaError: ignored