## MANIPULACIÓN DE DATOS (SPARK SQL EN DATABRICKS)

In [0]:
from pyspark.sql import SparkSession # Puerta de entrada para trabajar con spark <-- SIEMPRE DEBEMOS IMPORTAR LA LLAVE MAESTRA QUE INICIA TODO.
from pyspark.sql.functions import *  # Funciones propias del módulo SQL de Spark, para trabajar sobre Dataframes.

spark = SparkSession.builder.appName("DatabricksSparkSQL").getOrCreate()

### DATAFRAMES (Versión Databricks Free Edition)

#### Link de [Databrick Free Edition](https://dbc-89f542f8-2df6.cloud.databricks.com/?o=758509963140561)

Para abordar el tema de **Manipulación de Datos**, utilizaremos una de las principales herramientas que ofrece **Databricks: Spark SQL**.  
Esta herramienta nos permite aplicar conceptos del lenguaje SQL sobre conjuntos de datos estructurados, ya sea que provengan directamente de archivos o estén gobernados a través del servicio **Unity Catalog**.

Gracias a Spark SQL, podremos consultar, transformar y analizar datos de forma eficiente, aprovechando tanto la potencia del motor distribuido de Spark como la organización lógica que brinda Unity Catalog.


##### FASE 1. FUENTES DE DATOS

In [0]:
#### ➡️ Como se venía explicando, Unity Catalog trabaja bajo una jerarquía de Catálagos, Esquemas, Volumenes y Tablas.
####     Por ende, es diferente la manera de acceder a su contenido.

#====== 📌 LEER ARCHIVO CSV 
##📝 SINTAXIS:
#   dataset_nombre = spark.read.option("header","true").option("inferSchema","true").csv("/Volumes/NombreCatalago/NombreEsquema/NombreArchivo")

#---> spark.read : Permite leer un archivo basado en una ruta. Siempre instanciando primero SparkSession.
#---> .option("header","true||false") : Permite establecer si tomará en cuenta los encabezados del dataset. true(mostrar) | false(ocultar)
#---> .option("inferSchema","true||false") : Permite reconocer el tipo de dato de cada columna del dataset. true(mostrar) | false(ocultar)
#---> .csv(RutaArchivoCSV) : Tipo de archivo a leer (En este caso un CSV).

## 📝 EJEMPLO:
df_titanic_csv = spark.read.option("header","true").option("inferSchema","true").csv("/Volumes/explicacion_unity_catalog/ejemplos/csv_titanic")
# df_titanic_csv.show(5)

#====== 📌 LEER ARCHIVO PARQUET
##📝 SINTAXIS:
#   dataset_nombre = spark.read.parquet("/Volumes/NombreCatalago/NombreEsquema/NombreArchivoPARQUET")

#---> spark.read : Permite leer un archivo basado en una ruta. Siempre instanciando primero SparkSession.
#---> .parquet(RutaArchivoPARQUET) : Tipo de archivo a leer (En este caso un PARQUET).

## 📝 EJEMPLO
df_titanic_parquet = spark.read.parquet("/Volumes/explicacion_unity_catalog/ejemplos/parquet_titanic")
# df_titanic_parquet.show(5)

#====== 📌 LEER ARCHIVO DELTA
## 📝 SINTAXIS:
#   dataset_nombre = spark.read.format("delta").load("/Volumes/NombreCatalago/NombreEsquema/NombreArchivoDELTA")

#---> spark.read : Permite leer un archivo basado en una ruta. Siempre instanciando primero SparkSession.
#---> .format("delta") : stablece el tipo de archivo a leer (En este caso aplica siempre para DELTA)
#---> .load(RutaArchivoDelta) : Aplica solo a los archivos DELTA.

## 📝 EJEMPLO:
df_titanic_archivo_delta = spark.read.format("delta").load("/Volumes/explicacion_unity_catalog/ejemplos/archivo_delta_titanic")
# df_titanic_archivo_delta.show(5)

#====== 📌 LEER TABLA DELTA (RECOMENDADO Y NATIVO EN DATABRICKS-UNITY-CATALOG)
## 📝 SINTAXIS:
#   dataset_nombre = spark.sql("SELECT * FROM NombreCatalago.NombreEsquema.NombreTablaDelta")

#---> spark.sql() : Permite leer mediante lenguaje SQL una tabla delta. Siempre instanciando primero SparkSession.
#---> "NombreCatalago.NombreEsquema.NombreTablaDelta" : Ruta aplicable solo para las tablas delta y gobernadas por Unity Catalog

## 📝 EJEMPLO
df_titanic_delta = spark.sql("SELECT * FROM explicacion_unity_catalog.ejemplos.delta_titanic")
# df_titanic_delta.show(5)

##### FASE 2. EXPLORACIÓN INICIAL

Utilizaremos de ejemplo el dataset de titanic ( en formato tabla delta y CSV)

In [0]:
#====== A). Quitar encabezados originales provenientes del Dataset (Archivos CSV).
df_titanic_archivo_delta = spark.read.csv("/Volumes/explicacion_unity_catalog/ejemplos/csv_titanic") ## ⬅️ Archivo CSV
df_titanic_archivo_delta.show(3)
#### 🧠💡 Importante: Cuando leamos un archivo CSV, si no establecemos .option("header","true")
####                   el dataframe mostrará las columnas con un prefijo c_ seguido de una posición asignada.
####                   (Y los nombres de columnas serán parte de los registros del dataset)

In [0]:
#====== B). Mostrar los N primeros registros de un dataset.

df_titanic_delta = spark.sql("SELECT * FROM explicacion_unity_catalog.ejemplos.delta_titanic") ## ⬅️ Tabla delta

#### 📝 SINTAXIS: dataset_nombre.show(NúmeroDeRegistros)

#### 📝 EJEMPLO 1:
# df_titanic_delta.show(10) ## ⬅️ Mostrar los 10 primeros registros.

#### 📝 EJEMPLO 2:
# df_titanic_delta.show(5) ## ⬅️ Mostrar los 5 primeros registros.

#### 📝 EJEMPLO 3:
df_titanic_delta.show(3) ## ⬅️ Mostrar los 3 primeros registros.

In [0]:
#====== C). Mostrar los N últimos registros de un dataset (Retornado en una lista).

df_titanic_delta = spark.sql("SELECT * FROM explicacion_unity_catalog.ejemplos.delta_titanic") ## ⬅️ Tabla delta

#### 📝 SINTAXIS: dataset_nombre.tail(NúmeroDeRegistros)

#### 📝 EJEMPLO 1:
# df_titanic_delta.tail(10) ## ⬅️ Mostrar los 10 primeros últimos.

#### 📝 EJEMPLO 2:
# df_titanic_delta.tail(5) ## ⬅️ Mostrar los 5 primeros últimos.

#### 📝 EJEMPLO 3:
df_titanic_delta.tail(3) ## ⬅️ Mostrar los 3 primeros últimos.

In [0]:
#====== D). Mostrar el volúmen del dataset (Cantidad de filas y columnas respectivamente)

