In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, LongType, TimestampType 
from pyspark.sql.functions import concat_ws, col, regexp_replace, when, lit, desc, max, to_date, regexp_extract, count, to_timestamp, date_format, udf
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
import re

In [0]:
%run /Workspace/Admin/src/spark_performance

In [0]:
def carregando_ultima_partition(nome_tabela, coluna_particao):

    schema = StructType([
        StructField("id_envio", StringType(), nullable=True),
        StructField("corredor_de_armazenagem", StringType(), nullable=True),
        StructField("metodo_de_envio", StringType(), nullable=True),
        StructField("ligações_do_cliente", StringType(), nullable=True),
        StructField("avaliação_do_cliente", StringType(), nullable=True),
        StructField("preço", StringType(), nullable=True),
        StructField("qtd_itens", StringType(), nullable=True),
        StructField("importancia", StringType(), nullable=True),
        StructField("genero", StringType(), nullable=True),
        StructField("desconto", StringType(), nullable=True),
        StructField("peso_g", StringType(), nullable=True),
        StructField("Chegou_no_tempo", StringType(), nullable=True),
        StructField("Destino", StringType(), nullable=True),
        StructField("DataEnvio", StringType(), nullable=True),
        StructField("dataEntrega", StringType(), nullable=True),
        StructField("avaliacaoEntrega", StringType(), nullable=True),
        StructField("data_ref", StringType(), True)
    ])

    print(f"Tentando ler arquivo Delta de: {nome_tabela}")
    
    spark = SparkSession.builder.getOrCreate()
    dbutils = DBUtils(spark) 

    # 1- Listando arquivos Delta no Volume Bronze
    print(f"\n--- Verificação do caminho Volume Bronze ---")
    try:
        dbutils.fs.ls(nome_tabela)
        print(f"STATUS OK - Caminho '{nome_tabela}' existe. Tentando carregar como Delta.")
    except Exception as e:
        print(f"STATUS ALERTA - O caminho '{nome_tabela}' não existe ou não é acessível. Detalhe: {e}. Gerando um DataFrame vazio.")
        print("\nRetornando um dataframe vazio!")
        # Em caso de erro, retorne um dataframe vazio com o schema definido anteriormente
        return spark.createDataFrame([], schema=schema)
    
    # 2-  Recuperar partições existentes
    try:
        df = spark.read.format("delta").load(nome_tabela)
    except Exception as e:
        print(f"STATUS ALERTA - Erro ao acessar a tabela '{nome_tabela}': {e}")

    try:

        ultima_particao_df = df.select(max(coluna_particao).alias("ultima_particao"))
        
        ultima_particao = (
            ultima_particao_df.first()["ultima_particao"]
            if ultima_particao_df.first()
            else None
        )

    except Exception as e:
        print(f"STATUS ALERTA - Erro ao obter última partição da tabela '{nome_tabela}': {e}")

    # 3- Tentando ler o arquivo Delta do Volume Bronze
    print(f"\n--- Verificação da leitura do arquivo Delta ---")
    try:

        filtro = f"{coluna_particao} = '{ultima_particao}'"

        df = spark.read.format("delta").load(nome_tabela) \
                .where(filtro)
        print(f"STATUS OK - Tabela Delta '{nome_tabela}' carregada com sucesso pela partição: {ultima_particao}.")

    except Exception as e:
        # Em caso de erro, retorne um dataframe vazio com o schema definido anteriormente
        print(f"STATUS ALERTA - Erro ao carregar a tabela Delta '{nome_tabela}'. Provavelmente não é uma tabela Delta válida ou não contém dados. Detalhe do erro: {e}")
        print("\nRetornando um dataframe vazio!")
        return spark.createDataFrame([], schema=schema)
    
    # 4- Verificando se o dataframe está vazio
    # Em caso positivo, retorne um dataframe vazio com o schema definido anteriormente
    print(f"\n--- Verificação se arquivo Delta está vazio ---")
    if df.rdd.isEmpty():
        print(f"STATUS ALERTA - Tabela '{nome_tabela}' foi carregada como Delta VÁLIDA, mas está completamente vazia. Retornando DataFrame vazio com o schema definido.")
        print("\nRetornando um dataframe vazio!")
        return spark.createDataFrame([], schema=schema)
    else:
        print(f"STATUS OK - Tabela Delta '{nome_tabela}' não está vazio.")
        return df


# Caminho para a external location do diretório bronze
bronze_path = f"/Volumes/grainlogistic/grain/bronze/"

