# Data Sources and Spark SQL

Los objetivos de este notebook son los siguientes:
* Conectar a una tabla de Hive y de MySQL con el Data Source API
* Usar las **User Defined Functions** (UDF)
* Utilizar los operadores comunes para trabajar con datos de tipos complejos

## 1. Sobre el Spark SQL

Apache Spark nos permite ejecutar consultas SQL sobre las tablas en diversas fuentes de datos. La **Language de Consulta Estructurada** (SQL) es el dialecto que nos permite expresar relaciones entre los datos almacenados en todas las bases de datos relacionales, y en algunas bases de datos no relacionales.

En Spark, estas consultas adoptan el estándar **ANSI SQL:2003**, que es comúnmente adoptado en la mayoría de las bases de datos existentes en el mercado.

## 2. Spark SQL vs DataFrames

Casi todas las consultas que pueden ser hechas mediante SQL pueden ser expresadas de manera similar mediante DataFrames, y vice-versa. Ambas formas de hacer transformaciones en los datos pasan por el mismo **Spark SQL Engine**, y arrojarían los mismos resultados con la misma performance. Eso conlleva a una interoperabilidad única entre los APIs, lo que permitiría que diferentes roles de datos (Analistas, Ingenieros e Científicos de Datos) saquen provecho de una única herramienta.

## 3. Catalog

## 4. Trabajando con tablas de Hive

Spark SQL fue originado de un proyecto denominado **Shark**. Shark utilizaba códigos de **Apache Hive** para permitir trabajar mediante consultas SQL sobre tablas de Hive Metastore dentro de la interfaz de Spark. Esta fuerte relación con Hive nos permite sacar ventajas de los dos mundos: del procesamiento distribuído *in-memory* de Spark, y del almacenamiento distribuido de Hive/Hadoop.

Para trabajar con tablas de Hive, Spark SQL nos permite directamente realizar consultar con SQL (HiveQL) sin tener que configurar el JDBC Driver, mediante su interfaz programática de SQL como en el siguiente ejemplo:

In [None]:
# Crear SparkSession with Hive Context
from os.path import abspath

warehouse_location = abspath('spark-warehouse')

spark = SparkSession.builder \
    .master("local") \
    .appName("Chapter 9 and 10 - Data Sources and Spark SQL")
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
# Extraer la tabla externa de transacciones en Hive
transactions = spark.sql("SELECT * FROM transactions")
transactions.toPandas()

Unnamed: 0,transactionid,customerid,merchant,product,totalamount,transactiondate
0,,,Merchant,Product,TotalAmount,
1,1.0,11.0,amazon.com.uk,shirt;shoes,40;155,2021-04-08
2,2.0,12.0,marksandspencer.com,short;shirt;jumpsuit,50;35;65,2021-04-08
3,3.0,14.0,amazon.com.uk,smartphone;charger,450;50,2021-04-09
4,4.0,12.0,tesco.com,fruits;meat;wholegrains,34;52;28,2021-04-09
5,5.0,13.0,apple.com.uk,charger;headphone,40;25,2021-04-09
6,6.0,17.0,e.leclerc,smartphone,550,2021-04-10
7,7.0,14.0,zalando.com,shirt;shoes,45;105,2021-04-11
8,8.0,13.0,zalando.com,handbag;jumpsuit,130;55,2021-04-13
9,9.0,15.0,amazon.com,books;shoes,75;125,2021-04-13


También se puede hacer consulta de varias líneas:

In [None]:
transactions = spark.sql("""SELECT * 
                            FROM transactions""")
transactions.toPandas()

También es posible crear y eliminar databases/tablas de Hive mediante consultas SQL:

In [4]:
# Crear un nuevo database
spark.sql("""CREATE DATABASE customers""")

# Usar el nuevo database
spark.sql("""USE customers""");

In [None]:
# Crear nueva tabla en Hive
spark.sql("""CREATE TABLE IF NOT EXISTS customertypes 
             (type STRING, 
             pointspertransaction INT) 
             USING hive""");