df_titanic_delta = spark.sql("SELECT * FROM explicacion_unity_catalog.ejemplos.delta_titanic") ## ⬅️ Tabla delta

#### 📝 SINTAXIS: 
#       dataset_nombre.count() ⬅️ Cantidad de filas
#       len([i for i in dataframe_nombre.columns]) ⬅️ Cantidad de columnas

#### 📝 EJEMPLO 1:
print(f"Cantidad de filas: {df_titanic_delta.count()}") ## ⬅️ Cantidad de filas.

#### 📝 EJEMPLO 2:
print(f"Cantidad de columnas: {len([i for i in df_titanic_delta.columns])}") ## ⬅️ Cantidad de columnas.

##### FASE 3. EXPLORACIÓN INICIAL

###### FASE 3.1. DATOS CUALITATIVOS

In [0]:
"""
    A). CONVERTIR A MAYÚSCULAS LOS DATOS CUALITATIVOS DE UNA COLUMNA EN UN DATASET
    
    📝 SINTAXIS:

        dataframe_nombre = dataframe_nombre.withColumn(
            "NombreColumnaNueva",
            upper(col("NombreColumnaCualitativa")) ⬅️ Función upper() integrada en Spark SQL.
        )
    ### 🧠 En este caso, debemos almacenar en una variable los cambios a realizar en el dataframe
    ### 💡 Importante: Siempre .withColumn() trabajará con al menos una columna del dataset.
"""

# 💡 EJEMPLO:
df_titanic_delta = df_titanic_delta.withColumn(
    "embark_town",
    upper(col("embark_town")) ## ⬅️ Convertimos a mayúsculas los datos de la columna cualitativa ""embark_town""
)
df_titanic_delta.show(3)

In [0]:
"""
    B). CONVERTIR A MINÚSCULAS LOS DATOS CUALITATIVOS DE UNA COLUMNA EN UN DATASET
    
    📝 SINTAXIS:

        dataframe_nombre = dataframe_nombre.withColumn(
            "NombreColumnaNueva",
            lower(col("NombreColumnaCualitativa")) ⬅️ Función lower() integrada en Spark SQL.
        )
    ### 🧠 En este caso, debemos almacenar en una variable los cambios a realizar en el dataframe
    ### 💡 Importante: Siempre .withColumn() trabajará con al menos una columna del dataset.
"""

# 💡 EJEMPLO:
df_titanic_delta = df_titanic_delta.withColumn(
    "embark_town",
    lower(col("embark_town")) ## ⬅️ Convertimos a minúsculas los datos de la columna cualitativa ""embark_town""
)
df_titanic_delta.show(3)

In [0]:
"""
    C). CONVERTIR A MAYÚSCULA LA PRIMERA LETRA DE CADA DATO CUALITATIVOS DE UNA COLUMNA EN UN DATASET
    
    📝 SINTAXIS:

        dataframe_nombre = dataframe_nombre.withColumn(
            "NombreColumnaNueva",
            initcap(col("NombreColumnaCualitativa")) ⬅️ Función initcap() integrada en Spark SQL.
        )
    ### 🧠 En este caso, debemos almacenar en una variable los cambios a realizar en el dataframe
    ### 💡 Importante: Siempre .withColumn() trabajará con al menos una columna del dataset.
"""

# 💡 EJEMPLO:
df_titanic_delta = df_titanic_delta.withColumn(
    "embark_town",
    initcap(col("embark_town")) ## ⬅️ Convertimos la primera letra en mayúscula de cada dato en la columna cualitativa ""embark_town""
)
df_titanic_delta.show(3)

In [0]:
"""
    D). EXTRAER DE CADA DATO CUALITATIVO UNA CADENA EN ESPECÍFICO MEDIANTE >>EXPRESIONES REGULARES<< EN UN DATASET
    
    📝 SINTAXIS:

        dataframe_nombre = dataframe_nombre.withColumn(
            "NombreColumnaNueva",
                regexp_extract(col("embark_town"),r'([A-Z])',1) ⬅️ Función regexp_extract() integrada en Spark SQL.
        )
    ### 🧠 En este caso, debemos almacenar en una variable los cambios a realizar en el dataframe
    ### 💡 Importante: Siempre .withColumn() trabajará con al menos una columna del dataset.
"""

# 💡 EJEMPLO 1: ⬅️ Extraemos primera letra en mayúscula 
# df_titanic_delta = df_titanic_delta.withColumn(
#     "PrimeraLetraEnMayúscula",
#     regexp_extract(col("embark_town"),r'([A-Z])',1)
# )
# df_titanic_delta.show(3)

# 💡 EJEMPLO 2: ⬅️ Extraemos primera letra en minúscula 
# df_titanic_delta = df_titanic_delta.withColumn(
#     "PrimeraLetraEnMinúscula",
#     regexp_extract(col("embark_town"),r'([a-z])',0)
# )
# df_titanic_delta.show(3)

# 💡 EJEMPLO 3: ⬅️ Retornamos todo después de una letra en mayúscula 
# df_titanic_delta = df_titanic_delta.withColumn(
#     "TodoDespuesDeUnaMayúscula",
#     regexp_extract(col("embark_town"),r'[A-Z]([a-z]+)', 1)
# )
# df_titanic_delta.show(3)

# 💡 EJEMPLO 4: ⬅️ Retornamos todo después de una letra en minúscula 
# df_titanic_delta = df_titanic_delta.withColumn(
#     "TodoDespuesDeUnaMinúscula",
#     regexp_extract(col("embark_town"),r'[a-z]([a-z]+)', 1)
# )
# df_titanic_delta.show(3)

# 💡 EJEMPLO 5: ⬅️ Retornamos todo después de un número
# df_titanic_delta = df_titanic_delta.withColumn(
#     "TodoDespuesDeUnNúmero",
#     regexp_extract(col("embark_town"),r'([0-9]+)', 1)
# )
# df_titanic_delta.show(3)


In [0]:
"""
    E). REEMPLAZAR VALORES DE DATOS CUALITATIVOS MEDIANTE >>EXPRESIONES REGULARES<< EN UN DATASET
    
    📝 SINTAXIS:
    
    
    ### 🧠 En este caso, debemos almacenar en una variable los cambios a realizar en el dataframe
    ### 💡 Importante: Siempre .withColumn() trabajará con al menos una columna del dataset.
"""
### DATASET DE PRUEBA PARA ESTE EJEMPLO
datos_sucios = {
    "nombre": [
        "  JUAN  pérez  ", 
        "MARÍA@@LÓPEZ", 
        "Ana--Martínez", 
        "carlos_rodríguez ", 
        "Sofía123 García"
    ],
    "categoria_producto": [
        "ELECTRÓNICA#1", 
        "ropa--Mujer", 
        "hogar_y_DECORACIÓN", 
        "LIBROS@@", 
        "Juguetes  niños"
    ],
    "codigo": [
        "ID-0001", 
        "ID-0023", 
        "CL-1234", 
        "ID-abc123", 
        "id-9999"
    ]
}
## Dataset de prueba (sucio)
df_test = spark.createDataFrame(data=list(zip(*datos_sucios.values())),schema=["nombre","categoria_producto","codigo"])
# df_test.show()

