# DataFrames en Spark

In [23]:
import findspark

findspark.init()

from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql.functions import col, countDistinct, desc, asc
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder.master("local[*]").getOrCreate()

sc: SparkContext = spark.sparkContext

In [None]:
rdd = sc.parallelize([i for i in range(1000)]).map(lambda x: (x, x ** 2))
rdd.collect()

## Crear DF a partir de un RDD

In [None]:
# Crear un DF a partir de un RDD
df = rdd.toDF(["value", "value_squared"])
df.printSchema()

In [None]:
# Crear un DF especificando el esquema
rdd_names = sc.parallelize([(1, "John", 23.4), (2, "Mary", 18.3), (3, "Peter", 32.8), (4, "Ann", 68.5)])

# Forma 1

schema1 = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), True),
    StructField("balance", FloatType(), True),
])

df1 = spark.createDataFrame(rdd_names, schema=schema1)
df1.printSchema()

In [None]:
# Forma2
schema2 = "id INT, name STRING, balance FLOAT"
df2 = spark.createDataFrame(rdd_names, schema=schema2)
df2.printSchema()

## Crear DF a partir de diferentes fuentes de datos

In [None]:
# Archivo de texto

df3 = spark.read.text(paths=r"../../data/s7_data/dataTXT.txt")
df3.show(truncate=False)

In [None]:
# Archivo CSV

# df4 = spark.read.csv(path=r"../../data/s7_data/dataCSV.csv", header=True)
# O
df4 = spark.read.option("header", True).csv(path=r"../../data/s7_data/dataCSV.csv")
df4.printSchema()
df4.show()

In [None]:
# Archivo de texto con delimitador
df5 = spark.read.option("header", True).option("delimiter", "|").csv(path=r"../../data/s7_data/dataTab.txt")
df5.show()

In [None]:
# A partir de JSON proporcionando el esquema

schema_json = StructType([
    StructField("pais", StringType(), False),
    StructField("edad", StringType(), True),
    StructField("fecha", DateType(), True),
    StructField("color", StringType(), True),
])
df6 = spark.read.schema(schema_json).json(path=r"../../data/s7_data/dataJSON.json")
df6.show()

In [None]:
# A partir de un archivo Parquet

# df7 = spark.read.parquet(r"../../data/s7_data/dataParquet.parquet")
# O
df7 = spark.read.format("parquet").load(r"../../data/s7_data/dataParquet.parquet")
df7.show()

## Trabajo con columnas
Las operaciones estructuradas están diseñadas para ser más relacionales.

Las operaciones estructuradas **NO** son lazy (perezosas), es decir, se ejecutan inmediatamente.

Al igual que las operaciones con RDD, las operaciones estructuradas se dividen en dos categorías:

* **Transformaciones**: crean un nuevo DF a partir de uno existente.
* **Acciones**: devuelven un valor al programa controlador después de realizar un cálculo en el DF.

In [None]:
df = spark.read.parquet(r"../../data/s7_data/dataParquet_2.parquet")
df.show()

In [None]:
# Primera alternativa para trabajar con columnas
df.select("title").show()

In [None]:
# Segunda alternativa para trabajar con columnas
df.select(col("title")).show()

# Transformaciones

## Funciones select y selectExpr

* **select**: toma como argumentos una o varias columnas del dataframe y devuelve un nuevo dataframe con solo esas columnas seleccionadas.
* **selectExpr**: permite seleccionar columnas utilizando una sintaxis similar a la de SQL. Con esta función, podemos seleccionar columnas y realizar operaciones en ellas, como cálculos matemáticos o transformaciones de cadenas.

In [None]:
# select
df = spark.read.parquet(r"../../data/s7_data/datos.parquet")
df.printSchema()

In [None]:
df.select(col("video_id")).show()

In [None]:
df.select("video_id", "trending_date").show()

In [None]:
# Esto arrojará un error
df.select(
    "likes",
    "dislikes",
    ("likes" - "dislikes").alias("acceptance"),
).show()

In [None]:
# Aquí la ventaja de utilizar la función col
df.select(
    col("likes"),
    col("dislikes"),
    (col("likes") - col("dislikes")).alias("acceptance"),
).show()

