In [0]:
%run ./env

In [0]:
storageAccountName = ACCOUNT_NAME
sasToken = SAS_TOKEN

def mount_adls(blobContainerName):
    try:
        dbutils.fs.mount(
            source = "wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
            mount_point = f"/mnt/{storageAccountName}/{blobContainerName}",
            extra_configs = {'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
        )
        print("OK!")
    except Exception as e:
        print("Falha", e)

In [0]:
mounts = dbutils.fs.mounts()

if not any(mount.mountPoint == f"/mnt/{ACCOUNT_NAME}/bronze" for mount in mounts):
    mount_adls("bronze")

if not any(mount.mountPoint == f"/mnt/{ACCOUNT_NAME}/silver" for mount in mounts):
    mount_adls("silver")

if not any(mount.mountPoint == f"/mnt/{ACCOUNT_NAME}/gold" for mount in mounts):
    mount_adls("gold")

In [0]:
from datetime import datetime
from pyspark.sql.functions import lit

def silver():
    # Recuperar dados da bronze
    try:
        df_clientes     = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/clientes")
        df_categorias   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/categorias")
        df_produtos     = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/produtos")
        df_pedidos      = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/pedidos")
        df_itens_pedido = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/itens_pedido")
        df_enderecos    = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/enderecos")
        df_pagamentos   = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/pagamentos")
        df_estoque      = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/estoque")
    except Exception as e:
        print("Erro ao recuperar arquivos do bronze")
        raise e
        return
    
    # Metadodos
    try:
        df_clientes     = df_clientes    .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("clientes"))
        df_categorias   = df_categorias  .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("categorias"))
        df_produtos     = df_produtos    .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("produtos"))
        df_pedidos      = df_pedidos     .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("pedidos"))
        df_itens_pedido = df_itens_pedido.withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("itens_pedido"))
        df_enderecos    = df_enderecos   .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("enderecos"))
        df_pagamentos   = df_pagamentos  .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("pagamentos"))
        df_estoque      = df_estoque     .withColumn("DL_DATA_HORA_SILVER", lit(datetime.now().timestamp())).withColumn("DL_NOME_ARQUIVO_SILVER", lit("estoque"))
    except Exception as e:
        print("Erro ao adicionar metadados silver")
        raise e
        return
    
    # Alterar nome das colunas
    try:
        df_clientes = df_clientes \
            .withColumnRenamed("cliente_id",    "ID") \
            .withColumnRenamed("nome",          "NOME") \
            .withColumnRenamed("email",         "EMAIL") \
            .withColumnRenamed("telefone",      "TELEFONE") \
            .withColumnRenamed("data_cadastro", "DATA_CADASTRO") \
            .withColumnRenamed("DATA_HORA_BRONZE",    "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")
        
        df_categorias = df_categorias \
            .withColumnRenamed("categoria_id",   "ID") \
            .withColumnRenamed("nome_categoria", "NOME") \
            .withColumnRenamed("DATA_HORA_BRONZE",    "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")
            
        df_produtos = df_produtos \
            .withColumnRenamed("produto_id",     "ID") \
            .withColumnRenamed("categoria_id",   "ID_CATEGORIA") \
            .withColumnRenamed("nome_produto",   "NOME") \
            .withColumnRenamed("preco_unitario", "PRECO_UNITARIO") \
            .withColumnRenamed("DATA_HORA_BRONZE",    "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")
            

        df_pedidos = df_pedidos \
            .withColumnRenamed("pedido_id",   "ID") \
            .withColumnRenamed("cliente_id",  "ID_CLIENTE") \
            .withColumnRenamed("data_pedido", "DATA") \
            .withColumnRenamed("valor_total", "VALOR_TOTAL") \
            .withColumnRenamed("DATA_HORA_BRONZE",    "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")
            
        df_itens_pedido = df_itens_pedido \
            .withColumnRenamed("item_id",        "ID") \
            .withColumnRenamed("pedido_id",      "ID_PEDIDO") \
            .withColumnRenamed("produto_id",     "ID_PRODUTO") \
            .withColumnRenamed("quantidade",     "QUANTIDADE") \
            .withColumnRenamed("preco_unitario", "PRECO_UNITARIO") \
            .withColumnRenamed("DATA_HORA_BRONZE",    "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")
        
        df_enderecos = df_enderecos \
            .withColumnRenamed("endereco_id", "ID") \
            .withColumnRenamed("cliente_id",  "ID_CLIENTE") \
            .withColumnRenamed("rua",         "RUA") \
            .withColumnRenamed("cidade",      "CIDADE") \
            .withColumnRenamed("estado",      "ESTADO") \
            .withColumnRenamed("cep",         "CEP") \
            .withColumnRenamed("DATA_HORA_BRONZE",   "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")

        df_pagamentos = df_pagamentos \
            .withColumnRenamed("pagamento_id",     "ID") \
            .withColumnRenamed("pedido_id",        "ID_PEDIDO") \
            .withColumnRenamed("data_pagamento",   "DATA_PAGAMENTO") \
            .withColumnRenamed("metodo_pagamento", "METODO_PAGAMENTO") \
            .withColumnRenamed("valor_pago",       "VALOR_PAGO") \
            .withColumnRenamed("DATA_HORA_BRONZE",    "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE", "DL_NOME_ARQUIVO_BRONZE")
            
        df_estoque = df_estoque \
            .withColumnRenamed("produto_id",            "ID") \
            .withColumnRenamed("quantidade_em_estoque", "QUANTIDADE_EM_ESTOQUE") \
            .withColumnRenamed("DATA_HORA_BRONZE",      "DL_DATA_HORA_BRONZE") \
            .withColumnRenamed("NOME_ARQUIVO_BRONZE",   "DL_NOME_ARQUIVO_BRONZE")
    except Exception as e:
        print("Erro ao alterar nomes das colunas")
        raise e
        return
    
    try:
        df_clientes    .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/clientes")
        df_categorias  .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/categorias")
        df_produtos    .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/produtos")
        df_pedidos     .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/pedidos")
        df_itens_pedido.write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/itens_pedido")
        df_enderecos   .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/enderecos")
        df_pagamentos  .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/pagamentos")
        df_estoque     .write.format('delta').mode("overwrite").save(f"/mnt/{storageAccountName}/silver/estoque")
    except Exception as e:
        print("Erro ao salvar dados na silver")
        raise e
        return
    