## Dataset de prueba (limpio)
df_test_clean = df_test 
# df_test_clean.show()

## 💡 EJEMPLO 1: ⬅️ Eliminar caractéres especiales (@@,#,--,_)
df_test_clean = df_test_clean.withColumns({
    "categoria_producto":
    regexp_replace(col("categoria_producto"),r'[^A-Za-z0-9ÁÉÍÓÚáéíóúÑñÜü]'," "),
    "nombre":
    regexp_replace(col("nombre"),r'[^A-Za-z0-9ÁÉÍÓÚáéíóúÑñÜü]',' ')
})
# df_test_clean.show()

## 💡 EJEMPLO 2: ⬅️ Normalizar espacios (Varios espacios en blanco a uno solo)
df_test_clean = df_test_clean.withColumns({
    "categoria_producto":
    regexp_replace(col("categoria_producto"),r'\s+'," "),
    "nombre":
    regexp_replace(col("nombre"),r'\s+',' ')
})
# df_test_clean.show()

## 💡 EJEMPLO 3: ⬅️ Reemplazar guiones por espacios en blanco
df_test_clean = df_test_clean.withColumn(
    "nombre",
    regexp_replace(col("nombre"),r'[-_]',' ')
)
# df_test_clean.show()

## 💡 EJEMPLO 4: ⬅️ Eliminar prefijos (ID- || CL- || id- || Letras)
df_test_clean = df_test_clean.withColumn(
    "codigo",
    regexp_replace(col("codigo"),r'^ID-|^CL-|^id-|[a-zA-Z]','')
)
# df_test_clean.show()

## 💡 EJEMPLO 5: ⬅️ Eliminar números dentro de cadenas
df_test_clean = df_test_clean.withColumns({
    "nombre":
    regexp_replace(initcap(col("nombre")),r'[0-9]',''),
    "categoria_producto":
    regexp_replace(initcap(col("categoria_producto")),r'[0-9]','')
})
df_test_clean.show()


In [0]:
"""
    F). ELIMINAR VALORES DUPLICADOS EN DATOS CUALITATIVOS DE UN DATASET
    
    📝 SINTAXIS:
    
        dataset_nombre.dropDuplicates(subset=[NombreColumna1,NombreColumnaN])
        
        subset: Establece el o los nombres de las columnas a eliminar duplicados,
                en caso no especifiquemos, los duplicados serán eliminados de la
                totalidadl de dataset.
        
    ### 🧠 En este caso, debemos trabajar sobre el mismo dataset/dataframe para eliminar los duplicados.
"""
### ✅🗃️ Dataset a utilizar: Diamonds
df_diamonds = spark.sql("SELECT * FROM workspace.exercises.df_diamonds")
# df_diamonds.show()
# df_diamonds.count() ## ⬅️ Cantidad de datos inicial: 53940

## 💡 EJEMPLO 1: ELIMINAR VALORES DUPLICADOS DE TODO EL DATASET
# df_diamonds = df_diamonds.dropDuplicates()
# df_diamonds.count() ## ⬅️ Cantidad de datos inicial: 53794

##💡 EJEMPLO 2: ELIMINAR VALORES DUPLICADOS DE UNA COLUMNA ESPECÍFICA
# df_diamonds[["carat"]].count() ## ⬅️ Cantidad inicial de datos: 53940
# df_diamonds = df_diamonds.dropDuplicates(subset=["carat"]) 
# df_diamonds[["carat"]].count() ## ⬅️ Cantidad de datos restantes: 273   

###### FASE 3.2. DATOS CUANTITATIVOS

In [0]:
df_penguins = spark.sql("SELECT * FROM workspace.exercises.penguins")
df_penguins.show(3)

In [0]:
"""
    A). EXPLORACIÓN BÁSICA Y RESUMEN ESTADÍSTICO
        
        📝 Para los datos cuantitativos podemos obtener su estadística básica
            mediante .decribe() el cuál retornará el valor de las columnas float o int.
            En los campos cualitativos se visualizará información no creíble.
"""
df_penguins.describe().show()                            ## ⬅️ Estadística simple - Todas las columnas
df_penguins.select(col("body_mass_g")).describe().show() ## ⬅️ Estadística simple - Columna body_mass_g
df_penguins.select(mean(col("body_mass_g"))).show()      ## ⬅️ Media estadística - Columna body_mass_g
df_penguins.select(median(col("body_mass_g"))).show()    ## ⬅️ Mediana estadística - Columna body_mass_g
df_penguins.select(min(col("body_mass_g"))).show()       ## ⬅️ Valor mínimo - Columna body_mass_g
df_penguins.select(max(col("body_mass_g"))).show()       ## ⬅️ Valor máximo (IMC Gramos)
df_penguins.select(sum(col("body_mass_g"))).show()       ## ⬅️ Suma total - Columna body_mass_g
df_penguins.select(stddev(col("body_mass_g"))).show()    ## ⬅️ Desviación estándar - Columna body_mass_g
df_penguins.select(variance(col("body_mass_g"))).show()  ## ⬅️ Varianza - Columna body_mass_g



In [0]:
"""
    B). VERIFICAR VALORES NULOS EN LA(S) COLUMNA(AS) DEL DATASET
"""

## 💡 EJEMPLO 1: VERIFICAR CANTIDAD DE VALORES NULOS EN TODAS LAS COLUMNAS DEL DATASET
## ⬅️ Retorna la cantidad de datos nulos por columna.
cantidad_nulos = df_penguins.select([
    sum(when(col(i).isNull(),1).otherwise(0)).alias(i)
    for i in df_penguins.columns
])
# cantidad_nulos.show()

## 💡 EJEMPLO 2: VERIFICAR CANTIDAD DE VALORES NULOS EN UNA COLUMNA ESPECÍFICA DEL DATASET
##⬅️ Retorna la cantidad de datos nulos de una columna.
cantidad_nulos_bmg = df_penguins.select([
    sum(when(col(i).isNull(),1).otherwise(0)).alias(i)
    for i in df_penguins.columns
    if i=="body_mass_g"
])
cantidad_nulos_bmg.show()

In [0]:
"""
    C). FILTRANDO VALORES EN COLUMNAS CUANTITATIVAS
"""

print("Total de datos registrados: ",df_penguins.count()) ## ⬅️ Cantidad de datos del dataset : 344

## 💡 EJEMPLO 1: FILTRAR VALORES DE COLUMNA "body_mass_g" MAYOR A 3000
df_penguins_bmg_mayor_3000 = df_penguins.filter(
    col("body_mass_g")>3000
)
# df_penguins_bmg_mayor_3000.show(3)
print("Total de datos filtro 1: ",df_penguins_bmg_mayor_3000.count()) ## ⬅️ Cantidad de datos del dataset : 331

