# Apache Spark: SQL/DataFrames

Un DataFrame es una **representación relacional de los datos**. Los DataFrames son similares a las tablas relacionales o DataFrames en Python aunque con muchas optimizaciones que se ejecutan de manera "oculta" para el usuario.

### Pyspark

In [1]:
# importar pyspark (para el ejemplo 1, 2 y 3)
import findspark
findspark.init()

import pyspark

### Utilidades

In [2]:
# Utilidades
def display(dfs: pyspark.sql.dataframe.DataFrame) -> None:
    """
        Desplegar el schema y los datos 
        que contiene el dataframe pyspark
    """
    dfs.printSchema()
    dfs.show()

### Ejemplo 1
Con datos estructurados

In [3]:
# importar modulos de pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [4]:
# Crear la sesión de Spark 
spark = SparkSession.builder.getOrCreate()

In [5]:
# Datos
emp = [
    (1, "aa", "level1", 2000),
    (2, "bb", "level1", 2200),
    (3, "cc", "level2", 5000),
    (4, "dd", "level2", 6300),
    (5, "ff", "level3", 7200),
    (6, "gg", "level3", 9500)
]

# Crear el DataFrame
dfs = spark.createDataFrame(emp, ["id", "nombre", "dept", "salary"])

In [6]:
# display
dfs.show()

+---+------+------+------+
| id|nombre|  dept|salary|
+---+------+------+------+
|  1|    aa|level1|  2000|
|  2|    bb|level1|  2200|
|  3|    cc|level2|  5000|
|  4|    dd|level2|  6300|
|  5|    ff|level3|  7200|
|  6|    gg|level3|  9500|
+---+------+------+------+



In [7]:
# Se detiene la sesión
spark.stop()

### Ejemplo 2
Con datos anidados (no estructurados)

In [8]:
# importar modulos de pyspark
from pyspark.sql import SparkSession

from pyspark.sql.functions import (
    explode as Fexplode,
    explode_outer as Fexplode_outer,
    posexplode as Fposexplode,
    posexplode_outer as Fposexplode_outer,
    flatten as Fflatten,
    col as Fcol
)

In [9]:
# Crear la sesión de Spark
spark = (
    SparkSession.builder
        .master("local[1]")
        .appName("PruebaSpark.com")
        .getOrCreate()
)

In [10]:
# Data
datos = [
    ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
    ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
    ('Robert',['CSharp',''],{'hair':'red','eye':''}),
    ('Washington',None,None),
    ('Jefferson',['1','2'],{})
]

In [11]:
# Crear el DataFrame
dfs = spark.createDataFrame(
    data=datos,
    schema=['nombre','conocimiento_lenguajes','descripcion_fisica']
)

In [12]:
# display
display(dfs)

root
 |-- nombre: string (nullable = true)
 |-- conocimiento_lenguajes: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- descripcion_fisica: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+----------------------+--------------------+
