In [0]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import input_file_name, count, when, col, lit, max,row_number, date_format
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

In [0]:
# Definando o schema da estrutura json
json_schema = StructType([
    StructField("codigo", StringType(), True),
    StructField("origem", StringType(), True),
    StructField("extract", StringType(), True),
    StructField("desconto", StringType(), True),
    StructField("link", StringType(), True),
    StructField("nome", StringType(), True),
    StructField("parcelamento", StringType(), True),
    StructField("preco", StringType(), True)
])

In [0]:
def processar_dados_inbound(path_inbound):

    print(f"Tentando ler arquivos JSON de: {path_inbound}")

    try:
        # Iniciando a leitura dos arquivos json
        # .option("multiLine", "true") = Se o arquivo json possuir uma array com várias linhas, será lido como um todo
        # .option("mode", "PERMISSIVE") = Continua lendo os arquivos JSON mesmo que encontre erros de formatação ou dados que não se encaixam no schema pré definido
        # .option("columnNameOfCorruptRecord", "_corrupt_record") = Adicionando a opção pathGlobFilter para garantir que apenas .json sejam lidos explicitamente
        # .withColumn("file_source", input_file_name()) = Adicionando o  nome do caminho de cada arquivo lido em uma coluna
        df_json_unificado = spark.read \
                                .option("multiLine", "true") \
                                .option("mode", "PERMISSIVE") \
                                .option("columnNameOfCorruptRecord", "_corrupt_record") \
                                .schema(json_schema) \
                                .json(path_inbound) \
                                .withColumn("file_source", input_file_name()) 

        print("\n--- Esquema do DataFrame unificado ---")
        df_json_unificado.printSchema()

        # 1- Etapa de Qualidade. Verificação de Arquivos Lidos vs Arquivos Esperados
        # Usando dbutils para listar arquivos no Volumes
        all_files_in_dir =  dbutils.fs.ls(path_inbound)
        expected_json_files = [f for f in all_files_in_dir if f.name.endswith(".json")]
        # Contar quantos arquivos .json existem no Volumes Inbound
        num_expected_files = len(expected_json_files)


        # Contar quantos arquivos foram realmente processados pela aplicação a partir da coluna criada com o nome de cada arquivo lido
        files_processed_df = df_json_unificado.select("file_source").distinct().count()

        print(f"\n--- Verificação de Arquivos ---")
        print(f"Número de arquivos .json esperados na pasta '{path_inbound.replace('/*.json', '')}': {num_expected_files}")
        print(f"Número de arquivos únicos processados no DataFrame: {files_processed_df}")

        if files_processed_df == num_expected_files:
            print("STATUS: OK - Todos os arquivos JSON esperados foram processados.")
        else:
            print("STATUS: ALERTA - O número de arquivos processados difere do esperado. Investigue!")
            # Para identificar quais arquivos podem ter sido ignorados:
            processed_file_paths = [row.file_source for row in df_json_unificado.select("file_source").distinct().collect()]
            expected_file_paths = [f.path for f in expected_json_files]

            # Encontrar arquivos esperados que não foram processados
            missing_files = [path for path in expected_file_paths if path not in processed_file_paths]
            if missing_files:
                print(f"Arquivos JSON esperados que não foram encontrados no DataFrame: {missing_files}")

        # 2 Etapa de Qualidade. Contagem de Registros
        #Contar quantidade de linhas no dataframe processado
        total_records_loaded = df_json_unificado.count()
        print(f"\n--- Verificação de Registros ---")
        print(f"Total de registros carregados no DataFrame: {total_records_loaded}")

        #verificação
        if total_records_loaded == 0 and num_expected_files > 0:
            print("STATUS: ALERTA - Nenhum registro foi carregado, mas há arquivos JSON esperados. Verifique se os arquivos estão vazios ou malformados.")
        elif total_records_loaded > 0:
            print("STATUS: OK - Registros foram carregados com sucesso.")
        else:
            print("STATUS: N/A - Nenhuma expectativa de registros (pasta vazia ou sem arquivos .json).")

        # 3- Validação Básica de Dados (Verificar nulos em colunas críticas)
        print(f"\n--- Verificação de Nulos em Colunas Críticas ---")
        critical_columns = ["codigo", "nome", "preco"] # Exemplo

        for col_name in critical_columns:
            null_count = df_json_unificado.filter(col(col_name).isNull()).count()
            if null_count > 0:
                print(f"ALERTA: Coluna '{col_name}' possui {null_count} valores nulos.")
            else:
                print(f"OK: Coluna '{col_name}' não possui valores nulos.")

        # 4- Verificação de Registros Corrompidos (se houver a coluna _corrupt_record)
        if "_corrupt_record" in df_json_unificado.columns:
            corrupt_records_count = df_json_unificado.filter(col("_corrupt_record").isNotNull()).count()
            if corrupt_records_count > 0:
                print(f"\nALERTA: Encontrados {corrupt_records_count} registros corrompidos no _corrupt_record. Investigue!")
                df_json_unificado.filter(col("_corrupt_record").isNotNull()).select("file_source", "_corrupt_record").show(truncate=False)
            else:
                print("\nOK: Não foram encontrados registros corrompidos na coluna _corrupt_record.")

        # 5- Dropar a coluna 'file_source'
        # É importante dropar a coluna *depois* de todas as verificações que a utilizam
        df_json_unificado = df_json_unificado.drop("file_source")
        print("\n--- Coluna 'file_source' foi removida do DataFrame final. ---")
        print("\nEsquema do DataFrame final:")
        df_json_unificado.printSchema()

        return df_json_unificado

    except Exception as e:
        print(f"Ocorreu um erro durante a leitura ou verificação de qualidade: {e}")