## 💡 EJEMPLO 2: FILTRAR VALORES DE COLUMNA "body_mass_g" ENTRE 1000 y 3000
df_penguins_bmg_entre_1000_3000 = df_penguins.filter(
    (col("body_mass_g")>=1000) &
    (col("body_mass_G")<=3000)
)
# df_penguins_bmg_entre_1000_3000.show(3)
print("Total de datos filtro 2: ",df_penguins_bmg_entre_1000_3000.count()) ## ⬅️ Cantidad de datos del dataset : 11

## 💡 EJEMPLO 3: FILTRAR VALORES NULOS EN COLUMNA "body_mass_g"
df_penguins_nulos_body_mass_g = df_penguins.filter(
    col("body_mass_g").isNull()
)
# df_penguins_nulos_body_mass_g.show(3)
print("Total de datos filtro 3: ",df_penguins_nulos_body_mass_g.count()) ## ⬅️ Cantidad de datos del dataset : 2

## 💡 EJEMPLO 4: FILTRAR VALORES NO NULOS EN COLUMNA "body_mass_g"
df_penguins_no_nulos_body_mass_g = df_penguins.filter(
    col("body_mass_g").isNotNull()
)
# df_penguins_no_nulos_body_mass_g.show(3)
print("Total de datos filtro 4: ",df_penguins_no_nulos_body_mass_g.count()) ## ⬅️ Cantidad de datos del dataset : 342


In [0]:
"""
    D). RELLENANDO DATOS Null DE LAS COLUMNAS DE UN DATASET
    📝 SINTAXIS:
        df_penguins = df_penguins.fillna(value=ValorARellenar,subset=NombreColumna)
        ### 🧠 Importante: En este caso, los valores null deben rellenarse con datos
                            del mismo tipo que la columna. Por ende, debemos llenar 
                            individualmante.
"""
### ➡️ Verificar datos nulos
cantidad_nulos = df_penguins.select([
    sum(when(col(i).isNull(),1).otherwise(0)).alias(i)
    for i in df_penguins.columns
])
# cantidad_nulos.show()

### ➡️ Verificar datos nulos que han sido rellenados.
df_penguins = df_penguins.fillna(value=15.55,subset=["body_mass_g"])
# df_penguins.show()

cantidad_nulos.show() ##  ✅Datos nulos rellanados correctamente.


##### FASE 4. AGRUPAMIENTO DE INFORMACIÓN

El agrupamiento de la información nos brinda el resumen de la misma proveniente 
de un dataset, logrando encontrar ciertos patrones en cada grupo de información

In [0]:
"""
    📝 SINTAXIS:

        dataset_nuevo_agrupamiento= dataset_nombre.groupBy(["NombreColumna1","NombreColumna2]).agg(
            funciónAgregación(col("NombreColumna")).alias("NuevoNombreColumna")
        )
        
        ### 🧠 En este caso, debemos almacenar en una variable los cambios a realizar en el dataframe.

❎ En el agrupamiento de información, podemos utilizar diversas funciones de agregación, tales como:

💡.min():    ⬅️ Permite obtener el mínimo valor de la información agrupada.
💡.max():    ⬅️ Permite obtener el máximo valor de la información agrupada.
💡.sum():    ⬅️ Permite sumar la información de una columna cuantitativa por la información agrupada.
💡.count():  ⬅️ Permite contar la cantidad de información de una columna por la información agrupada.
💡.mean():   ⬅️ Permite obtener la media de una columna cuantitativa por la información agrupada.
💡.median(): ⬅️ Permite obtener la mediana de una columna cuantitativa por la información agrupada.
"""
###✔️ Usaremos el dataset de "penguins"
df_penguins = spark.sql("SELECT * FROM workspace.exercises.penguins") ## ⬅️ Tabla Delta
# df_penguins.show() 

In [0]:
## 💡 EJEMPLO 1: (AGRUPANDO POR UNA COLUMNA) 
df_agrupado_uno = df_penguins.groupBy(["species"]).agg(
    round(mean(col("body_mass_g")),2).alias("PromedioBMG") ## ⬅️ Hallamos la media de la columna "body_mass_g"
)
# df_agrupado_uno.show()

## 💡 EJEMPLO 2: (AGRUPAR POR DOS COLUMNAS)
df_agrupado_dos = df_penguins.groupBy(["species","sex"]).agg(
    round(mean(col("body_mass_g")),2).alias("Species_Sex_PromedioBMG")
)
# df_agrupado_dos.show()

## 💡 EJEMPLO 3: (AGRUPAR LA INFORMACIÓN POR LA CANTIDAD DE LA MISMA COLUMNA)

df_agrupado_tres = df_penguins.groupBy(["species"]).agg(
    count(col("species")).alias("Cantidad") ## ⬅️ .count() : función clave para traer la cantidad de datos de la columna.
)
df_agrupado_tres.show()

##### FASE 4. CÁLCULOS MÓVILES EN PYSPARK - DATABRICKS 

Así como SQL SERVER permite realizar cálculos móviles basados en funciones de agregación
(min,max,mean,entre otros), Pyspark lo realiza mediante el módulo de pypspark.sql.function 
simulando el lenguaje T-SQL en su estructura.

In [0]:
"""
    💡 Importante: Para realizar el cálculo movil utilizaremos la función .select() para
                    crear la columna móvil. Además, en Pyspark debemos definir una configuración
                    de la ventana deslizante en el caso de cálculos móviles mediante: 
                    >>> from pyspark.sql.window import Window <<< para utilizarlo en .over()
    📝 SINTAXIS: 
        1. Cálculo Móvil sin Particionamiento: Permite el cálculo móvil de una columna que no es afectada por otra(s).

            from pyspark.sql.window import Window
            dataset_nombre_calculos_moviles = dataset_nombre_original ### ⬅️ Copiamos el dataset original
            ⬇️ Configuración de la ventana 
            window_configuracion = Window.orderBy(NombreColumna1,NombreColumnaN) ### ⬅️ Ordenamos por una columna
            dataset_nombre_calculos_moviles = dataset_nombre_calculos_moviles.select( ### ⬅️ Función .select()
                col(NombreColumna1),col(NombreColumna2), ### ⬅️ Seleccionamos las columnas necesarias
                funcionAgregacion(col(NombreColumnaCalculo)).over(window_configuracion).alias(NombreColumnaCalculoMovil)
            )

        2. Cálculo Móvil con Particionamiento: Permite el cálculo móvil de una columna que es afectada por otra(s).

            from pyspark.sql.window import Window
            dataset_nombre_calculos_moviles = dataset_nombre_original ### ⬅️ Copiamos el dataset original
            ⬇️ Configuración de la ventana 
            window_configuracion = Window.partitionBy(NombreColumna1,NombreColumnaN) ### ⬅️ Particionamos por una columna
                                         .orderBy(NombreColumna1,NombreColumnaN) ### ⬅️ Ordenamos por una columna
                                         .rowsBetween(CantidadFilasAtras,FilaActual) ### ⬅️ Filas que formarán
                                                                                            parte del cálculo móvil.
                ### EXPLICACIÓN DE ROWSBETWEEN
                .rowsBetween(-2,0): En este caso, el 0 indica utilizar la fina actual.
                .rowsBetween(-2,1): Cualquier otro número que no sea 0 indica que se utilizará la fina actual.

            dataset_nombre_calculos_moviles = dataset_nombre_calculos_moviles.select( ### ⬅️ Función .select()
                col(NombreColumna1),col(NombreColumna2), ### ⬅️ Seleccionamos las columnas necesarias
                funcionAgregacion(col(NombreColumnaCalculo)).over(window_configuracion).alias(NombreColumnaCalculoMovil)
            )
"""
### 💹 USAREMOS EL SIGUIENTE DATASET DE EJEMPLO:
df_ventas = spark.sql("SELECT * FROM workspace.default.ventas_sac")
df_ventas.show(3)


