[![img/pythonista.png](img/pythonista.png)](https://www.pythonista.io)

# Introducción a *Dataframes*.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Intro a Dataframes").getOrCreate()
ct = spark.sparkContext

Los *dataframes* son un concepto compartido entre plataformas como *R*, *Pandas* y *Scala*. Son estructuras tabulares en las que todos los datos de una columna comparten el mismo tipado. Cada columna tiene un título y cada renglón tiene un índice. A la descripción de los tipos de datos de cada columna de un *dataframe* se le conoce como esquema (*schema*).

*PySPark* tiene la capacidad de poder manejar *dataframes* tanto de *Pandas* como de *Spark* e incluso cuenta con una [*API*](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) que optimiza la interacción entre ambos tipos de *dataframes*.

## Los *dataframes* de *Spark*.

Un *dataframe* de *Spark* es un objeto instanciado de la clase [```pyspark.sql.DataFrame```](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html).

### Creación de *dataframes*.

La función [```spark.createDataFrame()```](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.SparkSession.createDataFrame.html) permite crear *dataframes* a partir de objetos que se ingresan como argumentos.


```pyspark
df = spark.createDataFrame(data=<obj>, <títulos columnas>, schema=<esquema>)
```

Donde:

* ```<obj>``` es un objeto que represente una estructura tabular el cual puede ser:
    * Una colección de *Python*.
    * Un *dataframe* de *Pandas*.
    * Un *RDD* de *Spark*.
* ```<titulos columnas>``` es una lista de cadenas de caracteres que corresponden al título de cada columna.
* ```<esquema>``` es una estructura de tipos de datos de *Spark* que describe el tipado de cada columna.
    
Cabe hacer notar que los *dataframes* de *Spark* se construyen de forma perezosa, por lo que aún cuando sean definidos, estos no serán creados hasta que sean requeridos.
    
Por convención se utiliza el nombre ```df``` para designar un dataframe genérico.

### Los objetos ```Row``` y ```Column```.

Los *dataframes* de *Spark* están compuestos por 2 tipos de objetos.

* Los objetos de tipo ```pyspark.sql.Row``` que representan a cada renglón del *dataframe*.
* Los objetos de tipo ```pyspark.sql.Column``` que representan a cada columna del *dataframe*.

## Diferencias entre *RDD* y *Dataframes*.

Los *RDD* de *Apache Spark*. Son colecciones de datos que pueden ser crudos (incluyendo binarios), semiestructurados (*JSON*, *XML*, *YAML*) o estructurados. Al final de cuentas los *RDD* funcionan de forma similar a un *data lake*, mientras que los *dataframes* siempre tiene una estructura de tabla.

Los *RDD* son creados desde el contexto de *Spark*, mientras que los dataframes son creados usando la *API* de *SQL*.
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html

### El método ```df.show()```.

El método ```df.show()``` muestra los primeros ```n``` números de un *dataframe* de *Spark*.

```
df.show(<n>)
```

Donde:

* ```<n>``` es el número de renglones desplegados. El valor por defecto es ```20```.

**Ejemplos:**

* Las siguientes celdas crearán un *dataframe* de *Spark* a partir de un *dataframe* de *Pandas*.

In [None]:
import pandas as pd

* Se creará el *dataframe* de *Pandas* llamado ```pandas_df```.

In [None]:
pandas_df = pd.DataFrame({'Dirección':('Sur', 'Norte', 'Sur', 'Este'),
              'Rumbo':('Este', 'Noroeste', 'Norte', 'Norte'),
             'Pasajeros':(12, 24, 32, 5),
             'Documentado':(True, None, True, False) })

In [None]:
pandas_df

In [None]:
pandas_df.dtypes

* Se creará el *dataframe* de *Spark* a partir de ```pandas_df```.

In [None]:
df = spark.createDataFrame(pandas_df)

In [None]:
df.show()

In [None]:
df.show(2)

In [None]:
df.schema

* Las siguientes celdas creará un *dataframe* de *Spark* a partir dde un *RDD* que contiene una colección de objetos ```Row```.

In [None]:
from pyspark.sql import Row

* La siguiente celda creará un *RDD* de *Spark* que contiene una sucesión de objetos tipo ```Row```.
    * El objeto ```rdd``` tiene una estructura que puede ser convertida en una tabla.

In [None]:
rdd = ct.parallelize((Row('Sur', 'Este', '12', True),
                     Row('Norte', 'Noroeste', '24', None),
                     Row('Sur', 'Norte', '32', True),
                     Row('Este', 'Norte', '5', False)))

In [None]:
rdd_2 = ct.parallelize((Row('Sur', 'Este', '12', True),
                     1, 2, 3, 4,
                     Row('Sur', 'Norte', '32', True),
                     Row('Este', 'Norte', '5', False),
                      'uno', 'dos', b'121312412413523513462156725624567'))

* La siguiente celda creará un *dataset* a partir del *RDD* y además se ingresará como argumento el nombre de cada columna.

In [None]:
df = spark.createDataFrame(rdd, 
                ['Dirección', 'Rumbo', 'Pasajeros', 'Documentado'])

In [None]:
df.show()

* La siguiente celda creará un *dataframe* al que se le asignarán los nombres de las columnas automáticamente.

In [None]:
df_1 = spark.createDataFrame(rdd)
df_1.show()

### Selección de una columna de un *dataframe*.

Es posible acceder a una columna de un *dataframe* usando su nombre o usando su número de índice consecutivo iniciando desde ```0```. 

```
df.<Nombre Columna>
```

```
df[<n>]
```


In [None]:
df.Rumbo

In [None]:
df[1]

### El atributo ```df.schema```.

El atributo ```df.schema``` contiene la estructura del *schema* del *dataframe* como una instancia de la clase ```pyspark.sql.types.StructType``` que contiene una colección de instancias de tipo ```pyspark.sql.types.StructField```.

**Ejemplo:**

In [None]:
df.schema

### El método ```df.printSchema()```.

El método ```df.printSchema()``` despliega una cadena de caractéres describiendo el *schema* del *dataframe*.




In [None]:
df.printSchema()

## Tipado de *Spark*.

*Spark* cuenta con distintos tipos de datos que extienden a los tipos nativos de *Scala*, *Python* y *R*. El módulo ```pyspark.sql.types``` contiene a todas las clases correspondientes a dichos tipos.

El siguente enlace apunta a la referncia de tipos de datos de *Spark*.

https://spark.apache.org/docs/latest/sql-ref-datatypes.html

* La siguiente celda importará todos los tipos de datos de *PySpark* al entorno de esta *notebook*.

In [None]:
from pyspark.sql.types import *

## Definición de *schemas* .

Los *schemas* describen estructuras de tipos de datos en *Spark*. Estas estructuras no sólo describen tablas.

### El tipo ```pyspark.api.types.StructField```.

El tipo [```pyspark.api.types.StructField```](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.types.StructField.html) permite definir esquemas tanto para describir las columnas de un *dataframe* como para describir *schemas* complejos como los que se pueden encontrar en *YAML* o *JSON*.

```
StructField(<identificador>, <tipo>, <nulificable>)
```
Donde:

* ```<identificador>``` es el nombre del campo.
* ```<tipo>``` es el tipo de dato del campo.
* ```<nulificable>``` es un valor booleano que indica si el campo puede aceptar valores nulos.


Las estructuras complejas que pueden crearse en los *schemas* son contenidas dentro de un objeto de tipo [```pyspark.api.types.StructType```](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.types.StructType.html).

```
StructType(<campo 1>, <campo 2>, ... , <campo n>)
```

Donde:

* ```<campo i>``` es un objeto de tipo ```StructField```.

**Ejemplo:**

* La siguiente celda describe un *schema* para el *dataframe* ```df``` en el que la columna ```Pasajeros``` es un entero que va de ```-128```a ```127```.

In [None]:
schema = StructType([StructField('Dirección', StringType(), True), 
                     StructField('Rumbo', StringType(), True), 
                     StructField('Pasajeros', ByteType(), True), 
                     StructField('Documentado', BooleanType(), True)])

* La siguiente celda creará una *dataframe* a partir de ```pandas_df``` con el *schema* correspondiente a ```schema```.

In [None]:
pandas_df.dtypes

In [None]:
df = spark.createDataFrame(data=pandas_df, schema=schema)

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df.schema

## Lectura y escritura de archivos para *dataframes*.

https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

### Lectura de datos hacia un *dataframe*.

En el caso de *PySpark*  es posible leer y crear dataframes a partir de distintos formatos de archivo que describan estructuras tabulares. La propiedad [```spark.read```](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html) contiene una familia de métodos que pueden importar y convertir en *dataframes* diversos tipos de documento y de diversas fuentes.

```
df = spark.read.<método>(<ruta>)
```

### Escritura de datos desde un *dataframe*.

En el caso de *PySpark*  es posible leer y crear dataframes a partir de distintos formatos de archivo que describan estructuras tabulares. Las siguientes propiedades de los *dataframes* permiten exportar dataframes a distintos formatos y destinos.

* [```df.write```](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.write.html) perimte crear y escribir una serie de documentos en el fromato indicado.
* [```df.writeTo```](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeTo.html) permite añadir (*append*) datos a un documento o estructura existente.
* [```df.writeStream```](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.writeStream.html) permite enviar los datos del *dataframe* a un flujo.

```
df.write.<método>(<ruta>)
```
La propiedad ```df.write``` es una implementación del objeto [```DataFrameWriter```](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.html#pyspark.sql.DataFrameWriter).


### Fuentes de datos compatibles para los *dataframes*.

https://spark.apache.org/docs/latest/sql-data-sources.html

 ### El método ```spark.read.parquet()```.
 
 [*Apache Parquet*](https://parquet.apache.org/) es un formato binario capaz de almacenar estructuras de datos y conservar sus *schemas*.
 
 El siguiente enlace apunta a la documentación de la lectura/escritura de documentos en formato *parquet* para *Spark*.
 
 https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
 
 

**Ejemplo:**

* La siguiente celda importará los datos del archivo ```data/data_covid.parquet```.

In [None]:
df = spark.read.parquet('data/data_covid.parquet')

In [None]:
df

In [None]:
df.show()

In [None]:
df.show(1)

### El método ```df.toPandas()```.

El método ```df.toPandas()``` permite crear un *dataframe* de *Pandas* a partir de un *dataframe* de *Apache Spark*.

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.toPandas.html

* A continuación se usará el método ```df.toPandas()``` para mostrar el *dataframe* ```df``` de mejor manera.

In [None]:
df.toPandas()

 ### El método ```spark.read.csv()```.
 
 Los documentos *CSV* *(comma separated values)* son documentos de texto en los que se ingresan datos usando un caracter como separador. Por lo general, este separador es el caracter ```,```. Los documento CSV no conservan el tipo de dato y todos los elmentos son cadenas de caracteres. *Spark* puede leer este tipo de formatos, pero es necesario indicarle el *schema* que utilizará.
 
 El siguiente enlace apunta a la documentación de la lectura/escritura de documentos en formato *CSV* para *Spark*.
 
 https://spark.apache.org/docs/latest/sql-data-sources-csv.html

**Ejemplos:**

* La siguiente celda leerá los datos dentro del archivo [```data/data_covid.csv```](data/data_covid.csv). En este caso sólo se indica la opción de que existe un encabezado en el documento.

In [None]:
df = spark.read.option("header","true").csv('data/data_covid.csv')
df.toPandas()

In [None]:
df.printSchema()

* La siguiente celda leerá los datos dentro del archivo [```data/data_covid.csv```](data/data_covid.csv). En este caso se indican las opciones de que existe un encabezado en el documento y que se inferirá el *schema* de las columnnas.

In [None]:
df = spark.read.option("header","true").option("inferSchema", "true").csv('data/data_covid.csv')
df.toPandas()

In [None]:
df.printSchema()

* La siguiente celda definirá un *schema* para el *dataframe* resultante de extraer el archivo *CSV*.

In [None]:
schema = StructType([StructField('index', DateType(), True), StructField('AGUASCALIENTES', LongType(), True), StructField('BAJA CALIFORNIA', LongType(), True), StructField('BAJA CALIFORNIA SUR', LongType(), True), StructField('CAMPECHE', LongType(), True), StructField('CHIAPAS', LongType(), True), StructField('CHIHUAHUA', LongType(), True), StructField('DISTRITO FEDERAL', LongType(), True), StructField('COAHUILA', LongType(), True), StructField('COLIMA', LongType(), True), StructField('DURANGO', LongType(), True), StructField('GUANAJUATO', LongType(), True), StructField('GUERRERO', LongType(), True), StructField('HIDALGO', LongType(), True), StructField('JALISCO', LongType(), True), StructField('MEXICO', LongType(), True), StructField('MICHOACAN', LongType(), True), StructField('MORELOS', LongType(), True), StructField('NAYARIT', LongType(), True), StructField('NUEVO LEON', LongType(), True), StructField('OAXACA', LongType(), True), StructField('PUEBLA', LongType(), True), StructField('QUERETARO', LongType(), True), StructField('QUINTANA ROO', LongType(), True), StructField('SAN LUIS POTOSI', LongType(), True), StructField('SINALOA', LongType(), True), StructField('SONORA', LongType(), True), StructField('TABASCO', LongType(), True), StructField('TAMAULIPAS', LongType(), True), StructField('TLAXCALA', LongType(), True), StructField('VERACRUZ', LongType(), True), StructField('YUCATAN', LongType(), True), StructField('ZACATECAS', LongType(), True), StructField('Nacional', LongType(), True)])

In [None]:
df = spark.read.option("header","true").schema(schema).csv('data/data_covid.csv')
df.toPandas()

In [None]:
df.printSchema()

* Las siguientes celda escribirán los datos del *dataframe* en diversos formatos.

In [None]:
df.write.csv('datos')

In [None]:
df.write.parquet('datos_parquet')

In [None]:
df.write.json('datos_json')

In [None]:
rm -rf datos_json

In [None]:
df.write.option('encoding', 'UTF-8').json('datos_json')

In [None]:
df_nuevo = spark.read.option('encoding', 'UTF-8').json('datos_json')

In [None]:
df.toPandas()

## Ejemplo de funciones de los *dataframes*.

https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html

In [None]:
df.select('index','AGUASCALIENTES').where(df.AGUASCALIENTES >1000).show()

In [None]:
from datetime import date

In [None]:
df.select('index','AGUASCALIENTES').where(df.index == date(2020,12,20)).show()

In [None]:
df.select('index','AGUASCALIENTES').filter(df.index >= date(2020,12,1)).show()

In [None]:
df.select('index').count()

In [None]:
df.select('index', 'AGUASCALIENTES').sort('AGUASCALIENTES').toPandas()

In [None]:
df.select('index', 'AGUASCALIENTES').sort('AGUASCALIENTES', ascending=False).limit(12).toPandas()

In [None]:
df.select('index', 'AGUASCALIENTES', 'Nacional').sort('AGUASCALIENTES', ascending=False).limit(10).toPandas()

In [None]:
df.select('index', 'AGUASCALIENTES', 'Nacional').sort('Nacional', ascending=False).limit(10).toPandas()

In [None]:
dias_ags = df.select('index', 'AGUASCALIENTES').sort('AGUASCALIENTES', ascending=False).limit(10)
dias_nacional = df.select('index', 'Nacional').sort('Nacional', ascending=False).limit(10)

In [None]:
dias_ags.join(dias_nacional, 'index').show()

In [None]:
dias_ags.join(dias_nacional, 'index').count()

In [None]:
dias_ags.join(dias_nacional, 'index', how='full').show()

In [None]:
dias_ags.join(dias_nacional, 'index').select('index').show()

In [None]:
spark.stop()

<p style="text-align: center"><a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Licencia Creative Commons" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/80x15.png" /></a><br />Esta obra está bajo una <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Licencia Creative Commons Atribución 4.0 Internacional</a>.</p>
<p style="text-align: center">&copy; José Luis Chiquete Valdivieso. 2023.</p>