# Caminho para a pasta que contém os arquivos JSON
json_folder_path = "/Volumes/nintendodatabrickspkjgt7_workspace/nintendo/inbound/"

df_json_unificado = processar_dados_inbound(json_folder_path)

In [0]:
def carregando_delta_full(nome_tabela, coluna_particao):

    print(f"Tentando ler arquivo Delta de: {nome_tabela}")
    
    spark = SparkSession.builder.getOrCreate()
    dbutils = DBUtils(spark) 

    # 1- Listando arquivos Delta no Volume Bronze
    print(f"\n--- Verificação do caminho Volume Bronze ---")
    try:
        dbutils.fs.ls(nome_tabela)
        print(f"STATUS OK - Caminho '{nome_tabela}' existe. Tentando carregar como Delta.")
    except Exception as e:
        print(f"STATUS ALERTA - O caminho '{nome_tabela}' não existe ou não é acessível. Detalhe: {e}. Gerando um DataFrame vazio.")
        print("\nRetornando um dataframe vazio!")
        # Em caso de erro, retorne um dataframe vazio com o schema definido anteriormente
        return spark.createDataFrame([], schema=json_schema)

    # 2- Tentando ler o arquivo Delta do Volume Bronze
    print(f"\n--- Verificação da leitura do arquivo Delta ---")
    try:
        df = spark.read.format("delta").load(nome_tabela)
        print(f"STATUS OK - Tabela Delta '{nome_tabela}' carregada com sucesso.")
    except Exception as e:
        # Em caso de erro, retorne um dataframe vazio com o schema definido anteriormente
        print(f"STATUS ALERTA - Erro ao carregar a tabela Delta '{nome_tabela}'. Provavelmente não é uma tabela Delta válida ou não contém dados. Detalhe do erro: {e}")
        print("\nRetornando um dataframe vazio!")
        return spark.createDataFrame([], schema=json_schema)
    
    # 3- Verificando se o dataframe está vazio
    # Em caso positivo, retorne um dataframe vazio com o schema definido anteriormente
    print(f"\n--- Verificação se arquivo Delta está vazio ---")
    if df.rdd.isEmpty():
        print(f"STATUS ALERTA - Tabela '{nome_tabela}' foi carregada como Delta VÁLIDA, mas está completamente vazia. Retornando DataFrame vazio com o schema definido.")
        print("\nRetornando um dataframe vazio!")
        return spark.createDataFrame([], schema=json_schema)
    else:
        print(f"STATUS OK - Tabela Delta '{nome_tabela}' não está vazio.")
        return df


