# 0. SparkSession

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *


spark = SparkSession.builder \
            .appName("Data Processing") \
            .getOrCreate()

sc = SparkContext.getOrCreate()

spark

# 1. DataFrame
Un DataFrame en el contexto de Apache Spark es una estructura de datos distribuida que organiza los datos en forma de columnas, similar a una tabla en una base de datos relacional. Sin embargo, los DataFrames en Spark presentan algunas características distintivas que los hacen especialmente adecuados para el procesamiento de grandes volúmenes de datos en clústeres:

* **Distribución y paralelismo:** Los DataFrames en Spark se dividen en particiones, lo que permite que los datos se distribuyan de manera eficiente en un clúster de máquinas. Cada partición se procesa en paralelo en diferentes nodos del clúster, lo que mejora significativamente el rendimiento y la escalabilidad del procesamiento de datos.

* **Optimización de consultas:** Spark utiliza su motor de optimización Catalyst para mejorar el rendimiento de las operaciones en DataFrames. Catalyst optimiza el plan de ejecución de las consultas, reorganizando y reescribiendo las operaciones para minimizar la cantidad de datos que se transfieren a través de la red y maximizar la eficiencia de la computación distribuida.

* **Tipado de datos:** A diferencia de los RDDs (Resilient Distributed Datasets), los DataFrames en Spark tienen un esquema de datos definido. Esto significa que cada columna tiene un tipo de dato asociado, lo que facilita la detección temprana de errores y permite que Spark realice optimizaciones de rendimiento específicas basadas en el tipo de datos.

* **Lenguajes de programación compatibles:** Los DataFrames en Spark están disponibles en varios lenguajes de programación, como Scala, Java, Python y R. Esto facilita la adopción y el uso de Spark por parte de una amplia variedad de desarrolladores.

* **Integración con fuentes de datos diversas:** Spark ofrece conectores para una amplia gama de fuentes de datos, incluyendo sistemas de almacenamiento como HDFS, bases de datos relacionales, fuentes de datos en tiempo real como Kafka y sistemas de almacenamiento en la nube como AWS S3 y Azure Blob Storage. Esto permite que los DataFrames en Spark sean una herramienta versátil para el procesamiento de datos desde diversas fuentes.

* **API rica y funcionalidades avanzadas:** Los DataFrames en Spark proporcionan una API rica con una amplia gama de operaciones de transformación y acción que permiten realizar tareas de procesamiento de datos complejas, como filtrado, agrupación, ordenación y agregación, de manera eficiente.

Esto convierte los DataFrame de Spark en una herramienta esencial para el análisis y procesamiento de datos a gran escala en clústeres de Spark.

## 1.1 Crear un DataFrame
Existen dos formas de crear un DataFrame:  a través de un RDD (Resilient Distributed Dataset) existente o mediante la lectura de datos desde un archivo externo, como un archivo CSV, JSON, Parquet u otros formatos compatibles.
### 1.1.1. RDD
Un RDD es una estructura de datos fundamental en Spark que representa una colección distribuida de elementos. Puedes crear un DataFrame a partir de un RDD existente utilizando la función createDataFrame() en Spark. Esto es útil cuando ya tienes datos en forma de RDD y deseas aprovechar las ventajas adicionales que ofrece un DataFrame, como la optimización de consultas y el esquema de datos.

En el siguiente código, crearemos un conjunto de datos llamado 'datos' que consiste en una lista de tuplas. Cada tupla representa información sobre un empleado, incluyendo su ID, nombre, edad, género, ocupación y salario. Luego de esto, emplearemos Spark para paralelizar este conjunto de datos en un RDD llamado 'empleadosRDD'. Finalmente, se ejecuta la operación 'collect()' en 'empleadosRDD' para recuperar todos los elementos del RDD y mostrarlos como una lista en la salida. Esto permitirá ver los datos de los empleados.

In [4]:
datos=[(0, "Miguel", 21, "Masculino", "Chef", 6000000),\
       (1,"Juan",33,"Masculino","Ingeniero",4500000),\
       (2,"Ana",38,"Femenino","Arquitecta",6200000),\
       (3,"Carmen",52,"Femenino","Abogada",7500000)]
empleadosRDD=sc.parallelize(datos)

print('\nempleadosRDD type:', type(empleadosRDD))

empleadosRDD.collect()


