# Computación Avanzada y sus Aplicaciones a Ingeniería

### Máster Universitario en Ingeniería Informática

# Práctica 2 - Parte II - Introducción a SparkSQL

En esta práctica introducimos las operaciones básicas para trabajar con los DataFrames de Spark. Este primer notebook no es más que una guía de todas las operaciones que puedes realizar en SparkSQL con DataFrames. Sigue detenidamente todos los bloques y prueba a cambiar los valores establecidos para comprobar su funcionamiento.

Ten en cuenta que una vez tengas en marcha Spark, podrás visualizar la evolución de cada trabajo de Spark en  <http://localhost:4040>

## **Uso básico de los notebooks y su integración con Python**

### Utilización de un Notebook

Un notebook está compuesto por una serie de celdas. Estas celdas pueden contener texto explicativo o código, pero nunca se mezclan ambas en la misma celda. Cuando ejecutamos una celda de texto, lo que hemos escrito con el lenguaje de markdown se renderiza como texto, imágenes y links (como si fuera HTML). El texto que estás leyendo ahora mismo es parte de una celda de este tipo. Las celdas con código Python te permiten ejecutar comandos de Python como si estuvieras en la consola de Python. Coloca el cursos dentro de la celda de más abajo y presiona "Shift + Enter" para ejecutar el código y avanzar a la siguiente celda. También puedes utilizar "Ctrl + Enter" para ejecutar el código y mantenerte en la misma celda. Estos comandos funcionan tanto en celdas de código como en celdas de texto.

In [None]:
# Esto es una celda ed Python. Puedes ejecutar código Python en estas celdas
print('La suma de 1 y 1 es {0}'.format(1+1))

In [None]:
# Esta es otra celda Python, utiliza una variable x y un if
x = 28
if x > 18:
    print('x es mayor que 18')

### Estado de un Notebook

Cuando trabajas con un notebook es importante ejecutar todas las celdas con código. El notebook tiene estado, lo que quiere decir que las variables y sus valores se mantienen hasta el que el kernel del notebook se reinicia. Si no ejecutas todas las celdas de código a lo largo del notebook, las variables pueden no estar correctamente inicializadas y pueden fallar celdas de código posteriores. También necesitarás reejecutar cualquier celda que hayas modificado para que los cambios estén disponibles en otras celdas

In [None]:
# Esta celda utiliza la variable x que hemos definido en una celda anterior
# Si no ejecutamos la celda anaterior este código fallará
print(x * 2)

# Introducción a SparkSQL

En caso de estar utilizando pySpark, **NO** es necesario inicializar el `SparkSession`, es decir, **no** ejecutar la siguiente celda

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Ejemplo pySparkSQL") \
    .getOrCreate()

sc = spark.sparkContext


## API de SparkSQL
<http://spark.apache.org/docs/latest/api/python/pyspark.sql.html>

## Creación de un DataFrame
Podemos crear un DataFrame de diferente formas

1. Infiriendo el esquema automáticamente a partir de los datos
1. Infiriendo el esquema automáticamente a partir de los metadatos
1. Definiendo explícitamente el esquema

A su vez, al igual que con los RDDs podemos crearlo a partir de dos fuentes diferentes de datos

1. Cargando un conjunto de datos almacenado en un medio externo
2. Distribuyendo una colección de objetos existente

### Inferencia del esquema a partir de los datos

In [2]:
tuplas = [('Alice', 1), ('Bob', 4), ('Juan', 10), ('Pepe', 25), ('Panchito', 15)]

In [3]:
tuplas

[('Alice', 1), ('Bob', 4), ('Juan', 10), ('Pepe', 25), ('Panchito', 15)]

In [4]:
df = spark.createDataFrame(tuplas)

In [5]:
df

DataFrame[_1: string, _2: bigint]

In [6]:
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



In [7]:
df.show(4)

+-----+---+
|   _1| _2|
+-----+---+
|Alice|  1|
|  Bob|  4|
| Juan| 10|
| Pepe| 25|
+-----+---+
only showing top 4 rows



In [8]:
df.collect()

[Row(_1='Alice', _2=1),
 Row(_1='Bob', _2=4),
 Row(_1='Juan', _2=10),
 Row(_1='Pepe', _2=25),
 Row(_1='Panchito', _2=15)]

In [11]:
df = spark.createDataFrame(tuplas, ['name', 'age'])

In [12]:
df.show()

+--------+---+
|    name|age|
+--------+---+
|   Alice|  1|
|     Bob|  4|
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



In [16]:
l = [(1, ), (2, ), (3,), (4,)]

In [17]:
spark.createDataFrame(l)

DataFrame[_1: bigint]

In [9]:
print("Sin nombre para las columnas: " + str(spark.createDataFrame(tuplas).collect()))
print("Con nombre para las columnas: " + str(spark.createDataFrame(tuplas, ['name', 'age']).collect()))

Sin nombre para las columnas: [Row(_1='Alice', _2=1), Row(_1='Bob', _2=4), Row(_1='Juan', _2=10), Row(_1='Pepe', _2=25), Row(_1='Panchito', _2=15)]
Con nombre para las columnas: [Row(name='Alice', age=1), Row(name='Bob', age=4), Row(name='Juan', age=10), Row(name='Pepe', age=25), Row(name='Panchito', age=15)]


In [18]:
rdd = sc.parallelize(tuplas)

In [21]:
rdd.toDF(['name', 'age'])

DataFrame[name: string, age: bigint]

In [23]:
spark.createDataFrame(rdd, ['name', 'age'])

DataFrame[name: string, age: bigint]

In [None]:
print("Sin nombre para las columnas: " + str(spark.createDataFrame(rdd).collect()))
print("Con nombre para las columnas: " + str(spark.createDataFrame(rdd, ['name', 'age']).collect()))

In [24]:
from pyspark.sql import Row

In [26]:
p = Row(name='mikel', age=36)

In [28]:
p['name']

'mikel'

In [29]:
p.name

'mikel'

In [30]:
p.name = 'pepe'

RuntimeError: Row is read-only

In [31]:
Person = Row('name', 'age') 

In [33]:
Person('mikel', 36)

Row(name='mikel', age=36)

In [40]:
df = rdd.map(lambda t: Person(*t)).toDF()

In [None]:
from pyspark.sql import Row

Person = Row('name', 'age') # Creamos una Row con los índices para poder crear filas con datos utilizándola
person = rdd.map(lambda r: Person(*r)) # con *r lo que hacemos es pasar la tupla como parámetro a la fila y crearla directamente con dichos datos
df2 = spark.createDataFrame(person)
df2.collect()

In [41]:
df.toPandas()

Unnamed: 0,name,age
0,Alice,1
1,Bob,4
2,Juan,10
3,Pepe,25
4,Panchito,15


In [None]:
pandaDF = df2.toPandas()
print("DataFrame de Panda obtenido de un DataFrame Spark:")
print(pandaDF)
print("DataFrame de Spark creado a partir del Panda:")
print(spark.createDataFrame(pandaDF).collect())

### Especificación del esquema

In [42]:
from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)])



df3 = spark.createDataFrame(rdd, schema)
df3.collect()

