# Sesión 2 Databricks - Introducción a Apache Spark: RDDs, DataFrames, Datasets y Spark SQL


Todo el contenido de esta sesión puede ser amplicado con: 
- https://docs.databricks.com/aws/en/
- https://spark.apache.org/docs/latest/api/python/index.html
- https://learn.microsoft.com/en-us/azure/databricks/notebooks/best-practices

## Tipos de  celdas

En Databricks, podemos crear celdas de lops siguientes tipos:
* Markdown
* Python
* R
* Scala
* SQL
* Bash

In [0]:
# %scala
# object HelloWorld {
#   def main(args: Array[String]): Unit = {
#     println("Hello, World!")
#   }
# }

In [0]:
%sh
ls -la /

In [0]:
# %r
# x <- sqrt(18)
# print(x)

In [0]:
print("Hola mundo!")

## dbutils

dbutils es una utilidad interna de Databricks que proporciona comandos para interactuar con el entorno de ejecución de notebooks. Es muy útil para tareas como manipular archivos en DBFS, trabajar con secretos, ejecutar notebooks dentro de otros notebooks y más.

In [0]:
dbutils.help()

### dbutils.secrets
Para usar secretos, por detrás se puede almacenar en 
* Azure Key Vault.
* Base de datos administrada por Databricks.
* AWS Secrets Manager (IAM Rol configurado para poder acceder).

In [0]:
dbutils.secrets.help()

### dbutils.fs
Para interactuar con el sistema de archivos, utilizamos *dbutils.fs*, comandos de interés:
* dbutils.fs.cp
* dbutils.fs.ls
* dbutils.fs..mv
* dbutils.fs.rm

In [0]:
dbutils.fs.help()

In [0]:
#dbutils.fs.mounts()

In [0]:
dbutils.fs.ls("/")

### dbutils.notebook
Para poder interactuar con otros notebooks, utilizamos *dbutils.notebook*

In [0]:
%python
dbutils.notebook.help()

### dbutils.widgets
Para poder paramterizar los notebooks, se utilizan los widgets, que hay de 4 tipos:
* text
* dropdown
* combobox
* multiselect

In [0]:
dbutils.widgets.text("schema_name", "databricks_david_schema")
# dbutils.widgets.dropdown("state", "CA", ["CA", "IL", "MI", "NY", "OR", "VA"])
# dbutils.widgets.combobox(
#   name='fruits_combobox',
#   defaultValue='banana',
#   choices=['apple', 'banana', 'coconut', 'dragon fruit'],
#   label='Fruits'
# )
# dbutils.widgets.multiselect(
#   name='days_multiselect',
#   defaultValue='Tuesday',
#   choices=['Monday', 'Tuesday', 'Wednesday', 'Thursday',
#     'Friday', 'Saturday', 'Sunday'],
#   label='Days of the Week'
# )


Para obtener el valor del parámetro establecido:

In [0]:
dbutils.widgets.get("schema_name") 

Para obtener como un diccionario en Python, todos los parámetros establecidos:


In [0]:
dbutils.widgets.getAll()

Para eliminar uno o todos los widgets:

In [0]:

#dbutils.widgets.remove('schema_name')
#dbutils.widgets.removeAll()

## Catalog

En Databricks, un Catalog es la capa más alta dentro del sistema de namespaces para organizar datos. Introducido con Unity Catalog, permite gestionar catálogos, esquemas y tablas de manera centralizada y segura, especialmente útil en entornos multiusuario y con integración de seguridad.

La jerarquía de objetos es:

Catalog > Schema (Database) > Table / View / Function

Cada catálogo puede contener múltiples esquemas, y cada esquema puede tener múltiples tablas o vistas.
Puedes usar SQL para crear un catálogo nuevo de esta forma:

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS sesion2;
-- CREATE CATALOG IF NOT EXISTS customer_cat COMMENT 'This is customer catalog';
-- CREATE CATALOG customer_cat MANAGED LOCATION 's3://depts/finance';
-- CREATE FOREIGN CATALOG postgresql_catalog USING CONNECTION postgresql_connection OPTIONS (database = 'my_db');