In [None]:
# Insertar valores
spark.sql("""INSERT INTO customertypes
             VALUES 
             ('Bronze', 1),
             ('Silver', 3),
             ('Gold', 7),
             ('Diamond',15)""");

In [None]:
# Hacer consulta de la tabla creada
spark.sql("""SELECT * 
             FROM customertypes""").show()

In [None]:
# Eliminar la tabla y la base de datos en Hive
spark.sql("""DROP TABLE customertypes""");
spark.sql("""DROP DATABASE customers""");

Para ver los metadatos de cada elemento de la base de datos, se debe hacer uso de las funcionalidades del **Catalog**:

In [7]:
# Listar databases
spark.catalog.listDatabases()

[Database(name='customers', description='', locationUri='file:/mnt/notebooks/spark-warehouse/customers.db'),
 Database(name='default', description='Default Hive database', locationUri='hdfs://namenode:8020/user/hive/warehouse')]

In [8]:
# Listar todas las tablas de un database
spark.sql("""USE default""");
spark.catalog.listTables()

[Table(name='transactions', database='default', description=None, tableType='EXTERNAL', isTemporary=False)]

In [9]:
# Listar todas las columnas de la tabla "transactions"
spark.catalog.listColumns("transactions")

[Column(name='transactionid', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='customerid', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='merchant', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='product', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='totalamount', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='transactiondate', description=None, dataType='date', nullable=True, isPartition=False, isBucket=False)]