[Row(name='Alice', age=1),
 Row(name='Bob', age=4),
 Row(name='Juan', age=10),
 Row(name='Pepe', age=25),
 Row(name='Panchito', age=15)]

In [43]:
print(spark.createDataFrame(rdd, "a: string, b: int").collect())

rdd2 = rdd.map(lambda row: row[1])
print(spark.createDataFrame(rdd2, "int").collect())

[Row(a='Alice', b=1), Row(a='Bob', b=4), Row(a='Juan', b=10), Row(a='Pepe', b=25), Row(a='Panchito', b=15)]
[Row(value=1), Row(value=4), Row(value=10), Row(value=25), Row(value=15)]


### Ejemplo

In [52]:
import datetime
data = [
    Row(
        name="Charlie",
        num_pets=1,
        paid_in_full=True,
        preferences={},
        registered_on=datetime.datetime(2016, 5, 1, 12,0)
    ),

    Row(
        name="Alex",
        num_pets=3,
        paid_in_full=True,
        preferences={
            "preferred_vet": "Dr. Smith",
            "preferred_appointment_day": "Monday"
        },
        registered_on=datetime.datetime(2015, 1, 1, 12,0),
        visits=[
            datetime.datetime(2015, 2, 1, 11, 0),
            datetime.datetime(2015, 2, 2, 10, 45),
        ],
    ), 


    Row(
        name="Beth",
        num_pets=2,
        paid_in_full=False,
        preferences={
            "preferred_vet": "Dr. Travis",
        },
        registered_on=datetime.datetime(2013, 1, 1, 12,0),
        visits=[
            datetime.datetime(2015, 1, 15, 12, 15),
            datetime.datetime(2015, 2, 1, 11, 15),
        ],
    ),
]

In [53]:
data

[Row(name='Charlie', num_pets=1, paid_in_full=True, preferences={}, registered_on=datetime.datetime(2016, 5, 1, 12, 0)),
 Row(name='Alex', num_pets=3, paid_in_full=True, preferences={'preferred_vet': 'Dr. Smith', 'preferred_appointment_day': 'Monday'}, registered_on=datetime.datetime(2015, 1, 1, 12, 0), visits=[datetime.datetime(2015, 2, 1, 11, 0), datetime.datetime(2015, 2, 2, 10, 45)]),
 Row(name='Beth', num_pets=2, paid_in_full=False, preferences={'preferred_vet': 'Dr. Travis'}, registered_on=datetime.datetime(2013, 1, 1, 12, 0), visits=[datetime.datetime(2015, 1, 15, 12, 15), datetime.datetime(2015, 2, 1, 11, 15)])]

### Inferencia del esquema a partir de los datos

In [69]:
# Creamos un RDD con los datos de ejemplo
dataRDD = sc.parallelize(data)

# Creamos un DataFrame a partir del RDD, 
# infiriendo el esquema a partir de la primera Row
print("RDD: Esquema inferido a partir de la primera fila.")
# Por defecto samplingRatio=None
dataDF = spark.createDataFrame(dataRDD)
dataDF.printSchema()

# Creamos un DataFrame a partir del RDD,
# infiriendo el esquema a partir de un sampling de las Rows
print("RDD: Esquema inferido de un sampling aleatorio.")
dataDF = spark.createDataFrame(dataRDD, samplingRatio=0.6)
dataDF.printSchema()

RDD: Esquema inferido a partir de la primera fila.
root
 |-- name: string (nullable = true)
 |-- num_pets: long (nullable = true)
 |-- paid_in_full: boolean (nullable = true)
 |-- preferences: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- registered_on: timestamp (nullable = true)
 |-- visits: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)

RDD: Esquema inferido de un sampling aleatorio.
root
 |-- name: string (nullable = true)
 |-- num_pets: long (nullable = true)
 |-- paid_in_full: boolean (nullable = true)
 |-- preferences: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- registered_on: timestamp (nullable = true)
 |-- visits: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)



### Especificación del esquema

In [49]:
from pyspark.sql.types import ArrayType, BooleanType, DateType, \
    IntegerType, MapType, StringType, TimestampType, StructField, StructType
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("num_pets", IntegerType(), True),
        StructField("paid_in_full", BooleanType(), True),
        StructField("preferences", MapType(StringType(), StringType(), True), True),
        StructField("registered_on", DateType(), True),
        StructField("visits", ArrayType(TimestampType(), True), True),
    ]
)

# Crear un DataFrame a partir de un RDD, 
# especificando el esquema
print("RDD: Esquema espcificado explícitamente.")
dataDF = spark.createDataFrame(data, schema)
dataDF.printSchema()

RDD: Esquema espcificado explícitamente.
root
 |-- name: string (nullable = true)
 |-- num_pets: integer (nullable = true)
 |-- paid_in_full: boolean (nullable = true)
 |-- preferences: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- registered_on: date (nullable = true)
 |-- visits: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)



### Lectura del DataFrame desde un fichero JSON

In [67]:
# Crear un DataFrame a partir de un fichero JSON,
# infiriendo el esquema a partir de todas las filas
print("JSON: Esquema inferido a partir de todas las filas.")
dataDF = spark.read.option("samplingRatio", 0.333).json("data/data.json")
dataDF.printSchema()

JSON: Esquema inferido a partir de todas las filas.
root
 |-- name: string (nullable = true)
 |-- num_pets: long (nullable = true)
 |-- paid_in_full: boolean (nullable = true)
 |-- registered_on: string (nullable = true)



In [68]:
# Crear un DataFrame a partir de un fichero JSON,
# especificando el esquema
print("JSON: Esquema especificado explícitamente.")
dataDF = spark.read.json("data/data.json", schema)
dataDF.printSchema()

JSON: Esquema especificado explícitamente.
root
 |-- name: string (nullable = true)
 |-- num_pets: integer (nullable = true)
 |-- paid_in_full: boolean (nullable = true)
 |-- preferences: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- registered_on: date (nullable = true)
 |-- visits: array (nullable = true)
 |    |-- element: timestamp (containsNull = true)



## Acceso a las columnas de un DataFrame

In [None]:
print(df2.age)
print(df2["age"])
print((df2.age + 1).alias("cumpleaños"))

In [71]:
df.show()

+--------+---+
|    name|age|
+--------+---+
|   Alice|  1|
|     Bob|  4|
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



In [72]:
df.age

Column<'age'>

In [73]:
df["age"]

Column<'age'>

In [75]:
import pyspark.sql.functions as sql_f

In [76]:
sql_f.col('age')

Column<'age'>

# Operaciones con DataFrames

Podemos realizar dos tipos de operaciones sobre los DataFrames al igual que con los RDDs.
1. Transformaciones: Crean un nuevo DataFrame a partir de otro - **EVALUACIÓN VAGA (LAZY)** - hasta que no se ejecuta una acción no se realiza la transformación
2. Acciones: Utilizan el DataFrame para lograr un resultado que es recibido por el driver (o escriben el DataFrame a disco)

## Transformaciones sobre DataFrames

### Transformaciones básicas


