## PySpark

Librerias

In [37]:
# Importing SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.functions import col

1. Creación SparkSession

Para trabajar con PySpark, primero necesitas crear SparkSession. SparkSession es un punto de entrada a la funcionalidad de PySpark.

In [2]:
spark=SparkSession.builder.appName('example').getOrCreate()

2. Lectura de la base de datos

PySpark ofrece dos estructuras principales para almacenar datos cuando se realizan manipulaciones: El RDD y el DataFrame. 
<br>

Puedes pensar en el RDD como una colección distribuida de objetos (o filas). Puedes pensar en el DataFrame como si fuera una tabla.

In [3]:
df = spark.read.csv('./data/diabetes.csv', header=True, inferSchema=True)

3. Conociendo la base de datos

In [4]:
# looking at the first rows the dataset
df.show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [5]:
# Columns of dataframe
df.columns

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

Puede utilizar el método **count** para obtener el número total de registros en el Dataframe. 
<br>

El método **len** permite ver el número de columnas del DataFrame. Echemos un vistazo a la forma de nuestro conjunto de datos con los métodos count y len.

In [6]:
# Shape of dataset
print((df.count(),len(df.columns)))

(768, 9)


Para obtener la información del esquema del conjunto de datos, puede utilizar el método **printSchema** que a menudo se utiliza para comprender los datos con el método **show** en el análisis de datos.

In [7]:
# printSchema
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



Puede utilizar **describe().show()** para echar un vistazo a las estadísticas de descripción del conjunto de datos. Voy a utilizar el parámetro truncar para ver sólo 8 caracteres.

In [8]:
# description statistics
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

In [9]:
df.describe().show(truncate=8)

+-------+-----------+--------+-------------+-------------+--------+--------+------------------------+--------+--------+
|summary|Pregnancies| Glucose|BloodPressure|SkinThickness| Insulin|     BMI|DiabetesPedigreeFunction|     Age| Outcome|
+-------+-----------+--------+-------------+-------------+--------+--------+------------------------+--------+--------+
|  count|        768|     768|          768|          768|     768|     768|                     768|     768|     768|
|   mean|   3.845...|120.8...|     69.10...|     20.53...|79.79...|31.99...|                0.471...|33.24...|0.348...|
| stddev|   3.369...|31.97...|     19.35...|     15.95...|115.2...|7.884...|                0.331...|11.76...|0.476...|
|    min|          0|       0|            0|            0|       0|     0.0|                   0.078|      21|       0|
|    max|         17|     199|          122|           99|     846|    67.1|                    2.42|      81|       1|
+-------+-----------+--------+----------

4. Selección de columnas

Puede utilizar el método **select** para seleccionar columnas específicas. Tomemos las columnas 'Pregnancies' y 'Age' del conjunto de datos con el método select.

In [10]:
# Selecting
df.select("Pregnancies", "Age").show(10)

+-----------+---+
|Pregnancies|Age|
+-----------+---+
|          6| 50|
|          1| 31|
|          8| 32|
|          1| 21|
|          0| 33|
|          5| 30|
|          3| 26|
|         10| 29|
|          2| 53|
|          8| 54|
+-----------+---+
only showing top 10 rows



También puedes utilizar la función col del módulo **pyspark.sql.functions** para seleccionar columnas. Permítanme mostrarles.

In [11]:
# Selecting the col method
df.select(F.col("Pregnancies"), F.col("Age")).show(10)

+-----------+---+
|Pregnancies|Age|
+-----------+---+
|          6| 50|
|          1| 31|
|          8| 32|
|          1| 21|
|          0| 33|
|          5| 30|
|          3| 26|
|         10| 29|
|          2| 53|
|          8| 54|
+-----------+---+
only showing top 10 rows



5. Data Filtering

Para limpiar el conjunto de datos y mantener sólo los registros que desea, puede realizar para filtrar los registros basados en condiciones. Existen dos métodos para filtrar datos: **filter()** y **where()**. 

Vamos a filtrar los datos en los que el valor de la columna 'Age' es inferior a 40 con el método **filter**.

In [12]:
# Filtering
df.filter(df['Age'] < 40).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|
|         10|    115|            0|            0|      0|35.3|                   0.134| 29|      0|


Puede realizar más filtros utilizando el método de selección para ver sólo columnas específicas.

In [13]:
# Filtering for two columns
df.filter(df['age'] < 40).select('Insulin','Outcome').show(10)

+-------+-------+
|Insulin|Outcome|
+-------+-------+
|      0|      0|
|      0|      1|
|     94|      0|
|    168|      1|
|      0|      0|
|     88|      1|
|      0|      0|
|      0|      0|
|      0|      1|
|      0|      1|
+-------+-------+
only showing top 10 rows



También puede aplicar filtros a los registros en función de las condiciones. Busquemos registros con edad superior a 60 años y personas que sólo estén enfermas.

In [14]:
# Filtering with multiple conditions
df.filter(df['age'] > 60).filter(df['Outcome'] == '1').show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          4|    146|           92|            0|      0|31.2|                   0.539| 61|      1|
|          0|    105|           84|            0|      0|27.9|                   0.741| 62|      1|
|          2|    158|           90|            0|      0|31.6|                   0.805| 66|      1|
|          4|    146|           78|            0|      0|38.5|                    0.52| 67|      1|
|          2|    197|           70|           99|      0|34.7|                   0.575| 62|      1|
|          4|    145|           82|           18|      0|32.5|                   0.235| 70|      1|
|          6|    190|           92|            0|      0|35.5|                   0.278| 66|      1|