In [0]:
### 💡 EJEMPLO 1: CÁLCULO MÓVIL ➡️ VENTAS ACUMULATIVAS (SUMA ACUMULADA SIN PARTICIONES)
from pyspark.sql.window import Window
df_ventas_acumuladas = df_ventas
window_ejemplo_1 = Window.orderBy("nro_venta") ## ⬅️ Como no particionamos información, necesitamos ordenarlas.
df_ventas_acumuladas = df_ventas_acumuladas.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    round(sum(col("total")).over(window_ejemplo_1),2).alias("ventas_acumuladas")
)
# df_ventas_acumuladas.show(6)

### 💡 EJEMPLO 2: CÁLCULO MÓVIL ➡️ VENTAS ACUMULATIVAS (SUMA ACUMULADA CADA 3 DIAS) (✅ INCLUYE FILA ACTUAL)
df_ventas_acumuladas_3_dias = df_ventas
window_ejemplo_2 = Window.orderBy("nro_venta").rowsBetween(-2,0) ###⬅️ Configuración de la ventana a crear.
df_ventas_acumuladas_3_dias = df_ventas_acumuladas_3_dias.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    round(sum(col("total")).over(window_ejemplo_2),2).alias("ventas_acumuladas_3_dias")
) 
# df_ventas_acumuladas_3_dias.show(6) ### (✅ Incluye la fiLa actual - ❌No particionamos).

### 💡 EJEMPLO 3: CÁLCULO MÓVIL ➡️ VENTAS ACUMULATIVAS (SUMA ACUMULADA CADA 3 DIAS) (❌ NO INCLUYE FILA ACTUAL)
df_ventas_acumuladas_3_dias_sin_fila_actual = df_ventas
window_ejemplo_3 = Window.orderBy("nro_venta").rowsBetween(-3,-1) ###⬅️ Configuración de la ventana a crear.
df_ventas_acumuladas_3_dias_sin_fila_actual = df_ventas_acumuladas_3_dias_sin_fila_actual.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    round(sum(col("total")).over(window_ejemplo_3),2).alias("ventas_acumuladas_3_dias")
) 
# df_ventas_acumuladas_3_dias_sin_fila_actual.show(6) ### (❌ No incluye la fiLa actual - ❌No particionamos).

### 💡 EJEMPLO 4: CÁLCULO MÓVIL ➡️ VENTAS ACUMULATIVAS (SUMA ACUMULADA CON PARTICIONES)
df_ventas_acumuladas_particionadas = df_ventas
window_ejemplo_4 = Window.partitionBy("cliente").rowsBetween(Window.unboundedPreceding,0) ## ⬅️ Configuración ventana ✅
##window_ejemplo_4=Window.partitionBy("cliente") ## ⬅️ Configuración ventana ❌ (Arroja el total final acumulado en cada fila)
df_ventas_acumuladas_particionadas = df_ventas_acumuladas_particionadas.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    round(sum(col("total")).over(window_ejemplo_4),2).alias("ventas_acumuladas")
)
# df_ventas_acumuladas_particionadas.show(5) ## ✅ .over() permite el particionamiento junto a la window_especificación

#==========================================================================================================================
### 💡 EJEMPLO 5: CÁLCULO MÓVIL ➡️ PROMEDIO MÓVIL (PROMEDIO CADA 3 DÍAS) (✅ INCLUYE FILA ACTUAL)
df_ventas_promedio_3_dias = df_ventas
window_ejemplo_5 = Window.orderBy("nro_venta").rowsBetween(-2,0)
df_ventas_promedio_3_dias = df_ventas_promedio_3_dias.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    round(avg(col("total")).over(window_ejemplo_5),2).alias("promedio_ventas_3_dias")
)
# df_ventas_promedio_3_dias.show(5)

### 💡 EJEMPLO 6: CÁLCULO MÓVIL ➡️ PROMEDIO MÓVIL (PROMEDIO CADA 3 DÍAS) (❌ NO INCLUYE FILA ACTUAL)
df_ventas_promedio_3_dias_sin_fila_actual = df_ventas
window_ejemplo_6 = Window.orderBy("nro_venta").rowsBetween(-3,-1)
df_ventas_promedio_3_dias_sin_fila_actual = df_ventas_promedio_3_dias_sin_fila_actual.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    round(avg(col("total")).over(window_ejemplo_6),2).alias("promedio_ventas_3_dias")
)
# df_ventas_promedio_3_dias_sin_fila_actual.show(5)

#========================================================================================================================
### 💡 EJEMPLO 7: CÁLCULO MÓVIL ➡️ MÁXIMO MÓVIL (VALOR MÁXIMO CADA 3 DÍAS) (✅ INCLUYE FILA ACTUAL)
df_ventas_maximo_3_dias = df_ventas
window_ejemplo_7 = Window.orderBy("nro_venta").rowsBetween(-2,0)
df_ventas_maximo_3_dias = df_ventas_maximo_3_dias.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    max(col("total")).over(window_ejemplo_7).alias("valor_maximo_3_dias")
)
# df_ventas_maximo_3_dias.show(5)
### 💡 EJEMPLO 8: CÁLCULO MÓVIL ➡️ MÁXIMO MÓVIL (VALOR MÁXIMO CADA 3 DÍAS) (❌ NO INCLUYE FILA ACTUAL)
df_ventas_maximo_3_dias_sin_fila_actual = df_ventas
window_ejemplo_8 = Window.orderBy("nro_venta").rowsBetween(-3,-1)
df_ventas_maximo_3_dias_sin_fila_actual = df_ventas_maximo_3_dias_sin_fila_actual.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    max(col("total")).over(window_ejemplo_8).alias("valor_maximo_3_dias")
)
# df_ventas_maximo_3_dias_sin_fila_actual.show(5)

#=======================================================================================================================
### 💡 EJEMPLO 9: CÁLCULO MÓVIL ➡️ MÍNIMO MÓVIL (VALOR MÍNIMO CADA 3 DÍAS) (✅ INCLUYE FILA ACTUAL)
df_ventas_minimo_3_dias = df_ventas
window_ejemplo_9 = Window.orderBy("nro_venta").rowsBetween(-2,0)
df_ventas_minimo_3_dias = df_ventas_minimo_3_dias.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    min(col("total")).over(window_ejemplo_9).alias("valor_minimo_3_dias")
)
# df_ventas_minimo_3_dias.show(5)