Transformación | Descripción
------------- | -------------
*select(*\**cols)* | Devuelve un nuevo DataFrame proyectando una serie de expresiones que pueden ser nombres de columnas o expresiones de tipo Column. Si se utiliza "\*", todas las columnas del DataFrame se proyectan al nuevo DataFrame
*selectExpr(*\**expr)* | Variante de select que admite expresiones SQL
*filter(condition) / where(condition)* | Filtra las filas usando la condición especificada
*orderBy(*\**cols, ascending)* | Devuelve un nuevo DataFrame ordenado por las columnas especificadas. Por defecto en orden ascendente
*sort(*\**cols, *\*\**kwargs)* | Devuelve un DataFrame ordenado por las columnas especificadas. La condición debe ser tipo Column o expresión SQL

In [77]:
# DataFrame de ejemplo
tuplas = [('Alice', 1), ('Bob', 4), ('Juan', 10), ('Pepe', 25), ('Panchito', 15)]
df = spark.createDataFrame(tuplas, ['name', 'age'])
df.show()

+--------+---+
|    name|age|
+--------+---+
|   Alice|  1|
|     Bob|  4|
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



In [79]:
df.select("name").show()

+--------+
|    name|
+--------+
|   Alice|
|     Bob|
|    Juan|
|    Pepe|
|Panchito|
+--------+



In [82]:
(df["age"] * 2).alias('age2')

Column<'(age * 2) AS age2'>

In [83]:
df.select(df["name"], (df["age"] * 2).alias('age2')).show()

+--------+----+
|    name|age2|
+--------+----+
|   Alice|   2|
|     Bob|   8|
|    Juan|  20|
|    Pepe|  50|
|Panchito|  30|
+--------+----+



### `select(*cols)`

In [None]:
print(df.select('*').collect())

print(df.select('name', 'age').collect())

print(df.select(df.name, (df.age + 10).alias('age')).collect())

### `selectExpr(*expr)`

In [86]:
df.select('age * 2 as age2').show()

AnalysisException: Column '`age * 2 as age2`' does not exist. Did you mean one of the following? [age, name];
'Project ['age * 2 as age2]
+- LogicalRDD [name#349, age#350L], false


In [87]:
df.selectExpr('exp(age * 2) as age2').show()

+--------------------+
|                age2|
+--------------------+
|    7.38905609893065|
|  2980.9579870417283|
| 4.851651954097903E8|
|5.184705528587072E21|
|1.068647458152446...|
+--------------------+



In [None]:
df.selectExpr("age * 2", "abs(age)").collect()

### `filter(condition) = where(condition)`

In [88]:
df.filter("age > 8").show()

+--------+---+
|    name|age|
+--------+---+
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



In [89]:
df.filter(df["age"] > 8).show()

+--------+---+
|    name|age|
+--------+---+
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



In [None]:
# Modo programación orientada a objetos (POO)
print("Modo POO, edad > 18: " + str( df.filter(df.age > 18).collect() ))

print("Modo POO, edad == 1: " + str( df.where(df.age == 1).collect() ))

# Modo SQL
print("Modo SQL, edad > 18: " + str( df.filter("age > 18").collect() ))

print("Modo SQL, edad = 1: " + str( df.where("age = 1").collect() ))

### `orderBy(*cols, ascending)` y `sort(*cols, **kwargs)`

Tienen diferentes formas de recibir los argumentos, pero el resultado es el mismo en ambas. En ambas se reciben primero las columnas por las que ordenar el DataFrame y después la forma en la que se quiere ordenar.

In [92]:
df.sort("age", ascending=False).show()

+--------+---+
|    name|age|
+--------+---+
|    Pepe| 25|
|Panchito| 15|
|    Juan| 10|
|     Bob|  4|
|   Alice|  1|
+--------+---+



In [96]:
df.sort(df.age.desc()).show()

+--------+---+
|    name|age|
+--------+---+
|    Pepe| 25|
|Panchito| 15|
|    Juan| 10|
|     Bob|  4|
|   Alice|  1|
+--------+---+



In [95]:
print("Ordenar por edad descendente:")
print(df.sort(df.age.desc()).collect())
print(df.sort("age", ascending=False).collect())
print(df.orderBy(df.age.desc()).collect())

from pyspark.sql.functions import *

print("Ordenar por edad ascendente:")
print(df.sort(asc("age")).collect())

print("Ordenar por edad descendente y nombre ascendente:")
print(df.orderBy(desc("age"), "name").collect())
print(df.orderBy(["age", "name"], ascending=[0, 1]).collect())

Ordenar por edad descendente:
[Row(name='Pepe', age=25), Row(name='Panchito', age=15), Row(name='Juan', age=10), Row(name='Bob', age=4), Row(name='Alice', age=1)]
[Row(name='Pepe', age=25), Row(name='Panchito', age=15), Row(name='Juan', age=10), Row(name='Bob', age=4), Row(name='Alice', age=1)]
[Row(name='Pepe', age=25), Row(name='Panchito', age=15), Row(name='Juan', age=10), Row(name='Bob', age=4), Row(name='Alice', age=1)]
Ordenar por edad ascendente:
[Row(name='Alice', age=1), Row(name='Bob', age=4), Row(name='Juan', age=10), Row(name='Panchito', age=15), Row(name='Pepe', age=25)]
Ordenar por edad descendente y nombre ascendente:
[Row(name='Pepe', age=25), Row(name='Panchito', age=15), Row(name='Juan', age=10), Row(name='Bob', age=4), Row(name='Alice', age=1)]
[Row(name='Pepe', age=25), Row(name='Panchito', age=15), Row(name='Juan', age=10), Row(name='Bob', age=4), Row(name='Alice', age=1)]


### Transformaciones adicionales


Transformación | Descripción
------------- | -------------
*distinct()* | Devuelve un nuevo DataFrame con las filas únicas del original
*dropDuplicates(*\**cols)* | Devuelve un nuevo DataFrame sin filas duplicadas considerando las columnas especificadas
*withColumn(colName, col)* | Devuelve un nuevo DataFrame añadiendo una nueva columna o reemplazando la columna existente con el mismo nombre
*withColumnRenamed(existing, new)* | Devuelve un DataFrame renombrando una columna existente
*drop(col)* | Devuelve un nuevo DataFrame con la columna especificada eliminada
*limit(num)* | Limita el número de filas obtenidas como resultado
*cache()* | Mantiene el DataFrame almacenado en memoria para ser reusado


### `distinct()` y `dropDuplicates(*cols)`
La diferencia entre ambos está en que en `dropDuplicates` podemos especificar los campos por los cuales decidimos que dos filas están repetidas.

In [97]:
from pyspark.sql import Row
df2 = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=10, height=80)]).toDF()
print("DataFrame:")
df2.show()

print("distinct(): ")
df2.distinct().show()

print("dropDuplicates(): ")
df2.dropDuplicates().show() 

print("dropDuplicates('name', 'height')")
df2.dropDuplicates(['name', 'height']).show()


DataFrame:
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+

distinct(): 
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+

dropDuplicates(): 
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+

dropDuplicates('name', 'height')
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
+-----+---+------+



### `drop(col) `
Recordad que drop elimina la columna pero devuelve un nuevo DataFrame, no modifica el que tenemos (son inmutables)

In [None]:
print("Eliminamos la columna age:")
print(df.drop('age').collect())
print(df.drop(df.age).collect())

In [98]:
df.drop('age')

DataFrame[name: string]