empleadosRDD type: <class 'pyspark.rdd.RDD'>


[(0, 'Miguel', 21, 'Masculino', 'Chef', 6000000),
 (1, 'Juan', 33, 'Masculino', 'Ingeniero', 4500000),
 (2, 'Ana', 38, 'Femenino', 'Arquitecta', 6200000),
 (3, 'Carmen', 52, 'Femenino', 'Abogada', 7500000)]

Ahora crearemos un DataFrame de Spark llamado 'empleadosDF' a partir del RDD empleando diferentes métodos como ejemplo. La función createDataFrame() de Spark toma un RDD como entrada y crea un DataFrame con una estructura de columnas basada en los datos del RDD. Al igual que en el caso anterior, el collect() en 'empleadosDF' nos permitirá recuperar todos los datos del DataFrame y mostrarlos en la salida.

**Ejemplo 1**

In [5]:
empleadosDF=spark.createDataFrame(empleadosRDD)

print('\nempleadosDF type:', type(empleadosDF))

empleadosDF.collect()


empleadosDF type: <class 'pyspark.sql.dataframe.DataFrame'>


[Row(_1=0, _2='Miguel', _3=21, _4='Masculino', _5='Chef', _6=6000000),
 Row(_1=1, _2='Juan', _3=33, _4='Masculino', _5='Ingeniero', _6=4500000),
 Row(_1=2, _2='Ana', _3=38, _4='Femenino', _5='Arquitecta', _6=6200000),
 Row(_1=3, _2='Carmen', _3=52, _4='Femenino', _5='Abogada', _6=7500000)]

---
La salida que observamos en la anterior celda no es la más amigable para la vista, sin embargo, los DataFrame tienen algunos métodos que facilitan la operación y visualización de los datos como mostraremos a continuación.

In [6]:
empleadosDF.show()

+---+------+---+---------+----------+-------+
| _1|    _2| _3|       _4|        _5|     _6|
+---+------+---+---------+----------+-------+
|  0|Miguel| 21|Masculino|      Chef|6000000|
|  1|  Juan| 33|Masculino| Ingeniero|4500000|
|  2|   Ana| 38| Femenino|Arquitecta|6200000|
|  3|Carmen| 52| Femenino|   Abogada|7500000|
+---+------+---+---------+----------+-------+



---
**Ejemplo 2**

Para comprender mejor los datos podemos indicar el nombre de las columnas al momento de crear el DataFrame.

In [8]:
empleadosDF=spark.createDataFrame(empleadosRDD, schema=["Id","Nombre","Edad","Sexo","Profesion","Salario"])

display(empleadosDF.collect())

empleadosDF.show()

[Row(Id=0, Nombre='Miguel', Edad=21, Sexo='Masculino', Profesion='Chef', Salario=6000000),
 Row(Id=1, Nombre='Juan', Edad=33, Sexo='Masculino', Profesion='Ingeniero', Salario=4500000),
 Row(Id=2, Nombre='Ana', Edad=38, Sexo='Femenino', Profesion='Arquitecta', Salario=6200000),
 Row(Id=3, Nombre='Carmen', Edad=52, Sexo='Femenino', Profesion='Abogada', Salario=7500000)]

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  3|Carmen|  52| Femenino|   Abogada|7500000|
+---+------+----+---------+----------+-------+



Veamos qué tipo de dato se adoptó para cada variable

