In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

Definición de rutas de almacenamiento para las diferentes capas de procesamiento de datos
 
 1. **landing_path**: Ruta donde se almacenan los datos crudos que se reciben desde las fuentes externas, antes de cualquier transformación o limpieza.
 2. **bronze_path**: Ruta donde se almacenan los datos transformados mínimamente, a menudo llamados "datos de bronce". En esta capa se realizan las primeras limpiezas y validaciones.
 3. **silver_path**: Ruta para almacenar los datos que han sido limpiados y transformados en una forma más estructurada, listos para análisis más complejos.
 4. **gold_path**: Ruta donde se almacenan los datos más refinados, listos para ser utilizados en análisis avanzados, visualizaciones o para la toma de decisiones.

In [0]:
landing_path = '/mnt/project/Landing/'
bronze_path = '/mnt/project/Bronze/'
silver_path = '/mnt/project/Silver/'
gold_path = '/mnt/project/gold/'

El proposito es construir dinámicamente la cadena de conexión JDBC a una base de datos SQL Server usando valores almacenados de manera segura en Azure Databricks Secrets sto asegura que las credenciales de acceso al servidor y a la base de datos no se expongan directamente en el código, mejorando la seguridad.

1.  **Obtención de credenciales de Azure Databricks Secrets:**

- `dbutils.secrets.get(scope = "sc-adls", key = "ServerName"):` Este comando obtiene el valor del secreto **ServerName** desde el almacén de secretos de Azure Databricks con el scope sc-adls. Esto proporciona el nombre del servidor SQL al que se conectará.
- `dbutils.secrets.get(scope = "sc-adls", key = "DatabaseName"):` De manera similar, este comando obtiene el valor del secreto DatabaseName desde el mismo almacén de secretos, proporcionando el nombre de la base de datos a la que se va a acceder.

2.  **Creación de la cadena de conexión JDBC:**

- `url = "jdbc:sqlserver://{0};database={1}".format(server_name, database_name):` Este comando formatea una cadena JDBC para conectar a una base de datos SQL Server, reemplazando `{0}` por el nombre del servidor (**`server_name`**) y `{1}` por el nombre de la base de datos (**`database_name`**). El resultado es una cadena de conexión completa que permite a la aplicación conectarse al servidor SQL y a la base de datos especificada.

In [0]:
# Define the Azure SQL Server details
server_name = dbutils.secrets.get(scope = "sc-adls", key = "ServerName")
database_name = dbutils.secrets.get(scope = "sc-adls", key = "DatabaseName")
url = "jdbc:sqlserver://{0};database={1}".format(server_name, database_name)

obtener de forma segura las credenciales de usuario (nombre de usuario y contraseña) almacenadas en el almacén de secretos de Azure Databricks.

- `user = dbutils.secrets.get(scope = "sc-adls", key = "UserName"):` Este comando obtiene el valor del secreto UserName desde el almacén de secretos de Azure Databricks, con el scope sc-adls. Este secreto contiene el nombre de usuario que se utilizará para conectarse a la base de datos o a otros recursos protegidos.
Obtención de la contraseña:

- `password = dbutils.secrets.get(scope = "sc-adls", key = "PasswordDatabase"):` De manera similar, este comando obtiene el valor del secreto PasswordDatabase desde el mismo almacén de secretos, proporcionando la contraseña asociada con el nombre de usuario previamente recuperado.

In [0]:
# Define the Azure SQL Server authentication details
user = dbutils.secrets.get(scope = "sc-adls", key = "UserName")
password = dbutils.secrets.get(scope = "sc-adls", key = "PasswordDatabase")

Las consultas se generan dinámicamente, permitiendo leer de manera flexible las tablas Delta desde el directorio `silver_path`, con cada nombre de tabla proporcionado en las variables `statement1`, `statement2`, `statement3` y `statement4`. Este enfoque permite mantener la flexibilidad al ejecutar consultas sobre diferentes conjuntos de datos dentro del mismo entorn

In [0]:
query_1 = """select 
                *
           from delta.`{}/{}`""".format(silver_path,'statement1')

query_2 = """select 
                *
           from delta.`{}/{}`""".format(silver_path,'statement2')

query_3 = """select 
                *
           from delta.`{}/{}`""".format(silver_path,'statement3')

query_4 = """select 
                *
           from delta.`{}/{}`""".format(silver_path,'statement4')



Ejecucion de las consultas Creadas

In [0]:
statement1 = spark.sql(query_1)
statement2 = spark.sql(query_2)
statement3 = spark.sql(query_3)
statement4 = spark.sql(query_4)

Define los nombres de las Tablas

In [0]:
# Define the table name
table_name_1 = "statement1"
table_name_2 = "statement2"
table_name_3 = "statement3"
table_name_4 = "statement4"

### Guardar Datos en Azure SQL Database
El siguiente código define una función save_to_azure_sql que guarda un DataFrame de PySpark en una base de datos SQL de Azure. Esta función utiliza el conector JDBC para escribir los datos de manera eficiente en la tabla correspondiente de la base de datos SQL de Azure.

Función save_to_azure_sql:
La función save_to_azure_sql toma un DataFrame de PySpark y el nombre de la tabla en la base de datos SQL de Azure como argumentos y guarda los datos en dicha tabla. El proceso se realiza mediante la conexión JDBC.

In [0]:
## SAVE DATA AZURE SQL DATABASE
def save_to_azure_sql(df, table_name):
    """
    Saves a PySpark DataFrame to Azure SQL Database with improved efficiency.
 
    Args:
        df (pyspark.sql.DataFrame): The DataFrame to save.
        table_name (str): The name of the Azure SQL table to write to.
    """
 
    df.write \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", user) \
        .option("password", password) \
        .mode("overwrite") \
        .save()
 
# Save each DataFrame using the function
save_to_azure_sql(statement1, table_name_1)
save_to_azure_sql(statement2, table_name_2)
save_to_azure_sql(statement3, table_name_3)
save_to_azure_sql(statement4, table_name_4)