### `limit(num)`

In [None]:
print("Limit(1):" + str( df.limit(1).collect() ))
print("Limit(0):" + str( df.limit(0).collect() ))

### `withColumn(colName, col) `

In [99]:
print("Añadimos una columna denominada age2 obtenida a partir de la columna age y sumando un 2:")
df.withColumn('age2', df.age + 2).show()

Añadimos una columna denominada age2 obtenida a partir de la columna age y sumando un 2:
+--------+---+----+
|    name|age|age2|
+--------+---+----+
|   Alice|  1|   3|
|     Bob|  4|   6|
|    Juan| 10|  12|
|    Pepe| 25|  27|
|Panchito| 15|  17|
+--------+---+----+



In [102]:
df.select('*', (df.age + 2).alias('age2'), (df.age + 3).alias('age3')).show()

+--------+---+----+----+
|    name|age|age2|age3|
+--------+---+----+----+
|   Alice|  1|   3|   4|
|     Bob|  4|   6|   7|
|    Juan| 10|  12|  13|
|    Pepe| 25|  27|  28|
|Panchito| 15|  17|  18|
+--------+---+----+----+



### `withColumnRenamed(existing, new) `

In [None]:
print("Renombramos la columna age a age2 y obtenemos un nuevo DataFrame con el cambio:")
df.withColumnRenamed('age', 'age2').collect()

### Transformaciones: Operaciones con columnas


Operación | Descripción
------------- | -------------
*alias(*\**alias)* | Devuelve la columna con un nuevo nombre
*between(lowerBound, upperBound)* | True si el valor está entre los dos valores
*isNull() / isNotNull()* | True si el valor es nulo y viceversa
*when(condition, value) / otherwise(value)* | Evalúa una lista de condiciones y en base a estas devuelve un valor u otro
*Startswith(other), substring(startPos, len), like(otheR)* | Funciones para operar con strings
*isin(*\**cols)* | True si el valor está en la lista de argumentos
*explode(col)* | Devuelve una nueva fila para cada elemento en el array
*lit(value)* | Crea una columna con el valor literal
*length(col)* | Longitud de la columna

Estas operaciones por si mismas solo devuelve una columna (expresión SQL). Para poder aplicalas al DataFrame se deben utilizar en conjunto con una transformación tipo `select`.

### `alias(*alias)`

In [103]:
df['age'].alias('edad')

Column<'age AS edad'>

In [105]:
df.select(df['age'].alias('edad')).show()

+----+
|edad|
+----+
|   1|
|   4|
|  10|
|  25|
|  15|
+----+



In [None]:
print("Columna cambio de nombre: ")
print(df.name.alias("hola"))

print("\nDataFrame con columna nombre renombrada a hola: ")
df.select(df.name.alias("hola")).show() # lógicamente solo aparece la columna seleccionada!

### `between(lowerBound, upperBound) `

In [106]:
print("Columna con condición between: ")
print(df.age.between(18, 65))
print("\nDataFrame con nombre y columna con True o False según se cumple la condición")
df.select(df.name, df.age.between(2, 4)).show()

Columna con condición between: 
Column<'((age >= 18) AND (age <= 65))'>

DataFrame con nombre y columna con True o False según se cumple la condición
+--------+---------------------------+
|    name|((age >= 2) AND (age <= 4))|
+--------+---------------------------+
|   Alice|                      false|
|     Bob|                       true|
|    Juan|                      false|
|    Pepe|                      false|
|Panchito|                      false|
+--------+---------------------------+



In [107]:
df.filter(df.age.between(2, 4)).show()

+----+---+
|name|age|
+----+---+
| Bob|  4|
+----+---+



### `isNull() / isNotNull() `

In [None]:
print("Expresión Columna con isNull()/isNotNull():")
print(df.age.isNull())
print(df.age.isNotNull())

print("\nDataFrame con filas que tienen el nombre nulo")
df.filter(df.name.isNull()).show()

### `when(condition, value) / otherwise(value) `

In [None]:
from pyspark.sql import functions as F
print("Condición when/otherwise:")
print(F.when(df.age > 18, "adulto").otherwise("peque"))

print("\nEjemplos when/otherwise")
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()

In [109]:
from pyspark.sql import functions as sql_f

df.filter(sql_f.when(df.age > 3, True).otherwise(False)).show()

+--------+---+
|    name|age|
+--------+---+
|     Bob|  4|
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



### `Startswith(other), substring(startPos, len), like(otheR) `

In [None]:
print("Columna like: ")
print(df.name.like("M*"))

print("\nDataFrame con primeras tres letras de los nombres")
df.select(df.name.substr(1, 3).alias("Inicial")).collect()

### `isin(*cols) `

In [None]:
print("Columna con expresión isin()")
print(df.name.isin("Mikel", "Pepe"))


# Ojo! Estamos usando la operación para filtrar las filas, para ello podemos usar df[condición]!
print("\nFiltrar filas con nombres Bob y Mikel")
print(df[df.name.isin("Bob", "Mikel")].collect())
print("Filtrar filas con edades 1, 2 o 3")
print(df[df.age.isin([1, 2, 3])].collect())

### `explode(col) `
Explode puede utilizarse para simular el flatMap de los RDDs, ya que crea tantas filas como valores tenga un array o map

In [None]:
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
print("DataFrame original")
print(eDF.collect())

print("\nDataFrame con filas expandidas según el array de enteros")
eDF.select(F.explode(eDF.intlist).alias("anInt")).show()
print("DataFrame con filas expandidas según el map de caracteres")
eDF.select('*', F.explode(eDF.mapfield).alias("key", "value")).show()

In [110]:
df3 = spark.createDataFrame([
    Row(name='Isaac', hobbies=["Lifting", "Running", "Reading"], 
        avatar={"hair": "black","hair_length": "short"}),
    Row(name='Mikel', hobbies=["Triathlon", "Running", "Cycling"], 
        avatar={"hair": "blond","hair_length": "very short"}),
    Row(name='Daniele', hobbies=["Singing", "Dancing"], 
        avatar={"hair": "brown","hair_length": "large"}),
    Row(name='Anne', hobbies=["Running", "Music"], 
        avatar={"hair": "red","hair_length": "middle"})
])
df3.show()