Puede utilizar operadores como **&** y el signo **|** para aplicar múltiples condiciones de filtrado. 

Filtremos las personas enfermas y con 10 o más embarazos utilizando **&**.

In [15]:
# Filtering with multiple conditions
df.filter((df['Outcome']==1) & (df['Pregnancies'] >=9)).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|         10|    168|           74|            0|      0|38.0|                   0.537| 34|      1|
|          9|    119|           80|           35|      0|29.0|                   0.263| 29|      1|
|         11|    143|           94|           33|    146|36.6|                   0.254| 51|      1|
|         10|    125|           70|           26|    115|31.1|                   0.205| 41|      1|
|          9|    102|           76|           37|      0|32.9|                   0.665| 46|      1|
|          9|    171|          110|           24|    240|45.4|                   0.721| 54|      1|
|         13|    126|           90|            0|      0|43.4|                   0.583| 42|      1|


Para obtener un recuento del número de registros después del filtrado, puede utilizar el método **count**.

In [16]:
# Counting after filtering
df.filter(df['age']>40).count()

194

Puedes filtrar datos el método **where** como el método **filter**. Déjame mostrarte.

In [17]:
# Filtering with the where method
df.where((df['Outcome']==1) & (df['Pregnancies'] >=9)).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|         10|    168|           74|            0|      0|38.0|                   0.537| 34|      1|
|          9|    119|           80|           35|      0|29.0|                   0.263| 29|      1|
|         11|    143|           94|           33|    146|36.6|                   0.254| 51|      1|
|         10|    125|           70|           26|    115|31.1|                   0.205| 41|      1|
|          9|    102|           76|           37|      0|32.9|                   0.665| 46|      1|
|          9|    171|          110|           24|    240|45.4|                   0.721| 54|      1|
|         13|    126|           90|            0|      0|43.4|                   0.583| 42|      1|


In [20]:
# Filtering the where method
df.where(df['age']>40).count()

194

6. Añadir una nueva columna

Puedes utilizar el método **withColumn** para añadir una nueva columna. Vamos a crear una nueva columna utilizando la columna de edad. Para ello, voy a añadir los valores de edad a diez valor.


In [21]:
# Creating a new column
df.withColumn('New_Age',df['age']+10).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|New_Age|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|     60|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|     41|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|     42|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     31|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|     43|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|     40|
|          3|     78|       

7. Agrupamiento de los datos

Cuando trabajamos con grandes cantidades de datos, a menudo utilizamos el método **groupBy** para resumir los datos. Después de agrupar los datos, puede aplicar una función de agregación en cada uno de ellos. Veamos el número de suma de cada valor categórico de la columna resultado.

In [22]:
# Grouping data
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



También puede utilizar los métodos **distinct** y **count** para encontrar valores distintos en una columna. 

In [23]:
# Finding distinct values
df.select('Pregnancies').distinct().count()

17

Puede utilizar otras funciones de agregación como la suma, la media o el mínimo. Busquemos la media de la edad después de agrupar la columna de resultados. Observe que se utiliza el método alias para dar nombre a la nueva columna.

In [24]:
# Applying the aggregate functions
df.groupBy('Outcome').agg(F.mean("age").alias("age_mean")).show(10)

+-------+-----------------+
|Outcome|         age_mean|
+-------+-----------------+
|      1|37.06716417910448|
|      0|            31.19|
+-------+-----------------+



8. Implementación de Funciones

In [25]:
# Creating a function
def diabete(case):
    if case == 1 :
        return "diabete"
    else:
        return 'no diabete'

Ahora vamos a declarar el UDF y su tipo de retorno (StringType en este ejemplo). Después de eso, se utilizará **withColumn** para crear una nueva columna y luego pasar la columna Dataframe relevante (Outcome):

In [None]:
diabete_udf = F.udf(diabete, StringType())
df.withColumn("diabete_case", diabete_udf(col("Outcome"))).show(10)

9. Eliminación de datos

Para borrar una columna o múltiples columnas, puedes usar el método **drop** en PySpark. 

In [27]:
# Deleting a column
df.drop('Insulin').show(10)

+-----------+-------+-------------+-------------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+----+------------------------+---+-------+
|          6|    148|           72|           35|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|31.0|                   0.248| 26|      1|
|         10|    115|            0|            0|35.3|                   0.134| 

Para eliminar los registros duplicados del Dataframe, puede utilizar el método **dropDuplicates**.

In [28]:
# Deleting the duplicate records
print("The number of records: ", df.count())
df=df.dropDuplicates()
print("The number of records after removing the duplicate : ", df.count())

The number of records:  768
The number of records after removing the duplicate :  768


10. Guardar datos

Después de manipular los datos, a menudo querrá exportar los resultados. Puede escribir el Dataframe limpio en una ubicación deseada en el formato requerido con el método **write**.

In [None]:
# Exporting data
df.write.csv("./data/my_dataset.csv",header=True, mode="overwrite")

Como puede ver, esta carpeta incluye muchas particiones. Para reducir el número de particiones, puede utilizar el método coalesce con el número deseado de particiones.

Usar coalesce(1) es una buena práctica cuando deseas guardar tu DataFrame en un solo archivo en lugar de múltiples archivos, que es el comportamiento predeterminado en PySpark.

In [None]:
# Exporting data
df.coalesce(1).write.csv("./data/my_single_partition.csv")