![IBGE LOGO](https://s3-sa-east-1.amazonaws.com/arrematearte-farm/cristinagoston/lot_photos/11920/e2b04faba30f14f269a12c88c1dffc839e637495_ml.jpg)



# CAMADA BRONZE

In [0]:
from delta.tables import *
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pytz
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.functions import *
from datetime import datetime

import requests
import json

In [0]:
try:
  time_file = datetime.now(pytz.timezone('America/Sao_Paulo')).strftime('%Y%m%d_%H%M%S')
  # DeltaTable.isDeltaTable(spark, bronze_delta_path)

  #### Nome  e caminho onde será feita a escrita da da tabela Delta  ##########
  db = 'bronze.'
  table = 'ibge_news'
  delta_table = f'{db}{table}'
  bronze_delta_path = 'dbfs:/mnt/bronze/'

  ### Sistema de origem ####
  source_system_path = 'dbfs:/mnt/raw_3/'

  ### caminho para gravação do histórico ###
  historic_path = 'dbfs:/mnt/historic/bronze/'
  
  print("##----------------------------##")
  print(f"Data e Horário de execusão             ===> {time_file} \n")
  print(f"Tabela bronze a ser criada             ===>  {delta_table}")
  print(f"Caminho da tabela Bronze a ser criada  ===> {bronze_delta_path} \n")
  print("##----------------------------##")
  print(f"Sistema de origem                      ===> {source_system_path}")
  print(f"Caminho para gravação do histórico     ===> {historic_path}")
except Exception as e:
  print(e)

##----------------------------##
Data e Horário de execusão             ===> 20241020_100532 

Tabela bronze a ser criada             ===>  bronze.ibge_news
Caminho da tabela Bronze a ser criada  ===> dbfs:/mnt/bronze/ 

##----------------------------##
Sistema de origem                      ===> dbfs:/mnt/raw_3/
Caminho para gravação do histórico     ===> dbfs:/mnt/historic/bronze/


### Listando arquivos camada RAW

### Separando Arquivos de 10 em 10

### Criando Classe de Ingestao na Bronze

In [0]:
class BronzeIngestion:

  def __init__(self,db,table,bronze_delta_path,source_system_path,historic_path):
    self.db                 = db
    self.table              = table
    self.delta_table        = f"{self.db}.{self.table}"
    self.bronze_delta_path  = bronze_delta_path
    self.source_system_path = source_system_path
    self.historic_path      = historic_path

  ### Listando arquivos camada RAW   ####
  def list_all_files(self):
    try:
      #### Faz a comparação entre Raw e histórico caso ja existam arquivos no Historico  ####
      ################################################################################
      
      #### Lista de arquivos da Raw ####
      lst_folder_date = [lst_date.name.replace('/','') for lst_date in dbutils.fs.ls(f'{self.source_system_path}') if lst_date.name.replace('/','').isnumeric() ]
      lst_folder_date.sort()
      raw_set = {files.path.split('/')[-2] for files in dbutils.fs.ls(f'{self.source_system_path}{lst_folder_date[-1]}/')} 
      ### Lista de arquivos Historico   ####
      lst_folder_date_historic = [lst_date.name.replace('/','') for lst_date in dbutils.fs.ls(f'{self.historic_path}') if lst_date.name.replace('/','').isnumeric()  ]
      lst_folder_date_historic.sort()
      historic_set = {files.path.split('/')[-2]  for files in dbutils.fs.ls(f'{self.historic_path}{lst_folder_date_historic[-1]}/')}

      all_files = [f'{self.source_system_path}{lst_folder_date[-1]}/{files}'  for files in list(raw_set.symmetric_difference(historic_set))]
      print(f'Número de arquivos para ingestão == > {len(all_files)}')
      return all_files

    except:
      try:
        #### Caso naõa existm arquivos no histórico é listado todos arquivos na Raw ####
        ################################################################################
        ## Verifica a pasta data mais recente ###
        lst_folder_date = [lst_date.name.replace('/','') for lst_date in dbutils.fs.ls(f'{self.source_system_path}')]
        lst_folder_date.sort()
        ### self.source_system_path => sistema de origem   ####
        all_files = []
        for files in dbutils.fs.ls(f'{self.source_system_path}{lst_folder_date[-1]}/'):
          all_files.append([files.path,files.path.split("_")[-1].replace("/","")])
        all_files.sort(key = lambda pos:pos[1])
        # print(all_files)
        all_files_found = [all_files[i][0] for i in range(0,len(all_files))]
        print(f'Número de arquivos para ingestão == > {len(all_files_found)}')
        return all_files_found
      except Exception as error:
        print(f"{error}")       


  
  def separate_files(self):
    try:
      ### Separando arquivos a cada 10 items ou a quantidade que for..
      ### caso seja menor do que 10
      all_files = self.list_all_files()
      return  [ all_files[i:i+10] for i in range(0,len(all_files),10)]
    
    except Exception as error:
      print(f"{error}") 
  
  ### Salvando Dados na camada Bronze ###
  def save_files_delta(self):
    try:
      print(" Inicinado método list_all_files().... \n Inicinado método list_all_files() .... \n Inicinado método save_files_delta() ....\n")
      files_to_record = self.separate_files()
      for files in files_to_record:
        # print(files,len(files),"\n")
        df =spark.read.json(files)
        #### Adicionando data de ingestao ####
        df = df.withColumn('DTPROC',lit(datetime.now(pytz.timezone('America/Sao_Paulo')).strftime('%Y%m%d_%H%M%S')))
        print(f" Salvando dados da API no caminho ===> {self.bronze_delta_path}  com o nome ===>  {self.delta_table} ")
        df.write.mode('append').format('delta').save(self.bronze_delta_path)
      print("\n Dados salvos em formato delta. \n")
    except Exception as error:
      print(f"{error}")

  ##### Copiando arquivos para Historico  ####
  def copy_to_historic(self):
    try:
      ## Verifica a pasta data mais recente ###
      lst_folder_date = [lst_date.name.replace('/','') for lst_date in dbutils.fs.ls(f'{self.source_system_path}')]
      lst_folder_date.sort()
      print(f"\n Copiando arquivos de ==> {self.source_system_path}{lst_folder_date[-1]}/ para ==>  {self.historic_path}{lst_folder_date[-1]}/ \n ")
      dbutils.fs.cp(f'{self.source_system_path}{lst_folder_date[-1]}/',f'{self.historic_path}{lst_folder_date[-1]}/',recurse = True)
      print(" Copia realizada com sucesso.\n")
    except Exception as error:
      print(f"{error}")
  
  
  ### Criando tabela no hive_metastore (catalogo Databricks)  ####
  def create_delta_table_hive(self):
    try:
      sql = f""" CREATE DATABASE IF NOT EXISTS {self.db} """
      spark.sql(sql)
      print(sql,"\n")

      sql_drop = f""" DROP TABLE IF EXISTS {self.delta_table}  """
      spark.sql(sql_drop)
      print(sql_drop,"\n") 

      sql_table = f""" CREATE TABLE IF NOT EXISTS {self.delta_table} USING DELTA LOCATION '{self.bronze_delta_path}' """
      spark.sql(sql_table)
      print(sql_table)

    except Exception as error:
      print(f"{error}")

  

  #### Executando  ####
  def bronze_run(self):
    print("Inicio do processo de ingestão na bronze... \n")
    # self.list_all_files()
    # self.separate_files()
    self.save_files_delta()
    self.copy_to_historic()
    self.create_delta_table_hive()
    print("Processo finalizado!..")

### Ajustar Copy to historic.. para copiar apenas lista de arquivos vrificadas

In [0]:
bronze = BronzeIngestion('bronze','ibge_news','dbfs:/mnt/bronze/','dbfs:/mnt/raw_3/','dbfs:/mnt/historic/bronze/')
bronze.bronze_run()

Inicio do processo de ingestão na bronze... 

 Inicinado método list_all_files().... 
 Inicinado método list_all_files() .... 
 Inicinado método save_files_delta() ....

Número de arquivos para ingestão == > 1
 Salvando dados da API no caminho ===> dbfs:/mnt/bronze/  com o nome ===>  bronze.ibge_news 

 Dados salvos em formato delta. 


 Copiando arquivos de ==> dbfs:/mnt/raw_3/202410/ para ==>  dbfs:/mnt/historic/bronze/202410/ 
 
 Copia realizada com sucesso.

 CREATE DATABASE IF NOT EXISTS bronze  

 DROP TABLE IF EXISTS bronze.ibge_news   

 CREATE TABLE IF NOT EXISTS bronze.ibge_news USING DELTA LOCATION 'dbfs:/mnt/bronze/' 
Processo finalizado!..


In [0]:
%sql
select count(*) from delta.`dbfs:/mnt/bronze/`
-- select * from bronze.ibge_news

count(1)
30954


In [0]:
# display(dbutils.fs.ls('dbfs:/mnt/historic/bronze/202410/'))

## apagar pastas
# dbutils.fs.rm('dbfs:/mnt/raw/',True)
# dbutils.fs.rm('dbfs:/mnt/bronze/',True)
# dbutils.fs.rm('dbfs:/mnt/historic/bronze/',True)

## criar pasta
# dbutils.fs.mkdirs('dbfs:/mnt/raw/')
# dbutils.fs.mkdirs('dbfs:/mnt/raw_2/')
# dbutils.fs.mkdirs('dbfs:/mnt/raw_3/')

In [0]:
# ### é Possivel fazer a Leitura de varios arquivos de forma Multipla, passando a referencia deles dentro de uma lista  ##########

# spark.read.json(files).display()