In [0]:
'''
Iniciando clean_bronze_data_countries
'''

'\nIniciando clean_bronze_data_countries\n'

Importando funções comuns para uso no notebook

In [0]:
%run
./shared_silver_functions

Inicializando sessão spark e importando dados do banco bronze

In [0]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import functions as F
import urllib

# Inicializa a sessão Spark
spark = SparkSession.builder.appName("accidents_analysis").getOrCreate()

# Pega os dados bronze
spark.catalog.listDatabases()
countries_bronze_df = spark.read.format("delta").load('dbfs:/user/hive/warehouse/bronze_database.db/bronze_countries')

countries_bronze_df.display()


country_name,country_code
Afghanistan,YA
Albania,ZA
Algeria,7T
American Samoa,Nas
Andorra,C3
Angola,D2
Anguilla,VP-A
Antigua and Barbuda,V2
Argentina,LV
Armenia,EK


## Processamento
Seção para realizar a limpeza padrão de dados:
- Fazer o trim para remover espaços antes e ao final dos valores
- Transformar valores vazios ou sem caracteres em null
- Dropar duplicatas
- Adicionar o "país" Europa no dataset

In [0]:
from functools import reduce

# Lista de colunas que desejamos fazer o trim
all_columns = countries_bronze_df.columns


# Função lambda para aplicar o trim em mais de uma coluna
countries_bronze_df = reduce(
    lambda df, col: df.withColumn(col, F.trim(df[col])),
    all_columns,
    countries_bronze_df,
)

# Valores vazios transformar em null
# Função lambda para deixar como null colunas que não possuam nem texto, nem números, ou seja que tenham apenas caracteres especiais
countries_bronze_df = reduce(
    lambda df, col: df.withColumn(
        col, F.when(F.regexp_like(col, F.lit("^[^\w]*$")), None).otherwise(F.col(col))
    ),
    all_columns,
    countries_bronze_df,
)

# Função lambda para substituir valores vazios por null em uma coluna
countries_bronze_df = reduce(
    lambda df, col: df.withColumn(
        col, F.when(F.col(col) == "", None).otherwise(F.col(col))
    ),
    all_columns,
    countries_bronze_df,
)

# Drop em duplicatas
countries_bronze_df = countries_bronze_df.dropDuplicates()

# Adicionando o "país" Europa com country_name: Europe e country_code: EU
new_country = spark.createDataFrame([("Europe","EU")],["country_name","country_code"])

countries_bronze_df = countries_bronze_df.union(new_country)

countries_bronze_df.display()

country_name,country_code
Sudan,ST
Sweden,SE
Afghanistan,YA
Albania,ZA
China,B
Georgia,4L
Iraq,YI
Caribbean Netherlands,PJbes
Cape Verde,D4
Greenland,OYg


## Avaliando a qualidade dos dados
- Testar se não há nulos em country_names
- Testar se não há nulos em country_codes

In [0]:
# Testando a qualidade dos dados
test_col_not_null(countries_bronze_df,"country_name")
test_col_not_null(countries_bronze_df,"country_code")


Avaliando a condição country_name não contem nulos
Avaliando a condição country_code não contem nulos


## Registrando dados no banco
Após aprovação, salvamos o dado sanitizado no banco e verificamos se tivemos sucesso

In [0]:
# Cria o banco de dados se ele não existir
database_name = 'silver_database'
table_name = 'silver_countries'


spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# Salva o DataFrame como tabela Delta
countries_bronze_df.write.mode("overwrite").format("delta").option("mergeSchema", "true").saveAsTable(
    f"{database_name}.{table_name}"
)

In [0]:
%sql
SELECT * from silver_database.silver_countries

country_name,country_code
Sudan,ST
Sweden,SE
Afghanistan,YA
Albania,ZA
China,B
Georgia,4L
Iraq,YI
Caribbean Netherlands,PJbes
Cape Verde,D4
Greenland,OYg


In [0]:
'''
Finalizando clean_bronze_data_countries
'''

'\nFinalizando clean_bronze_data_countries\n'