# Lendo arquivo Delta do diretório bronze pela ultima partição
df = carregando_ultima_partition(bronze_path, "data_ref")

In [0]:
def filter_not_null_value(df, coluna):

    print(f"Iniciando o filtro de valores vazios na coluna: {coluna}")

    dffiltered = df.filter(col(coluna).isNotNull())

    qtddotal = df.count()
    qtdnotnull = df.filter(col(coluna).isNotNull()).count()
    qtdnull = df.filter(col(coluna).isNull()).count()

    print(f"\nDataframe filtrado, numero de linhas: {qtdnotnull}")

    assert qtddotal == (qtdnull + qtdnotnull), \
    f"\nErro na contagem: O total de linhas ({qtddotal}) não é igual à soma de nulos ({qtdnull}) e não nulos ({qtdnotnull}) para a coluna '{coluna}'."

    print(f"\nFiltro ralizado com sucesso")
    print(f"\ndf origem {qtddotal} linhas = df filtrado {qtdnotnull} linhas + df não filtrado {qtdnull} linhas")

    return dffiltered

dffiltered = filter_not_null_value(df, "id_envio")

In [0]:
def define_numeric_columns(df):

    print(f"Iniciando conversão de colunas com padrões inteiros/decimais")

    # Regex para identificar valores inteiras e monetários
    regex_inteiro = re.compile(r"^\d+$")
    regex_decimal = re.compile(r"^[+-]?(\d+(\.\d*)?|\.\d+)$")

    # Obtendo colunas de tipo string
    colunas_string = [coluna for coluna, dtype in df.dtypes if dtype == "string"]
    colunas_inteiras = []
    colunas_decimais = []

    # Identifica colunas com valores inteiras e monetários
    for coluna in colunas_string:
        df_sem_nulos = df.filter(col(coluna).isNotNull())
        valores_amostra = df_sem_nulos.select(coluna).rdd.map(lambda row: row[0]).collect()

        if any(bool(regex_inteiro.match(str(valor))) for valor in valores_amostra):
            print(f"\nA coluna '{coluna}' contém valores no formato inteiro.")
            colunas_inteiras.append(coluna)
        elif any(bool(regex_decimal.match(str(valor))) for valor in valores_amostra):
            print(f"\nA coluna '{coluna}' contém valores no formato decimal.\n")
            colunas_decimais.append(coluna)

        else:
            print(f"\nA coluna '{coluna}' não contém valores no formato inteiro ou decimal.\n")

    # Aplica a conversão para valores percentuais
    for coluna in colunas_inteiras:
        df = df.withColumn(coluna, col(coluna).cast("long"))

        print(f"STATUS OK - Coluna {coluna} convertida com sucesso para tipo 'inteiro'.")

    # Aplica a conversão para valores monetários
    for coluna in colunas_decimais:
        df = df.withColumn(coluna, col(coluna).cast("double"))

        print(f"STATUS OK - Coluna {coluna} convertida com sucesso para tipo 'decimal'.")

    return df

df_numeric = define_numeric_columns(dffiltered)

In [0]:
def define_data_columns(df):

    print(f"Iniciando conversão de colunas com padrões inteiros/decimais")

    # Regex para identificar valores inteiras e monetários
    regex_data = re.compile(r"^[a-zA-Zçãéê]+-feira, \d{1,2} de [a-zA-Z]+ de \d{4}$|^[a-zA-Zçãéê]+, \d{1,2} de [a-zA-Z]+ de \d{4}$")

    meses = {
    'janeiro': '01', 'fevereiro': '02', 'março': '03', 'abril': '04',
    'maio': '05', 'junho': '06', 'julho': '07', 'agosto': '08',
    'setembro': '09', 'outubro': '10', 'novembro': '11', 'dezembro': '12'
    }

    def converter_data(data_str):
        try:
            # Remove o dia da semana e o hífen
            partes = data_str.split(', ')
            if len(partes) < 2:
                return None
            data_pt = partes[1]  # Ex: '11 de agosto de 2024'
            partes_data = data_pt.split(' de ')
            if len(partes_data) != 3:
                return None
            dia = partes_data[0].zfill(2)
            mes = meses.get(partes_data[1].lower(), '01')
            ano = partes_data[2]
            # Formata para yyyy-MM-dd
            data_formatada = f"{ano}-{mes}-{dia}"
            return data_formatada
        except Exception:
            return None

    # Registrar a UDF
    converter_data_udf = udf(converter_data, StringType())

    # Obtendo colunas de tipo string
    colunas_string = [coluna for coluna, dtype in df.dtypes if dtype == "string"]
    colunas_data = []

    # Identifica colunas com valores inteiras e monetários
    for coluna in colunas_string:
        df_sem_nulos = df.filter(col(coluna).isNotNull())
        valores_amostra = df_sem_nulos.select(coluna).rdd.map(lambda row: row[0]).collect()

        if any(bool(regex_data.match(str(valor))) for valor in valores_amostra):
            print(f"\nA coluna '{coluna}' contém valores no formato data.")
            colunas_data.append(coluna)
        else:
            print(f"\nA coluna '{coluna}' não contém valores no formato data.\n")

    for coluna in colunas_data:

        # Supondo que seu DataFrame se chama df e a coluna de data é 'data_str'
        # 1. Crie a coluna formatada para yyyy-MM-dd
        df = df.withColumn(coluna, converter_data_udf(df[coluna]))

        # 2. Converta para tipo date e depois para dd/MM/yyyy
        df = df.withColumn(coluna, to_date(df[coluna], 'yyyy-MM-dd'))

        print(f"STATUS OK - Coluna {coluna} convertida com sucesso para tipo 'date'.")

    return df