## Schemas


Un **Schema** (también conocido como Database en otros entornos) es un contenedor lógico dentro de un catálogo que agrupa tablas, vistas, funciones y otros objetos relacionados.  

In [0]:
%sql
-- En el esquema por defecto
CREATE SCHEMA IF NOT EXISTS schema_in_default_catalog

Puedes hacer tu notebook más flexible usando widgets para capturar el nombre del schema como parámetro:

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS ${schema_name}

También puedes capturar el valor del widget en Python y construir dinámicamente la consulta SQL, lo cual, es mucho más flexible:

In [0]:
# Obtener el valor del widget
custom_schema = dbutils.widgets.get("schema_name")

# Crear la sentencia SQL con el nombre del schema
query = f"""
    CREATE SCHEMA IF NOT EXISTS {custom_schema}
"""

# Ejecutar el SQL dinámico
spark.sql(query)

Esto es útil cuando:
* Quieres hacer validaciones antes de ejecutar.
* El nombre del esquema depende de lógica adicional.
* Estás creando múltiples objetos de forma programática.

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS sesion2.${schema_name} COMMENT 'This is customer catalog';
-- CREATE SCHEMA customer_sc MANAGED LOCATION 's3://depts/finance';

Ahora vamos a obtener la información del esquema:

In [0]:
%sql
DESCRIBE SCHEMA EXTENDED ${catalog_name}.${schema_name};

## Volumes

Un **Volume** es un espacio de almacenamiento en un esquema (schema) de Unity Catalog que se utiliza para guardar archivos como CSV, JSON, Parquet, imágenes, etc.  
Es una forma estructurada y segura de trabajar con archivos dentro del entorno de Databricks, usando controles de acceso unificados (ACLs).

---

### 🔹 ¿Qué es un Volume?

- Son directorios montados dentro de Unity Catalog.
- Pueden ser **internos** (gestionados por Databricks) o **externos** (en un bucket S3 o ADLS).
- Útiles para manejar datos no tabulares o datos crudos que aún no han sido ingeridos en tablas.

In [0]:
%sql
-- CREATE VOLUME my_another_volume
CREATE VOLUME IF NOT EXISTS ${catalog_name}.${schema_name}.landing;
-- Create an external volume on the specified location with comment
-- CREATE EXTERNAL VOLUME my_catalog.my_schema.my_external_volume
-- LOCATION 's3://my-bucket/my-location/my-path'
-- COMMENT 'This is my example external volume on S3'

Ahora vamos a descargar un fichero CSV dentro del volumen, para ello, vamos a hacer uso de CURL:

In [0]:
%sh
curl -L https://raw.githubusercontent.com/dvddepennde/crops_training_school/refs/heads/main/nutrients_csvfile.csv -o /Volumes/sesion2/databricks_david_schema/landing/nutrients_csvfile.csv

In [0]:
%sql
SELECT *
FROM csv.`/Volumes/${catalog_name}/${schema_name}/landing/nutrients_csvfile.csv`

## Ejercicios dbutils & catalog & schema & volumes

1. Crea parámetros que indiquen el nombre del catálogo, nombre de schema y nombre de volumen. Asignales un valor que gustes, es el que utilizaremos para posteriores ejercicios.

2. Obten el valor de cada uno de los widgets y muestralos en una sola linea con el formato "Nombre de catálogo: {}, Nombre del esquema: {}, Nombre del volumen: {}"

3. Crea el catálogo, dentro de este, el esquema, y por último, dentro del esquema, el volumen. Haz las sentencias tanto en celdas SQL como en Python.

*IMPORTANTE*: Debes utilizar variables y no poner directamente el valor del parámetro en la sentencia

4. Descarga el archivo de la URL proporcionada dentro del volumen creado previamente. Una vez creado, deberás verificar que existe.

