In [9]:
%run ../../../ingestion/ingestion.ipynb
%run ../../../data\ quality/quality.ipynb
%run ../../../write/write.ipynb
%run ../../../alerts/webhook.ipynb

25/05/07 22:01:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/05/07 22:01:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [10]:
from pyspark.sql import functions as F

In [11]:
# Cria uma instância da classe WebhookTeams utilizando o arquivo de configuração localizado em ../../config/teams.json
webhook = WebhookTeams("../../../config/teams.json")

In [12]:
try:
    # Cria um leitor de CSV com delimitador de linha definido como ","
    reader = CSVReader(line_delimiter=",")
    
    # Executa a leitura do arquivo CSV e retorna um DataFrame
    df = client_code(reader, "../../../layer/bronze/economia/bd_agricultura.csv")
except:
    # Em caso de erro durante a leitura, envia uma notificação via Teams
    webhook.send_teams_notification("Erro de leitura!")


In [13]:
df.show()

+----+----------------+------------------------+---------------------------+---------------------------+
|Data|Codigo_municipio|area_colhida_em_hectares|quantidade_produzida_em_ton|valor_producao_em_mil_reais|
+----+----------------+------------------------+---------------------------+---------------------------+
|2013|         3500501|                     100|                        800|                        306|
|2014|         3500501|                     100|                        570|                        227|
|2015|         3500501|                     100|                        650|                        270|
|2016|         3500501|                     100|                        696|                        464|
|2021|         3500709|                     100|                        600|                        898|
|2014|         3500808|                     100|                        345|                        111|
|2022|         3501509|                     100|       

In [None]:
try:
    # Cria um leitor de CSV com delimitador de linha definido como ","
    reader = ParquetReader()
    
    # Executa a leitura do arquivo CSV e retorna um DataFrame
    df_din = client_code(reader, "../../../layer/prata/dimensao/dimensao_ibge/*.parquet")
    df_din = df_din.select("cod_ibge", "municipios", "reg_administrativa")
except:
    # Em caso de erro durante a leitura, envia uma notificação via Teams
    webhook.send_teams_notification("Erro de leitura!")


In [15]:
# Instancia a classe de utilitários de qualidade de dados com o DataFrame original
dq = DataQualityUtils(df)

# Encadeia as transformações de qualidade de dados no DataFrame
df_tratado = (
    dq.remove_espacos_em_colunas()  # Remove espaços extras dos nomes das colunas
      .trim_strings()  # Remove espaços em branco no início e fim das strings de todas as colunas
      #.remover_caracteres_especiais(['localidades'])  # (Comentado) Remove caracteres especiais da coluna 'localidades'
      .remover_duplicatas()  # Remove registros duplicados do DataFrame
      .preencher_nulos({  # Preenche valores nulos com zero nas colunas especificadas
          'area_colhida_em_hectares': 0,
          'quantidade_produzida_em_ton': 0,
          'valor_producao_em_mil_reais': 0
      })
      .converter_para_numerico(['area_colhida_em_hectares', 'quantidade_produzida_em_ton',
                                'valor_producao_em_mil_reais'])
      .get_df()  # Retorna o DataFrame tratado
)


In [16]:
df_din.show()

+--------+--------------------+--------------------+
|cod_ibge|          municipios|  reg_administrativa|
+--------+--------------------+--------------------+
| 3500000|Estado sem especi...|Estado sem especi...|
| 3500105|          Adamantina|RA de Presidente ...|
| 3500204|              Adolfo|RA de São José do...|
| 3500303|               Aguaí|      RA de Campinas|
| 3500402|      Águas da Prata|      RA de Campinas|
| 3500501|    Águas de Lindóia|      RA de Campinas|
| 3500550|Águas de Santa Bá...|      RA de Sorocaba|
| 3500600|  Águas de São Pedro|      RA de Campinas|
| 3500709|              Agudos|         RA de Bauru|
| 3500758|            Alambari|      RA de Sorocaba|
| 3500808|   Alfredo Marcondes|RA de Presidente ...|
| 3500907|              Altair|      RA de Barretos|
| 3501004|         Altinópolis|RA de Ribeirão Preto|
| 3501103|         Alto Alegre|     RA de Araçatuba|
| 3501152|            Alumínio|      RA de Sorocaba|
| 3501202|    Álvares Florence|RA de São José 

In [17]:
df_tratado.show()

[Stage 8:>                                                          (0 + 1) / 1]

+----+----------------+------------------------+---------------------------+---------------------------+
|Data|Codigo_municipio|area_colhida_em_hectares|quantidade_produzida_em_ton|valor_producao_em_mil_reais|
+----+----------------+------------------------+---------------------------+---------------------------+
|2019|         3508207|                   200.0|                     1200.0|                      605.0|
|2021|         3526001|                   200.0|                      840.0|                     1260.0|
|2013|         3516002|                   500.0|                     1950.0|                      742.0|
|2019|         3522000|                   500.0|                     2880.0|                     1715.0|
|2021|         3542107|                    50.0|                      250.0|                      313.0|
|2018|         3546900|                   400.0|                     1860.0|                     1042.0|
|2015|         3547601|                   250.0|       

                                                                                

In [19]:
# Faz o relacionamento com a dimensão de códigos do IBGE
df_tratado = df_tratado.alias("tratado").join(
    df_din.alias("din"),
    F.col("tratado.Codigo_municipio") == F.col("din.cod_ibge"),
    how="left"
)

In [20]:
# Exibe as primeiras linhas do DataFrame tratado para visualização dos dados após as transformações
df_tratado.show()

+----+----------------+------------------------+---------------------------+---------------------------+--------+--------------------+--------------------+
|Data|Codigo_municipio|area_colhida_em_hectares|quantidade_produzida_em_ton|valor_producao_em_mil_reais|cod_ibge|          municipios|  reg_administrativa|
+----+----------------+------------------------+---------------------------+---------------------------+--------+--------------------+--------------------+
|2019|         3508207|                   200.0|                     1200.0|                      605.0| 3508207|           Buritizal|        RA de Franca|
|2021|         3526001|                   200.0|                      840.0|                     1260.0| 3526001|      Junqueirópolis|RA de Presidente ...|
|2013|         3516002|                   500.0|                     1950.0|                      742.0| 3516002|    Flórida Paulista|RA de Presidente ...|
|2019|         3522000|                   500.0|                

In [None]:
try:
    # Cria uma instância da classe responsável por exportar em formato Parquet
    exporter = ParquetExport()

    # Exporta o DataFrame para o caminho local especificado
    exporter.export(df_tratado, "../../../layer/prata/economia/bd_agricultura/", destination="local")
except:
    # Em caso de erro durante a exportação, envia uma notificação via Teams
    webhook.send_teams_notification("Erro de escrita!")

                                                                                