## 1. Primeros pasos con SparkSQL

Spark SQL es un módulo de Spark para el procesamiento de datos estructurados. A diferencia de la API de Spark RDD, las interfaces proporcionadas por Spark SQL brindan a Spark más información sobre la estructura tanto de los datos, como de los cálculos que se están realizando.

Internamente, Spark SQL usa esta información extra para realizar optimizaciones adicionales. Hay varias formas de interactuar con Spark SQL, incluidas SQL y la DataFrame API. Al calcular un resultado, se utiliza el mismo motor de ejecución, independientemente de qué API / lenguaje esté utilizando para expresar el cálculo. Esta unificación significa que los desarrolladores pueden alternar fácilmente entre diferentes API en función de cuál proporciona la forma más natural de expresar unas transformaciones determinadas.

SparkSQL se sustenta sobre el concepto de DataFrame, la cual es una colección de datos tabular, que al igual que los RDDs, se encuentra distribuida en los diferentes nodos de un cluster.

### 1.1. SQLContext

El punto de entrada a las funcionalidades de SparkSQL es la clase SparkSesion. Para crear una SparkSession básica:

In [10]:
# Importar librerías y dependencias
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [2]:
# SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("spark.app.name", "myApp") \
    .getOrCreate()

22/05/20 10:28:42 WARN Utils: Your hostname, gmachin resolves to a loopback address: 127.0.1.1; using 192.168.0.19 instead (on interface wlp3s0)
22/05/20 10:28:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 10:28:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/20 10:28:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark

In [4]:
spark.sparkContext.getConf().getAll()

[('spark.app.name', 'myApp'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '192.168.0.19'),
 ('spark.sql.warehouse.dir',
  'file:/home/gmachin/git/project_exploring_spark/spark-warehouse'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '38563'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.id', 'local-1653035325796'),
 ('spark.app.startTime', '1653035323512')]

### 1.2. DataFrame

Se puede crear un DataFrame a partir de un RDD, una tabla de Hive, o otro tipo de datos de Python como listas o DataFrames de Pandas. Para crear un DataFrame podemos utilizas el método `spark.createDataFrame(data)`, o leyendo datos externos con `spark.read.csv()`, `spark.read.json()`...etc.

Orígenes posibles de datos:

- RDDs
- Hive
- Spark sources: parquet (defecto), json, jdbc, orc, libsvm, csv, text

In [41]:
# Crear una lista de tuplas
import random

random.seed(7)
ids = range(5)
species = random.choices(['capibara', 'naked mole rat', 'axolotl', 'flying fox'], k=5)

data = list(zip(ids, species))
data

[(0, 'naked mole rat'),
 (1, 'capibara'),
 (2, 'axolotl'),
 (3, 'capibara'),
 (4, 'axolotl')]

In [42]:
# Crear un schema
schema = StructType([
    StructField('uuid', IntegerType(), True),
    StructField('name', StringType(), True)
])

# Crear un DataFrame a partir de los datos anteriores
df = spark.createDataFrame(
    data=data,
    schema=schema,
)

df

DataFrame[uuid: int, name: string]

In [43]:
# Ver el esquema de datos en formato árbol
df.printSchema()

root
 |-- uuid: integer (nullable = true)
 |-- name: string (nullable = true)



In [44]:
df.collect()

[Row(uuid=0, name='naked mole rat'),
 Row(uuid=1, name='capibara'),
 Row(uuid=2, name='axolotl'),
 Row(uuid=3, name='capibara'),
 Row(uuid=4, name='axolotl')]

In [45]:
# Visualizar el contenido del DataFrame
df.show()

+----+--------------+
|uuid|          name|
+----+--------------+
|   0|naked mole rat|
|   1|      capibara|
|   2|       axolotl|
|   3|      capibara|
|   4|       axolotl|
+----+--------------+



En este caso, vamos a leer datos de un fichero externo:

In [20]:
df_quijote = spark.read.text("ficheros/extracto-quijote.txt")
df_quijote

DataFrame[value: string]

In [21]:
df_quijote.printSchema()

root
 |-- value: string (nullable = true)



In [22]:
df_quijote.show()

+--------------------+
|               value|
+--------------------+
|En un lugar de la...|
|No ha mucho tiemp...|
|astillero, adarga...|
+--------------------+



In [11]:
# Crear un RDD vacío
emp_RDD = spark.sparkContext.emptyRDD()
 
# Crear un schema
columns = StructType([
    StructField('Name', StringType(), True),
    StructField('Age', StringType(), True),
    StructField('Gender', StringType(), True)
])
 
# Aplicar schema al RDD
df = spark.createDataFrame(
    data = emp_RDD,
    schema = columns
)
 
# Mostrar DataFrame
print('Dataframe :')
df.show()

Dataframe :
+----+---+------+
|Name|Age|Gender|
+----+---+------+
+----+---+------+



In [12]:
# Mostrar Esquema
print('Schema :')
df.printSchema()

Schema :
root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)



### 1.3. Operaciones con DataFrames

En Pyhton, es posible acceder a las diferentes columnas de los DataFrames, bien por atributos (`df.age`), o bien por índices (`df['age']`). Mientras que la primera opción es más conveniente para un análisis exploratorio e interactivo, se recomienda utilizar la segunda forma, que está preparada para el futuro y no se romperá con los nombres de columna que también son atributos de la clase DataFrame.

In [48]:
# Creamos un DataFrame a partir de un fichero json
df = spark.read.json("ficheros/people.json")

print(df.printSchema())
df.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

None
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [49]:
# Seleccionar la columna name
df.select(df['name']).orderBy('name').show()

+-------+
|   name|
+-------+
|   Andy|
| Justin|
|Michael|
+-------+



In [50]:
# Filtrar aquellos registros cuya edad sea superior a 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



Además de los métodos proporcionados por la clase DataFrame, Spark permite mediante la función sql de la SparkSession ejecutar querys de tipo SQL de forma programática. Para ello necesitaremos registrar el DataFrame que queramos explorar en la SparkSession para que sea accesible por el motor SQL.

In [32]:
# Registrar el DtaFrame como una vista temporal
df.createOrReplaceTempView("people")

sql_df = spark.sql(
    """
    select *
    from people
    where age is not null
      and name like 'A%'
    """
)

sql_df.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## 2. Bibliografía

- https://spark.apache.org/docs/latest/rdd-programming-guide.html
- https://spark.apache.org/docs/latest/sql-programming-guide.html
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#dataframe-apis
- Libro: Introducción a Apache Spark 