In [0]:
def save_table(df, table_name, layer):
    (
        df.write.mode("append").format('delta')
        .option("mergeSchema", "true")    
        .option("delta.columnMapping.mode", "name")    
        .saveAsTable(f"{layer}_layer.{table_name}")
    )

In [0]:
def limpar_nome_coluna(nome):

    # Remoção de acentos e demais caracteres especiais
    nome = re.sub(r'[áàâãä]', 'a', nome, flags=re.IGNORECASE)
    nome = re.sub(r'[éèêë]', 'e', nome, flags=re.IGNORECASE)
    nome = re.sub(r'[íìîï]', 'i', nome, flags=re.IGNORECASE)
    nome = re.sub(r'[óòôõö]', 'o', nome, flags=re.IGNORECASE)
    nome = re.sub(r'[úùûü]', 'u', nome, flags=re.IGNORECASE)
    nome = re.sub(r'[ç]', 'c', nome, flags=re.IGNORECASE)
    nome = re.sub(r'[^\w\s]', '', nome)

    # Removendo underlines
    nome = nome.replace('_', '')

    # Troca espaços por underline
    nome = nome.replace(' ', '_')

    # Retorna o nome da coluna ajustado e em caracteres maiúsculos
    return nome.upper()

In [0]:
def analise_nulos(df_teste):

    total_linhas = df_teste.count()
    print(f">>> Total de linhas: {total_linhas}")

    # Total de nulos por coluna
    nulos_por_coluna = df_teste.select([
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df_teste.columns
    ])

    nulos_dict = nulos_por_coluna.first().asDict()

    for coluna, qtd_nulos in nulos_dict.items():
        if(qtd_nulos > 0):
            print(f"{coluna}: {qtd_nulos} registros nulos ({round(qtd_nulos/total_linhas*100, 2)}%)")

In [0]:
def ingestao_dados(dados, table_name, schema):
    
    # Leitura dos dados da tabela existente (caso já tinha sido salva anteriormente)
    dados_existentes = spark.read.table(f"{schema}_layer.{table_name}");
    
    # Verificando se os dados importados já existem
    verifica_tabela = spark.catalog.tableExists(f"{schema}_layer.{table_name}");

    # Se a tabela já existir, realiza filtragem no dataframe importado para inserir apenas os dados novos
    if verifica_tabela:         

        colunas = dados.columns

        condicoes = [dados[c] == dados_existentes[c] for c in colunas]

        df_filtrado = dados.join(dados_existentes, on=condicoes, how="left_anti")
        
        df_filtrado = df_filtrado.dropDuplicates()

        print('>> Camada: Silver')
        print(f'>> Caminho da tabela bronze: bronze_layer.{table_name}')
        print('>>> Executando carga de dados...', end=" ")

        try:        
            save_table(df_filtrado, table_name, 'silver')
            print('CARGA FINALIZADA COM SUCESSO! \n')
        except:
            print('ERRO NA CARGA DE DADOS... \n')    

    # Caso a tabela não exista, realiza a carga dos dados pela primeira vez
    else:        
        
        print('>> Camada: Silver')
        print(f'>> Caminho da tabela bronze: bronze_layer.{table_name}')
        print('>>> Executando carga de dados...', end=" ")

        dados = dados.dropDuplicates()

        try:        
            save_table(dados, table_name, 'silver')
            print('CARGA FINALIZADA COM SUCESS! \n')
        except:
            print('ERRO NA CARGA DE DADOS... \n')   


In [0]:
def analise_qualidade_dados(df):

    # Filtro de registros com ano inválido
    print('>>> A coluna ANO_DO_EXERCICIO não pode estar fora do intervalo entre 1900 e o ano atual. Verificando...\n')
    
    print(f'Qtd de registros com ANO_DO_EXERCICIO inválido: {df.filter((col('ANO_DO_EXERCICIO') < 1900) | (col('ANO_DO_EXERCICIO') > year(current_date()))).count()}')

    print('Filtrando registros com ANO_DO_EXERCICIO fora do intervalo entre 1900 e o ano atual...', end=" ")

    df = df.filter(~((col('ano do exercício') < 1900) | (col('ano do exercício') > year(current_date()))))
    print('OK!\n')

    # Filtro de registros com CEP inválido
    print('>>> Para ser válido, o CEP não pode ter valor nulo e deve possuir 8 dígitos. Verificando...\n')
    
    print(f'Qtd de registros com CEP inválido: {df.filter(col('CEP').isNotNull() & (col('CEP') < 10000000) | (col('CEP') > 99999999)).count()}')

    print('Filtrando registros com CEP inválido...', end=" ")

    df = df.filter(~(col('CEP').isNotNull() & (col('CEP') < 10000000) | (col('CEP') > 99999999)))

    print('OK!\n')

    # Filtro de registros em que o valor do IPTU é maior do que o valor do imóvel (dados inconsistentes)
    print('>>> O IPTU do imóvel não pode ser maior do que o valor do próprio imóvel. Verificando...\n')
    
    print(f'Qtd de registros com IPTU maior do que o valor do imóvel: {df.filter(col('VALOR_COBRADO_DE_IPTU') > col('VALOR_TOTAL_DO_IMOVEL_ESTIMADO')).count()}')

    print('Filtrando registros em que o valor do IPTU é maior do que o valor do imóvel (dados inconsistentes)...', end=" ")

    df = df.filter(col('VALOR_COBRADO_DE_IPTU') < col('VALOR_TOTAL_DO_IMOVEL_ESTIMADO'))

    print('OK!\n')

    # Filtro de registros com inconsistência no preenchimento da coluna ESTADO
    print('>>> A coluna que tem a informação do ESTADO deve possuir apenas 2 caracteres com letras. Verificando...\n')

    print(f'Qtd de registros com inconsistência na coluna ESTADO: {df.filter(col('ESTADO').isNotNull() & (col('ESTADO').rlike("^[A-Z]{2}$") == False)).count()}')

    print('Filtrando registros com inconsistências no preenchimento da coluna ESTADO...', end=" ")

    df = df.filter(col('ESTADO').isNotNull() & (col('ESTADO').rlike("^[A-Z]{2}$") == True))

    print('OK!\n')

    # Filtro de registros com valor de IPTU menor ou igual a zero
    print('>>> O valor do IPTU do imóvel deve ser maior do que zero. Verificando...\n')

    print(f'Qtd de registros com valor de IPTU menor ou igual a zero: {df.filter(col('VALOR_COBRADO_DE_IPTU') <= 0).count()}')

    print('Filtrando registros com valor de IPTU menor ou igual a zero...', end=" ")
    df = df.filter(col('VALOR_COBRADO_DE_IPTU') > 0)

    print('OK!')

    return df