# Spark DataFrames / SQL

En el notebook anterior hemos introducido Spark y el uso de RDD para interactuar con los datos. Tal como comentamos, los RDD permiten trabajar a bajo nivel, siendo más cómodo y eficiente hacer uso de DataFrames y el lenguaje SQL.

## DATAFRAMES

Un DataFrame es una estructura equivalente a una tabla de base de datos relacional, con un motor bien optimizado para el trabajo en un clúster. Los datos se almacenan en filas y columnas y ofrece un conjunto de operaciones para manipular los datos.

El trabajo con DataFrames es más sencillo y eficiente que el procesamiento con RDD, por eso su uso es predominante en los nuevos desarrollos con Spark.

A continuación veremos cómo podemos obtener y persistir DataFrames desde diferentes fuentes y formatos de datos

### Creando Dataframes

El caso más básico es crear un DataFrame a partir de un RDD mediante `toDF`:

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() # SparkSession de forma programativa
# Creamos un RDD
datos = [("Aitor", 182), ("Pedro", 178), ("Marina", 161)]
rdd = spark.sparkContext.parallelize(datos)
# Creamos un DataFrame y mostramos su esquema
dfRDD = rdd.toDF()
dfRDD.printSchema()
# mediante 'printSchema'  obtenemos un resumen del esquema del DataFrame , donde para cada columna se indica el nombre, el tipo y si admite valores nulos.

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)



Podemos ver como los nombres de las columnas son _1 y _2. Para asignarle un nombre adecuado podemos pasarle una lista con los nombres a la hora de crear el DataFrame:

In [0]:
columnas = ["nombre","altura"]
dfRDD = rdd.toDF(columnas)
dfRDD.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- altura: long (nullable = true)



Si queremos mostrar sus datos, haremos uso del método `show`:

In [0]:
dfRDD.show()
# Obtenemos una vista de los datos en forma de tabla:

+------+------+
|nombre|altura|
+------+------+
| Aitor|   182|
| Pedro|   178|
|Marina|   161|
+------+------+



También podemos crear un DataFrame directamente desde una `SparkSession` sin crear un RDD previamente mediante el método `createDataFrame`:

In [0]:
dfDesdeDatos = spark.createDataFrame(datos, columnas)
dfDesdeDatos.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- altura: long (nullable = true)



### Mostrando los datos

Para los siguientes apartados, supongamos que queremos almacenar ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:

In [0]:
clientes = [
    ("Aitor", "Medrano", "Elche", 3000),
    ("Pedro", "Casas", "Elche", 4000),
    ("Laura", "García", "Elche", 5000), 
    ("Miguel", "Ruiz", "Torrellano", 6000),
    ("Isabel", "Guillén", "Alicante", 7000)
]
columnas = ["nombre","apellidos", "ciudad", "sueldo"]
df = spark.createDataFrame(clientes, columnas)

Para mostrar los datos podemos utilizar el método `show`, al cual le podemos indicar o no la cantidad de registros a recuperar, así como si queremos que los datos se trunquen o no, o si los queremos mostrar en vertical:

In [0]:
df.show(2)

+------+---------+------+------+
|nombre|apellidos|ciudad|sueldo|
+------+---------+------+------+
| Aitor|  Medrano| Elche|  3000|
| Pedro|    Casas| Elche|  4000|
+------+---------+------+------+
only showing top 2 rows



In [0]:
df.show(truncate=False)

+------+---------+----------+------+
|nombre|apellidos|ciudad    |sueldo|
+------+---------+----------+------+
|Aitor |Medrano  |Elche     |3000  |
|Pedro |Casas    |Elche     |4000  |
|Laura |García   |Elche     |5000  |
|Miguel|Ruiz     |Torrellano|6000  |
|Isabel|Guillén  |Alicante  |7000  |
+------+---------+----------+------+



In [0]:
df.show(3, vertical=True)

-RECORD 0------------
 nombre    | Aitor   
 apellidos | Medrano 
 ciudad    | Elche   
 sueldo    | 3000    
-RECORD 1------------
 nombre    | Pedro   
 apellidos | Casas   
 ciudad    | Elche   
 sueldo    | 4000    
-RECORD 2------------
 nombre    | Laura   
 apellidos | García  
 ciudad    | Elche   
 sueldo    | 5000    
only showing top 3 rows



Si sólo queremos recuperar unos pocos datos, podemos hacer uso de `head` o `first` los cuales devuelven objetos `Row`:

In [0]:
df.first()

Out[10]: Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)

In [0]:
df.head()

Out[11]: Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)

In [0]:
df.head(3)

Out[12]: [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
 Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
 Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000)]

Si queremos obtener un valor en concreto, una vez recuperada una fila, podemos acceder a sus columnas:

In [0]:
nom1 = df.first()[0]           # 'Aitor'
nom1

Out[13]: 'Aitor'

In [0]:
nom2 = df.first()["nombre"]    # 'Aitor'
nom2

Out[14]: 'Aitor'

También podemos obtener un sumario de los datos (igual que con Pandas) mediante `describe`:

In [0]:
df.describe()

Out[15]: DataFrame[summary: string, nombre: string, apellidos: string, ciudad: string, sueldo: string]

In [0]:
df.describe().show()

+-------+------+---------+----------+------------------+
|summary|nombre|apellidos|    ciudad|            sueldo|
+-------+------+---------+----------+------------------+
|  count|     5|        5|         5|                 5|
|   mean|  null|     null|      null|            5000.0|
| stddev|  null|     null|      null|1581.1388300841897|
|    min| Aitor|    Casas|  Alicante|              3000|
|    max| Pedro|     Ruiz|Torrellano|              7000|
+-------+------+---------+----------+------------------+



Si únicamente nos interesa saber cuantas filas tiene nuestro DataFrame, podemos hacer uso de `count`:

In [0]:
df.count()  # 5

Out[17]: 5

Por último, como un DataFrame por debajo es un RDD, podemos usar `collect` y `take` conforme necesitemos y recuperar objetos de tipo `Row`:

In [0]:
df.collect()

Out[17]: [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
 Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
 Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000),
 Row(nombre='Miguel', apellidos='Ruiz', ciudad='Torrellano', sueldo=6000),
 Row(nombre='Isabel', apellidos='Guillén', ciudad='Alicante', sueldo=7000)]

In [0]:
df.take(2)

Out[18]: [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
 Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000)]

In [0]:
nom = df.collect()[0][0]        # 'Aitor'
nom

Out[19]: 'Aitor'

### Cargando diferentes formatos

Lo más usual es cargar los datos desde una archivo externo. Para ello, mediante el API de `DataFrameReader` cargaremos los datos directamente en un Dataframe mediante diferentes métodos dependiendo del formato (admite tanto el nombre de un recurso como una ruta de una carpeta).

Para cada formato, existe un método corto que se llama como el formato en sí, y un método general donde mediante format indicamos el formato y que finaliza con el método `load` siempre dentro de `spark.read`:

**CSV**
```<python>
dfCSV = spark.read.csv("datos.csv")
dfCSV = spark.read.csv("datos/*.csv")   # Una carpeta entera
dfCSV = spark.read.option("sep", ";").csv("datos.csv")
dfCSV = spark.read.option("header", "true").csv("datos.csv")
dfCSV = spark.read.option("header", True).option("inferSchema", True).csv("datos.csv")
dfCSV = spark.read.options(sep=";", header=True, inferSchema=True).csv("pdi_sales.csv")
dfCSV = spark.read.format("csv").load("datos.csv") 
dfCSV = spark.read.load(path="datos.csv", format="csv", header="true", sep=";", inferSchema="true")

``` 