Al igual de los DataFrames, se puede hacer un **cache**/**persist** en la tabla dentro de la consulta de SQL:

In [10]:
# Hacer un cache a la tabla 'transactions'
spark.sql("CACHE TABLE transactions");

In [11]:
# Hacer un uncache a la tabla 'transactions'
spark.sql("UNCACHE TABLE IF EXISTS transactions");

Antes de seguir, aplicamos algunas transformaciones en nuestro DataFrame de transacciones para limpiar los datos:

In [12]:
from pyspark.sql.functions import split, col

# Convertir el tipo de la columna "Product" a array
transactions = transactions.withColumn("product", split(col("product"), ";").cast("array<string>")) \
                           .withColumn("totalamount", split(col("totalamount"), ";").cast("array<int>"))

# Eliminar la primera fila de la tabla
transactions = transactions.na.drop(how="any")

transactions.toPandas()

Unnamed: 0,transactionid,customerid,merchant,product,totalamount,transactiondate
0,1,11,amazon.com.uk,"[shirt, shoes]","[40, 155]",2021-04-08
1,2,12,marksandspencer.com,"[short, shirt, jumpsuit]","[50, 35, 65]",2021-04-08
2,3,14,amazon.com.uk,"[smartphone, charger]","[450, 50]",2021-04-09
3,4,12,tesco.com,"[fruits, meat, wholegrains]","[34, 52, 28]",2021-04-09
4,5,13,apple.com.uk,"[charger, headphone]","[40, 25]",2021-04-09
5,6,17,e.leclerc,[smartphone],[550],2021-04-10
6,7,14,zalando.com,"[shirt, shoes]","[45, 105]",2021-04-11
7,8,13,zalando.com,"[handbag, jumpsuit]","[130, 55]",2021-04-13
8,9,15,amazon.com,"[books, shoes]","[75, 125]",2021-04-13


## 5 Trabajando con otras fuentes de datos

El **DataFrameReader** nos permite leer desde distintas fuentes y formatos de datos. La siguiente celda nos muestra la forma de leer un tabla almacenada en MySQL mediante el JDBC Driver:

In [13]:
with open("/mnt/notebooks/secret/secret.txt") as secret:
    user = secret.readline().strip("\n")
    password = secret.readline()

# Cargar los datos de la tabla 'customers' en MySQL
url = "jdbc:mysql://database/customers"
customers = spark.read \
    .format("jdbc") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("url", url) \
    .option("dbtable", "customers") \
    .option("user", user) \
    .option("password", password) \
    .load()

# Mostrar los datos de la tabla 'customers'
customers.toPandas()

Unnamed: 0,Id,CustomerType,Firstname,MiddleName,LastName
0,12,diamond,elena,,moore
1,13,bronze,paul,williams,page
2,14,bronze,albert,,thomas
3,15,bronze,sandra,elizabeth,faith
4,16,gold,robert,,alexander


Ya que Spark SQL es interoperable con los DataFrames, también es posible realizar consultas SQL sobre un mismo DataFrame:

In [14]:
# Crear una vista temporal de las tablas
transactions.createOrReplaceTempView("transacciones")
customers.createOrReplaceTempView("clientes")

# Realizar una operación de inner join
compras = spark.sql("""SELECT transacciones.*, clientes.*
                       FROM transacciones
                       INNER JOIN clientes
                       ON transacciones.customerid == clientes.id""")
compras.toPandas()

Unnamed: 0,transactionid,customerid,merchant,product,totalamount,transactiondate,Id,CustomerType,Firstname,MiddleName,LastName
0,4,12,tesco.com,"[fruits, meat, wholegrains]","[34, 52, 28]",2021-04-09,12,diamond,elena,,moore
1,2,12,marksandspencer.com,"[short, shirt, jumpsuit]","[50, 35, 65]",2021-04-08,12,diamond,elena,,moore
2,8,13,zalando.com,"[handbag, jumpsuit]","[130, 55]",2021-04-13,13,bronze,paul,williams,page
3,5,13,apple.com.uk,"[charger, headphone]","[40, 25]",2021-04-09,13,bronze,paul,williams,page
4,7,14,zalando.com,"[shirt, shoes]","[45, 105]",2021-04-11,14,bronze,albert,,thomas
5,3,14,amazon.com.uk,"[smartphone, charger]","[450, 50]",2021-04-09,14,bronze,albert,,thomas
6,9,15,amazon.com,"[books, shoes]","[75, 125]",2021-04-13,15,bronze,sandra,elizabeth,faith


Podemos escribir tablas al MySQL accediendo al **DataFrameWriter**, y su sintaxe es similar al de DataFrameReader

In [None]:
compras.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", "compras") \
    .option("user", user) \
    .option("password", password).save() \

In [None]:
# Mostrar los datos escritos a compras
spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "compras") \
    .option("user", user) \
    .option("password", password) \
    .load().toPandas()

## 6. Definiendo y utilizando funciones de transformación propias

Spark SQL tiene una amplia diversidad de funciones de transformación de datos. No obstante, podemos realizar transformaciones personalizadas mediante un **User Defined Functions** (UDF).

In [15]:
spark.sql("""SELECT *
             FROM clientes""").toPandas()

Unnamed: 0,Id,CustomerType,Firstname,MiddleName,LastName
0,12,diamond,elena,,moore
1,13,bronze,paul,williams,page
2,14,bronze,albert,,thomas
3,15,bronze,sandra,elizabeth,faith
4,16,gold,robert,,alexander


In [None]:
spark.sql("""SELECT capitalize(CustomerType)
             FROM clientes""").toPandas()

Definimos una función en Python denominada **capitalize** que extrae la primera letra de cada dato de entrada. Luego, lo registramos como UDF.

In [16]:
from pyspark.sql.types import *

# Definición de la función
def capitalize(string):
    if string == None:
        return None
    else:
        return string.capitalize()

# Registro de la función como UDF
spark.udf.register("capitalize", capitalize, StringType());

Ahora, hacemos una consulta de los datos aplicando la transformación al ```CustomerType```, ```Firstname```, ```MiddleName``` y ```LastName``` dentro del SELECT gracias al UDF.

In [17]:
# Hacemos una consulta
spark.sql("""SELECT Id, 
             capitalize(CustomerType) as customertype,
             capitalize(Firstname) as firstname,
             capitalize(MiddleName) as middlename,
             capitalize(LastName) as lastname
             FROM clientes""").toPandas()

Unnamed: 0,Id,customertype,firstname,middlename,lastname
0,12,Diamond,Elena,,Moore
1,13,Bronze,Paul,Williams,Page
2,14,Bronze,Albert,,Thomas
3,15,Bronze,Sandra,Elizabeth,Faith
4,16,Gold,Robert,,Alexander


## 7.0 Trabajando con datos de tipos complejos (arrays)

### 7.1 Funciones básicas

#### 7.1.1 EXPLODE

La función EXPLODE crea una nueva fila para cada elemento de un array.

*Ejemplo*: ver el merchant y la fecha de compra de cada producto.

In [18]:
spark.sql("""SELECT EXPLODE(product) as Product, Merchant, TransactionDate
             FROM transacciones""").toPandas()

Unnamed: 0,Product,Merchant,TransactionDate
0,shirt,amazon.com.uk,2021-04-08
1,shoes,amazon.com.uk,2021-04-08
2,short,marksandspencer.com,2021-04-08
3,shirt,marksandspencer.com,2021-04-08
4,jumpsuit,marksandspencer.com,2021-04-08
5,smartphone,amazon.com.uk,2021-04-09
6,charger,amazon.com.uk,2021-04-09
7,fruits,tesco.com,2021-04-09
8,meat,tesco.com,2021-04-09
9,wholegrains,tesco.com,2021-04-09


#### 7.1.2 ARRAY_CONTAINS

La función ARRAY_CONTAINS inspecciona los valores de cada array y retorna TRUE si un determinado valor existe, y FALSE si no existe.

*Ejemplo*: verificar si un cliente se compró una camisa

In [19]:
spark.sql("""SELECT ARRAY_CONTAINS(product, "shirt") as Bought_Shirt
             FROM transacciones""").toPandas()

Unnamed: 0,Bought_Shirt
0,True
1,True
2,False
3,False
4,False
5,False
6,True
7,False
8,False


#### 7.1.2 Otras funciones

Spark SQL provee un serie de funciones útiles para manipulación de arrays, entre los que se destacarían:
* array_distinct: para remover valores duplicados
* array_join: concatena todos los elementos de un array con un delimitador determinado
* array_mix/array_max: retorna el valor mínimo/máximo de un array
* array_sort: ordena los valores de cada array
* flatten: retorna un array luego de juntar un array de un array

### 7.2 Higher-Order Functions

Las Higher-Order Functions son funciones que se aplican a los tipos complejos, y que toman como argumento una otra función a ser aplicado en cada elemento. Estas funciones fueron introducidas en PySpark a partir de la versión 3.1.1, pero es posible usarlos en expresiones SQL en Spark 2.4.

La idea de las Higher-Order Functions es introducir constructos de la programación funcional para facilitar la manipulación de los datos en los arrays.

Antes del Spark 2.4, había varios enfoques para manipular datos del tipo complejo, pero todas eran ineficientes o muy complejas. Las más comunes eran:
* Através de funciones integradas (built-in)
* Aplicar un EXPLODE a los arrays, hacer las transformaciones y usar collect_list or collect_set para reconstruir los arrays, cómo en el siguiente ejemplo:
```SQL
SELECT key,
values,
collect_list(value + 1) AS values_plus_one
FROM nested_data
LATERAL VIEW explode(values) T AS value
GROUP BY key,
values
```

* Aplicar una UDF a la columna con datos del tipo complejo

#### 7.2.1 TRANSFORM

La función TRANSFORM simplemente aplica una función anónima a cada elemento de un array y retorna un array con los elementos transformados.

El esquema básico del TRANSFORM funciona de manera similar que en otras Higher-Order Functions que veremos a seguir.

Ejemplo: aplicar la función **initcap** para capitalizar la primera letra de cada valor en ```Product```

In [20]:
spark.sql("""SELECT product, TRANSFORM(product, element -> initcap(element)) AS product_capitalized 
             FROM transacciones""").toPandas()

Unnamed: 0,product,product_capitalized
0,"[shirt, shoes]","[Shirt, Shoes]"
1,"[short, shirt, jumpsuit]","[Short, Shirt, Jumpsuit]"
2,"[smartphone, charger]","[Smartphone, Charger]"
3,"[fruits, meat, wholegrains]","[Fruits, Meat, Wholegrains]"
4,"[charger, headphone]","[Charger, Headphone]"
5,[smartphone],[Smartphone]
6,"[shirt, shoes]","[Shirt, Shoes]"
7,"[handbag, jumpsuit]","[Handbag, Jumpsuit]"
8,"[books, shoes]","[Books, Shoes]"


#### 7.2.2 FILTER

La función FILTER funciona de manera similar con respecto a TRANSFORM, pero retorna apenas los valores del registro que cumplan con un predicado.

Ejemplo: mostrar apenas *shirt* entre los valores del ```Product```

In [21]:
spark.sql("""SELECT product, FILTER(product, element -> element = "shirt") AS product_capitalized 
             FROM transacciones""").toPandas()

Unnamed: 0,product,product_capitalized
0,"[shirt, shoes]",[shirt]
1,"[short, shirt, jumpsuit]",[shirt]
2,"[smartphone, charger]",[]
3,"[fruits, meat, wholegrains]",[]
4,"[charger, headphone]",[]
5,[smartphone],[]
6,"[shirt, shoes]",[shirt]
7,"[handbag, jumpsuit]",[]
8,"[books, shoes]",[]


#### 7.2.3 EXISTS

La función EXISTS funciona de manera similar con respecto a TRANSFORM, pero retorna *true* si un determinado predicado se cumple, y *false* si no se cumple.

*Ejemplo*: evaluar si cada transacción incluye un *jumpsuit* como producto

In [22]:
spark.sql("""SELECT product, EXISTS(product, element -> element = "jumpsuit") AS product_capitalized 
             FROM transacciones""").toPandas()

Unnamed: 0,product,product_capitalized
0,"[shirt, shoes]",False
1,"[short, shirt, jumpsuit]",True
2,"[smartphone, charger]",False
3,"[fruits, meat, wholegrains]",False
4,"[charger, headphone]",False
5,[smartphone],False
6,"[shirt, shoes]",False
7,"[handbag, jumpsuit]",True
8,"[books, shoes]",False


#### 7.2.4 AGGREGATE/REDUCE

La función AGGREGATE toma como input dos funciones anónimas, y es más compleja que las demás funciones.

La idea es reducir los elementos de un array a un único valor, aplicando una función anónima, y aplica una otra función al resultado de la primera transformación.

La función REDUCE es un alias del AGGREGATE, y produce los mismos resultados con los mismos inputs.

*Ejemplo*: realizar la suma de los montos gastados en todos los productos

In [23]:
spark.sql("""SELECT TotalAmount, AGGREGATE(TotalAmount, 0, (element, element2) -> element + element2) AS sum_totalamount 
             FROM transacciones""").toPandas()

Unnamed: 0,TotalAmount,sum_totalamount
0,"[40, 155]",195
1,"[50, 35, 65]",150
2,"[450, 50]",500
3,"[34, 52, 28]",114
4,"[40, 25]",65
5,[550],550
6,"[45, 105]",150
7,"[130, 55]",185
8,"[75, 125]",200


In [24]:
spark.sql("""SELECT TotalAmount, AGGREGATE(totalamount, 0, (element, element2) -> element + element2, element3 -> element3 - 200) AS sum_totalamount_minus_200 
             FROM transacciones""").toPandas()

Unnamed: 0,TotalAmount,sum_totalamount_minus_200
0,"[40, 155]",-5
1,"[50, 35, 65]",-50
2,"[450, 50]",300
3,"[34, 52, 28]",-86
4,"[40, 25]",-135
5,[550],350
6,"[45, 105]",-50
7,"[130, 55]",-15
8,"[75, 125]",0