|    nombre|conocimiento_lenguajes|  descripcion_fisica|
+----------+----------------------+--------------------+
|     James|         [Java, Scala]|{eye -> brown, ha...|
|   Michael|   [Spark, Java, null]|{eye -> null, hai...|
|    Robert|            [CSharp, ]|{eye -> , hair ->...|
|Washington|                  null|                null|
| Jefferson|                [1, 2]|                  {}|
+----------+----------------------+--------------------+



In [13]:
# Funcion explode Caso 1 con arreglo
dfs2 = dfs.select(dfs.nombre, Fexplode(dfs.conocimiento_lenguajes))
display(dfs2)

root
 |-- nombre: string (nullable = true)
 |-- col: string (nullable = true)

+---------+------+
|   nombre|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  null|
|   Robert|CSharp|
|   Robert|      |
|Jefferson|     1|
|Jefferson|     2|
+---------+------+



In [14]:
# Funcion explode Caso 2 con clave valor
dfs3 = dfs.select(dfs.nombre, Fexplode(dfs.descripcion_fisica))
display(dfs3)

root
 |-- nombre: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)

+-------+----+-----+
| nombre| key|value|
+-------+----+-----+
|  James| eye|brown|
|  James|hair|black|
|Michael| eye| null|
|Michael|hair|brown|
| Robert| eye|     |
| Robert|hair|  red|
+-------+----+-----+



In [15]:
# Funcion explode_outer Caso 1 con arreglo
(
    dfs.select(
        dfs.nombre,
        Fexplode_outer(dfs.conocimiento_lenguajes)
    )
    .show()
)

+----------+------+
|    nombre|   col|
+----------+------+
|     James|  Java|
|     James| Scala|
|   Michael| Spark|
|   Michael|  Java|
|   Michael|  null|
|    Robert|CSharp|
|    Robert|      |
|Washington|  null|
| Jefferson|     1|
| Jefferson|     2|
+----------+------+



In [16]:
# Funcion explode_outer Caso 2 con clave valor
(
    dfs.select(
        dfs.nombre,
        Fexplode_outer(dfs.descripcion_fisica)
    )
    .show()
)

+----------+----+-----+
|    nombre| key|value|
+----------+----+-----+
|     James| eye|brown|
|     James|hair|black|
|   Michael| eye| null|
|   Michael|hair|brown|
|    Robert| eye|     |
|    Robert|hair|  red|
|Washington|null| null|
| Jefferson|null| null|
+----------+----+-----+



In [17]:
# Funcion posexplode Caso 1 con arreglo
(
    dfs.select(
        dfs.nombre,
        Fposexplode(dfs.conocimiento_lenguajes)
    )
    .show()
)

+---------+---+------+
|   nombre|pos|   col|
+---------+---+------+
|    James|  0|  Java|
|    James|  1| Scala|
|  Michael|  0| Spark|
|  Michael|  1|  Java|
|  Michael|  2|  null|
|   Robert|  0|CSharp|
|   Robert|  1|      |
|Jefferson|  0|     1|
|Jefferson|  1|     2|
+---------+---+------+



In [18]:
# Funcion posexplode Caso 2 con clave valor
(
    dfs.select(
        dfs.nombre,
        Fposexplode(dfs.descripcion_fisica)
    )
    .show()
)

+-------+---+----+-----+
| nombre|pos| key|value|
+-------+---+----+-----+
|  James|  0| eye|brown|
|  James|  1|hair|black|
|Michael|  0| eye| null|
|Michael|  1|hair|brown|
| Robert|  0| eye|     |
| Robert|  1|hair|  red|
+-------+---+----+-----+



In [19]:
# Funcion posexplode_outer Caso 1 con arreglo
(
    dfs.select(
        dfs.nombre,
        Fposexplode_outer(dfs.conocimiento_lenguajes)
    )
    .show()
)

+----------+----+------+
|    nombre| pos|   col|
+----------+----+------+
|     James|   0|  Java|
|     James|   1| Scala|
|   Michael|   0| Spark|
|   Michael|   1|  Java|
|   Michael|   2|  null|
|    Robert|   0|CSharp|
|    Robert|   1|      |
|Washington|null|  null|
| Jefferson|   0|     1|
| Jefferson|   1|     2|
+----------+----+------+



In [20]:
# Funcion posexplode_outer Caso 2 con clave valor
(
    dfs.select(
        dfs.nombre,
        Fposexplode_outer(dfs.descripcion_fisica)
    )
    .show()
)

+----------+----+----+-----+
|    nombre| pos| key|value|
+----------+----+----+-----+
|     James|   0| eye|brown|
|     James|   1|hair|black|
|   Michael|   0| eye| null|
|   Michael|   1|hair|brown|
|    Robert|   0| eye|     |
|    Robert|   1|hair|  red|
|Washington|null|null| null|
| Jefferson|null|null| null|
+----------+----+----+-----+



In [21]:
# Se detiene la sesión
spark.stop()

### Ejemplo 3
Desde un archivo Json (con multiples lineas) y consultarlo como apache sql

In [22]:
# importar modulos de pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    DoubleType,
    StringType
)

In [23]:
# Crear la sesión de Spark 
spark = (
    SparkSession.builder
        .master("local[1]")
        .appName("PruebaSparkJson.com")
        .getOrCreate()
)

In [24]:
# creacion de Schema
json_array_schema = StructType([
    StructField("zip_code", IntegerType(), False),
    StructField("latitude", DoubleType(), False),
    StructField("longitude", DoubleType(), False),
    StructField("city", StringType(), False),
    StructField("state", StringType(), False),
    StructField("county", StringType(), False)
])

In [25]:
# Creacion del 
dfs = (
    spark.read
        .option("multiline", "true")
        .schema(json_array_schema)
        .json("data/zipcodesUSDummy.json")
)

In [26]:
display(dfs)

root
 |-- zip_code: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- county: string (nullable = true)

+--------+---------+----------+----------+-----+---------+
|zip_code| latitude| longitude|      city|state|   county|
+--------+---------+----------+----------+-----+---------+
|     501|40.922326|-72.637078|Holtsville|   NY|  Suffolk|
|     544|40.922326|-72.637078|Holtsville|   NY|  Suffolk|
|     601|18.165273|-66.722583|  Adjuntas|   PR| Adjuntas|
|     602|18.393103|-67.180953|    Aguada|   PR|   Aguada|
|     603|18.455913| -67.14578| Aguadilla|   PR|Aguadilla|
|     604| 18.49352|-67.135883| Aguadilla|   PR|Aguadilla|
|     605|18.465162|-67.141486| Aguadilla|   PR|Aguadilla|
|     606|18.172947|-66.944111|   Maricao|   PR|  Maricao|
|     610|18.288685|-67.139696|    Anasco|   PR|   Anasco|
+--------+---------+----------+----------+-----+-----

In [27]:
# Contar estados (state)
col_target = "State"
dfs.groupBy(col_target).count().show()

+-----+-----+
|State|count|
+-----+-----+
|   NY|    2|
|   PR|    7|
+-----+-----+



In [28]:
# Registrando una tabla para consultarla con apache sql
tb_codigo_zip = "tb_codigo_zip"
dfs.createOrReplaceTempView("tb_codigo_zip")

In [29]:
# script sql
query = f"""
SELECT 
    t.{col_target},
    COUNT(0) AS count
FROM {tb_codigo_zip} AS t
GROUP BY t.State
"""

In [30]:
# print query
print(query)


SELECT 
    t.State,
    COUNT(0) AS count
FROM tb_codigo_zip AS t
GROUP BY t.State



In [31]:
# lanzar consulta
qdfs = spark.sql(query)

In [32]:
display(qdfs)

root
 |-- State: string (nullable = true)
 |-- count: long (nullable = false)

+-----+-----+
|State|count|
+-----+-----+
|   NY|    2|
|   PR|    7|
+-----+-----+



In [33]:
spark.stop()