URL: https://raw.githubusercontent.com/dvddepennde/luxury-watches-analysis/refs/heads/main/data/Luxury%20watch.csv

Puede que la celda no sea necesario que sea en python...

5. Sube un CSV al volumen, de forma programatica o manual, para posterior análisis. Podéis buscar en https://www.kaggle.com/datasets . Una vez lo subas, consulta su contenido con SparkSQL.

6. Crea un notebook y llámalo sesion2_notebook_called, el cual, debe recibir por parámetro:
* db_name: Simulamos nombre de base de datos, si no sabéis que poner, poned "public".
* table_name: Simulamos nombre de tabla, si no sabéis que poner, poned vuestro nombre.
* num: Generar un número entero de forma aleatoria utilizando random.

El cual obtenga los valores, valide que "num" es un número y devolver:
* En caso de que la validación de num no sea correcta, un mensaje como el siguiente: "{'status': 'FAILED', 'custom_message': '<REPLACE_WITH_CUSTOM_MSJ>'}". El notebook no deberá continuar.
* En caso de que la validación sea correcta, un mensaje "Simulando operaciones..." y que devuelva: "{'status': 'OK', custom_message: '<REPLACE_WITH_CUSTOM_MSJ>', ... }" donde ... corresponde con devolver los mismos parámetros que recibió.

tip: Para validar si es o no entero, podéis usar este fragmento de código, o cualquiera que os parezca mejor:

```python3
try:
    # Intentamos convertirlo a entero
    num = int(num_str)
    # Código en caso de que SÍ sea entero

except ValueError:
    # Código si no es numérico (entero)
```


# Spark

Cuando PySpark lee un archivo (por ejemplo, un archivo CSV), puede intentar adivinar los tipos de datos de cada columna, basándose en los primeros registros del archivo. Este proceso se llama inferencia del esquema.

In [0]:
path_luxury_watches = f"/Volumes/{catalog}/{schema}/{volume}/luxury_watches.csv"
df = spark.read.csv(
    path=path_luxury_watches,
    header=True,     # Utiliza la primera fila como nombres de columnas
    inferSchema=True ## INFERENCIA DE ESQUEMA HABILITADA
)
display(df)