### 💡 EJEMPLO 10: CÁLCULO MÓVIL ➡️ MÍNIMO MÓVIL (VALOR MÍNIMO CADA 3 DÍAS) (❌ NO INCLUYE FILA ACTUAL)
df_ventas_minimo_3_dias_sin_fila_actual = df_ventas
window_ejemplo_10 = Window.orderBy("nro_venta").rowsBetween(-3,-1)
df_ventas_minimo_3_dias_sin_fila_actual = df_ventas_minimo_3_dias_sin_fila_actual.select(
    "*", ## ⬅️ De esta manera seleccionamos todas las columnas
    min(col("total")).over(window_ejemplo_10).alias("valor_minimo_3_dias")
)
df_ventas_minimo_3_dias_sin_fila_actual.show(5)


##### FASE 5. COMBINAR Y UNIR DATASETS/DATAFRAMES

A diferencia de Pandas y Polars, PySpark permite la unificación de datasets/dataframes mediante tres formas: 

    - Utilizando Join  (Unión columnar - Horizontalmente)
    - Utilizando union (Unión a nivel de filas - vertifcalmente) ✅ Más riguroso
    - Utilizando unionByName (Unión a nivel de filas - vertifcalmente) ✅ Más flexible

###### FUNCIÓN JOIN

In [0]:
"""
    📝 SINTAXIS: 

        dataset_unificado_nombre=dataset_nombre_1.join(other=dataset_nombre_2,on='ColumnaEnComun',
                                how='left || rigth || inner || full || semi || anti || cross')                                
                                
        ==================================================================================================  

    ### 💡Importante: Join permite unificar los datasets/dataframes a nivel columnar, es decir,
                       unifica horizontalmente las columnas de un dataset/dataframe A; con las columnas
                       de un dataset/dataframe B; gracias a una columna en común que tienen ambos 
                       conjuntos de datos.  
"""
### ✅ Utilizaremos este dataset de ejemplo:
import numpy as np

diccionario_uno = {
    "ID_Cliente":[1,2,3,4,5],
    "Nombre": ["Pepito","Juanito","Pedrito","Brayan","Carlos"],
    "Departamento":["LAS QUINTANAS","EL GOLF","BUENOS AIRES","SAN ANDRÉS","CALIFORNIA"]
}
diccionario_dos = {
    "ID_Cliente":[1,1,2,2,5,4,1,None],
    "Ventas":np.random.uniform(low=1450.25,high=1980.30,size=8).round(2).tolist()
}

df_uno = spark.createDataFrame(data=list(zip(*diccionario_uno.values())),schema=list(diccionario_uno.keys()))
df_dos = spark.createDataFrame(data=list(zip(*diccionario_dos.values())),schema=list(diccionario_dos.keys()))

diccionario_tres = {
    "Cliente_ID":[1,1,2,2,5,4,1,None],
    "Ventas":np.random.uniform(low=1450.25,high=1980.30,size=8).round(2).tolist()
}
df_tres = spark.createDataFrame(data=list(zip(*diccionario_tres.values())),schema=list(diccionario_tres.keys()))

In [0]:

#### 💡 EJEMPLO 1 (UNIFICACIÓN DE DATAFRAMES [INNER]) 
df_ejemplo_1 = df_uno.join(other=df_dos,on="ID_Cliente",how="inner")
# df_ejemplo_1.count() ## Cantidad de registros: 7
# df_ejemplo_1.show(7)
## [NO SE ENCUENTRA EL CLIENTE 3] ➡️ INNER MANTIENE LA INFORMACIÓN RELACIONADA Y EXISTENTE ENTRE AMBOS DATASETS

#### 💡 EJEMPLO 2 (UNIFICACIÓN DE DATAFRAMES [LEFT])
df_ejemplo_2 = df_uno.join(other=df_dos,on="ID_Cliente",how="left")
# df_ejemplo_2.count() ## Cantidad de registros: 8
# df_ejemplo_2.show(8)
## [SE ENCUENTRA EL CLIENTE 3] ➡️ LEFT MANTIENE LA INFORMACIÓN DEL LADO IZQUIERDO (df_uno) 
## [ASÍ NO EXISTA ALGÚN REGISTRO EN EL LADO DERECHO (df_dos)]

#### 💡 EJEMPLO 3 (UNIFICACIÓN DE DATAFRAMES [RIGHT])
df_ejemplo_3 = df_uno.join(other=df_dos,on="ID_Cliente",how="right")
# df_ejemplo_3.count() ## Cantidad de registros: 8
# df_ejemplo_3.show(8)
## [SE ENCUENTRA EL CLIENTE 3] ➡️ RIGHT MANTIENE LA INFORMACIÓN DEL LADO DERECHO (df_dos) 
## [ASÍ NO EXISTA ALGÚN REGISTRO EN EL LADO IZQUIERDO (df_uno)]

#### 💡 EJEMPLO 4 (UNIFICACIÓN DE DATAFRAMES [FULL] o [OUTER]) ⬅️ AMBOS REALIZAN LO MISMO
df_ejemplo_4 = df_uno.join(other=df_dos,on="ID_Cliente",how="full")
# df_ejemplo_4.count() ## Cantidad de registros: 9
# df_ejemplo_4.show(9)
## ➡️ FULL u OUTER MANTIENE LA INFORMACIÓN DE AMBOS DATAFRAMES.

#### 💡 EJEMPLO 5 (UNIFICACIÓN DE DATAFRAMES [CROSS])
df_ejemplo_5 = df_uno.join(other=df_dos,how="cross")
# df_ejemplo_5.count() ## Cantidad de registros: 40
# df_ejemplo_5.show()
## ➡️ CROSS GENERA TODAS LAS COMBINACIONES POSIBLES DE LAS FILAS DEL df_uno CON LAS FILAS DEL df_dos. 
##     (YA NO NECESITAMOS UTILIZAR EL PARÁMETRO "on=")

# #### 💡 EJEMPLO 6 (UNIFICACIÓN DE DATAFRAMES [INNER] CON DIFERENTES NOMBRES)
##---- PREVIAMENTE DEBEMOS CAMBIAR EL NOMBRE DE LA COLUMNA DE UNO DE LOS DATAFRAMES
#      PARA MANTENER LA INTEGRIDAD
# df_tres = df_tres.withColumnRenamed(existing="Cliente_ID",new="ID_Cliente")
# df_tres.show(5) ⬅️ NOMBRE CAMBIADO CORRECTAMENTE
df_ejemplo_6 = df_uno.join(other=df_tres,how="inner",on="ID_Cliente")
# df_ejemplo_6.count() ## Cantidad de registros: 7
# df_ejemplo_6.show(7)

#### 💡 EJEMPLO ANTI (PERMITE DEVOLVER LAS FILAS DE df_uno QUE NO SE ENCUENTRAN EN df_dos)
df_ejemplo_anti = df_uno.join(other=df_dos,how="anti",on="ID_Cliente")
# df_ejemplo_anti.count() ## Cantidad de registros: 1
# df_ejemplo_anti.show()


