##### Establecemos una conexión segura con el Azure Data Lake Storage mediante la creación de un secret scope.

In [0]:

dbutils.secrets.listScopes()

In [0]:
val secretScope = "secret-scope-proyecto"
val adlsName = "adlseu2dsrpd01cv"
val adlsAccessKey = dbutils.secrets.get(scope = secretScope, key = "key-adlseu2dsrpd01cv")

In [0]:
spark.conf.set(s"fs.azure.account.key.${adlsName}.dfs.core.windows.net", adlsAccessKey)

In [0]:
dbutils.fs.ls("abfss://data@adlseu2dsrpd01cv.dfs.core.windows.net/archivoscsv")

##### Leyendo archivos .csv desde el Azure Data Lake Storage

In [0]:
// Tipo y ubicación del archivo
val file_location1 = "abfss://data@adlseu2dsrpd01cv.dfs.core.windows.net/archivoscsv/Alumnos.csv"
val file_location2 = "abfss://data@adlseu2dsrpd01cv.dfs.core.windows.net/archivoscsv/Cursos.csv"
val file_location3 = "abfss://data@adlseu2dsrpd01cv.dfs.core.windows.net/archivoscsv/Notas.csv"
val file_type = "csv"

// Opciones CSV
val infer_schema = "true"
val first_row_is_header = "true"
val delimiter = ","

// Lectura del archivo csv con las opciones definidas file_location1
val alumnos = spark.read.format(file_type)
  .option("inferSchema", infer_schema)
  .option("header", first_row_is_header)
  .option("sep", delimiter)
  .load(file_location1)

// Lectura del archivo csv con las opciones definidas file_location2
val cursos = spark.read.format(file_type)
  .option("inferSchema", infer_schema)
  .option("header", first_row_is_header)
  .option("sep", delimiter)
  .load(file_location2)

// Lectura del archivo csv con las opciones definidas file_location3
val notas = spark.read.format(file_type)
  .option("inferSchema", infer_schema)
  .option("header", first_row_is_header)
  .option("sep", delimiter)
  .load(file_location3)

##### Convertimos los DF generados en vistas temporales para trabajar con SQL

In [0]:
alumnos.createOrReplaceTempView("v_alumnos")
cursos.createOrReplaceTempView("v_cursos")
notas.createOrReplaceTempView("v_notas")

In [0]:
%sql
select concat(nombre, " ", apellido) as NombreAlumno, nombreCurso, nota
from v_alumnos
inner join v_notas on v_alumnos.idAlumno = v_notas.idAlumno
inner join v_cursos on v_cursos.idCurso = v_notas.idCurso

NombreAlumno,nombreCurso,nota
Carlos Valladares,Fundamentos,8
Carlos Valladares,Spark,10
Carlos Valladares,Cloud,10
Maria Robles,Fundamentos,5
Maria Robles,Spark,9
Maria Robles,Cloud,8
Pedro Martinez,Fundamentos,5
Pedro Martinez,Spark,3
Pedro Martinez,Cloud,5
Josefa Calzadilla,Fundamentos,2


##### Creamos un dataFrame con la consulta anterior para luego convertirlo en tabla delta

In [0]:
%python
notas_estudiantes = spark.sql("""
    SELECT concat(nombre, ' ', apellido) AS NombreAlumno, nombreCurso, nota 
    FROM v_alumnos 
    INNER JOIN v_notas ON v_alumnos.idAlumno = v_notas.idAlumno 
    INNER JOIN v_cursos ON v_cursos.idCurso = v_notas.idCurso
""")

In [0]:
%python
notas_estudiantes.write.format("delta").mode("overwrite").saveAsTable("delta_notas_estudiantes")