In [None]:
df.select(countDistinct("video_id").alias("distinct_videos")).show()

In [None]:
# Con selectExpr
df.selectExpr(
    "likes",
    "dislikes",
    "likes - dislikes as acceptance",
).show()

In [None]:
df.selectExpr("count(distinct video_id) as distinct_videos").show()

## Funciones filter y where

* **filter**: toma como argumento una expresión booleana y devuelve un nuevo dataframe que contiene solo los registros que cumplen con esa expresión.

* **where**:  funciona de manera similar a filter(), tomando también una expresión booleana como argumento. La principal diferencia es que where() es una función de conveniencia que se utiliza comúnmente para escribir consultas SQL en PySpark.

In [None]:
df.filter(col("video_id") == "YVfyYrEmzgM").show()

In [None]:
df.where(col("video_id") == "YVfyYrEmzgM").show()

In [None]:
df.select(col("video_id"), col("likes")).filter(col("likes") >= 5000).show()

In [None]:
df.select("video_id", "likes").where("likes >= 5000").show()

In [None]:
df.select(
    col("video_id"),
    col("likes"),
    col("dislikes"),
).filter(
    (col("likes") >= 5000) &
    (col("dislikes") >= 100)
).show()

## Funciones distinct y dropDuplicates

* **distinct**: se utiliza para obtener los valores únicos de una o varias columnas de un dataframe.
* **dropDuplicates**: también se utiliza para eliminar los registros duplicados de un dataframe y devolver un nuevo dataframe con solo los registros únicos, pero a diferencia de distinct(), esta función permite especificar las columnas que se deben considerar para determinar si un registro es duplicado o no.

In [None]:
# distinct
df_distinct = df.distinct()
df_distinct.count()

In [None]:
# dropDuplicates
df.dropDuplicates().count()

In [None]:
df_duplication = spark.createDataFrame([
    (1, "blue", 100),
    (1, "blue", 101),
    (2, "red", 300),
    (2, "red", 301),
    (3, "green", 500),
    (3, "green", 501),
    (4, "yellow", 700),
    (4, "yellow", 701),
    (5, "black", 900),
    (5, "black", 901),
]).toDF("id", "color", "price")
df_duplication.show()

In [None]:
# Al ser el precio diferente, no se considera duplicado
df_duplication.dropDuplicates(
    subset=["id", "color", "price"]
).show()

In [None]:
# Al solo tener en cuenta el id y el color, se considera duplicado
df_duplication.dropDuplicates(
    subset=["id", "color"]
).show()

## Funciones sort y orderBy

* **sort**: se utiliza para ordenar los datos de un RDD en función de una o varias columnas.
* **orderBy**: es un alias de sort().

In [None]:
# sort
df.sort(col("likes")).show()

In [None]:
# orderBy
df.orderBy(col("likes")).show()

In [None]:
df = spark.read.parquet(r"../../data/s7_data/datos.parquet").select(
    col("video_id"),
    col("title"),
    col("likes"),
    col("dislikes"),
    col("views"),
).dropDuplicates(subset=["video_id"])
df.show()

In [None]:
# sort
df.sort(desc(col("likes"))).show()
# df.sort(col("likes").desc()).show()

In [None]:
# orderBy
df.orderBy("likes", ascending=False).show()

In [None]:
df_duplication = spark.createDataFrame([
    (1, "blue", 100),
    (1, "blue", 101),
    (2, "red", 300),
    (2, "red", 301),
    (3, "green", 500),
    (3, "green", 501),
    (4, "yellow", 700),
    (4, "yellow", 701),
    (5, "black", 900),
    (5, "black", 901),
]).toDF("id", "color", "price")
df_duplication.show()

In [None]:
df_duplication.orderBy(
    col("color").asc(),
    col("price"),
).show()

In [None]:
# Función limit: devuelve un nuevo dataframe con un número determinado de registros.

# Top 10 videos con más vistas
top_10_views = df.orderBy(
    col("views").desc(),
).limit(10)
top_10_views.show()

## Funciones withColumn y withColumnRenamed

* **withColumn**: se utiliza para agregar una nueva columna a un DataFrame existente o para reemplazar una columna existente.
* **withColumnRenamed**: es una función que se utiliza para cambiar el nombre de una columna en un DataFrame de PySpark.