In [10]:
empleadosDF.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Edad: long (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Profesion: string (nullable = true)
 |-- Salario: long (nullable = true)



---
**Ejemplo 3**

Como obervamos en el esquema anterior "Id", "Edad" y "Salario" los estamos usando con el mismo tipo de dato (long), pero podríamos optimizar el entorno si modificamos este tipo para las variables. Para esto, podemos definir explícitamente el esquema de datos del DataFrame utilizando la estructura 'StructType' y 'StructField'. Esto se hace para controlar el tipo de datos de cada columna en el DataFrame.

La estructura 'StructType' se utiliza para definir el esquema del DataFrame, especificando las columnas y sus tipos de datos correspondientes. En este caso, se definieron las seis columnas ("Id", "Nombre", "Edad", "Sexo", "Profesion" y "Salario") con sus respectivos tipos de datos.

Ver más sobre tipo de datos en: [Supported Data Types](https://spark.apache.org/docs/latest/sql-ref-datatypes.html)

In [8]:
empleadosDF=spark.createDataFrame(empleadosRDD, \
          StructType([ \
                      StructField("Id", ByteType(),False), \
                      StructField("Nombre", StringType(),False), \
                      StructField("Edad", ByteType(),False), \
                      StructField("Sexo", StringType(),False), \
                      StructField("Profesion", StringType(),False), \
                      StructField("Salario", IntegerType(),False)]))

In [9]:
empleadosDF.printSchema()
empleadosDF.show()

root
 |-- Id: byte (nullable = false)
 |-- Nombre: string (nullable = false)
 |-- Edad: byte (nullable = false)
 |-- Sexo: string (nullable = false)
 |-- Profesion: string (nullable = false)
 |-- Salario: integer (nullable = false)

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  3|Carmen|  52| Femenino|   Abogada|7500000|
+---+------+----+---------+----------+-------+



Comparando con el esquema previamente mostrado (**Ejemplo 2**), se observa que la diferencia principal es que aquí se ha definido explícitamente el tipo de datos de las columnas "Id" y "Edad" como 'ByteType', y "Salario" como 'IntegerType', en lugar de 'long'. Esto permite un mayor control sobre el esquema y potencialmente ahorra espacio de almacenamiento si los valores de estas columnas son de un tamaño menor a los que se manejan en 'long'.

### 1.1.2. Archivo Externo

**Cargar archivo csv**

Ahora emplearemos la función spark.read.csv() para cargar datos desde un archivo CSV y crear el DataFrame.

In [10]:
empleadosDF = spark.read.csv("data/empleados.csv")

print('\nempleadosDF type:', type(empleadosDF))

empleadosDF


empleadosDF type: <class 'pyspark.sql.dataframe.DataFrame'>


DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string]

In [11]:
empleadosDF.show(5)

+---+------+----+---------+----------+-------+
|_c0|   _c1| _c2|      _c3|       _c4|    _c5|
+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



Como se observa, el DataFrame está creado correctamente pero no le hemos indicado que la primera línea es el header.

In [12]:
empleadosDF = spark.read.option("header",True).csv("data/empleados.csv")

empleadosDF.show(5)

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



In [13]:
empleadosDF.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Edad: string (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Profesion: string (nullable = true)
 |-- Salario: string (nullable = true)



Al igual que el ejemplo presentado con anterioridad, podemos ver y asignar el formato a cada variable al momento de crear el DataFrame.

In [14]:
schema = StructType() \
      .add("Id",ByteType(),True) \
      .add("Nombre",StringType(),True) \
      .add("Edad",ByteType(),True) \
      .add("Sexo",StringType(),True) \
      .add("Profesion",StringType(),True) \
      .add("Salario",IntegerType(),True)

empleadosDF = spark.read.format("csv") \
      .option("header", True) \
      .option("delimiter", ",") \
      .schema(schema) \
      .load("data/empleados.csv")

empleadosDF.printSchema()
empleadosDF.show()

root
 |-- Id: byte (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Edad: byte (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Profesion: string (nullable = true)
 |-- Salario: integer (nullable = true)

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



**Cargar archivo JSON**

La función spark.read.json() nos permite leer un archivo JSON en un DataFrame de Spark. Observaremos el tipo y el esquema del JSON cargado y asignaremos el formato a las variables como en los casos anteriores.

In [15]:
empleadosDF = spark.read.json("data/empleados.json")

print('\nempleadosDF type:', type(empleadosDF), '\n')

empleadosDF.printSchema()
empleadosDF.show()


empleadosDF type: <class 'pyspark.sql.dataframe.DataFrame'> 

root
 |-- Edad: long (nullable = true)
 |-- Id: long (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Profesion: string (nullable = true)
 |-- Salario: long (nullable = true)
 |-- Sexo: string (nullable = true)

+----+---+------+----------+-------+---------+
|Edad| Id|Nombre| Profesion|Salario|     Sexo|
+----+---+------+----------+-------+---------+
|  33|  1|  Juan| Ingeniero|4500000|Masculino|
|  38|  2|   Ana|Arquitecta|6200000| Femenino|
+----+---+------+----------+-------+---------+



In [16]:
empleadosDF = spark.read.json("data/empleados.json", \
            StructType([ \
                      StructField("Id", IntegerType(),False), \
                      StructField("Nombre", StringType(),False), \
                      StructField("Edad", ByteType(),False), \
                      StructField("Sexo", StringType(),False), \
                      StructField("Profesion", StringType(),False), \
                      StructField("Salario", IntegerType(),False)]))

print('\nempleadosDF type:', type(empleadosDF), '\n')

empleadosDF.printSchema()

empleadosDF.show()


empleadosDF type: <class 'pyspark.sql.dataframe.DataFrame'> 

root
 |-- Id: integer (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- Edad: byte (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Profesion: string (nullable = true)
 |-- Salario: integer (nullable = true)

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



Mas información sobre carga de datos en: [Data Sources Spark](https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html)

## 1.2 Acceso a los datos
Una gran ventaja de trabajar con DataFrames es la capacidad de acceder a cada variable o columna de manera intuitiva y conveniente. En general, los DataFrames están diseñados para imitar la funcionalidad de las tablas en una base de datos relacional y esto significa que puedes acceder a las columnas ¡de la misma manera que accederías a los campos de una tabla de base de datos: a través del nombre del DataFrame, seguido del carácter punto (.) y el nombre de la variable.

In [17]:
empleadosDF.Salario

Column<'Salario'>

## 1.3. Operaciones
Los DataFrames en Spark permiten realizar una amplia variedad de operaciones, que se pueden dividir en dos categorías principales: propiedades y métodos. 
### 1.3.1. Propiedades
Las propiedades son atributos o características de un DataFrame que se pueden consultar para obtener información sobre su estructura y contenido. Algunas propiedades comunes incluyen:

##### columns

Esta propiedad devuelve una lista de los nombres de las columnas en el DataFrame. Te permite conocer las columnas disponibles.

In [18]:
empleadosDF.columns

['Id', 'Nombre', 'Edad', 'Sexo', 'Profesion', 'Salario']

##### dtypes
Muestra los tipos de datos de cada columna en el DataFrame.

In [19]:
empleadosDF.dtypes

[('Id', 'int'),
 ('Nombre', 'string'),
 ('Edad', 'tinyint'),
 ('Sexo', 'string'),
 ('Profesion', 'string'),
 ('Salario', 'int')]

##### rdd
Convierte un DataFrame a un RDD

In [20]:
empRDD = empleadosDF.rdd
type(empRDD)

pyspark.rdd.RDD


### 1.3.2. Métodos
Los métodos son funciones que se aplican al DataFrame para realizar operaciones de transformación, filtrado, agregación y otras tareas de procesamiento de datos. Observemos cómo se emplean métodos comunes en el siguiente DataFrame

In [21]:
empleadosDF.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



##### agg(funciones_agregacion)

Este método realiza una agregación sobre el data frame, es decir que a partir del data frame original, devuelve un nuevo data frame que contiene el resultado de las funciones de agregación realizadas sobre el conjunto de datos.

Estas funciones de agregación son especificadas por el usuario y hacen referencia a operaciones básicas sobre los datos. Estas son algunas de las funciones de agregación disponibles:

* first: entrega el primer elemento
* last: entrega el último elemento
* min: entrega el valor mínimo de una variable específica
* max: entrega el valor máximo de una variable específica
* sum: entrega la suma de los valores de una variable específica
* avg: entrega el promedio de los valores de una variable específica

Usemos el método agg para conocer el promedio de los salarios de los empleados

In [22]:
salario_mean = empleadosDF.agg({"Salario": "avg"}).take(1)

print(type(salario_mean))

salario_mean

<class 'list'>


[Row(avg(Salario)=5350000.0)]

In [23]:
salario_mean=empleadosDF.agg({"Salario": "avg"})

print('Type salario_mean: ', type(salario_mean))

salario_mean.show()

Type salario_mean:  <class 'pyspark.sql.dataframe.DataFrame'>
+------------+
|avg(Salario)|
+------------+
|   5350000.0|
+------------+



También podemos utilizar las funciones disponibles en la clase functions de la librería pyspark.sql. Por ejemplo, empleemos algunas de las funciones para hallar el máximo, el mínimo y el promedio de "Edad".

In [24]:
from pyspark.sql import functions

edad_stats = empleadosDF.agg( \
    functions.min(empleadosDF.Edad), \
    functions.max(empleadosDF.Edad), \
    functions.avg(empleadosDF.Edad))

print('Type edad_stats: ', type(edad_stats))

edad_stats.show()

Type edad_stats:  <class 'pyspark.sql.dataframe.DataFrame'>
+---------+---------+---------+
|min(Edad)|max(Edad)|avg(Edad)|
+---------+---------+---------+
|       33|       38|     35.5|
+---------+---------+---------+



En este caso utilizamos tres funciones de agregación, el resultado es un nuevo DataFrame de una sola fila y tres columnas (una por cada función de agregación)
##### corr(var1,var2)
Este método se utiliza para calcular la correlación entre un par de variables del DataFrame

In [25]:
empleadosDF.corr("Edad", "Salario")

1.0

##### count()
Se utiliza para conocer el número de registros (filas) en el DataFrame

In [26]:
empleadosDF.count()

2

##### union(other)
Este método crea un nuevo DataFrame a partir de la unión de dos DataFrames. Para ejemplificar el método, actualicemos nuestro DataFrame a uno nuevo (emp_act) con el registro de otros empleados.

In [27]:
nuevos_emp = spark.createDataFrame([ \
    (0, "Miguel", 21, "Masculino", "Chef", 6000000),\
      (4,"Ana",40,"Femenino","Docente",7600000),\
      (5,"Luis",62,"Masculino","Panadero",None),\
    (6,"Daniel",30,"Masculino","Empresario",9000000),\
    (2,"Ana",38,"Femenino","Arquitecta",6200000)])

nuevos_emp.show()

+---+------+---+---------+----------+-------+
| _1|    _2| _3|       _4|        _5|     _6|
+---+------+---+---------+----------+-------+
|  0|Miguel| 21|Masculino|      Chef|6000000|
|  4|   Ana| 40| Femenino|   Docente|7600000|
|  5|  Luis| 62|Masculino|  Panadero|   NULL|
|  6|Daniel| 30|Masculino|Empresario|9000000|
|  2|   Ana| 38| Femenino|Arquitecta|6200000|
+---+------+---+---------+----------+-------+



In [28]:
emp_act = empleadosDF.union(nuevos_emp)

emp_act.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



##### distinct()
Crea un nuevo DataFrame a partir de los registros (filas) que sean diferentes en el DataFrame inicial

In [29]:
emp_dist = emp_act.distinct()
emp_dist.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
+---+------+----+---------+----------+-------+



##### drop(var)
Toma el DataFrame inicial, elimina la variable (columna) especificada y entrega un nuevo DataFrame con el resultado

In [30]:
emp_drop = emp_dist.drop("Id")
emp_drop.show()

+------+----+---------+----------+-------+
|Nombre|Edad|     Sexo| Profesion|Salario|
+------+----+---------+----------+-------+
|  Juan|  33|Masculino| Ingeniero|4500000|
|   Ana|  38| Femenino|Arquitecta|6200000|
|Miguel|  21|Masculino|      Chef|6000000|
|   Ana|  40| Femenino|   Docente|7600000|
|Daniel|  30|Masculino|Empresario|9000000|
|  Luis|  62|Masculino|  Panadero|   NULL|
+------+----+---------+----------+-------+



##### dropDuplicates(var)
Este método devuelve un nuevo DataFrame en el que ha eliminado los registros (filas) duplicadas. Esto ya los hacía el método distinct(), la diferencia es que en dropDuplicates() se puede especificar las columnas a evaluar para considerar que dos registros son duplicados

In [31]:
emp_drop_id = emp_act.dropDuplicates(["Id"])
emp_drop_id.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
|  6|Daniel|  30|Masculino|Empresario|9000000|
+---+------+----+---------+----------+-------+



##### fillna(valor,col[ ])
Este método devuelve un nuevo DataFrame luego de sustituir los valores nulos detectados por el valor especificado. Adicionalmente, es posible indicarlo cuales son las columnas sobre las que va a operar

In [32]:
emp_fna = emp_act.fillna(1000000)
emp_fna.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  5|  Luis|  62|Masculino|  Panadero|1000000|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



##### dropna(_cuantas, _umbral, _columnas)
Este método devuelve un nuevo DataFrame resultado de eliminar los registros (filas) que contienen nulos en el DataFrame inicial.

In [33]:
emp_dna = emp_act.dropna("any")
emp_dna.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



##### filter(condicion)
Funciona similar al filter de RDD. Devuelve un nuevo DataFrame que contiene los registros para los cuales la condición arroja como resultado True.

In [34]:
empf = emp_act.filter(emp_act.Salario>5000000)
empf.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+



##### GroupBy(var)
Genera un nuevo DataFrame donde agrupa los registros que coincidan en la variable especificada. Luego de agrupar es posible realizar agregaciones para indicar que se desea realizar con los valores de los registros agrupados


In [35]:
emp_avg = emp_act.groupBy("Nombre").avg()
emp_avg_sal = emp_act.groupBy("Nombre").agg({"Salario":"avg"})

print("DataFrame Inicial\n")
emp_act.show()

print("DataFrame promediando todas las variables\n")
emp_avg.show()

print("DataFrame promediando una variable específica\n")
emp_avg_sal.show()

DataFrame Inicial

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+

DataFrame promediando todas las variables

+------+------------------+------------------+-----------------+
|Nombre|           avg(Id)|         avg(Edad)|     avg(Salario)|
+------+------------------+------------------+-----------------+
|   Ana|2.6666666666666665|38.666666666666664|6666666.666666667|
|  Juan|               1.0|              33.0|        4500000.0|
|Miguel|               0.0|              21.0|        6000000.0|
|  Luis|               5.0|   

##### orderBy(var)
Entrega un DataFrame ordenado de acuerdo a la variable indicada

In [36]:
emp_or_sal = emp_act.orderBy(emp_act.Salario.desc())

print("DataFrame Inicial\n")
emp_act.show()
print("DataFrame ordenado por salario\n")
emp_or_sal.show()

DataFrame Inicial

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
+---+------+----+---------+----------+-------+

DataFrame ordenado por salario

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  6|Daniel|  30|Masculino|Empresario|9000000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  5|  L

##### select(lista_var)
Genera un nuevo DataFrame que incluye únicamente las variables indicadas, es decir, se selecciona un grupo de columnas.

In [37]:
emp_sel = emp_act.select("Nombre","Sexo","Salario")

emp_sel.show()

+------+---------+-------+
|Nombre|     Sexo|Salario|
+------+---------+-------+
|  Juan|Masculino|4500000|
|   Ana| Femenino|6200000|
|Miguel|Masculino|6000000|
|   Ana| Femenino|7600000|
|  Luis|Masculino|   NULL|
|Daniel|Masculino|9000000|
|   Ana| Femenino|6200000|
+------+---------+-------+



##### selectExpr(expresiones)
Funciona similar al método select(), pero permite crear nuevas variables (columnas) a partir de la evaluación de las expresiones indicadas

In [38]:
emps = emp_act.selectExpr("Nombre","Sexo","Salario", "Salario > 6000000")

emps.show()

+------+---------+-------+-------------------+
|Nombre|     Sexo|Salario|(Salario > 6000000)|
+------+---------+-------+-------------------+
|  Juan|Masculino|4500000|              false|
|   Ana| Femenino|6200000|               true|
|Miguel|Masculino|6000000|              false|
|   Ana| Femenino|7600000|               true|
|  Luis|Masculino|   NULL|               NULL|
|Daniel|Masculino|9000000|               true|
|   Ana| Femenino|6200000|               true|
+------+---------+-------+-------------------+



##### withcolumn(nombre,expresion)
Permite adicionar columnas con el resultado de la evaluación de una expresión y se puede asignar el nombre de las columnas adicionadas

In [39]:
emp_wc = emp_act.withColumn("Mayor a 50 años", emp_act.Edad > 50)

emp_wc.show()

+---+------+----+---------+----------+-------+---------------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|Mayor a 50 años|
+---+------+----+---------+----------+-------+---------------+
|  1|  Juan|  33|Masculino| Ingeniero|4500000|          false|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|          false|
|  0|Miguel|  21|Masculino|      Chef|6000000|          false|
|  4|   Ana|  40| Femenino|   Docente|7600000|          false|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|           true|
|  6|Daniel|  30|Masculino|Empresario|9000000|          false|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|          false|
+---+------+----+---------+----------+-------+---------------+



# 2. Consultas SQL
Spark proporciona una funcionalidad adicional que permite realizar operaciones en DataFrames utilizando SQL, similar a cómo se trabaja con bases de datos relacionales. Esto es posible gracias a la clase SparkContext, que permite ejecutar consultas SQL en DataFrames. 

En general, el lenguaje SQL nos permite operar con DataFrames en Apache Spark mediante el SparkContext y los métodos descritos anteriormente permiten operar los datos para crear registros, leerlos, actualizarlos e incluso eliminarlos, las operaciones básicas de un CRUD. Como observamos con anterioridad, estos métodos son útiles para manipular y transformar datos de manera eficiente.

Para utilizar SQL con DataFrames, primero debes registrar un DataFrame como una "tabla temporal" en el motor SQL de Spark. Esto se hace utilizando el método createOrReplaceTempView() de un DataFrame, que permite darle un nombre a la tabla temporal. Una vez registrada, la tabla se puede utilizar en consultas SQL como si fuera una tabla en una base de datos.

In [40]:
emp_drop_id.createOrReplaceTempView("Empleados")

De esta manera cada que requiere utilizar una consulta SQL lo realizará sobre la tabla Empleados

**Lectura de datos**

In [41]:
q1 = spark.sql("SELECT * FROM Empleados")
q1.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  2|   Ana|  38| Femenino|Arquitecta|6200000|
|  4|   Ana|  40| Femenino|   Docente|7600000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
|  6|Daniel|  30|Masculino|Empresario|9000000|
+---+------+----+---------+----------+-------+



Filtremos los registros de los empleados masculinos

In [42]:
q2 = spark.sql('SELECT * FROM Empleados WHERE Sexo = "Masculino"')
q2.show()

+---+------+----+---------+----------+-------+
| Id|Nombre|Edad|     Sexo| Profesion|Salario|
+---+------+----+---------+----------+-------+
|  0|Miguel|  21|Masculino|      Chef|6000000|
|  1|  Juan|  33|Masculino| Ingeniero|4500000|
|  5|  Luis|  62|Masculino|  Panadero|   NULL|
|  6|Daniel|  30|Masculino|Empresario|9000000|
+---+------+----+---------+----------+-------+



Leamos solo las variables de interés

In [43]:
q3 = spark.sql("""
		SELECT Nombre, Edad, Sexo 
		FROM Empleados
	""")
q3.show()

+------+----+---------+
|Nombre|Edad|     Sexo|
+------+----+---------+
|Miguel|  21|Masculino|
|  Juan|  33|Masculino|
|   Ana|  38| Femenino|
|   Ana|  40| Femenino|
|  Luis|  62|Masculino|
|Daniel|  30|Masculino|
+------+----+---------+



**Funciones definidas por el usuario**

Además de las consultas básicas que proporciona SQL, el usuario puede definir sus propias funciones e incorporarlas dentro de una consulta SQL. Esto es de gran ayuda para personalizar las acciones que deseamos realizar sobre los datos. Primero debemos definir la función a utilizar:

In [44]:
def funcionEdad(edad):
	if edad < 50:
		return "Menor de 50"
	else:
		return "Mayor de 50"

Ahora debemos registrar la función

In [45]:
spark.udf.register("funcionEdad",funcionEdad)

<function __main__.funcionEdad(edad)>

Ahora si podemos empezar a usar nuestra función

In [46]:
q4 = spark.sql("""
		SELECT Nombre, Sexo, Salario, funcionEdad(Edad) AS Edad_Mayor_50
		FROM Empleados
		""")
q4.show()

+------+---------+-------+-------------+
|Nombre|     Sexo|Salario|Edad_Mayor_50|
+------+---------+-------+-------------+
|Miguel|Masculino|6000000|  Menor de 50|
|  Juan|Masculino|4500000|  Menor de 50|
|   Ana| Femenino|6200000|  Menor de 50|
|   Ana| Femenino|7600000|  Menor de 50|
|  Luis|Masculino|   NULL|  Mayor de 50|
|Daniel|Masculino|9000000|  Menor de 50|
+------+---------+-------+-------------+



# Recursos

Los siguientes enlaces corresponden a sitios en donde encontrará información muy útil para profundizar en el conocimiento y manejo de dataframes y SQL en Spark:


* [Pyspark By Examples](https://sparkbyexamples.com/pyspark-tutorial/)
* [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
* [Spark SQL, Built-in Functions](https://spark.apache.org/docs/latest/api/sql/index.html)
* [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html)