# 1. Mount Azure Data Lake using Service Principal
#### Steps to follow
1. Get client_id, tenant_id and client_secret from key vault
2. Set Spark Config with App/ Client Id, Directory/ Tenant Id & Secret
3. Call file system utlity mount to mount the storage
4. Explore other file system utlities related to mount (list all mounts, unmount)

Obtenemos las credenciales necesarias para montar un almacenamiento en Microsoft Data Lake

In [0]:
#Get client_id, tenant_id and client_secret from key vault
client_id = dbutils.secrets.get(scope = 'mlops-scope', key = 'mlops-app-client-id')
tenant_id = dbutils.secrets.get(scope = 'mlops-scope', key = 'mlops-app-tenant-id')
client_secret = dbutils.secrets.get(scope = 'mlops-scope', key = 'mlops-app-client-secret')

Asignamos las variables necesariamos para montar algun Data Lake

In [0]:
#Set Spark Config with App/ Client Id, Directory/ Tenant Id & Secret
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": client_id,
          "fs.azure.account.oauth2.client.secret": client_secret,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}

Montamos el almacenamiento de Data Lake que deseamos

In [0]:
#Call file system utlity mount to mount the storage
dbutils.fs.mount(
  source = "abfss://raw@datalakemlopsd4m.dfs.core.windows.net/",
  mount_point = "/mnt/datalakemlopsd4m/raw/",
  extra_configs = configs)

In [0]:
#Verificar que contenedores estan montados en el azure datalake
#%fs mounts

#Desmontar el Contenedor especifico
#dbutils.fs.unmount("/mnt/datalakemlopsd4m/demo/")

#Ver cuales son los archivos que estan en el Directorio
#display(dbutils.fs.ls("/mnt/datalakemlopsd4m/raw/marcobre/turno1/"))

#Revisar el el bd cargado apartir de spark
#display(spark.read.csv("/mnt/datalakemlopsd4m/raw/marcobre/turno1/datos_turno1_vf.csv", header=True))

# **2. Comprensión de los Datos(EDA) y Preprocesamiento de Datos**

### *2.1 Cargamos los datos desde el Storage (RAW) para realizar la comprension y preparacion de datos*

In [0]:
import pandas as pd
# Cargar el archivo CSV en un DataFrame de Spark
datos_turno1 = spark.read.csv("/mnt/datalakemlopsd4m/raw/marcobre/turno1/datos_turno1_vf.csv", header=True)
datos_turno2 = spark.read.csv("/mnt/datalakemlopsd4m/raw/marcobre/turno1/datos_turno1_vf.csv", header=True)

# Convertir DataFrame de Spark a DataFrame de Pandas
df_turno1 = datos_turno1.toPandas()
df_turno2 = datos_turno2.toPandas()

# Agregar columna "turno" con valor 1 y 2 al DataFrame df_turno1
df_turno1['turno'] = 1
df_turno2['turno'] = 2

#Consolidar los datos del turno 1 y turno 2
datos = pd.concat([df_turno1, df_turno2], ignore_index=True)

# Ahora puedes manejar el DataFrame con Pandas
# Por ejemplo, puedes usar funciones de Pandas como head(), describe(), etc.
datos.head()

#### Paso Final (el DF final de pandas, debes convertirlo a un DF de SPARK, para que los datos limipios sean reflejados)

1. Convertimos el df-pandas a un df-spark, para poder guardarlo en el Data Storage

In [0]:
#Guardar luego de convertirlo a un DataFrame de Spark
spark_datos = spark.createDataFrame(datos)

2. Convertimos el tipo de datos Void a String, para poder guardar en un archivo CSV (Data Storage)

In [0]:
from pyspark.sql.functions import when, col

# Lista de todas las columnas
columnas = spark_datos.columns

# Convertir valores 'void' a cadena vacía ('') en todas las columnas que contienen 'void'
for columna in columnas:
    spark_datos = spark_datos.withColumn(
        columna,
        when(col(columna) == 'void', '').otherwise(col(columna))
    )

3. Guardamos el df-spark en la ruta del Data Storage de Microsoft Azure

In [0]:
# Ruta donde quieres guardar el archivo CSV en tu Azure Data Lake Storage
ruta_guardado = "/mnt/datalakemlopsd4m/raw/marcobre/datosraw/datostotalmarcobre.csv"

# Guardar el DataFrame en formato CSV en la ruta especificada
spark_datos.write.csv(ruta_guardado, header=True, mode="overwrite")
#spark_datos.repartition(1).write.csv(ruta_guardado, header=True, mode="overwrite") #Sirve para guardar solo en 1 partition csv

4. Cargamos el CSV(Carpeta Spark) del Data Storage, para verificar que todo este OK (Tambien se convierte de dfSpark a dfPandas)

In [0]:
# Lee el archivo CSV en un DataFrame de Spark
ruta_carpeta_csv = "/mnt/datalakemlopsd4m/raw/marcobre/datosraw/datostotalmarcobre.csv"
df_spark = spark.read.option("header", "true").csv(ruta_carpeta_csv)

# Muestra los primeros registros del DataFrame de Spark Convertido a un DataFrame Pandas
df_pandas = df_spark.toPandas()
df_pandas.head()

In [0]:
df_pandas.shape

## **3. Guardar los Datos procesados(spark df) apartir del storage RAW, en una tabla DELTA (hacia storage PROCESSED)**

Montamos el almacenamiento de Data Lake donde gaurdaremos estos datos si no esta (Processed)

In [0]:
#Call file system utlity mount to mount the storage
dbutils.fs.mount(
  source = "abfss://processed@datalakemlopsd4m.dfs.core.windows.net/",
  mount_point = "/mnt/datalakemlopsd4m/processed/",
  extra_configs = configs)

Creamos la BD donde almacenaremos las Tables DELTA (Verificar si ya esta creada)

In [0]:
# Listar todas las bases de datos
#spark.sql("SHOW DATABASES").show()

In [0]:
# Crear la base de datos si no existe en el almacenamiento de PROCESSED
#spark.sql("CREATE DATABASE IF NOT EXISTS processed_db LOCATION '/mnt/datalakemlopsd4m/processed/'")

### **Guardamos el df preprocesado en la tabla DELTA (datos_turno1_deltavf) dentro de la BD (processed_db)**

In [0]:
spark_datos.write.format("delta").mode("overwrite").saveAsTable("processed_db.datos_processed_delta")

Verificamos que se creo la Tabla DELTA en la BD adecuada

In [0]:
# Listar todas las bases de datos
#spark.sql("SHOW DATABASES").show()

# Listar las tablas en una base de datos específica (por ejemplo, processed_db)
spark.sql("SHOW TABLES IN processed_db").show()