+-------+--------------------+--------------------+
|   name|             hobbies|              avatar|
+-------+--------------------+--------------------+
|  Isaac|[Lifting, Running...|{hair_length -> s...|
|  Mikel|[Triathlon, Runni...|{hair_length -> v...|
|Daniele|  [Singing, Dancing]|{hair_length -> l...|
|   Anne|    [Running, Music]|{hair_length -> m...|
+-------+--------------------+--------------------+



In [117]:
df3.select(sql_f.explode('hobbies')).show()

+---------+
|      col|
+---------+
|  Lifting|
|  Running|
|  Reading|
|Triathlon|
|  Running|
|  Cycling|
|  Singing|
|  Dancing|
|  Running|
|    Music|
+---------+



In [121]:
df3.select('*', sql_f.explode('hobbies').alias('hobbie'))\
    .select('*', sql_f.explode('avatar').alias('clave', 'valor')).show()

+-------+--------------------+--------------------+---------+-----------+----------+
|   name|             hobbies|              avatar|   hobbie|      clave|     valor|
+-------+--------------------+--------------------+---------+-----------+----------+
|  Isaac|[Lifting, Running...|{hair_length -> s...|  Lifting|hair_length|     short|
|  Isaac|[Lifting, Running...|{hair_length -> s...|  Lifting|       hair|     black|
|  Isaac|[Lifting, Running...|{hair_length -> s...|  Running|hair_length|     short|
|  Isaac|[Lifting, Running...|{hair_length -> s...|  Running|       hair|     black|
|  Isaac|[Lifting, Running...|{hair_length -> s...|  Reading|hair_length|     short|
|  Isaac|[Lifting, Running...|{hair_length -> s...|  Reading|       hair|     black|
|  Mikel|[Triathlon, Runni...|{hair_length -> v...|Triathlon|hair_length|very short|
|  Mikel|[Triathlon, Runni...|{hair_length -> v...|Triathlon|       hair|     blond|
|  Mikel|[Triathlon, Runni...|{hair_length -> v...|  Running|hair

In [114]:
df3.groupBy('hobbies').count().show(truncate=False)

+-----------------------------+-----+
|hobbies                      |count|
+-----------------------------+-----+
|[Lifting, Running, Reading]  |1    |
|[Triathlon, Running, Cycling]|1    |
|[Singing, Dancing]           |1    |
|[Running, Music]             |1    |
+-----------------------------+-----+



### `lit(value)`

In [123]:
df.select('*', sql_f.lit(1)).show()

+--------+---+---+
|    name|age|  1|
+--------+---+---+
|   Alice|  1|  1|
|     Bob|  4|  1|
|    Juan| 10|  1|
|    Pepe| 25|  1|
|Panchito| 15|  1|
+--------+---+---+



In [None]:
from pyspark.sql import functions as F
print("Columna de unos: ")
print(F.lit(1))

print("\nDataFrame con una nueva columna con unos:")
df.select("*", F.lit(1)).show()

### `length(col) `

In [None]:
print("Columna para obtener longitud del texto: " )
print(F.length(df.name))

print("\nDataFrame con la longitud de cada nombre")
df.select(F.length(df.name).alias('len')).show()

### Transformaciones con pseudo-conjuntos
Aunque no los hemos visto en teoría, podemos seguir haciendo operaciones entre pseudo-conjuntos al igual que con los RDDs

Transformación | Descripción
------------- | -------------
*distinct()* | Devuelve el DataFrame sin elementos repetidos – ¡Cuidado! Requiere shuffle (enviar datos por red)
*union(rdd)* | Devuelve la unión de los elementos en los dos DataFrame  (se mantienen los duplicados)
*intersect(rdd)* | Devuelve la instersección de los elementos en los dos DataFrame (elimina los duplicados) – ¡Cuidado! Requiere shuffle (datos por red)
*subtract(rdd)* | Devuelve los elementos presentes en el primer DataFrame y no en el segundo – ¡Cuidado! También requiere de shuffle

In [None]:
df1 = spark.createDataFrame(["agua", "vino", "cerveza", "agua", "agua", "vino"], "string")
df2 = spark.createDataFrame(["cerveza", "cerveza", "agua", "agua", "vino", "coca-cola", "naranjada"], "string")

print("distinct: " + str(df1.distinct().collect()))
print("union: " + str(df1.union(df2).collect()))
print("intersect: " + str(df1.intersect(df2).collect()))
print("substract: " + str(df1.subtract(df2).collect()))

## Acciones sobre RDDs

### Acciones básicas


Acción | Descripción
------------- | -------------
*show(n=20, truncate=True)* | Imprime las primeras n filas del DataFrame. Truncate indica si se quiere truncar los strings demasiado largos
*count()* | Devuelve el número de filas en el DataFrame
*collect()* | Devuelve todas las filas del DataFrame como una lista de Rows **Cuidado: Debe de caber en memoria**
*first()* | Devuelve la primera fila del DataFrame
*take(n)* | Devuelve las primeras n filas del DataFrame como lista de Rows
*toPandas()* | Devuelve el contenido del DataFrame como un pandas.DataFrame **Cuidado: Debe de caber en memoria**
*columns* | Devuelve todos los nombres de columnas como una lista
*describe(*\**cols)* | Calcula estadísticas para las columnas numéricas  (count, mean, stddev, min, y max)
*explain(extended=False)* | Imprime los planes físicos y lógicos para debugging

In [124]:
df.take(1)

[Row(name='Alice', age=1)]

In [125]:
df.first()

Row(name='Alice', age=1)

In [126]:
df.columns

['name', 'age']

In [128]:
df.describe().show()

+-------+-----+-----------------+
|summary| name|              age|
+-------+-----+-----------------+
|  count|    5|                5|
|   mean| null|             11.0|
| stddev| null|9.513148795220223|
|    min|Alice|                1|
|    max| Pepe|               25|
+-------+-----+-----------------+



In [None]:
print("DataFrame show(): ")
df.show()

print("Dos primeras filas con take(2): " + str(df.take(2)))

print("\nPrimera fila con first(): " + str(df.first()))

print("\nDataFrame completo con collect(): " + str( df.collect() ))

print("\nNúmero de filas en el DataFrame con count(): " + str( df.count() ) )

print("\nDataFrame como panda: ")
print(df.toPandas())

print("\nColumnas en el DataFrame con columns: "+ str( df.columns ))

print("\nPlanes físicos y lógicos con explain:")
df.filter(df.age > 10).select(df.age).explain(True)

### Transformaciones para realizar agregaciones



Transformación | Descripción
------------- | -------------
*agg(*\**exprs)* | Realiza agregaciones sobre el DataFrame completo sin agrupar (equivalente a df.groupBy.agg()) – Agregaciones disponibles: avg, max, min, sum, count. exprs puede ser un diccionario clave (nombre columna) – valor (función de agregación) o una lista de expresiones de agregación de Columna
*groupBy(*\**cols)* | Agrupa el DataFrame usando las columnas especificadas para poder aplicar agregaciones sobre los grupos – GroupedData




### `agg(*exprs)`
Agrega columnas numéricas en todo el DataFrame, sin grupos

Agregaciones disponibles: avg, max, min, sum, count

\**exprs* puede ser

* Un diccionario clave (nombre columna) – valor (función de agregación)
+ Una lista de expresiones de agregación de Columna


In [132]:
df.agg(sql_f.mean('age'), sql_f.max('age')).show()

+--------+--------+
|avg(age)|max(age)|
+--------+--------+
|    11.0|      25|
+--------+--------+



In [136]:
{'age': 'max', 'age': 'mean'}

{'age': 'mean'}

In [135]:
df.agg().show()

+--------+
|avg(age)|
+--------+
|    11.0|
+--------+



In [None]:
print("Usando diccionario clave-valor: ")
df.agg({"age": "max"}).show()

from pyspark.sql import functions as F
print("Usando expresión de agregación en la columna:")
df.agg(F.min(df.age)).show()

### `groupBy(*cols)`
Agrupa filas del DataFrame en base a las columnas especificadas

Se crea un DataFrame GroupedData, posteriormente se trata de aplicar agregaciones por grupos (ver siguiente sección)

In [137]:
df.show()

+--------+---+
|    name|age|
+--------+---+
|   Alice|  1|
|     Bob|  4|
|    Juan| 10|
|    Pepe| 25|
|Panchito| 15|
+--------+---+



In [140]:
df.groupBy('name').agg(sql_f.mean('age'), sql_f.max('age')).show()

+--------+--------+--------+
|    name|avg(age)|max(age)|
+--------+--------+--------+
|   Alice|     1.0|       1|
|     Bob|     4.0|       4|
|    Juan|    10.0|      10|
|    Pepe|    25.0|      25|
|Panchito|    15.0|      15|
+--------+--------+--------+



In [None]:
print("Media usando groupBy().avg() - Se agrupa todo el DataFrame = usar agg() directamente")
print(df.groupBy().avg().collect())

print("\nAgrupamos por nombre y para cada nombre calculamos la edad media:")
print(sorted(df.groupBy('name').agg({'age': 'mean'}).collect()))
print(sorted(df.groupBy(df.name).avg().collect()))

print("\nAgrupamos por nombre y edad y contamos cuántos filas hay en cada grupo:")
sorted(df.groupBy(['name', df.age]).count().collect())

### Transformaciones sobre GroupedData (groupBy)


Transformación | Descripción
------------- | -------------
*avg(*\**cols) / mean(*\**cols)* | Calcula la media de los valores para cada grupo en cada columna numérica
*count()* | Cuenta el número de registros (filas) en cada grupo
*max(**\**cols)* | Calcula el máximo valor para cada grupo en cada columna numérica
*min(*\**cols)* | Calcula el mínimo valor para cada grupo en cada columna numérica
*sum(*\**cols)* | Calcula la suma de los valores para cada grupo en cada columna numérica
*pivot(pivot_col, values)* | Pivota sobre una columna del DataFrame para realizar después la agregación especificada. Values especifica los valores que aparecerán en las columnas. Si no se especifica lo calcula Spark (menos eficiente)
*agg(*\**exprs)* | Realiza agregaciones sobre cada grupo del DataFrame. Agregaciones disponibles: avg, max, min, sum, count. exprs puede ser un diccionario clave (nombre columna) – valor (función de agregación) o una lista de expresiones de agregación de Columna

### `avg(*cols) / mean(*cols), count(), max(*cols), min(*cols), sum(*cols)`
Podemos aplicar directamente cualquiera de estos métodos sobre GroupedData para obtener el resultado de la función correspondiente por grupos

In [None]:
df2 = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=10, height=80), \
    Row(name='Bob', age=5, height=90), \
    Row(name='Bob', age=10, height=100)]).toDF()

print("DataFrame ejemplo")
df2.show()

print("Edad media global con avg: " + str( df2.groupBy().avg('age').collect() ))
print("Edad y altura medias globales con avg: " + str( df2.groupBy().avg('age', 'height').collect() ))

print("Edad media global con mean" + str(df2.groupBy().mean('age').collect() ))
print("Edad y altura medias globales con mean" + str( df2.groupBy().mean('age', 'height').collect() ))


print("Media de edad y altura agrupoda por nombre: ")
df2.groupBy('name').avg('age', 'height').show()

print("Conteo de personas por edad:")
df2.groupBy(df2.age).count().show()

print("Edad máxima global" + str( df2.groupBy().max('age').collect() ) )
print("Edad y altura máximas globales" + str( df2.groupBy().max('age', 'height').collect() ))


print("Edad mínima global" + str( df2.groupBy().min('age').collect() ))
print("Edad y altura minima globales" + str( df2.groupBy().min('age', 'height').collect() ))

print("Suma global de edades" + str( df2.groupBy().sum('age').collect() ))
print("Suma global de edades y altura" + str( df2.groupBy().sum('age', 'height').collect() ))

In [None]:
print("Edad media agrupado por nombre con avg: " )
df2.groupBy('name').avg('age').show()

print("Altura media agrupado por edad con avg: " )
df2.groupBy('age').avg('height').show()

print("Conteo de personas por altura:")
df2.groupBy(df2.height).count().show()

print("Edad máxima por nombre" )
df2.groupBy('name').max('age').show()

print("Edad media por nombre y altura" )
df2.groupBy('name', 'height').avg('age').show()

### `agg(*exprs)`
Agrega columnas numéricas por grupos

Agregaciones disponibles: avg, max, min, sum, count

\**exprs* puede ser

* Un diccionario clave (nombre columna) – valor (función de agregación)
+ Una lista de expresiones de agregación de Columna


In [None]:
print("Conteo de personas con el mismo nombre:")
df2.agg({"*": "count"}).show()

In [None]:
from pyspark.sql import functions as F
print("Edad mínima para cada nombre:")
df2.groupBy(df2.name).agg(F.min(df2.age)).show()

In [None]:
data = [('Alice',1,6, 'Mate'), ('Bob',2,8, 'Mate'), ('Alice',3,9, 'Lengua'), ('Bob',4,7, 'Lengua')]
df = spark.createDataFrame(data, ['name', 'age', 'grade', 'subject'])

print("Media de edad y notas de todos los alumnos")
df.groupBy().avg().show()

print("Media de edad y notas de los alumnos con el mismo nombre")
df.groupBy('name').avg('age', 'grade').show()


print("Número de alumnos con el mismo nombre")
df1 = df.groupBy(df.name)
df1.agg({"*": "count"}).show() 
df.groupBy(df.name).count().show()

### `pivot(pivot_col, values)`

Pivota sobre una columna del DataFrame para realizar después la agregación especificada

*values* especifica los valores que aparecerán en las columnas
* Si no se especifica lo calcula Spark (menos eficiente)
* Similar a pivot table de pandas


In [6]:
data = [(2021,'Alice', 67, 60, 'COMP4008'),
        (2021,'Alice', 67, 25, 'COMP4103'),
        (2021,'Bob', 34, 70, 'COMP4008'),
        (2021,'Bob', 34, 95, 'COMP4103'),
        (2022,'Mikel', 67, 60, 'COMP4008'),
        (2022,'Mikel', 67, 25, 'COMP4103'),
        (2022,'Isaac', 34, 70, 'COMP4008'),
        (2022,'Isaac', 34, 95, 'COMP4103') ]

In [7]:
df = spark.createDataFrame(data, ['year', 'name', 'age',\
                                  'grade', 'module_name'])

In [143]:
df.show()

+----+-----+---+-----+-----------+
|year| name|age|grade|module_name|
+----+-----+---+-----+-----------+
|2021|Alice| 67|   60|   COMP4008|
|2021|Alice| 67|   25|   COMP4103|
|2021|  Bob| 34|   70|   COMP4008|
|2021|  Bob| 34|   95|   COMP4103|
|2022|Mikel| 67|   60|   COMP4008|
|2022|Mikel| 67|   25|   COMP4103|
|2022|Isaac| 34|   70|   COMP4008|
|2022|Isaac| 34|   95|   COMP4103|
+----+-----+---+-----+-----------+



In [145]:
df.groupBy('year').agg(sql_f.min('grade'), sql_f.max('grade'), sql_f.avg('grade')).show()

+----+----------+----------+----------+
|year|min(grade)|max(grade)|avg(grade)|
+----+----------+----------+----------+
|2021|        25|        95|      62.5|
|2022|        25|        95|      62.5|
+----+----------+----------+----------+



In [151]:
%time df.groupBy('year', 'module_name')\
            .agg(sql_f.min('grade').alias('min'), sql_f.max('grade').alias('maxi'), sql_f.avg('grade').alias('avg')).show()

+----+-----------+---+----+----+
|year|module_name|min|maxi| avg|
+----+-----------+---+----+----+
|2021|   COMP4103| 25|  95|60.0|
|2021|   COMP4008| 60|  70|65.0|
|2022|   COMP4103| 25|  95|60.0|
|2022|   COMP4008| 60|  70|65.0|
+----+-----------+---+----+----+

Wall time: 6.31 s


In [150]:
%time df.groupBy('year').pivot('module_name', ['COMP4008', 'COMP4103'])\
    .agg(sql_f.min('grade').alias('min'), sql_f.max('grade').alias('maxi'), sql_f.avg('grade').alias('avg')).show()

+----+------------+-------------+------------+------------+-------------+------------+
|year|COMP4008_min|COMP4008_maxi|COMP4008_avg|COMP4103_min|COMP4103_maxi|COMP4103_avg|
+----+------------+-------------+------------+------------+-------------+------------+
|2021|          60|           70|        65.0|          25|           95|        60.0|
|2022|          60|           70|        65.0|          25|           95|        60.0|
+----+------------+-------------+------------+------------+-------------+------------+

Wall time: 6.26 s


In [None]:
print("Notas medias por alumno y asignatura (modo eficiente especificando asignaturas): ")
df.groupBy("name").pivot("subject", ["Mate", "Lengua"]).avg("grade").show()

print("Notas medias por alumno y asignatura (modo no eficiente, dejando a Spark que busque los valores sobre los que pivotar): ")
df.groupBy("name").pivot("subject").avg("grade").show()

# Aspectos avanzados

## Funciones definidas por el usuario (User Defined Functions, UDF)

Podemos crear una función para trabajar sobre las columnas de los DataFrames mediante funciones definidas por el usuario. Sin embargo, debemos limitarnos a lo estríctamente necesario al ser mucho menos eficientes que las específicas de Spark.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [2]:
len('hola')

4

In [4]:
import pyspark.sql.functions as sql_f

In [5]:
sql_f.length('hola')

Column<'length(hola)'>

In [8]:
df.show()

+----+-----+---+-----+-----------+
|year| name|age|grade|module_name|
+----+-----+---+-----+-----------+
|2021|Alice| 67|   60|   COMP4008|
|2021|Alice| 67|   25|   COMP4103|
|2021|  Bob| 34|   70|   COMP4008|
|2021|  Bob| 34|   95|   COMP4103|
|2022|Mikel| 67|   60|   COMP4008|
|2022|Mikel| 67|   25|   COMP4103|
|2022|Isaac| 34|   70|   COMP4008|
|2022|Isaac| 34|   95|   COMP4103|
+----+-----+---+-----+-----------+



In [11]:
df.select(sql_f.length('name')).show()

+------------+
|length(name)|
+------------+
|           5|
|           5|
|           3|
|           3|
|           5|
|           5|
|           5|
|           5|
+------------+



In [12]:
from pyspark.sql.types import IntegerType

In [16]:
f_len = sql_f.udf(lambda x: len(x), IntegerType())

In [17]:
df.select(f_len('name')).show()

+--------------+
|<lambda>(name)|
+--------------+
|             5|
|             5|
|             3|
|             3|
|             5|
|             5|
|             5|
|             5|
+--------------+



In [18]:
@sql_f.udf('int')
def f_len(s):
    return len(s)

In [19]:
df.select(f_len('name')).show()

+-----------+
|f_len(name)|
+-----------+
|          5|
|          5|
|          3|
|          3|
|          5|
|          5|
|          5|
|          5|
+-----------+



In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

print("Longitud con función definida por el usuario (UDF):")
slen = udf(lambda s: len(s), IntegerType())
df.select(slen(df.name).alias('slen')).show()

print("Longitud con versión de SparkSQL, mucho más eficiente:")
from pyspark.sql.functions import length
df.select(length(df.name).alias('length')).show()

## SQL en Spark
Podemos trabajar directamente con SQL en Spark si registramos el DataFrame como tabla.

In [20]:
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-----+---+-----+-----------+
|year| name|age|grade|module_name|
+----+-----+---+-----+-----------+
|2021|Alice| 67|   60|   COMP4008|
|2021|Alice| 67|   25|   COMP4103|
|2021|  Bob| 34|   70|   COMP4008|
|2021|  Bob| 34|   95|   COMP4103|
|2022|Mikel| 67|   60|   COMP4008|
|2022|Mikel| 67|   25|   COMP4103|
|2022|Isaac| 34|   70|   COMP4008|
|2022|Isaac| 34|   95|   COMP4103|
+----+-----+---+-----+-----------+



In [22]:
spark.sql("SELECT * FROM people WHERE age > 2 AND module_name != 'COMP4008'").show()

+----+-----+---+-----+-----------+
|year| name|age|grade|module_name|
+----+-----+---+-----+-----------+
|2021|Alice| 67|   25|   COMP4103|
|2021|  Bob| 34|   95|   COMP4103|
|2022|Mikel| 67|   25|   COMP4103|
|2022|Isaac| 34|   95|   COMP4103|
+----+-----+---+-----+-----------+



## JOINs con SparkSQL

### Transformaciones Join tipo SQL

Transformación | Descripción
------------- | -------------
*join(other, on, how)*  | Joins entre dos DataFrames

* `other`: segunda tabla para el join
* `on`: nombre de la columna por la que se realiza la unión. Puede ser una lista o una expresión join (Column)
* `how`: tipo de unión entre inner, `outer, left_outer, right_outer, left_semi`

In [None]:
data = [Row(name=u'Alice', age=1), Row(name=u'Bob', age=2)]
data2 = [Row(name=u'Chris', height=80), Row(name=u'Bob', height=85)]
df = spark.createDataFrame(data, ['age', 'name'])
df2 = spark.createDataFrame(data2, ['height', 'name'])
print("df:")
df.show()
print("df2:")
df2.show()

### `inner join`

In [None]:
print("Join por name")
df.join(df2, 'name').show()
print("Join por name + select df.name y height")
df.join(df2, 'name').select(df.name, df2.height).show()

### `fullOuterJoin`

In [None]:
print("Full outer join por name")
df.join(df2, 'name', 'outer').show()
print("Full outer join por name + select df.name y height")
df.join(df2, 'name', 'outer').select('name', 'height').show()

### `leftOuterJoin`

In [None]:
print("Left outer join por name")
df.join(df2, 'name', 'left_outer').show()

### `rightOuterJoin`

In [None]:
print("Left outer join por name")
df.join(df2, 'name', 'right_outer').show()

### Más ejemplos

In [None]:
data = [('Alice',1,6, 'Mate'), \
        ('Bob',2,8, 'Mate'), \
        ('Alice',3,9, 'Lengua'), \
        ('Bob',4,7, 'Lengua')]
df = spark.createDataFrame(data, ['name', 'course', 'grade', 'subject'])
df2 = sc.parallelize([ \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=5, height=80), \
    Row(name='Alice', age=10, height=80), \
    Row(name='Bob', age=5, height=90), \
    Row(name='Bob', age=10, height=100), \
    Row(name='Juan', age=5, height=90), \
    Row(name='Jaimito', age=6, height=120)]).toDF()

print("df:")
df.show()
print("df2:")
df2.show()

In [None]:
print("Inner join usando name")
df.join(df2, 'name').show()

print("Outer join usando name + select name y height")
df.join(df2, 'name', 'outer').select('name', 'height').show()

print("Outer join usando df.name == df2.name + select df.name y height")
df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).show()

print("Outer join usando condición con name y age = grade (a pesar de no tener sentido) + select name y age")
cond = [df.name == df2.name, df2.age == df.grade]
df.join(df2, cond, 'outer').select(df.name, df2.age).show()

# Caché de DataFrames
Si se va a reusar un DataFrame es conveniente cachearlo para que no se recalcule cada vez

In [None]:
quijoteDF = spark.read.text("./datos/pg2000.txt")
palabrasQuijoteDF = quijoteDF.select(explode(split('value', ' ')).alias('word')).cache()
print("Cabeza aparece " + str( palabrasQuijoteDF.where(palabrasQuijoteDF.word.like("%cabeza%")).count() ) + " veces")
print("Lanza aparece " + str( palabrasQuijoteDF.where(palabrasQuijoteDF.word.like("%lanza%")).count() ) + " veces")

# Lectura y escritura de ficheros


## Lectura
Leer ficheros de texto, JSON o CVS es muy sencillo.

Hay dos formas de hacerlo:
* Usando `spark.read.tipo_fichero(fichero)`
* Usando `spark.read.format(tipo_fichero).options(opciones).load(fichero)`

In [None]:
dfText = spark.read.text("datos/pg2000.txt")
dfJSON = spark.read.json("datos/json.json") # infiere el esquema
dfCSV = spark.read.csv("datos/personas.csv", inferSchema=True, header=True)

print("Elementos en DataFrame a partir de datos/pg2000.txt: " + str(dfText.count()) + "\nEsquema: ")
print dfText.printSchema()
print("Elementos en DataFrame a partir de datos/json.json: " + str(dfJSON.count()) + "\nEsquema: ")
print dfJSON.printSchema()
print("Elementos en DataFrame a partir de datos/personas.csv: " + str(dfCSV.count()) + "\nEsquema: ")
print dfCSV.printSchema()

In [None]:
dfText = spark.read.format("text").load("datos/pg2000.txt")
dfJSON = spark.read.format("json").load("datos/json.json") # infiere el esquema
dfCSV = spark.read.format("csv").options(inferSchema=True, header=True).load("datos/personas.csv")

print("Elementos en DataFrame a partir de datos/pg2000.txt: " + str(dfText.count()) + "\nEsquema: ")
print(dfText.printSchema())
print("Elementos en DataFrame a partir de datos/json.json: " + str(dfJSON.count()) + "\nEsquema: ")
print(dfJSON.printSchema())
print("Elementos en DataFrame a partir de datos/personas.csv: " + str(dfCSV.count()) + "\nEsquema: ")
print(dfCSV.printSchema())

## Escritura
Escribir ficheros de texto, JSON o CVS es igual de fácil.

**Nota: El fichero de salida se toma como directorio**

Hay dos formas de hacerlo:
* Usando `DataFrame.write.tipo_fichero(fichero)`
* Usando `DataFrame.write.format(tipo_fichero).options(opciones).load(fichero)`

In [None]:
dfText.write.text("datos/salidaTXT1.txt")
dfJSON.write.json("datos/salidaJSON1.json")
dfCSV.write.csv("datos/salidaCSV1.csv", header=True)

print("Ver datos escritos en datos/salidaTXT1.txt")
print("Ver datos escritos en datos/salidaJSON1.json")
print("Ver datos escritos en datos/salidaCSV1.csv")

In [None]:
dfText.write.format('text').save("datos/salidaTXT2.txt")
dfJSON.write.format('json').save("datos/salidaJSON2.json")
dfCSV.write.format('csv').save("datos/salidaCSV2.csv", header=True)

print("Ver datos escritos en datos/salidaTXT2.txt")
print("Ver datos escritos en datos/salidaJSON2.json")
print("Ver datos escritos en datos/salidaCSV2.csv")

## Más opciones en: <http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter>

# Un ejemplo completo: WordCount simple con DataFrames

*value* es el nombre de la columna por defecto al leer texto

1. Aplicamos un select con explode que equivale a un flatMap
1. Dentro del select dividimos la línea por espacios con split
 * A la columna resultante la denominamos Word con alias
 * Filtramos palabras vacías
2. Agrupamos por palabra y contamos cuántas veces se repite cada una
3. Ordenamos de manera descendente
4. Mostramos el DataFrame obtenido


In [None]:
from pyspark.sql.functions import *
dfText = spark.read.format("text").load("data/pg2000.txt")
dfText.select(explode(split(dfText["value"], " ")).alias("word")) \
    .filter("word != ''") \
    .groupBy("word").count() \
    .sort(desc("count")) \
    .show()

## Por pasos

In [153]:
dfText = spark.read.format("text").load("data/pg2000.txt")

In [154]:
dfText.cache()

DataFrame[value: string]

In [162]:
dfText.select(sql_f.explode(sql_f.split('value', ' ')).alias('word'))\
        .groupBy('word').count()\
        .sort(sql_f.desc('count')).show()

+----+-----+
|word|count|
+----+-----+
| que|19429|
|  de|17988|
|   y|15894|
|  la|10200|
|   a| 9575|
|    | 9504|
|  el| 7957|
|  en| 7898|
|  no| 5611|
|  se| 4690|
| los| 4680|
| con| 4047|
| por| 3758|
| las| 3423|
|  lo| 3387|
|  le| 3382|
|  su| 3319|
| don| 2533|
| del| 2464|
|  me| 2344|
+----+-----+
only showing top 20 rows



#### Vemos lo que hay en el DataFrame

In [None]:
dfText.select("*").show(10, False)

#### Dividimos las líneas en palabras

In [None]:
dfText.select(split("value", " ")) \
    .show(10, False)

#### Usamos explode para crear una Row con cada elemento del array en la columna actual

In [None]:
dfText.select( \
      explode(split("value", " ")) \
    ).show(10)

#### Renombramos la nueva columna


In [None]:
dfText.select( \
        explode(split("value", " ")) \
        .alias("word") \
    ).show(10)

#### Filtramos palabras vacías


In [None]:
dfText.select( \
           explode(split("value", " ")) \
          .alias("word")) \
      .filter("word != ''") \
      .show(10)

#### Agrupamos por palabra y contamos el número de apariciones


In [None]:
dfText.select( \
              explode(split("value", " ")) \
              .alias("word")) \
        .filter("word != ''") \
        .groupBy("word").count() \
        .show(10)

#### Ordenamos de manera descendente por la cuenta


In [None]:
dfText.select( \
              explode(split("value", " ")) \
              .alias("word")) \
        .filter("word != ''") \
        .groupBy("word").count() \
        .sort(desc("count")) \
        .show(10)