# Caminho para a external location do diretório bronze
bronze_path = f"/Volumes/nintendodatabrickspkjgt7_workspace/nintendo/bronze"

# Lendo arquivo Delta do diretório bronze pela ultima partição
df_old = carregando_delta_full(bronze_path, "data_ref")

In [0]:
def comparando_dados_incremental(df1, df2):

    # 1- Combinando dataframe processado X dataframe bronze
    print(f"\n--- Unindo dataframes ---")
    if df2.count() > 0:
        if "data_ref" not in df1.columns:
            df1 = df1.withColumn("data_ref", date_format("extract", "yyyy-MM-dd"))
    df_old_tagged = df2.withColumn("source", lit("old"))
    df_new_tagged = df1.withColumn("source", lit("new"))

    df_combined = df_new_tagged.unionByName(df_old_tagged)

    if df_combined.count() == (df_old_tagged.count() + df_new_tagged.count()):
        print(f"STATUS OK - Dataframe Processado vs Dataframe Bronze unidos com sucesso")
    else:
        print(f"STATUS ALERTA - Dataframes não foram unidos corretamente.")
        print("\nRetornando um dataframe vazio!")
        return spark.createDataFrame([], schema=json_schema)

    # 2- Retirando linhas duplicadas do Dataframe
    print(f"\n--- Obtendo novos dados ---")
    # Contando ocorrências de cada linha
    df_counts = df_combined.groupBy("codigo","desconto", "parcelamento", "preco").count()
    df_counts = df_counts.select('codigo','count')
    df_joined = df_combined.join(df_counts.select("codigo", "count"), on="codigo", how="left")

    # Filtrando apenas as linhas que aparecem uma única vez
    df_sem_duplicatas = df_joined.filter(col("count") == 1).drop("count")

    window_spec = Window.partitionBy("codigo", "desconto", "link", "nome", "parcelamento", "preco").orderBy(col("extract").desc())

    # Adicionar um número de linha dentro de cada partição
    df_ranked = df_sem_duplicatas.withColumn("row_num", row_number().over(window_spec))

    # Filtrar para manter apenas a primeira linha (a mais recente) para cada código
    df_result = df_ranked.filter(col("row_num") == 1).drop("row_num", "source")

    # Supondo que sua coluna de data se chame 'extract'
    df_result = df_result.withColumn("data_ref", date_format("extract", "yyyy-MM-dd"))

    # 1. Encontrar a maior data no DataFrame
    maior_data = df_result.select(max("extract")).collect()[0][0]

    # 2. Filtrar o DataFrame para mostrar apenas as linhas com a maior data
    df_maior_data = df_result.filter(col("extract") == maior_data)
    
    if df_maior_data.count() > 0:
        print(f"STATUS INCREMENTAL - Dataframe possui novos {df_maior_data.count()} dados incremental")
    else:
        print(f"STATUS INCREMENTAL - Dataframes não possuí novos dados.")

    return df_maior_data

df_incremental = comparando_dados_incremental(df_json_unificado,df_old)