In [None]:
df = spark.read.parquet(r"../../data/s7_data/datos.parquet")

In [None]:
# withColumn
df.withColumn("acceptance", col("likes") - col("dislikes")).show()

In [None]:
new_df = (
    df.
    withColumn("acceptance", col("likes") - col("dislikes")).
    withColumn("likes_percentage_relative_views", col("likes") / col("views") * 100)
)
new_df.select(
    col("video_id"),
    col("title"),
    col("acceptance"),
    col("likes_percentage_relative_views"),
).show()

In [None]:
# withColumnRenamed
df.withColumnRenamed("video_id", "video_code").show()

## Funciones drop, sample y randomSplit

* **drop**: se utiliza para eliminar una o varias columnas de un DataFrame.
* **sample**: se utiliza para obtener una muestra aleatoria de un DataFrame.
* **randomSplit**:  se utiliza para dividir un DataFrame en dos o más DataFrames de forma aleatoria, según un conjunto de pesos o fracciones.

In [None]:
df = spark.read.parquet(r"../../data/s7_data/datos.parquet")

In [None]:
# drop
clean_df = df.drop("tags", "thumbnail_link", "video_error_or_removed")
clean_df.columns

In [None]:
# sample
df.sample(fraction=0.1).count()

In [None]:
df.sample(fraction=0.1, seed=123).count()

In [None]:
# withReplacement: indica si se permite que un mismo registro se seleccione más de una vez.
df.sample(fraction=0.1, seed=123, withReplacement=False).count()

In [None]:
# randomSplit
df_1, df_2 = df.randomSplit([0.7, 0.3], seed=123)  # 70% y 30% de los datos respectivamente (aproximadamente)
print(f"original size: {df.count()}")
print(f"df_1 size: {df_1.count()}")
print(f"df_2 size: {df_2.count()}")
print(f"df_1 + df_2 size: {df_1.count() + df_2.count()}")

## Trabajo con datos faltantes o incorrectos

Las dos formas más habituales de tratar los datos faltantes o incorrectos son:
* Eliminar las filas que tienen valores perdidos en una o más columnas.
* Llenar los valores perdidos con un valor determinado.


In [None]:
df = spark.read.parquet(r"../../data/s7_data/datos.parquet")
df.count()

In [None]:
# Eliminar las filas.
# df.na.drop().count()
df.dropna(how="any").count()  # cuando al menos una columna tiene un valor perdido

In [None]:
df.dropna(how="all").count()  # cuando todas las columnas tienen un valor perdido

In [None]:
df.dropna(subset=["tags"]).count()  # cuando la columna tags tiene un valor perdido

In [None]:
# Llenar los valores perdidos con un valor determinado.
filled_df = df.orderBy(col("views").asc()).select(
    col("video_id"),
    col("title"),
    col("views"),
    col("likes"),
    col("dislikes"),
)

In [None]:
# Todas las columnas
filled_df.fillna(0).show()

In [None]:
# Ciertas columnas
filled_df.fillna(
    value=0,
    subset=[
        "views",
        "likes",
        "dislikes",
    ]
).show()

# Acciones sobre DataFrames en PySpark

* **show**: muestra los primeros n registros del DataFrame (20 por defecto).
* **take**: devuelve los primeros n registros del DataFrame en forma de lista.
* **head**: devuelve los primeros n registros del DataFrame en forma de lista.

In [None]:
df = spark.read.parquet(r"../../data/s7_data/datos.parquet")

# Recordemos que las acciones son operaciones que devuelven un valor, no un DataFrame como en el caso de las transformaciones.

In [None]:
# show
df.show(
    n=5,
    truncate=False,
)

In [None]:
# take
df.take(
    num=5,
)

for row in df.take(5):
    row: Row
    print(f"video_id: {row.video_id}\ntitle: {row.title}\nlikes: {row.likes}\nviews: {row.views}\n")

In [None]:
# head
df.head(
    n=5
)

In [None]:
# collect: si los datos son muy grandes, puede provocar un error de memoria (OutOfMemoryError)
df.select(col("likes")).collect()

# Escritura de DataFrames en PySpark

