[![img/pythonista.png](img/pythonista.png)](https://www.pythonista.io)

# Introducción a *Spark SQL*.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Intro a Spark SQL").getOrCreate()
sc = spark.sparkContext

Prácticamente todas las operaciones de *Apache Spark* para datos estructurados pueden ser realizadas mediante consultas de *SQL*. De forma similar a las *API*s de *Java*, *Python*, *R* y *Scala*, *Apache Spark* puede calcular la estrategia más conveniente para ejecutar una consulta de *SQL*.

Las consultas de *Spark SQL* siempre regresan [*dataframes* y *datasets*](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes).

*Apache Spark* extiende las funcionalidades de [*Apache Hive*](https://spark.apache.org/docs/latest/sql-programming-guide.html) y es capaz de [leer tablas de *Hive*](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html).

La referencia de programación de *Spark SQL* puede ser consultada en la siguiente liga:

* https://spark.apache.org/docs/latest/sql-ref.html

## Tablas temporales.

Una tabla temporal es una estructura tabular que se crea en memoria a partir de un *dataframe* y que se crea mediante el método ```df.createOrRepalceTempView()```.

``` python
df.createOrRepalceTempView(<nombre>)
```

Donde:

* ```<nombre>``` es el nombre de la tabla temporal a la que se podrá hacer referencia en la consulta tal como se hace en una tabla de una base de datos.

**Ejemplo:**

In [2]:
df = spark.read.parquet("data/data_covid.parquet")

In [3]:
df.toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,AGUASCALIENTES,BAJA CALIFORNIA,BAJA CALIFORNIA SUR,CAMPECHE,CHIAPAS,CHIHUAHUA,DISTRITO FEDERAL,COAHUILA,COLIMA,DURANGO,...,SINALOA,SONORA,TABASCO,TAMAULIPAS,TLAXCALA,VERACRUZ,YUCATAN,ZACATECAS,Nacional,index
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,2020-02-26
1,0,0,0,0,0,0,2,0,0,0,...,0,0,0,0,0,0,0,0,4,2020-02-27
2,0,0,0,0,0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,2,2020-02-28
3,0,0,0,0,1,0,0,0,0,0,...,0,0,0,0,0,0,0,0,2,2020-02-29
4,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,2,2020-03-01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
790,25,23,24,1,1,8,163,4,9,8,...,6,12,4,5,4,79,38,3,583,2022-04-26
791,21,25,18,3,1,9,200,3,6,1,...,10,7,5,8,2,51,25,5,556,2022-04-27
792,33,17,12,4,3,9,134,2,2,2,...,11,4,10,8,0,34,24,1,456,2022-04-28
793,19,15,6,1,1,2,63,2,2,0,...,1,2,3,3,0,39,27,3,255,2022-04-29


In [None]:
df.createOrReplaceTempView("COVID_NACIONAL")

## Ejecución de consultas *SQL*.

El método ```spark.sql()``` pemite ejecutar una consulta sobre una o más tablas temporales.

```
spark.sql(<Consulta>)
```

In [None]:
spark.sql('SELECT Nacional from COVID_NACIONAL')

In [None]:
spark.sql('SELECT Nacional, index from COVID_NACIONAL').show()

## Las funciones de *Spark SQL*.

*Spark SQL* [es compatible](https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html) con el estándar *ANSI SQL* y además lo extiende mendiante un conjunto de funciones las cuales pueden ser consultadas desde la siguiente liga:

* https://spark.apache.org/docs/latest/api/sql/


**Ejemplo:**

* La siguiente celda regresará una tabla a partir de la tabla temporal ```COVID_NACIONAL``` con los casos de COVID-19 detectados en Aguascalientes entre durante  enero de 2021. 

In [None]:
spark.sql("""
           SELECT AGUASCALIENTES, index
           FROM COVID_NACIONAL 
           WHERE index 
              BETWEEN "2021-01-01"
                  AND "2021-01-31" 
           """).show(31)

* La siguiente celda creará la tabla temporal ```AGS_ENERO``` a partir de la tabla temporal ```COVID_NACIONAL``` con los casos de COVID-19 detectados en Aguascalientes entre durante  enero de 2021. 

In [None]:
spark.sql("""
           (SELECT AGUASCALIENTES, index
           FROM COVID_NACIONAL 
           WHERE index 
              BETWEEN timestamp"2021-01-01"
                  AND timestamp"2021-01-31")
           """).createOrReplaceTempView("AGS_ENERO");

* La siguiente celda calculará el promedio de casos diarios de COVID-19 detectados en Aguascalientes durante el mes de enero de 2021 a partir de la tabla temporal ```COVID_NACIONAl```.

In [None]:
spark.sql("""
           SELECT avg(AGUASCALIENTES)
           FROM
           (SELECT AGUASCALIENTES, index
           FROM COVID_NACIONAL 
           WHERE index 
              BETWEEN timestamp"2021-01-01"
                  AND timestamp"2021-01-31") 
           """).show()

* La siguiente celda calculará el promedio de casos diarios de COVID-19 detectados en Aguascalientes durante el mes de enero de 2021 a partir de la tabla temporal ```AGS_ENERO```.

In [None]:
spark.sql("""
           SELECT avg(AGUASCALIENTES)
           FROM AGS_ENERO 
           """).show()

In [None]:
spark.stop()

<p style="text-align: center"><a rel="license" href="http://creativecommons.org/licenses/by/4.0/"><img alt="Licencia Creative Commons" style="border-width:0" src="https://i.creativecommons.org/l/by/4.0/80x15.png" /></a><br />Esta obra está bajo una <a rel="license" href="http://creativecommons.org/licenses/by/4.0/">Licencia Creative Commons Atribución 4.0 Internacional</a>.</p>
<p style="text-align: center">&copy; José Luis Chiquete Valdivieso. 2023.</p>