In [0]:
silver()

In [0]:
spark.read.format('delta').load(f'/mnt/{storageAccountName}/bronze/clientes').limit(10).display()

cliente_id,nome,email,telefone,data_cadastro,data_hora_bronze,nome_arquivo
1,Srta. Clara Pires,heloisamartins@example.org,55(61)907041699,2017-07-20,1732890502.315252,clientes.csv
2,Isis Sampaio,carlos-eduardo15@example.com,55(48)919799599,2004-12-02,1732890502.315252,clientes.csv
3,Felipe Araújo,heloisasa@example.org,5594943369883,2023-01-26,1732890502.315252,clientes.csv
4,Thomas Nascimento,jose-miguel94@example.net,55(22)961758696,1979-01-10,1732890502.315252,clientes.csv
5,Daniel Cunha,pereirarodrigo@example.net,55(045)95869794,1974-07-12,1732890502.315252,clientes.csv
6,Lorenzo Freitas,cporto@example.com,55(58)915004879,1994-09-02,1732890502.315252,clientes.csv
7,Gustavo Henrique das Neves,hpires@example.net,55(031)92075737,2008-11-04,1732890502.315252,clientes.csv
8,Pietra Albuquerque,alexandremonteiro@example.org,5599902980159,1998-07-25,1732890502.315252,clientes.csv
9,Leandro Carvalho,bnogueira@example.org,5569902732667,2012-11-14,1732890502.315252,clientes.csv
10,Marina Almeida,rcorreia@example.net,55(21)977264737,2001-11-05,1732890502.315252,clientes.csv


In [0]:
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/pedidos').limit(10).display()

ID,ID_CLIENTE,DATA_PEDIDO,VALOR_TOTAL,DL_DATA_HORA_BRONZE,nome_arquivo,DL_DATA_HORA_SILVER,DL_NOME_ARQUIVO_SILVER
1,8489,1991-08-19,48843.51,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
2,8156,1986-02-22,12775.57,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
3,3787,1970-11-09,27226.85,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
4,1246,1973-01-22,88888.45,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
5,2884,1990-02-01,4522.46,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
6,7132,1972-02-11,90845.62,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
7,4494,1979-10-09,60962.24,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
8,6776,2014-05-09,22211.63,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
9,579,2002-12-02,32398.46,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
10,8170,1980-12-18,5061.28,1732890502.346638,pedidos.csv,1732898380.493592,pedidos