Una instancia de la clase DataFrameWriter se puede obtener a partir de un DataFrame mediante el método write. Esta clase proporciona métodos para escribir los datos de un DataFrame en un formato determinado.
El patrón común para escribir un DataFrame es el siguiente:

    `df.write.format("formato").option("clave", "valor").partitionBy("columna").bucketBy(n, "columna").sortBy("columna").save("ruta")`

Donde:
* **format**("formato"): especifica el formato en el que se va a guardar el DataFrame. El argumento "formato" debe ser una cadena de texto que corresponda a uno de los formatos de archivo soportados por PySpark, como por ejemplo "parquet", "csv", "json", etc.

* **option**("clave", "valor"): permite establecer una opción específica para el formato de archivo seleccionado en el método anterior. Por ejemplo, si el formato es "parquet", una opción común es "compression" para especificar el tipo de compresión a utilizar. La "clave" es el nombre de la opción y el "valor" es el valor que se le asigna.

* **partitionBy**("columna"): divide el DataFrame en particiones de acuerdo con los valores únicos de una columna especificada. Esto es útil si se desea guardar los datos en diferentes directorios o archivos según los valores de una columna específica.

* **bucketBy**(n, "columna"): organiza los datos en "buckets" (o cubetas) de acuerdo con los valores de una columna. Esto es útil si se desea particionar los datos de una manera más eficiente para mejorar el rendimiento de ciertas consultas. El argumento "n" es el número de cubetas que se desea crear.

* **sortBy**("columna"): ordena los datos de acuerdo con los valores de una columna. Esto es útil si se desea que los datos estén ordenados de alguna manera específica antes de guardarlos.

* **save**("ruta"): finalmente, guarda el DataFrame en el sistema de archivos en la ruta especificada. La ruta puede ser un directorio o un archivo, dependiendo del formato seleccionado.

Existen varios modos de guardado:

* **append**: añade los datos al final de un archivo existente.
* **overwrite**: sobrescribe el archivo existente.
* **ignore**: no hace nada.
* **error**: lanza un error.
* **errorifexists**: lanza un error si el archivo existe.
* **default**: sobrescribe el archivo existente.



In [2]:
df = spark.read.parquet(r"../../data/s7_data/datos.parquet")

In [8]:
df.repartition(2)  # Establece el número de particiones del DataFrame

# El número de archivos generados es igual al número de particiones del DataFrame

# df.write.csv(r"../../stuff/save_csv.csv", sep="|")
# O
# df.write.format("csv").option("sep", "|").save(r"../../stuff/save2_csv.csv")

df.coalesce(1).write.format("csv").option("sep", "|").save(r"../../stuff/save3_csv.csv")

In [22]:
# Guardar un DF a la vez que se particiona
clean_df = df.filter(col("comments_disabled").isin("True", "False"))

clean_df.write.partitionBy("comments_disabled").parquet(r"../../stuff/save_parquet")

# Persistencia de DataFrames en PySpark

Al igual que los RDDs, los DataFrames también se pueden persistir en memoria o en disco para mejorar el rendimiento de las consultas que se realicen sobre ellos. Persistir un DF consume menos espacio que persistir un RDD.

Para ello, se utiliza el método persist, que acepta como argumento el nivel de persistencia que se desea utilizar. Los niveles de persistencia disponibles son:

* **MEMORY_ONLY**: persiste el DataFrame en memoria.
* **MEMORY_AND_DISK**: persiste el DataFrame en memoria y en disco.
* **MEMORY_ONLY_SER**: persiste el DataFrame en memoria utilizando serialización.
* **MEMORY_AND_DISK_SER**: persiste el DataFrame en memoria y en disco utilizando serialización.
* **DISK_ONLY**: persiste el DataFrame en disco.
* **MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.**: persiste el DataFrame en dos o más nodos.
* **OFF_HEAP**: persiste el DataFrame en memoria fuera del heap de Java.
* **NONE**: elimina la persistencia del DataFrame.

In [24]:
df = spark.createDataFrame([(1, "a"), (2, "b"), (3, "c")], ["id", "value"])

df.persist(StorageLevel.MEMORY_ONLY)

df.show()

+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+-----+



In [25]:
# Retirar la persistencia
df.unpersist()

DataFrame[id: bigint, value: string]