In [0]:
def carregando_tabela_incremental(df,delta_table_path):

    print(f"Iniciando o salvamento do DataFrame no formato Delta em: {delta_table_path}")

    try:
        # 1- Obter a contagem de linhas ANTES de salvar ---
        num_rows_to_save = df.count()
        df_old = spark.read.format("delta").load(delta_table_path)
        num_rows_already_saved = df_old.count()

        print(f"Número de linhas no DataFrame a ser salvo: {num_rows_to_save}")

        # 2- Salvar o DataFrame no formato Delta ---
        df.write \
                        .format("delta") \
                        .mode("append") \
                        .partitionBy("data_ref") \
                        .save(delta_table_path)

        print(f"DataFrame salvo com sucesso como tabela Delta particionada por 'extract' em: {delta_table_path}")

        # Início das Verificações de Qualidade Pós-Gravação 

        # 3- Garantir que os dados foram salvos no caminho
        print(f"\n--- Verificação: Leitura da Tabela Delta Salva ---")
        df_delta_read = spark.read.format("delta").load(delta_table_path)
        print("Esquema da tabela Delta lida:")
        df_delta_read.printSchema()
        print("Primeiras 5 linhas da tabela Delta lida:")
        df_delta_read.show(5, truncate=False)

        if df_delta_read.isEmpty():
            print(f"STATUS ALERTA - A tabela Delta salva em '{delta_table_path}' está vazia ou não pôde ser lida.")
        else:
            print(f"STATUS OK - A tabela Delta foi lida com sucesso de '{delta_table_path}'.")


        # 4- Verificar se a quantidade de linhas salvas condiz com o que está salvo
        row_verify = num_rows_already_saved + num_rows_to_save
        df_new = spark.read.format("delta").load(delta_table_path)
        qtd_df = df_new.count()
        if num_rows_to_save == 0:
            print(f"Sem necessidade de verificação de linhas carregadas, quantidade de novas linhas a serem carregadas igual a: {num_rows_to_save}")
        else:

            print(f"\n--- Verificação: Contagem de Linhas Salvas ---")
            print(f"Número de linhas salvas na tabela Delta: {num_rows_to_save}")

            if row_verify == qtd_df:
                print(f"STATUS OK - A quantidade de linhas salvas ({num_rows_to_save}) corresponde à quantidade de linhas no DataFrame original ({row_verify}).")
            else:
                print(f"STATUS ALERTA - A quantidade de linhas salvas ({num_rows_to_save}) NÃO CORRESPONDE à quantidade de linhas no DataFrame original ({row_verify}). Investigue!")


        # 5- Verificar se realmente foi particionado ---
        print(f"\n--- Verificação: Particionamento por 'data' ---")

        print("Conteúdo do diretório Delta (buscando por pastas de partição usando dbutils):")
        try:
            # Lista os subdiretórios no caminho Delta. Esperamos ver pastas como data=YYYY-MM-DD
            delta_contents = dbutils.fs.ls(delta_table_path)
            partition_folders_found = [f.name for f in delta_contents if f.isDir and "=" in f.name]
            if partition_folders_found:
                print(f"STATUS OK - Pastas de partição detectadas (ex: {', '.join(partition_folders_found[:3])}...):")
            else:
                print("Nenhuma pasta de partição padrão (ex: 'data=...') detectada diretamente no caminho raiz.")

        except Exception as ls_e:
            print(f"STATUS ALERTA - Erro ao listar conteúdo do diretório Delta com dbutils.fs.ls(): {ls_e}")


        # Verificando por qual coluna a Tabela Delta foi particionada
        try:
            spark.sql(f"DESCRIBE DETAIL delta.`{delta_table_path}`").show(truncate=False)
            table_details_df = spark.sql(f"DESCRIBE DETAIL delta.`{delta_table_path}`")
            partition_columns = table_details_df.select("partitionColumns").collect()[0][0]

            if "data_ref" in partition_columns:
                print(f"STATUS OK - A tabela Delta está particionada pela coluna 'data_ref'.")
            else:
                print(f"STATUS ALERTA - A tabela Delta NÃO parece estar particionada pela coluna 'data_ref'. Partições encontradas: {partition_columns}")

        except Exception as sql_e:
            print(f"STATUS ALERTA - Não foi possível obter detalhes da tabela Delta (verifique o log): {sql_e}")
            # A linha abaixo não pode ser executada por estar dentro de um try-except, por isso a retirei
            # print("Tente verificar manualmente a estrutura de pastas com '%fs ls -l dbfs:/nintendo/bronze/'.")


    except Exception as e:
        print(f"Ocorreu um erro geral ao salvar ou verificar a tabela Delta: {e}")

carregando_tabela_incremental(df_incremental,bronze_path)