Ahora, en este caso, vamos a declarar nosotros el esquema que va a tener el dataframe:

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Definir el esquema manualmente
schema = StructType([
    # Nombre, Tipo de dato, Requerido
    StructField("Brand", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Case Material", StringType(), True),
    StructField("Strap Material", StringType(), True),
    StructField("Movement Type", StringType(), True),
    StructField("Water Resistance", StringType(), True),
    StructField("Case Diameter (mm)", DoubleType(), True),
    StructField("Case Thickness (mm)", DoubleType(), True),
    StructField("Band Width (mm)", DoubleType(), True),
    StructField("Dial Color", StringType(), True),
    StructField("Crystal Material", StringType(), True),
    StructField("Complications", StringType(), True),
    StructField("Power Reserve", StringType(), True),
    StructField("Price (USD)", StringType(), True)
])

# Cargar el archivo CSV usando el esquema definido manualmente
df = spark.read.csv(path=path_luxury_watches, header=True, schema=schema)

# Mostrar los datos
display(df)


En cuanto a las diferencias, es importante tener en cuenta las siguientes características:


| Característica               | Inferir Esquema (automático)         | Definir Esquema (manual)         |
|------------------------------|--------------------------------------|----------------------------------|
| **Facilidad de uso**          | Muy fácil, no requiere intervención. | Requiere más trabajo inicial.   |
| **Precisión**                 | Puede ser impreciso con datos inconsistentes. | Alta precisión (100% control).  |
| **Rendimiento**               | Más lento en archivos grandes.       | Más rápido, ya que no necesita inferir. |
| **Flexibilidad**              | Ideal para datos que varían frecuentemente. | Menos flexible, requiere actualización manual. |
| **Robustez**                  | Puede fallar en ciertos casos (por ejemplo, valores no numéricos). | Es más robusto y seguro.       |


Pero...
- ¿Qué pasa si el esquema no coincide?: Si el esquema no coincide con el origen, Spark puede aceptar los datos dejando valores nulos (permissive), descartar las filas corruptas (dropMalformed) o fallar inmediatamente (failFast) según spark.read.option("mode", …).

- ¿Que ocurre si una de las columnas declaradas no existe?: Si el esquema declara una columna que no existe en los datos, Spark la crea y la rellena siempre con null (no lanza excepción)
- ¿Que ocurre si en el origen hay más columnas que las definidas en el esquema?: Si los datos tienen columnas extras que no están en el esquema, Spark las ignora y solo carga las definidas (dependiendo de enforceSchema).
- ¿Cuando se debería usar un esquema fijo y cuando inferirlo?: Se recomienda usar esquema fijo cuando se requiere alto rendimiento, consistencia y control estricto; usar inferSchema en etapas exploratorias o con datos muy variables para ganar flexibilidad.

Mostramos el esquema de las columnas:

In [0]:
df.printSchema()

Mostrar únicamente las N primeras filas:

In [0]:
display(df.show(5))

Obtener las dimensiones del DataFrame:

In [0]:
rows = df.count()
columns = len(df.columns)
print(f"Número de filas: {rows}, columnas: {columns}")


Obtener lista de columnas del DataFrame

In [0]:
print(df.columns)


Convertir el dataframe a dataframe de Pandas:

In [0]:
pandas_df = df.toPandas()
pandas_df.sample(4)

Ordenar por alguna columna, de forma ascendente/descendente:

In [0]:
display(df.orderBy("Case Diameter (mm)", ascending=True).show(5))
## Lo cual es idéntico a
# display(df.sort("Case Diameter (mm)", ascending=True).show(5)


Ordenar por varias columnas:

In [0]:
display(df.sort(['Band Width (mm)',"Case Diameter (mm)"], ascending=[False, False]).show(5))
## Lo cual es idéntico a
# display(df.orderBy(['Band Width (mm)',"Case Diameter (mm)"], ascending=[False, False]).show(5))

Renombramos columna:

In [0]:
df = df.withColumnRenamed("Price (USD)", "Price")
# from pyspark.sql.functions import col
# df = df.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
display(df)


Añadir columna como un literal:

In [0]:
from pyspark.sql.functions import lit
df = df.withColumn("Currency", lit("USD"))
df = df.withColumn("File", lit(f"/Volumes/{catalog}/{schema}/{volume}/luxury_watches.csv"))
display(df)

Eliminamos columna:

In [0]:
df = df.drop("File")
display(df.limit(3))

Obtenemos únicamente las columnas que nos resultan interesantes

In [0]:
display(df.select("Brand", "Model", "Price"))

Mostramos los valores únicos de una columna

In [0]:
df.select("Brand").distinct().show()

Unimos la columna Brand con el model:


In [0]:
from pyspark.sql.functions import concat_ws, col
df = df.withColumn(
    "Brand_Model",
    concat_ws(" ", col("Brand"), col("Model"))  # Concatenar 'Brand' y 'Model' con un espacio en el medio
)
display(df)

Convertimos la columna Price a numérico

In [0]:
from pyspark.sql.functions import col, regexp_replace
df_cleaned = df.withColumn(
    "Price",
    regexp_replace(col("Price"), r"[^\d.]", "")  # Eliminar todo lo que no sea dígito o punto
).withColumn(
    "Price",
    col("Price").cast("double")  # Convertir a tipo numérico (Double)
)
df_cleaned.printSchema()

Vamos ahora a ordenar por precio del reloj:

In [0]:
display(df_cleaned.orderBy("Price", ascending=False).limit(10))


### Filtros


Filtramos por un valor exacto de tipo String

In [0]:
# Filtrar los registros donde la columna 'Movement Type' tiene el valor 'Automatic'
df_filtered = df_cleaned.filter(df_cleaned['Movement Type'] == 'Automatic')

# Mostrar las primeras 5 filas
display(df_filtered.limit(5))

Filtramos con condición de cadena:

In [0]:
display(df_cleaned.filter(
  (df_cleaned['Case Material'].contains('Steel')) & 
  ~(df_cleaned['Strap Material'].contains('eather')) &
  (df_cleaned['Water Resistance'].contains('100'))
))

Filtramos con operador mayor que:

In [0]:
# Filtrar los registros donde la columna 'Price' es mayor que 98
df_filtered = df_cleaned.filter(df_cleaned['Price'] > 65000)
display(df_filtered)

Condiciones múltiples:

In [0]:
df_filtered = df_cleaned.filter(
  (df_cleaned['Brand'] == 'Rolex') & 
  (df_cleaned['Movement Type'] == 'Automatic') & 
  (df_cleaned['Power Reserve'] == '55 hours') & 
  (df_cleaned['Price'] > 65000)
)
display(df_filtered)


### Agrupaciones

Contar los relojes por marca

In [0]:
display(df_cleaned.groupBy('Brand').count().withColumnRenamed("count", "Nº de relojes").orderBy('Nº de relojes', ascending=False).show(5))

Media de precio por marca y modelo

In [0]:
#display(df_cleaned.groupBy(['Brand', 'Model']).avg())
#display(df_cleaned.select(['Brand', 'Model' , 'Price']).groupBy(['Brand', 'Model']).avg())
display(
    df_cleaned.select([
        'Brand', 'Model' , 'Price']
    ).groupBy(
        ['Brand', 'Model']
    ).avg().withColumnRenamed(
        "avg(Price)", "AVG Price"
    ).sort(['AVG Price', 'Brand'], ascending=[False, True]).show(5)
)

### Tabla

In [0]:
catalog = dbutils.widgets.get('catalog_name')
schema = dbutils.widgets.get('schema_name')
table_name = "luxury_watches"

## Renombramos columnas, ya que no se aceptan acentos, ni espacios, establecemos snake_case

df_cleaned = df_cleaned.withColumnRenamed("Case Material", "case_material").withColumnRenamed("Strap Material", "strap_material").withColumnRenamed("Movement Type", "movement_type").withColumnRenamed("Water Resistance", "water_resistance").withColumnRenamed("Case Diameter (mm)", "case_diameter").withColumnRenamed("Case Thickness (mm)", "case_thickness").withColumnRenamed("Band Width (mm)", "band_width").withColumnRenamed("Dial Color", "dial_color").withColumnRenamed("Crystal Material", "crystal_material").withColumnRenamed("Complications", "complications").withColumnRenamed("Power Reserve", "power_reserve").withColumnRenamed("Price", "price").withColumnRenamed("Currency", "currency").withColumnRenamed("Brand_Model", "brand_model")

df_cleaned.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{table_name}")
print(f"Tabla creada: {catalog}.{schema}.{table_name}")

Consultar la tabla creada:


In [0]:
data = spark.sql(f"""SELECT * FROM {catalog}.{schema}.{table_name}""")
display(data)

### Ejercicios Spark

1. Filtrar por la marca Rolex

2. Consulta la tabla samples.tpch.orders y realiza algún tipo de gráfico (O alguna similar de samples.tpch o del CSV extraído)


3. Utilizando el CSV propio que has subido al volumen, crea una tabla en el catálogo creado y consultala con SQL. Prueba a añadir columnas adicionales al CSV, como por ejemplo, refresh_date a la fecha actual, nombre del fichero...

# Control de versiones + Best practiques  + Modularization
En este punto, vamos a unir nuestro repositorio de Github: https://github.com/dvddepennde/databricks_notebook_bp al Workspace y trabajaremos con él, realizando una serie de operaciones y mostrando como debería ser estructurado.