**txt**
```<Python>
dfTXT = spark.read.text("datos.txt")
# cada fichero se lee entero como un registro
dfTXT = spark.read.option("wholetext", true).text("datos/")

dfTXT = spark.read.format("txt").load("datos.txt")
```

**JSON**
```<Python>
dfJSON = spark.read.json("datos.json")
dfJSON = spark.read.format("json").load("datos.json")
```

**parquet**
```<Python>
dfParquet = spark.read.parquet("datos.parquet")
dfParquet = spark.read.format("parquet").load("datos.parquet")
```

Mas información en la [documentación](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html) oficial

**AVRO**
La fuente de datos en formato Avro se incluye como un módulo externo, y por lo tanto, para poder leer o escribir datos en dicho formato, previamente hemos de cargar una librería.
Para ello, al arrancar PySpark, le pasaremos como parámetro `--packages org.apache.spark:spark-avro_2.12:3.3.1`:  

`pyspark --packages org.apache.spark:spark-avro_2.12:3.3.1`  

Una vez arrancado, ya podemos leer y escribir datos en formato Avro de forma similar al resto:  

```
df = spark.read.format("avro").load("datos.avro")
df.write.format("avro").save("archivo.avro")
```  

La librería también nos permite convertir columnas y estructuras de datos con las operaciones `to_avro()` y `from_avro()`. Más información en la [documentación oficial](https://spark.apache.org/docs/latest/sql-data-sources-avro.html).

**Actividad:** Con un fichero cualquiera en `.avro` probar el código anterior e imprimir el contenido de dicho fichero

### Persistiendo diferentes formatos  

Si lo que queremos es persistir los datos, en vez de `read`, utilizaremos `write` (de manera que obtenemos un `DataFrameWriter`) y si usamos la forma general usaremos el método `save`:

**CSV**
```<python>
dfCSV.write.csv("datos.csv")
dfCSV.write.format("csv").save("datos.csv")
dfCSV.write.format("csv").mode("overwrite").save("datos.csv")
```

**TXT**
```<python>
dfTXT.write.text("datos.txt")
dfTXT.write.option("lineSep",";").text("datos.txt")
dfTXT.write.format("txt").save("datos.txt")
```

**JSON**
```<python>
dfJSON.write.json("datos.json")
dfJSON.write.format("json").save("datos.json")
```

**Parquet**
```<python>
dfParquet.write.parquet("datos.parquet")
dfParquet.write.mode("overwrite").partitionBy("fecha").parquet("datos/")
dfParquet.write.format("parquet").save("datos.parquet")
```

Más información en la[ documentación oficial](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)

> Por cada partición, Spark generará un archivo de salida. Recuerda que podemos reducir el `número de particiones` mediante `coalesce` o `repartition`.

Una vez vista la sintaxis, vamos a ver un ejemplo completo de lectura de un archivo CSV (el archivo `pdi_sales.csv`) que está almacenado en HDFS y que tras leerlo, lo guardamos como JSON de nuevo en HDFS:

> **Usar DBFS en vez de HDFS**. El siguiente código esta hecho para HDFS. Modificarlo para que pueda ser ejecutado en DBFS. 

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-dataframe-csv").getOrCreate()

# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("delimiter",";").option("header", "true").csv("dbfs:/FileStore/Notebook/pdi_sales.csv")

# df.printSchema()

df.write.json("dbfs:/FileStore/Notebook/pdi_sales_json")

### Comprimiendo los datos

Para configurar el algoritmo de compresión, si los datos están en Parquet o Avro, a nivel de la sesión de Spark, podemos realizar su configuración:

```<python>
spark.setConf("spark.sql.parquet.compression.codec","snappy")
spark.setConf("spark.sql.parquet.compression.codec","none")
spark.setConf("spark.sql.avro.compression.codec","snappy")
```

Si sólo queremos hacerlo para una operación en particular, para cada lectura/escritura le añadimos `.option("compression", "algoritmo")`. Por ejemplo:

In [0]:
dfVentas = spark.read.option("compression", "snappy").option("delimiter",";").option("header", "true").csv("dbfs:/FileStore/Notebook/pdi_sales.csv")
#dfClientes = spark.read.option("compression", "snappy").parquet("clientes.parquet")
#dfVentas.write.option("compression", "snappy").format("avro").save("ventas.avro")

## DATOS Y ESQUEMAS 

El esquema completo de un DataFrame se modela mediante un `StructType`, el cual contiene una colección de objetos `StructField`. Así pues, cada columna de un DataFrame de Spark se modela mediante un objeto `StructField` indicando su nombre, tipo y gestión de los nulos.

Hemos visto que al crear un DataFrame desde un archivo externo, podemos inferir el esquema. Si queremos crear un DataFrame desde un esquema propio utilizaremos los tipos `StructType`, `StructField`, así como `StringType`, `IntegerType` o el tipo necesario para cada columna. Para ello, primero hemos de importarlos (como puedes observar, estas clases pertenecen a las librerías SQL de PySpark):

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

> **Tipos**.  
> Además de cadenas y enteros, flotantes (`FloatType`) o dobles (`DoubleType`), tenemos tipos booleanos (`BooleanType`), de fecha (`DateType` y `TimestampType`), así como tipos complejos como `ArrayType`, `MapType` y `StructType`. Para más información, consultar la [documentación oficial](https://spark.apache.org/docs/latest/sql-ref-datatypes.html).

Volvamos al ejemplo anterior donde tenemos ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:

In [0]:
clientes = [
    ("Aitor", "Medrano", "Elche", 3000),
    ("Pedro", "Casas", "Elche", 4000),
    ("Laura", "García", "Elche", 5000), 
    ("Miguel", "Ruiz", "Torrellano", 6000),
    ("Isabel", "Guillén", "Alicante", 7000)
]

Para esta estructura, definiremos un esquema con los campos, indicando para cada uno de ellos su nombre, tipo y si admite valores nulos:

In [0]:
esquema = StructType([
    StructField("nombre", StringType(), False),
    StructField("apellidos", StringType(), False),
    StructField("ciudad", StringType(), True),
    StructField("sueldo", IntegerType(), False)
])

A continuación ya podemos crear un DataFrame con datos propios que cumplen un esquema haciendo uso del método `createDataFrame`:

In [0]:
df = spark.createDataFrame(data=clientes, schema=esquema)
df.printSchema()

root
 |-- nombre: string (nullable = false)
 |-- apellidos: string (nullable = false)
 |-- ciudad: string (nullable = true)
 |-- sueldo: integer (nullable = false)



In [0]:
df.show(truncate=False)

+------+---------+----------+------+
|nombre|apellidos|ciudad    |sueldo|
+------+---------+----------+------+
|Aitor |Medrano  |Elche     |3000  |
|Pedro |Casas    |Elche     |4000  |
|Laura |García   |Elche     |5000  |
|Miguel|Ruiz     |Torrellano|6000  |
|Isabel|Guillén  |Alicante  |7000  |
+------+---------+----------+------+



Si lo que queremos es asignarle un esquema a un DataFrame que vamos a leer desde una fuente de datos externa, hemos de emplear el método `schema`:

```<Python>
dfClientes = spark.read.option("header", True).schema(esquema).csv("clientes.csv")
```

> **Rendimiento y esquema**  
> La inferencia de los tipos de los datos es un proceso computacionalmente costoso. Por ello, si nuestro conjunto de datos es grande, es muy recomendable crear el esquema de forma programativa y configurarlo en la carga de datos. Se recomienda la lectura del artículo [Using schemas to speed up reading into Spark DataFrames](https://t-redactyl.io/blog/2020/08/using-schemas-to-speed-up-reading-into-spark-dataframes.html).

> **Actividad** En otro notebook hacer el ejemplo que se muestra en el artículo que se sugiere arriba. La dataset se encuentra en [UCI Machine Learning Repository](https://archive.ics.uci.edu/dataset/339/taxi+service+trajectory+prediction+challenge+ecml+pkdd+2015) con el nombre `train.csv.zip`

In [0]:
# Importar librerias necesarias
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, BooleanType
from time import time

# Definir ubicacion del archivo csv
data_location = "dbfs:/FileStore/Notebook2/train.csv"

In [0]:


# Lectura de datos sin esquema
t1 = time()
data_inferred = spark.read.csv(data_location, header=True, inferSchema=True)
t2 = time()
print('Lectura completada en %s segundos.' % (str(t2 - t1)))


Lectura completada en 35.34104084968567 segundos.


In [0]:
# Definir un esquema explicito
schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("call_type", StringType(), True),
    StructField("origin_call", IntegerType(), True),
    StructField("origin_stand", IntegerType(), True),
    StructField("taxi_id", LongType(), True),
    StructField("timestamp", LongType(), True),
    StructField("day_type", StringType(), True),
    StructField("missing_data", BooleanType(), True),
    StructField("polyline", StringType(), True)
])

#Lectura de Datos con esquema definido
t1 = time()
data_schema = spark.read.csv(data_location, header=False, schema=schema)
t2 = time()
print('Lectura con esquema completada en %s segundos.' % (str(t2 - t1)))

Lectura con esquema completada en 0.39853787422180176 segundos.


Respecto al esquema, tenemos diferentes propiedades como columns, dtypes y schema con las que obtener su información:

In [0]:
df.columns

Out[33]: ['nombre', 'apellidos', 'ciudad', 'sueldo']

In [0]:
df.dtypes

Out[34]: [('nombre', 'string'),
 ('apellidos', 'string'),
 ('ciudad', 'string'),
 ('sueldo', 'int')]

In [0]:
df.schema

Out[35]: StructType([StructField('nombre', StringType(), False), StructField('apellidos', StringType(), False), StructField('ciudad', StringType(), True), StructField('sueldo', IntegerType(), False)])

Si una vez hemos cargado un DataFrame queremos cambiar el tipo de una de sus columnas, podemos hacer uso del método `withColumn`:

In [0]:
# Forma larga
from pyspark.sql.types import DoubleType
df = df.withColumn("sueldo", df.sueldo.cast(DoubleType()))

In [0]:
# Forma corta
df = df.withColumn("sueldo", df.sueldo.cast("double"))


In [0]:
#df = df.withColumn("fnac", to_date(df.Date, "M/d/yyy"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1839072699364289>:1[0m
[0;32m----> 1[0m df [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mfnac[39m[38;5;124m"[39m, [43mto_date[49m(df[38;5;241m.[39mDate, [38;5;124m"[39m[38;5;124mM/d/yyy[39m[38;5;124m"[39m))

[0;31mNameError[0m: name 'to_date' is not defined

> **Errores al leer datos**  
> Si tenemos un error al leer un dato que contiene un tipo no esperado, por defecto, Spark lanzará una excepción y se detendrá la lectura.  
> Si queremos que asigne los tipos a los campos pero que no los valide, podemos pasarle el parámetro extra verifySchema a False al crear un DataFrame mediante spark.createDataFrame o enforceSchema también a False al cargar desde una fuente externa mediante spark.read, de manera que los datos que no concuerden con el tipo se quedarán nulos, vacíos o con valor 0, dependiendo del tipo de dato que tiene asignada la columna en el esquema.  
`dfClientes = spark.read.option("header", True).option("enforceSchema",False).schema(esquema).csv("clientes.csv")`

## DATAFRAME API

Una vez tenemos un DataFrame podemos trabajar con los datos mediante un conjunto de operaciones estructuradas, muy similares al lenguaje relacional. Estas operaciones también se clasifican en transformaciones y acciones, recordando que las transformaciones utilizan una evaluación perezosa.  
Es muy importante tener en cuenta que todas las operaciones que vamos a realizar a continuación son immutables, es decir, nunca van a modificar el DataFrame sobre el que realizamos la transformación. Así pues, realizaremos encadenamiento de transformaciones (transformation chaining) o asignaremos el resultado a un nuevo DataFrame.

> **Preparación**
> Para los siguientes apartados, vamos a trabajar sobre el siguiente DataFrame con el fichero de [ventas](https://tajamar365.sharepoint.com/:x:/s/3405-MasterIA2024-2025/EW5mbeDvxFpKkePpayR4A6gBmrh_dpG54CxcqODEnQ7hMw?e=RixWQy) que se ha utilizado en casos anteriores:

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-dataframes-api").getOrCreate()
# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("sep",";").option("header", "true").option("inferSchema", "true").csv("dbfs:/FileStore/Notebook/pdi_sales_small.csv")
df.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Country: string (nullable = true)



### Proyectando

La operación [select](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html) permite indicar las columnas a recuperar pasándolas como parámetros:

In [0]:
# Consulta de columnas
df.select("ProductID","Revenue").show(3)

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
+---------+-------+
only showing top 3 rows



También podemos realizar cálculos (referenciando a los campos con `nombreDataframe.nombreColumna`) sobre las columnas y crear un alias (operación asociada a un campo):

In [0]:
# Calculo y creación del alias
df.select(df.ProductID,(df.Revenue+10).alias("VentasMas10")).show(3)

+---------+-----------+
|ProductID|VentasMas10|
+---------+-----------+
|      725|      125.5|
|      787|      324.9|
|      788|      324.9|
+---------+-----------+
only showing top 3 rows



Si tenemos un DataFrame con un gran número de columnas y queremos recuperarlas todas a excepción de unas pocas, es más cómodo utilizar la transformación [drop](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.drop.html), la cual funciona de manera opuesta a `select`, es decir, indicando las columnas que queremos quitar del resultado:

In [0]:
# Obtenemos el mismo resultado
df.select("ProductID", "Date", "Zip")
df.drop("Units", "Revenue", "Country")

Out[48]: DataFrame[ProductID: int, Date: date, Zip: string]

### Trabajando con columnas

Para acceder a las columnas, debemos crear objetos [Column](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html). Para ello, podemos seleccionarlos a partir de un DataFrame como una propiedad o mediante la función `col`:

In [0]:
# nomCliente = df.nombre
# nomCliente = df["ProductID"]
# nomCliente = col("ProductID")

Así pues, podemos recuperar ciertas columnas de un DataFrame con cualquier de las siguientes expresiones:

In [0]:
from pyspark.sql.functions import col

df.select("ProductID", "Revenue").show()
df.select(df.ProductID, df.Revenue).show()
df.select(df["ProductID"], df["Revenue"]).show()
df.select(col("ProductID"), col("Revenue")).show()

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
|      940|  687.7|
|      396|  857.1|
|      734|  330.7|
|      769|  257.2|
|      499|  846.3|
|     2254|   57.7|
|       31|  761.2|
|      475|  970.2|
|      510|  837.1|
|      499|  883.0|
|      289|  866.0|
|      702|  286.1|
|      910|  414.7|
|      901|  818.9|
|      550|  404.0|
|      559|  585.6|
|      767|  105.0|
+---------+-------+
only showing top 20 rows

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
|      940|  687.7|
|      396|  857.1|
|      734|  330.7|
|      769|  257.2|
|      499|  846.3|
|     2254|   57.7|
|       31|  761.2|
|      475|  970.2|
|      510|  837.1|
|      499|  883.0|
|      289|  866.0|
|      702|  286.1|
|      910|  414.7|
|      901|  818.9|
|      550|  404.0|
|      559|  585.6|
|      767|  105.0|
+---------+-------+
only showing t

### col vs expr

En ocasiones se confunde el uso de la función [col ](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.col.html)con [expr](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html). Aunque podemos referenciar a una columna haciendo uso de `expr`, su uso provoca que se parseé la cadena recibida para interpretarla.

Para el siguiente ejemplo, supongamos que tenemos un DataFrame con datos de clientes. Utilizaremos también la función [concat_ws](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat_ws.html) para concatenar textos utilizado un separador.

In [0]:
df.printSchema()


root
 |-- ProductID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Zip: string (nullable = true)
 |-- Units: integer (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, concat_ws, expr

# Seleccionar columnas existentes y realizar operaciones
df.select(
    concat_ws(" ", col("Country"), col("Zip")).alias("CountryZip"),  # Concatenar Country y Zip
    col("Revenue"),                                                 # Mantener la columna Revenue
    expr("Revenue * 1.1").alias("AdjustedRevenue")                  # Calcular nueva columna Revenue ajustada
).show()


+--------------------+-------+------------------+
|          CountryZip|Revenue|   AdjustedRevenue|
+--------------------+-------+------------------+
|Germany 41540    ...|  115.5|127.05000000000001|
|Germany 41540    ...|  314.9|            346.39|
|Germany 41540    ...|  314.9|            346.39|
|Germany 22587    ...|  687.7| 756.4700000000001|
|Germany 22587    ...|  857.1| 942.8100000000001|
|Germany 22587    ...|  330.7|363.77000000000004|
|Germany 22587    ...|  257.2|            282.92|
|Germany 12555    ...|  846.3| 930.9300000000001|
|Germany 40217    ...|   57.7|63.470000000000006|
|Germany 40217    ...|  761.2| 837.3200000000002|
|Germany 13583    ...|  970.2|           1067.22|
|Germany 22337    ...|  837.1| 920.8100000000001|
|Germany 22337    ...|  883.0| 971.3000000000001|
|Germany 13587    ...|  866.0|             952.6|
|Germany 13587    ...|  286.1|314.71000000000004|
|Germany 13587    ...|  414.7|            456.17|
|Germany 13587    ...|  818.9| 900.7900000000001|


### Añadiendo columnas

Una vez tenemos un DataFrame, podemos añadir columnas mediante el método [withColumn](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html):

In [0]:
dfNuevo = df.withColumn("total", df.Units * df.Revenue)
dfNuevo.show()

+---------+----------+---------------+-----+-------+-------+------+
|ProductID|      Date|            Zip|Units|Revenue|Country| total|
+---------+----------+---------------+-----+-------+-------+------+
|      725|1999-01-15|41540          |    1|  115.5|Germany| 115.5|
|      787|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
|      788|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
|      940|1999-01-15|22587          |    1|  687.7|Germany| 687.7|
|      396|1999-01-15|22587          |    1|  857.1|Germany| 857.1|
|      734|2003-04-10|22587          |    1|  330.7|Germany| 330.7|
|      769|1999-02-15|22587          |    1|  257.2|Germany| 257.2|
|      499|1999-01-15|12555          |    1|  846.3|Germany| 846.3|
|     2254|1999-01-15|40217          |    1|   57.7|Germany|  57.7|
|       31|2002-05-31|40217          |    1|  761.2|Germany| 761.2|
|      475|1999-02-15|13583          |    1|  970.2|Germany| 970.2|
|      510|1999-01-15|22337          |    1|  83

> **withColumn**
> Anteriormente hemos utilizado el método [withColumn](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html) para cambiarle el tipo a un campo ya existente. Así pues, si referenciamos a una columna que ya existe, en vez de crearla, la sustituirá.

Otra forma de añadir una columna con una expresión es mediante la transformación [selectExpr](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.selectExpr.html). Por ejemplo, podemos conseguir el mismo resultado que en el ejemplo anterior de la siguiente manera:

In [0]:
df.selectExpr("*", "Units * Revenue as total").show()

+---------+----------+---------------+-----+-------+-------+------+
|ProductID|      Date|            Zip|Units|Revenue|Country| total|
+---------+----------+---------------+-----+-------+-------+------+
|      725|1999-01-15|41540          |    1|  115.5|Germany| 115.5|
|      787|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
|      788|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
|      940|1999-01-15|22587          |    1|  687.7|Germany| 687.7|
|      396|1999-01-15|22587          |    1|  857.1|Germany| 857.1|
|      734|2003-04-10|22587          |    1|  330.7|Germany| 330.7|
|      769|1999-02-15|22587          |    1|  257.2|Germany| 257.2|
|      499|1999-01-15|12555          |    1|  846.3|Germany| 846.3|
|     2254|1999-01-15|40217          |    1|   57.7|Germany|  57.7|
|       31|2002-05-31|40217          |    1|  761.2|Germany| 761.2|
|      475|1999-02-15|13583          |    1|  970.2|Germany| 970.2|
|      510|1999-01-15|22337          |    1|  83

Aunque más adelante veremos como realizar transformaciones con agregaciones, mediante `selectExpr` también podemos realizar analítica de datos aprovechando la potencia de SQL:

In [0]:
df.selectExpr("count(distinct(ProductID)) as productos","count(distinct(Country)) as paises").show()

+---------+------+
|productos|paises|
+---------+------+
|      799|     5|
+---------+------+



**Cambiando el nombre**  
Si por algún extraño motivo necesitamos cambiarle el nombre a una columna (por ejemplo, vamos a unir dos DataFrames que tienen columnas con el mismo nombre pero en posiciones diferentes, o que al inferir el esquema tenga un nombre críptico o demasiado largo y queremos que sea más legible) podemos utilizar la transformación [withColumnRenamed](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumnRenamed.html):

In [0]:
df.withColumnRenamed("Zip", "PostalCode").show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|     PostalCode|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      725|1999-01-15|41540          |    1|  115.5|Germany|
|      787|2002-06-06|41540          |    1|  314.9|Germany|
|      788|2002-06-06|41540          |    1|  314.9|Germany|
|      940|1999-01-15|22587          |    1|  687.7|Germany|
|      396|1999-01-15|22587          |    1|  857.1|Germany|
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



### Filtrando

Si queremos eliminar filas, usaremos el método [filter](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html):

In [0]:
df.filter(df.Country=="Germany").show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      725|1999-01-15|41540          |    1|  115.5|Germany|
|      787|2002-06-06|41540          |    1|  314.9|Germany|
|      788|2002-06-06|41540          |    1|  314.9|Germany|
|      940|1999-01-15|22587          |    1|  687.7|Germany|
|      396|1999-01-15|22587          |    1|  857.1|Germany|
|      734|2003-04-10|22587          |    1|  330.7|Germany|
|      769|1999-02-15|22587          |    1|  257.2|Germany|
|      499|1999-01-15|12555          |    1|  846.3|Germany|
|     2254|1999-01-15|40217          |    1|   57.7|Germany|
|       31|2002-05-31|40217          |    1|  761.2|Germany|
|      475|1999-02-15|13583          |    1|  970.2|Germany|
|      510|1999-01-15|22337          |    1|  837.1|Germany|
|      499|2002-06-05|22337          |    1|  883.0|Germany|
|      289|1999-02-15|13

Por similitud con SQL, podemos utilizar también `where` como un alias de `filter`:

In [0]:
df.where(df.Units>20).show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|     2091|1999-05-15|9739           |   24| 3652.7|Mexico |
|     2091|1999-06-15|40213          |   41| 6240.1|Germany|
|     2091|1999-10-15|40213          |   41| 6347.7|Germany|
|     2091|1999-12-15|40213          |   23| 3560.9|Germany|
+---------+----------+---------------+-----+-------+-------+



Podemos utilizar los operadores lógicos (`&` para conjunción y `|` para la disyunción) para crear condiciones compuestas (recordad rodear cada condición entre paréntesis):

In [0]:
df.filter((df.Country=="Germany") & (df.Units>20)).show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|     2091|1999-06-15|40213          |   41| 6240.1|Germany|
|     2091|1999-10-15|40213          |   41| 6347.7|Germany|
|     2091|1999-12-15|40213          |   23| 3560.9|Germany|
+---------+----------+---------------+-----+-------+-------+



In [0]:
df.filter((df.ProductID==2314) | (df.ProductID==1322)).show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|     2314|1999-05-15|46045          |    1|   13.9|Germany|
|     1322|2000-01-06|75593 CEDEX 12 |    1|  254.5|France |
+---------+----------+---------------+-----+-------+-------+



Un caso particular de filtrado es la eliminación de los registros repetidos, lo cual lo podemos hacer de dos maneras:  

- Haciendo uso del método [distinct](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.distinct.html) tras haber realizado alguna transformación.  
- Utilizando [dropDuplicates](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropDuplicates.html) sobre un DataFrame:

In [0]:
df.select("Country").distinct().show()

+-------+
|Country|
+-------+
|Germany|
|France |
|Canada |
|Mexico |
| France|
+-------+



In [0]:
df.dropDuplicates(["Country"]).select("Country").show()

+-------+
|Country|
+-------+
|Germany|
|France |
|Canada |
|Mexico |
| France|
+-------+



### **Ordenando**  

Una vez recuperados los datos deseados, podemos ordenarlos mediante [sort](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sort.html) u [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) (son operaciones totalmente equivalentes):

In [0]:
df.select("ProductID","Revenue").sort("Revenue").show(5)

+---------+-------+
|ProductID|Revenue|
+---------+-------+
|     2314|   13.9|
|     1974|   52.4|
|     1974|   52.4|
|     1974|   52.4|
|     1974|   52.4|
+---------+-------+
only showing top 5 rows



In [0]:
df.sort("Revenue").show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|     2314|1999-05-15|46045          |    1|   13.9|Germany|
|     1974|1999-03-15|R3B            |    1|   52.4|Canada |
|     1974|1999-04-15|R3H            |    1|   52.4|Canada |
|     1974|1999-03-15|R3H            |    1|   52.4|Canada |
|     1974|1999-01-15|R3S            |    1|   52.4|Canada |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
df.sort("Revenue", ascending=True).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|     2314|1999-05-15|46045          |    1|   13.9|Germany|
|     1974|1999-03-15|R3B            |    1|   52.4|Canada |
|     1974|1999-04-15|R3H            |    1|   52.4|Canada |
|     1974|1999-03-15|R3H            |    1|   52.4|Canada |
|     1974|1999-01-15|R3S            |    1|   52.4|Canada |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
df.sort(df.Revenue.asc()).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|     2314|1999-05-15|46045          |    1|   13.9|Germany|
|     1974|1999-03-15|R3B            |    1|   52.4|Canada |
|     1974|1999-04-15|R3H            |    1|   52.4|Canada |
|     1974|1999-03-15|R3H            |    1|   52.4|Canada |
|     1974|1999-01-15|R3S            |    1|   52.4|Canada |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
# Ordenación descendente
df.sort(df.Revenue.desc()).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0|France |
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8|France |
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0|France |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
df.sort("Revenue", ascending=False).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0|France |
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8|France |
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0|France |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc
df.sort(desc("Revenue")).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0|France |
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8|France |
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0|France |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
# Ordenación diferente en cada columna
df.sort(df.Revenue.desc(), df.Units.asc()).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0|France |
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8|France |
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0|France |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



In [0]:
df.sort(["Revenue","Units"], ascending=[0,1]).show(5)

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0|France |
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8|France |
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0|France |
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows



Normalmente, tras realizar una ordenación, es habitual quedarse con un subconjunto de los datos. Para ello, podemos utilizar la transformación [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html).  
Por ejemplo, la siguiente transformación es similar al ejemplo anterior, sólo que ahora al driver únicamente le llegan 5 registros, en vez de traerlos todos y sólo mostrar 5:

In [0]:
df.sort(df.Revenue.desc(), df.Units.asc()).limit(5).show()

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1|France |
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0|France |
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8|France |
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0|France |
+---------+----------+---------------+-----+-------+-------+



### **Añadiendo filas**  

a única manera de añadir filas a un DataFrame es creando uno nuevo que sea el resultado de unir dos DataFrames que compartan el mismo esquema (mismo nombres de columnas y en el mismo orden). Para ello, utilizaremos la transformación [union](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html) que realiza la unión por el orden de las columnas:

In [0]:
nuevasVentas = [
    (6666, "2022-03-24", "03206", 33, 3333.33, "Spain"),
    (6666, "2022-03-25", "03206", 22, 2222.22, "Spain"),
]

# Creamos un nuevo DataFrame con las nuevas ventas
nvDF = spark.createDataFrame(nuevasVentas)


In [0]:
# Unimos los dos DataFrames
dfUpdated = df.union(nvDF)

> **Trabajando con conjuntos**  
> Considerando dos DataFrames como dos conjuntos, podemos emplear las operaciones [union](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html), [intersect](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersect.html), [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) (mantiene los duplicados), [exceptAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.exceptAll.html) (mantiene los duplicados) y [subtract](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.subtract.html) .

### Cogiendo muestras  
Si necesitamos recoger un subconjunto de los datos, ya sea para preparar los datos para algún modelo de machine learning como para una muestra aleatoria de los mismos, podemos utilizar las siguientes transformaciones:  
- [sample](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.sample.html) permite obtener una muestra a partir de un porcentaje (no tiene porqué obtener una cantidad exacta). También admite un semilla e indicar si queremos que pueda repetir los datos.

In [0]:
df.count()                  # 120239

Out[74]: 120239

In [0]:
muestra = df.sample(0.10)

In [0]:
muestra.count()             # 11876

Out[76]: 12162

In [0]:
muestraConRepetidos = df.sample(True, 0.10)

In [0]:
muestraConRepetidos.count() # 11923

Out[78]: 12189

- [randomSplit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.randomSplit.html) recupera diferentes DataFrames cuyos tamaños en porcentaje se indican como parámetros (si no suman uno, los parámetros se normalizan):

In [0]:
dfs = df.randomSplit([0.8, 0.2])
dfEntrenamiento = dfs[0]

In [0]:
dfPrueba = dfs[1]

In [0]:
dfEntrenamiento.count()     # 96194

Out[81]: 96220

In [0]:
dfPrueba.count()            # 24045

Out[82]: 24019

## Trabajando con datos sucios

Hay tres formas de gestionar la suciedad de los datos o la omisión completa de los mismos:  
1. Eliminar las filas que tienen valores vacíos en una o más columnas.  
2. Rellenar los valores nulos con valores que definimos nosotros.  
3. Sustituir los datos erróneos por algún valor que sepamos cómo gestionarlo.

Vamos a ver cada uno de estos casos a partir del siguiente dataset:
  

In [0]:
malasVentas = [
    (6666, "2022-03-22", "03206", 33, 3333.33, "Spain"),
    (6666, "2022-03-22", None, 33, 3333.33, "Spain"),
    (6666, "2022-03-23", "03206", None, 2222.22, "Spain"),
    (6666, "2022-03-24", "03206", None, None, "Espain"),
    (None, None, None, None, None, None)
]
malDF = spark.createDataFrame(malasVentas, ["ProductID", "Date", "Zip", "Units", "Revenue" , "Country"])
malDF.show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| null|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| null|2222.22|  Spain|
|     6666|2022-03-24|03206| null|   null| Espain|
|     null|      null| null| null|   null|   null|
+---------+----------+-----+-----+-------+-------+



Si queremos saber si una columna contiene nulos, podemos hacer un filtrado utilizando el método [isNull](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.isNull.html) sobre los campos deseados (también podemos utilizar [isNotNull](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.isNotNull.html) si queremos el caso contrario):

In [0]:
malDF.filter(malDF.Zip.isNull()).show()

+---------+----------+----+-----+-------+-------+
|ProductID|      Date| Zip|Units|Revenue|Country|
+---------+----------+----+-----+-------+-------+
|     6666|2022-03-22|null|   33|3333.33|  Spain|
|     null|      null|null| null|   null|   null|
+---------+----------+----+-----+-------+-------+



Para trabajar con las filas que contengan algún dato nulo, podemos acceder a la propiedad `na`, la cual devuelve un [DataFrameNaFunctions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameNaFunctions.html) sobre la que podemos indicarle:  
- que la elimine mediante el método [drop](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameNaFunctions.drop.html) / [dropna](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropna.html). Puede recibir `"any"` (borrará las filas que contengan algún nulo) o `"all"` (borrará las filas que todas sus columnas contengan nulos) y una lista con las columnas a considerar:

In [0]:
# Elimina todos los nulos
malDF.na.drop().show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
+---------+----------+-----+-----+-------+-------+



In [0]:
# Elimina las filas que todas sus columnas son nulas
malDF.na.drop("all").show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| null|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| null|2222.22|  Spain|
|     6666|2022-03-24|03206| null|   null| Espain|
+---------+----------+-----+-----+-------+-------+



In [0]:
# Elimina las filas que tienen el Zip nulo
malDF.na.drop(subset=["Zip"]).show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| null|2222.22|  Spain|
|     6666|2022-03-24|03206| null|   null| Espain|
+---------+----------+-----+-----+-------+-------+



También podemos indicar la cantidad de valores no nulos que ha de contener cada fila para eliminarla mediante el parámetro `thresh`:

In [0]:
# Elimina las filas que tengan menos de 3 valores rellenados
malDF = malDF.dropna(thresh = 3)
malDF.show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| null|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| null|2222.22|  Spain|
|     6666|2022-03-24|03206| null|   null| Espain|
+---------+----------+-----+-----+-------+-------+



- que la rellene mediante el método [fill](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameNaFunctions.fill.html) / [fillna](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.fillna.html), indicando el valor y si queremos, sobre qué columnas aplicar la modificación:

In [0]:
# Rellenamos los zips vacíos por 99999
malDF.na.fill("99999", subset=["Zip"]).show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22|99999|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| null|2222.22|  Spain|
|     6666|2022-03-24|03206| null|   null| Espain|
+---------+----------+-----+-----+-------+-------+



- que la sustituya mediante el método [replace](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameNaFunctions.replace.html)

In [0]:
# Cambiamos Espain por Spain
malDF.na.replace("Espain", "Spain").show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|     6666|2022-03-22|03206|   33|3333.33|  Spain|
|     6666|2022-03-22| null|   33|3333.33|  Spain|
|     6666|2022-03-23|03206| null|2222.22|  Spain|
|     6666|2022-03-24|03206| null|   null|  Spain|
+---------+----------+-----+-----+-------+-------+



> **na.replace vs replace**  

La función `replace` se puede emplear dentro de la propiedad `na` o a partir del propio Dataframe.

Otro caso muy común es realizar una operación sobre una columna para transformar su valor, por ejemplo, pasar todo el texto a minúsculas o dividir una columna entre 100 para cambiar la escala.  
En nuestro caso, vamos a modificar las columnas Zip y Country para realizar un `trim` y borrar los espacios en blanco:

In [0]:
from pyspark.sql.functions import col, trim
df = df.withColumn("Country", trim(col("Country"))).withColumn("Zip", trim(col("Zip")))
df.show()

+---------+----------+-----+-----+-------+-------+
|ProductID|      Date|  Zip|Units|Revenue|Country|
+---------+----------+-----+-----+-------+-------+
|      725|1999-01-15|41540|    1|  115.5|Germany|
|      787|2002-06-06|41540|    1|  314.9|Germany|
|      788|2002-06-06|41540|    1|  314.9|Germany|
|      940|1999-01-15|22587|    1|  687.7|Germany|
|      396|1999-01-15|22587|    1|  857.1|Germany|
|      734|2003-04-10|22587|    1|  330.7|Germany|
|      769|1999-02-15|22587|    1|  257.2|Germany|
|      499|1999-01-15|12555|    1|  846.3|Germany|
|     2254|1999-01-15|40217|    1|   57.7|Germany|
|       31|2002-05-31|40217|    1|  761.2|Germany|
|      475|1999-02-15|13583|    1|  970.2|Germany|
|      510|1999-01-15|22337|    1|  837.1|Germany|
|      499|2002-06-05|22337|    1|  883.0|Germany|
|      289|1999-02-15|13587|    1|  866.0|Germany|
|      702|1999-02-15|13587|    1|  286.1|Germany|
|      910|1999-03-15|13587|    1|  414.7|Germany|
|      901|1999-02-15|13587|   

## Usando SQL

En la era del big data SQL es la lengua franca, permitiendo a perfiles con pocos conocimientos de programación trabajar de forma eficiente con los datos (siempre poniendo el foco en la analítica de datos, no en el procesamiento transaccional). Spark soporta el ANSI SQL 2003, ampliamente establecido en el mundo de las bases de datos. Para correr SQL en Spark podemos hacerlo a través de:  
- El cliente SQL, es cual se ofrece como un comando en `./bin/spark-sql`  
- Mediante un servidor ODBC/JDBC  
- De forma programativa mediante aplicaciones Spark.  

Las dos primeras opciones se integran con Apache Hive para utilizar su metastore. Ahora nos vamos a centrar en la última.

### Vistas temporales  
Ya hemos visto que los DataFrames tienen una estructura similar a una tabla de una base de datos relacional. Para poder realizar consultas, necesitaremos crear vistas temporales mediante el método [createTempView](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.createTempView.html) o [createOrReplaceTempView](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.createOrReplaceTempView.html) para posteriormente realizar una consulta sobre la vista creada a través de [spark.sql](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html):

In [0]:
# 1. definimos la vista
df.createOrReplaceTempView("ventas")

In [0]:
# 2. realizamos la consulta
ventasCanada = spark.sql("select * from ventas where trim(Country)='Canada'")
ventasCanada.show(3)

+---------+----------+---+-----+-------+-------+
|ProductID|      Date|Zip|Units|Revenue|Country|
+---------+----------+---+-----+-------+-------+
|      725|1999-01-15|H1B|    1|  115.4| Canada|
|     2235|1999-01-15|H1B|    2|  131.1| Canada|
|      713|1999-01-15|H1B|    1|  160.1| Canada|
+---------+----------+---+-----+-------+-------+
only showing top 3 rows



### Vistas globales  
Las vistas temporales tienen un alcance de SparkSession, de manera que desaparecen una vez finalice la sesión que ha creado la vista. Si necesitamos tener una vista que se comparta entre todas las sesiones y que permanezca viva hasta que la aplicación Spark finalice, podemos crear una vista temporal global mediante [createOrReplaceGlobalTempView](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.createOrReplaceGlobalTempView.html)

Estas vistas se almacenan en la base de datos `global_temp` y en las consultas es necesario poner el prefijo `global_temp` para acceder a sus vistas.

In [0]:
# 1. definimos la vista global
df.createOrReplaceGlobalTempView("ventasg")

In [0]:
# 2. realizamos la consulta
ventasCanadaG = spark.sql("select * from global_temp.ventasg where trim(Country)='Canada'")
ventasCanadaG.show(3)

+---------+----------+---+-----+-------+-------+
|ProductID|      Date|Zip|Units|Revenue|Country|
+---------+----------+---+-----+-------+-------+
|      725|1999-01-15|H1B|    1|  115.4| Canada|
|     2235|1999-01-15|H1B|    2|  131.1| Canada|
|      713|1999-01-15|H1B|    1|  160.1| Canada|
+---------+----------+---+-----+-------+-------+
only showing top 3 rows



In [0]:
# Creamos otra sesión y vemos como funciona
spark.newSession().sql("select count(*) from global_temp.ventasg where trim(Country)='Canada'").show()

+--------+
|count(1)|
+--------+
|   30060|
+--------+



### Eliminando vistas  
Para borrar una vista que hayamos creado, necesitamos acceder al Spark Catalog que veremos en una [sesión posterior(catalog#spark-sql-catalog), y utilizar el método [dropTempView](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.dropTempView.html) o [dropGlobalTempView](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Catalog.dropGlobalTempView.html) dependiendo del tipo de vista:

In [0]:
spark.catalog.dropTempView("ventas")
spark.catalog.dropGlobalTempView("ventasg")

Out[97]: True

## Trabajando con Databricks y SQL  
Vamos a cargar en `DataBase Tables` el csv `pdi_sales_small.csv`. Para ellos vamos a crear primero la tabla. Click en Catalog

![](https://community.cloud.databricks.com/files/Notebook2/1.png)

![](https://community.cloud.databricks.com/files/Notebook2/2.png)

![](https://community.cloud.databricks.com/files/Notebook2/3.png)

![](https://community.cloud.databricks.com/files/Notebook_2/4.png)

![](https://community.cloud.databricks.com/files/Notebook_2/5.png)

Para que funcione correctamente con nuestro datos, vamos a modificar el código:

```<Python>
infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"
```

Y tras cargar el dataset, antes de crear la vista, vamos a limpiar los países:

```<python>
from pyspark.sql.functions import trim
df = df.withColumn("Country", trim(df.Country))
```

### Datos visuales

Si volvemos a ejecutar el cuaderno, ahora sí que cargará correctamente los datos. Si nos vamos a la celda que realiza una consulta sobre todos los datos, podemos ver en la parte superior derecha como el lenguaje empleado en la celda es SQL, por ello la primera línea comienza con %sql, y a continuación ya podemos introducir directamente código SQL, teniendo la opción de visualizar los datos tanto en modo texto como mediante gráficos:

![](https://community.cloud.databricks.com/files/Notebook_2/6.png)

![](https://community.cloud.databricks.com/files/Notebook_2/7.png)

![](https://community.cloud.databricks.com/files/Notebook_2/8.png)

![](https://community.cloud.databricks.com/files/Notebook_2/9.png)

![](https://community.cloud.databricks.com/files/Notebook_2/10.png)

![](https://community.cloud.databricks.com/files/Notebook_2/11.png)

### Cuadro de mandos

Además, con las tablas y/o gráficos que generamos dentro de Databricks, podemos generar un sencillo cuadro de mandos.

Vamos a crear un par de consultas, una para obtener las ventas medias por país:

```<sql>
%sql
select Country, avg(Revenue) as ventas
from pdi_sales_small_csv
group by Country
order by ventas desc
```

Y otra para las unidas pedidas por cada país:

```<sql>
%sql
select Country, sum(Units) as pedidos
from pdi_sales_small_csv
group by Country
order by pedidos desc
```

Si pulsamos sobre el icono del `+` , podemos añadir el resultado de la celda a un dashboard:

![](https://community.cloud.databricks.com/files/Notebook_2/12.png)

![](https://community.cloud.databricks.com/files/Notebook_2/13.png)

![](https://community.cloud.databricks.com/files/Notebook_2/14.png)

![](https://community.cloud.databricks.com/files/Notebook_2/15.png)

![](https://community.cloud.databricks.com/files/Notebook_2/16.png)

![](https://community.cloud.databricks.com/files/Notebook_2/17.png)

![](https://community.cloud.databricks.com/files/Notebook_2/18.png)

![](https://community.cloud.databricks.com/files/Notebook_2/19.png)

![](https://community.cloud.databricks.com/files/Notebook_2/20.png)

![](https://community.cloud.databricks.com/files/Notebook_2/21.png)

## Actividades

En las siguientes actividades vamos a familiarizarnos con el uso del API de DataFrames de Spark.  
1. A partir del archivo [nombres.json](https://tajamar365.sharepoint.com/:u:/s/3405-MasterIA2024-2025/ESw3v7IfZAFNqa7Kb05kuu0Bz7hbNQOvX2ZsbtGskFjuJA?e=UUJrbF), crea un DataFrame y realiza las siguientes operaciones:  


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date, year, monotonically_increasing_id

# Crear una sesión de Spark
spark = SparkSession.builder.getOrCreate()

# Cargar el archivo JSON en un DataFrame
df = spark.read.json("dbfs:/FileStore/Notebook2/nombres.json")


a. Crea una nueva columna (columna `Mayor30`) que indique si la persona es mayor de 30 años.  


In [0]:
df = df.withColumn("Mayor30", col("Edad") > 30)


b. Crea una nueva columna (columna `FaltanJubilacion`) que calcule cuantos años le faltan para jubilarse (supongamos que se jubila a los 67 años).  


In [0]:
df = df.withColumn("FaltanJubilacion", lit(67) - col("Edad"))


c. Crea una nueva columna (columna `Apellidos`) que contenga XYZ (puedes utilizar la función [lit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lit.html))  


In [0]:
df = df.withColumn("Apellidos", lit("XYZ"))


d. Elimina las columna `Mayor30` y `Apellidos`.  


In [0]:
df = df.drop("Mayor30", "Apellidos")


e. Crea una nueva columna (columna `AnyoNac`) con el año de nacimiento de cada persona (puedes utilizar la función [current_date](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.current_date.html)).  


In [0]:
df = df.withColumn("AnyoNac", year(current_date()) - col("Edad"))


f. Añade un id incremental para cada fila (campo `Id`) y haz que al hacer un `show` se vea en primer lugar (puedes utilizar la función [monotonically_increasing_id](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.monotonically_increasing_id.html)) seguidos del `Nombre`, `Edad`, `AnyoNac`, `FaltaJubilacion` y `Ciudad`

In [0]:
df = df.withColumn("Id", monotonically_increasing_id())

# Reordenar las columnas para que 'Id' esté en primer lugar
columnas_ordenadas = ["Id", "Nombre", "Edad", "AnyoNac", "FaltanJubilacion", "Ciudad"]
df = df.select(columnas_ordenadas)


Al realizar los seis pasos, el resultado del DataFrame será similar a :

In [0]:
df.show()


+---+------+----+-------+----------------+--------+
| Id|Nombre|Edad|AnyoNac|FaltanJubilacion|  Ciudad|
+---+------+----+-------+----------------+--------+
|  0| Aitor|  45|   1979|              22|   Elche|
|  1|Marina|  14|   2010|              53|Alicante|
|  2| Laura|  19|   2005|              48|   Elche|
|  3| Sonia|  45|   1979|              22|    Aspe|
|  4| Pedro|null|   null|            null|   Elche|
+---+------+----+-------+----------------+--------+



![](https://community.cloud.databricks.com/files/Notebook_2/77.png)

2. A partir del archivo [VentasNulos.csv](https://tajamar365.sharepoint.com/:x:/s/3405-MasterIA2024-2025/EUP8X8fQMpNJv5DNBzu9cXUBNy5gSjYkjiFb5yyAfyvtfw?e=eruozp):  



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, min, lit

# Crear una sesión de Spark
spark = SparkSession.builder.getOrCreate()

# Cargar el archivo CSV en un DataFrame
df = spark.read.option("header", True).option("inferSchema", True).csv("dbfs:/FileStore/Notebook2/VentasNulos.csv")


a. Elimina las filas que tengan al menos 4 nulos.  


In [0]:
df = df.dropna(thresh=4)


b. Con las filas restantes, sustituye:  
  i. Los nombres nulos por Empleado  
  

In [0]:
df = df.fillna({"Nombre": "Empleado"})


ii. Las ventas nulas por la media de las ventas de los compañeros (redondeado a entero).
      > **Agrupando:** Para obtener la media, aunque lo veremos en la próxima sesión, debes agrupar y luego obtener la media de la columna:  
      > `valor = df.groupBy().avg('Ventas')`   

    

In [0]:
# Calcular la media de las ventas
valor_ventas_media = df.groupBy().avg('Ventas').collect()[0][0]
valor_ventas_media_redondeado = int(round(valor_ventas_media))

# Rellenar los valores nulos en la columna 'Ventas' con la media redondeada
df = df.fillna({"Ventas": valor_ventas_media_redondeado})


iii. Los euros nulos por el valor del compañero que menos € ha ganado. (tras agrupar, puedes usar la función `min`)  
      

In [0]:
# Calcular el valor mínimo de la columna 'Euros'
valor_euros_minimo = df.groupBy().min('Euros').collect()[0][0]

# Rellenar los valores nulos en la columna 'Euros' con el valor mínimo
df = df.fillna({"Euros": valor_euros_minimo})


iv. La ciudad nula por `C.V`. y el identificador nulo por `XYZ`.  

Para los pasos ii) y iii)  puedes crear un DataFrame que obtenga el valor a asignar y luego pasarlo como parámetro al método para rellenar los nulos.

In [0]:
df = df.fillna({"Ciudad": "C.V.", "Identificador": "XYZ"})


Mostrar resultados

In [0]:
df.show()


+--------+------+-----+-----------+-------------+
|  Nombre|Ventas|Euros|     Ciudad|Identificador|
+--------+------+-----+-----------+-------------+
|    Pepe|     4|  200|      Elche|          X21|
|   Pedro|     1|   30|   Valencia|          R23|
|  Marina|     3|  350|       Aspe|          V55|
|Empleado|    10|  500|Crevillente|          AMV|
|     Ana|    10| 2300|   Alicante|          B89|
+--------+------+-----+-----------+-------------+



3.  A partir del archivo [movies.tsv](https://tajamar365.sharepoint.com/:u:/s/3405-MasterIA2024-2025/Ef-LbTw0tw1JgYAXrl-zdQQBw73LyG0oknmFzxbs01CL8w?e=AC8asA), crea una esquema de forma declarativa con los campos:
  
a. interprete de tipo string

b. pelicula de tipo string

c. anyo de tipo int



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Crear una sesión de Spark
spark = SparkSession.builder.getOrCreate()

# Definir el esquema para el archivo movies.tsv
esquema = StructType([
    StructField("interprete", StringType(), True),
    StructField("pelicula", StringType(), True),
    StructField("anyo", IntegerType(), True)
])

# Cargar los datos en un DataFrame
df = spark.read.option("header", False).option("sep", "\t").schema(esquema).csv("dbfs:/FileStore/Notebook2/movies.tsv")


Cada fila del fichero implica que el actor/actriz ha trabajado en dicha película en el año indicado.  
a. Una vez creado el esquema, carga los datos en un DataFrame.  


In [0]:
# Cargar el archivo TSV en un DataFrame utilizando el esquema definido
df = spark.read.option("header", False).option("sep", "\t").schema(esquema).csv("dbfs:/FileStore/Notebook2/movies.tsv")


A continuación, mediante el DataFrame API:  
b. Muestra las películas en las que ha trabajado Murphy, Eddie (I).  


In [0]:
# Filtrar las películas en las que ha trabajado Murphy, Eddie (I)
peliculas_murphy = df.filter(df.interprete == "Murphy, Eddie (I)").select("pelicula")

# Mostrar el resultado
peliculas_murphy.show()


+--------------------+
|            pelicula|
+--------------------+
|            Showtime|
|              Norbit|
|Hot Tub Time Machine|
|Nutty Professor I...|
|Beverly Hills Cop II|
|      Trading Places|
|      Daddy Day Care|
|      Dr. Dolittle 2|
| Shrek Forever After|
|   Beverly Hills Cop|
|               Shrek|
| The Haunted Mansion|
|   Coming to America|
|             Shrek 2|
|     Doctor Dolittle|
| The Nutty Professor|
|               Mulan|
|         Tower Heist|
|          Dreamgirls|
|           Bowfinger|
+--------------------+
only showing top 20 rows



c. Muestra los intérpretes que aparecen tanto en Superman como en Superman II.

In [0]:
# Filtrar los intérpretes que han trabajado en Superman
interpretes_superman = df.filter(df.pelicula == "Superman").select("interprete")

# Filtrar los intérpretes que han trabajado en Superman II
interpretes_superman_ii = df.filter(df.pelicula == "Superman II").select("interprete")

# Encontrar los intérpretes que aparecen en ambas películas utilizando intersect
interpretes_comunes = interpretes_superman.intersect(interpretes_superman_ii)

# Mostrar el resultado
interpretes_comunes.show()


+------------------+
|        interprete|
+------------------+
|  O'Halloran, Jack|
|   Tucker, Burnell|
|  Hollis, John (I)|
|       Beatty, Ned|
|    Stamp, Terence|
|Ratzenberger, John|
|     Hackman, Gene|
|    Fielder, Harry|
|  Perrine, Valerie|
| McClure, Marc (I)|
|   Donner, Richard|
+------------------+