df_data = define_data_columns(df_numeric)

In [0]:
def replace_with_hyphen(df):
    
    print(f"Iniciando correção de valores nulos ou corrompidos em colunas strings")
    
    # Identificar colunas strings (inteiras e decimais)
    string_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (StringType))]

    print(f"\nColunas strings identificadas: {string_cols}")

    # Contar valores nulos antes da transformação
    null_counts_before = df.select([count(when(col(c).isNull(), c)).alias(c) for c in string_cols]).collect()[0].asDict()
    print(f"\nValores nulos antes da transformação: {null_counts_before}")

    # Substituir valores nulos ou '#REF!' por '-' nas colunas strings
    for col_name in string_cols:
        df = df.withColumn(
            col_name,
            when(col(col_name).isNull() | (col(col_name) == '#REF!'), '-').otherwise(col(col_name))
        )
        print(f"\nValor nulo ou '#REF!' na coluna {col_name} alterado para '-'")

    # Contar valores nulos e '#REF!' depois da transformação
    null_or_ref_counts_after = df.select([
        (count(when(col(c).isNull() | (col(c) == '#REF!'), c))).alias(c) for c in string_cols
    ]).collect()[0].asDict()
    print(f"\nValores nulos ou '#REF!' depois da transformação: {null_or_ref_counts_after}\n")

    # Verificar se todas as colunas tiveram seus valores nulos e '#REF!' substituídos
    for col_name in string_cols:
        if null_or_ref_counts_after[col_name] == 0:
            print(f"STATUS OK - Coluna {col_name} foi corretamente preenchida.")
        else:
            print(f"STATUS ALERTA - Coluna {col_name} ainda contém valores nulos ou '#REF!'!")

    return df

df_no_null = replace_with_hyphen(df_data)

In [0]:
def carregando_tabela_silver(df,delta_table_path):

    print(f"Iniciando o salvamento do DataFrame no formato Delta em: {delta_table_path}")

    try:
        # 1- Obter a contagem de linhas ANTES de salvar ---
        num_rows_to_save = df.count()
        print(f"Número de linhas no DataFrame a ser salvo: {num_rows_to_save}")

        # 2- Salvar o DataFrame no formato Delta ---
        df.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .partitionBy("data_ref") \
                        .save(delta_table_path)

        print(f"DataFrame salvo com sucesso como tabela Delta particionada por 'extract' em: {delta_table_path}")

        # Início das Verificações de Qualidade Pós-Gravação 

        # 3- Garantir que os dados foram salvos no caminho
        print(f"\n--- Verificação: Leitura da Tabela Delta Salva ---")
        df_delta_read = spark.read.format("delta").load(delta_table_path)
        print("Esquema da tabela Delta lida:")
        df_delta_read.printSchema()
        num_rows_saved = df_delta_read.count()

        if df_delta_read.isEmpty():
            print(f"STATUS ALERTA - A tabela Delta salva em '{delta_table_path}' está vazia ou não pôde ser lida.")
        else:
            print(f"STATUS OK - A tabela Delta foi lida com sucesso de '{delta_table_path}' com {num_rows_saved} linhas recarregadas.")


    except Exception as e:
        print(f"Ocorreu um erro geral ao salvar ou verificar a tabela Delta: {e}")


# Caminho para a external location do diretório silver
silver_path = f"/Volumes/grainlogistic/grain/silver"

# Sobreescrevendo dados particionados no diretório silver
carregando_tabela_silver(df_no_null, silver_path)