#### 💡 EJEMPLO SEMI (PERMITE DEVOLVER LAS FILAS DE df_uno EXISTENTES EN df_dos)
####        PERO DEVOLVIENDO SOLO LAS FILAS DE df_uno
df_ejemplo_semi = df_uno.join(other=df_dos,how="semi",on="ID_Cliente")
# df_ejemplo_semi.count() ## Cantidad de registros: 1
# df_ejemplo_semi.show()


###### FUNCIÓN UNION, UNIONALL Y UNIONBYNAME

In [0]:
"""
    📝 SINTAXIS: 

        1. Union : Permite unificar datasets/dataframes a nivel de fila (Por posiciones de las columnas).

            📝 SINTAXIS:

                dataset_nombre_unificado_final = dataset_nombre_uno.union(other=dataset_nombre_dos) 

        2. UnionALL : Permite unificar datasets/dataframes a nivel de fila.

            📝 SINTAXIS:

                dataset_nombre_unificado_final = dataset_nombre_uno.unionAll(other=dataset_nombre_dos) 

        1. UnionByName : Permite unificar datasets/dataframes a nivel de fila (Por nombres de las columnas).

            📝 SINTAXIS:

                dataset_nombre_unificado_final = dataset_nombre_uno.unionByName(other=dataset_nombre_dos,
                                                 allowMissingColumns=True|False)
                💡 El parámetro "allowMissingColumns" si está en True, permite que las columnas de un 
                   dataset que no hagan match con las de otro dataset se vuelvan columnas independientes.
                   Sin embargo, si se establece en False ambos datasets deben tener la misma cantidad,
                   tipo y nombre en las columnas.
        ==================================================================================================  

    ### 💡Importante: Union,UnionAll y UnionByName permite unificar los datasets/dataframes a nivel fila,
                       es decir, unifica verticalmente las columnas de un dataset/dataframe A; con las 
                       columnas de un dataset/dataframe B; gracias a la(s) columna(s) en común que tienen
                       ambos conjuntos de datos.
                       
                       Debemos tener en cuenta que en ambos datasets/dataframes se debe tener la misma cantidad
                       tipos y orden en las columnas, evitando ciertas inconsistencias en el dataset final.
                       Además, en ninguna de as 3 funciones presentadas anteriormente elimina duplicados a menos
                       que utilicemos .distinc() o .dropDuplicates() según la casuística correspondiente. 
"""
### ✅ Utilizaremos estos dataset de ejemplo:
import numpy as np
diccionario_uno = {
    "Categorias":["ELECTRODOMÉSTICOS","TECNOLOGÍA","JUGUETES","DEPORTE","CULTURA"],
    "Año": [2022,2022,2023,2023,2023],
    "Ventas":[3565.16, 3723.17, 3614.21, 3552.18, 3572.76]
}

diccionario_dos = {
    "Categorias":["ELECTRODOMÉSTICOS","TECNOLOGÍA","JUGUETES","DEPORTE","CULTURA"],
    "Año": [2024,2024,2024,2025,2025],
    "Ventas":np.random.uniform(low=2150.25,high=4580.30,size=5).round(2).tolist()
}

diccionario_tres = {
    "Cat":["ELECTRODOMÉSTICOS","TECNOLOGÍA","JUGUETES","DEPORTE","CULTURA"],
    "Años": [2024,2024,2024,2025,2025],
    "Vent":np.random.uniform(low=2150.25,high=4580.30,size=5).round(2).tolist()
}

diccionario_cuatro = {
    "Cat":["ELECTRODOMÉSTICOS","TECNOLOGÍA","JUGUETES","DEPORTE","CULTURA"],
    "Años": [2024,2024,2024,2025,2025],
    "Vent":np.random.uniform(low=2150.25,high=4580.30,size=5).round(2).tolist()
}

diccionario_cinco = {
    "Categorias":["ELECTRODOMÉSTICOS","TECNOLOGÍA","JUGUETES","DEPORTE","CULTURA"],
    "Año": [2022,2022,2023,2023,2023],
    "Ventas":[3565.16, 3723.17, 3614.21, 3552.18, 3572.76]
}
df_uno = spark.createDataFrame(data=list(zip(*diccionario_uno.values())),schema=list(diccionario_uno.keys()))
df_dos = spark.createDataFrame(data=list(zip(*diccionario_dos.values())),schema=list(diccionario_dos.keys()))
df_tres = spark.createDataFrame(data=list(zip(*diccionario_tres.values())),schema=list(diccionario_tres.keys()))
df_cuatro = spark.createDataFrame(data=list(zip(*diccionario_cuatro.values())),schema=list(diccionario_cuatro.keys()))
df_cinco = spark.createDataFrame(data=list(zip(*diccionario_cinco.values())),schema=list(diccionario_cinco.keys()))

In [0]:

#### ➡️ UNION:
## ------ Datasets con las mismas características en las columnas (Nombre,Tipo,Cantidad) 
df_ejemplo_1 = df_uno.union(other=df_dos)
# df_ejemplo_1.show()

## ------ Datasets con las mismas características en Tipo y Cantidad (Nombres diferentes) 
df_ejemplo_2 = df_uno.union(other=df_tres)
# df_ejemplo_2.show()

## ------ Datasets con datos duplicados y mismas características en las columnas(Nombres,Tipo,Cantidad) 
df_ejemplo_3 = df_uno.union(other=df_cinco) ## NO ELIMINAMOS DUPLICADOS
# df_ejemplo_3.show()

## ------ Datasets con datos duplicados y mismas características en las columnas(Nombres,Tipo,Cantidad) 
df_ejemplo_4 = df_uno.union(other=df_cinco).dropDuplicates()
df_ejemplo_4.show() ## ELIMINAMOS DUPLICADOS


In [0]:

#### ➡️ UNIONALL:
## ------ Datasets con las mismas características en las columnas (Nombre,Tipo,Cantidad) 
df_ejemplo_1 = df_uno.unionAll(other=df_dos)
# df_ejemplo_1.show()

## ------ Datasets con las mismas características en Tipo y Cantidad (Nombres diferentes) 
df_ejemplo_2 = df_uno.unionAll(other=df_tres)
# df_ejemplo_2.show()

## ------ Datasets con datos duplicados y mismas características en las columnas(Nombres,Tipo,Cantidad) 
df_ejemplo_3 = df_uno.unionAll(other=df_cinco) ## NO ELIMINAMOS DUPLICADOS
# df_ejemplo_3.show()

## ------ Datasets con datos duplicados y mismas características en las columnas(Nombres,Tipo,Cantidad) 
df_ejemplo_4 = df_uno.unionAll(other=df_cinco).dropDuplicates()
# df_ejemplo_4.show() ## ELIMINAMOS DUPLICADOS

In [0]:

#### ➡️ UNIONBYNAME:
## ------ Datasets con las mismas características en las columnas (Nombre,Tipo,Cantidad) 
df_ejemplo_1 = df_uno.unionByName(other=df_dos,allowMissingColumns=True)
# df_ejemplo_1 = df_uno.unionByName(other=df_dos,allowMissingColumns=False) ## Mismo resultado por las caracteristicas de las columnas.
# df_ejemplo_1.show()

## ------ Datasets con las mismas características en Tipo y Cantidad (Nombres diferentes) 
# df_ejemplo_2 = df_uno.unionByName(other=df_tres,allowMissingColumns=False) ## Error por los nombres diferentes de columnas.
df_ejemplo_2 = df_uno.unionByName(other=df_tres,allowMissingColumns=True) ## Columnas diferentes se vuelven columnas independientes. 
# df_ejemplo_2.show()

## ------ Datasets con datos duplicados y mismas características en las columnas(Nombres,Tipo,Cantidad) 
df_ejemplo_3 = df_uno.unionByName(other=df_cinco,allowMissingColumns=True) ## NO ELIMINAMOS DUPLICADOS
# df_ejemplo_3 = df_uno.unionByName(other=df_cinco,allowMissingColumns=False) ## Mismo resultado por las caracteristicas de las columnas.
# df_ejemplo_3.show()

## ------ Datasets con datos duplicados y mismas características en las columnas(Nombres,Tipo,Cantidad) 
df_ejemplo_4 = df_uno.unionByName(other=df_cinco,allowMissingColumns=True).dropDuplicates()

## Mismo resultado por las caracteristicas de las columnas.
# df_ejemplo_4 = df_uno.unionByName(other=df_cinco,allowMissingColumns=False).dropDuplicates() 
# df_ejemplo_4.show() ## ELIMINAMOS DUPLICADOS

##### FASE 6. EXPORTACIÓN DE DATASETS/DATAFRAMES

En PySpark dentro de Databricks, la exportación de datos cumple el mismo rol crítico: cerrar el ciclo de manipulación y transformación de datos asegurando que la información ya procesada quede disponible para su consumo, integración o almacenamiento.Sin embargo, la diferencia clave es que, bajo Unity Catalog, toda la gestión de datos se centraliza en un catálogo/esquema/volumen-tabla que permite una seguridad y gobernanza en la información. Los siguientes formatos para exportar los datasets serán: CSV, PARQUET, DELTA TABLE y ARCHIVO DELTA.

In [0]:
### 💡 UTLIZAREMOS DE EJEMPLO ESTE DATASET
data = {
    "id": [1, 2, 3, 3, None],
    "nombre": ["Ana", "Luis", "Karla", "Karla", "Pedro"],
    "edad": [23, 35, 29, 29, None]
}
df = spark.createDataFrame(data=list(zip(*data.values())),schema=list(data.keys()))
# df.show()

In [0]:
from pyspark.sql.types import IntegerType
### ✅ Realizaremos una Limpieza básica
df = df.dropna()                 # eliminar filas con valores nulos
df = df.drop_duplicates()        # eliminar duplicados
df = df.withColumn(
    "id",
    col("id").cast(dataType=IntegerType()) # castear columna a entero
)
df = df.withColumnsRenamed({"nombre":"Nombre","edad":"Edad"})

In [0]:
### 💡CREAREMOS UN CATALAGO y UN ESQUEMA PARA ALMACENAR LOS ARCHIVOS A EXPORTAR.
##----1. CREAMOS EL CATALAGO
spark.sql("CREATE CATALOG IF NOT EXISTS catalago_exportacion_archivos")
print("Catalago creado correctamente.")
##----2. CREAMOS EL ESQUEMA
spark.sql("CREATE SCHEMA IF NOT EXISTS catalago_exportacion_archivos.esquema_exportacion_archivos")  
print("Esquema creado correctamente.")

In [0]:
##---- ✅🗃️ EXPORTANDO A FORMATO CSV (COMMA SEPARATED VALUES) [CREAMOS UN VOLUMEN INDEPENDIENTE]
## 1. CREAMOS EL VOLUMEN:
spark.sql("CREATE VOLUME IF NOT EXISTS catalago_exportacion_archivos.esquema_exportacion_archivos.volumen_csv")
print("Volumen CSV creado correctamente.")
## 2. EXPORTAMOS/ALMACENAMOS EL ARCHIVO EN EL VOLUMEN:
df.write.option("header","true").option("inferSchema","true").mode('overwrite').csv("/Volumes/catalago_exportacion_archivos/esquema_exportacion_archivos/volumen_csv")
print("Se exportó el dataframe en modo CSV correctamente.")

In [0]:
##---- ✅🗃️ EXPORTANDO A FORMATO PARQUET (FORMATO COLUMNAR OPTIMIZADO) [CREAMOS UN VOLUMEN INDEPENDIENTE]
## 1. CREAMOS EL VOLUMEN:
spark.sql("CREATE VOLUME IF NOT EXISTS catalago_exportacion_archivos.esquema_exportacion_archivos.volumen_parquet")
print("Volumen PARQUET creado correctamente.")
## 2. EXPORTAMOS/ALMACENAMOS EL ARCHIVO EN EL VOLUMEN:
df.write.mode('overwrite').parquet("/Volumes/catalago_exportacion_archivos/esquema_exportacion_archivos/volumen_parquet")
print("Se exportó el dataframe en modo PARQUET correctamente.")

In [0]:
##---- ✅🗃️ EXPORTANDO A FORMATO ARCHIVO DELTA (ARCHIVO FÍSICO DE DATOS QUE NO ES GOBERNADO POR UNITY CATALOG COMO ENTIDAD) [CREAMOS UN VOLUMEN INDEPENDIENTE]
## 1. CREAMOS EL VOLUMEN:
spark.sql("CREATE VOLUME IF NOT EXISTS catalago_exportacion_archivos.esquema_exportacion_archivos.volumen_archivo_delta")
print("Volumen de ARCHIVO DELTA creado correctamente.")
## 2. EXPORTAMOS/ALMACENAMOS EL ARCHIVO EN EL VOLUMEN:
df.write.format('delta').mode("overwrite").save("/Volumes/catalago_exportacion_archivos/esquema_exportacion_archivos/volumen_archivo_delta")
print("Se exportó el dataframe en modo ARCHIVO DELTA correctamente.")

In [0]:
##---- ✅🗃️ EXPORTANDO A FORMATO DELTA TABLE (TABLA OPTIMIZADA PARA SU USO EFICIENTE EN LECTURA/ESCRITURA DE DATOS)
#  [CREAMOS DIRECTAMENTE LA TABLA RELACIONADA A UN CATÁLAGO Y ESQUEMA]
## 1. EXPORTAMOS/ALMACENAMOS EL ARCHIVO DIRECTAMENTE EN UNA TABLA
df.write.format("delta").mode("overwrite").saveAsTable("catalago_exportacion_archivos.esquema_exportacion_archivos.delta_table")
print("Se exportó el dataframe a una DELTA TABLE correctamente.")

###### PUEDES REVISAR LA IMAGEN OutPutExportacionDatosDatabricks.png PARA VER LA ESTRUCTURA DE ESTA FASE.