In [0]:
#dbutils.fs.rm("/user/hive/warehouse/deltable_files_log", recurse=True)
#dbutils.fs.rm("/FileStore/tables/stage_roco/input/", recurse=True)
#dbutils.fs.rm('/user/hive/warehouse/logs_roco/ingest_blob_to_stage', recurse=True)

### Readme
Tabelas de log:

1. Tabela que registra as movimentações do repositório de origem, para o nosso armazenamento stage, essa tabela serve para armazenar informações como nome do arquivo, data da ingestão e path do arquivo.

* ETAPA INGESTÃO - **Controle interno**  
 caminho_tbl_delta_log_ingestao = "/user/hive/warehouse/logs_roco/ingest_blob_to_stage"

2. Tabela que registra as informações de controle do recebimento desses arquivos ingeridos, conforme modelo de consistencia física. Logs gerados a partir de informações da tabela de log de ingestão.
* ETAPA GERAÇÃO DE LOG DE INGESTÃO - **ControleRecepcaoArquivo**  
OBS : PADRONIZAR O CAMINHO DAS TABELAS DE LOG  
ATUAL: caminho_tbl_delta_log_cra = "/user/hive/warehouse/deltable_files_log"  
PREVISTO: caminho_tbl_delta_log_cra = "/user/hive/warehouse/logs_roco/controle_recepcao_arquivo"

## Imports

In [0]:
from pyspark.sql.types import StringType, IntegerType, TimestampType, StructType, StructField
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *
import os

##Variaveis

In [0]:
# filepath do arquivo original que será replicado
arquivo_original = "/FileStore/tables/blob/EMPF2ROCOK-1.txt"

qtd_copias = 3

# data que será o nome da pasta com os arquivos replicados/inputs
data = "2023-12-22"
data2 = "2023-12-21"
data3 = "2023-12-20"
data4 = "2023-12-23"

#filepath do destino dos arquivos replicados, pode a data manual, ou usar a função obter_data_brasilia
caminho_destino = f"/FileStore/tables/blob/{data}/"

blob_raiz = "/FileStore/tables/blob/"

# Caminho onde será criado o arquivo delta vazio
caminho_tabela_delta = '/user/hive/warehouse/deltable_files_log'

# Esquema do Dataframe
esquema = StructType([
    StructField('nome_pasta', StringType(), True),
    StructField('nome_arquivo', StringType(), True),
    StructField('dt_list_files_blob', StringType(), True),
    StructField('caminho_arquivo', StringType(), True)
])

## Funções

### Obter datas brasilia

In [0]:
def obter_data_brasilia():
    # Obtém a data e hora atuais no tempo UTC
    data_hora_utc = datetime.utcnow()

    # Subtrai 3 horas para obter o horário de Brasília
    data_hora_brasilia = data_hora_utc - timedelta(hours=3)

    # Obtém apenas a data no formato "YYYY-MM-DD"
    data_atual_brasilia = data_hora_brasilia.strftime("%Y-%m-%d")

    return data_atual_brasilia

#data_atual = obter_data_brasilia()
#print(data_atual)

def obter_datetime_brasilia():
    # Obtém a data e hora atuais no tempo UTC
    data_hora_utc = datetime.utcnow()

    # Subtrai 3 horas para obter o horário de Brasília
    data_hora_brasilia = data_hora_utc - timedelta(hours=3)

    # Obtém apenas a data e horário no formato "YYYY-MM-DDTHH:MM:SS"
    data_atual_brasilia = data_hora_brasilia.strftime("%Y-%m-%d %H:%M:%s")

    # Retorna apenas a data e horário sem os milissegundos
    return data_atual_brasilia[:-3]

### Criação de diretório IF NOT EXISTS

In [0]:
# Função para criar diretórios, caso não existam no caminho especificado
def criar_diretorio(caminho_diretorio):
    # Verifica se o diretório existe, e cria se não existir
    diretorio = None

    try:
        # Tentativa de atribuir o caminho a variável
        diretorio = dbutils.fs.ls(caminho_diretorio)
    except Exception as e:
        pass

    if not diretorio:
        dbutils.fs.mkdirs(caminho_diretorio)
        print(f"Diretório {caminho_diretorio} criado com sucesso.")
    else:
        print(f"Erro ao criar o diretório {caminho_diretorio}, pois já existe")

### Criação de tabela Delta IF NOT EXISTS

In [0]:
def criar_tbl_delta(caminho_tabela_delta, esquema_tbl_delta):
    # "Arquivos" definido como None para evitar o erro FileErrorNotFound quando o arquivo delta não existir ainda
    arquivos = None

    try:
        # Tentativa de atribuir o caminho a variável
        arquivos = dbutils.fs.ls(caminho_tabela_delta)
    except Exception as e:
        print(f"Erro ao tentar acessar o caminho {caminho_tabela_delta} pois ele ainda não existe. Criando ...")

    # Se arquivos for None, será criado a pasta e o arquivo Delta no caminho especificado
    if not arquivos:   
        # Crie um DataFrame Delta vazio com a estrutura definida
        df_empty = spark.createDataFrame([], esquema_tbl_delta)
        
        # Salve o DataFrame como um arquivo Delta
        df_empty.write.format("delta").mode("overwrite").save(caminho_tabela_delta)

        # Leia o DataFrame Delta vazio
        df_log_empty = spark.read.format("delta").load(caminho_tabela_delta)

        print("Arquivo Delta criado")
        # Exibir o DataFrame Delta vazio
        df_log_empty.show(truncate=False)
    else:
        print(f"Erro ao criar o diretório {caminho_tabela_delta}, pois já existe")

### Movendo novos arquivos do Blob para Stage

In [0]:
# Essa lista servirá de filtro, para que seja adicionado apenas novos valores a tabela delta
def distintos_tbl_delta(caminho_tbl_delta, coluna_filtro):
    df = spark.read.format("delta").load(caminho_tbl_delta)

    """
    .rdd(): Converte o DataFrame resultante em um RDD.
    .flatMap(lambda x: x): Converte cada linha do RDD em uma lista com um único valor.
    .collect(): Retorna os valores do RDD em uma lista."""

    # Para estrutura atual, recomendado filtrar pela coluna "nome_pasta"
    valores_distintos = df.select(coluna_filtro).distinct().rdd.flatMap(lambda x: x).collect()

    return valores_distintos

def distintos_diretorio_origem(caminho_origem):
    # Cria uma lista com os nomes das pastas
    qtd_arquivos_caminho_origem = dbutils.fs.ls(caminho_origem)
    nomes_pastas = []
    for item in qtd_arquivos_caminho_origem:
        nome_pasta = item.name
        if nome_pasta != "EMPF2ROCO.TXT":
            nomes_pastas.append(nome_pasta) 

    return nomes_pastas

def move_blob_to_stage(caminho_origem, caminho_destino, caminho_tabela_delta_log, esquema_tbl_delta):
    
    criar_tbl_delta(caminho_tabela_delta_log, esquema_tbl_delta)
    criar_diretorio(caminho_destino)

    # Cria um dataframe vazio
    df_registros = spark.createDataFrame([], esquema_tbl_delta)

    pastas_distintas_tbl_delta = distintos_tbl_delta(caminho_tabela_delta_log, "nome_pasta")

    # Cria uma lista com os nomes das pastas
    pastas_distintas_caminho_origem = dbutils.fs.ls(caminho_origem)
    # Obtém o número de linhas no dataframe
    #numero_linhas = df.count()
    # Itera sobre a lista de arquivos e pastas
    for item in pastas_distintas_caminho_origem:
        # Verifica se o item é uma pasta
        if item.isDir():
            # Obtém o nome da pasta
            nome_pasta = item.name
            if nome_pasta != "EMPF2ROCO.TXT" and nome_pasta not in pastas_distintas_tbl_delta:
                # Obtém a lista de arquivos na pasta
                arquivos = dbutils.fs.ls(caminho_origem + "/" + nome_pasta)

                # Adiciona a pasta e os arquivos ao dataframe
                for arquivo in arquivos:
                    # Copia do arquivo do diretorio de origem para o de destino
                    file_path_origem = f"{caminho_origem}{nome_pasta}{arquivo.name}" 
                    file_path_destino = f"{caminho_destino}{nome_pasta}{arquivo.name}" 
                    dbutils.fs.cp(file_path_origem, file_path_destino)
                    # Adiciona linha por linha no df_registro
                    dt_ingest_blob_to_stage = obter_datetime_brasilia()
                    
                    new_row = spark.createDataFrame([(nome_pasta, arquivo.name, dt_ingest_blob_to_stage, file_path_destino)], esquema)
                    
                    
                    #lista.append(file_path)         
                    df_registros = df_registros.union(new_row)
                    #df_registros = df_registros.withColumn("File_Path", F.lit(file_path))
                    #df_join= df_join.withColumn("ingest_date", lit('2023-12-23T15:51:22.367+0000')).


        # Imprime o dataframe
        #df_registros.display()
    df_registros.write.format("delta").mode("append").save(caminho_tabela_delta_log)


    df = spark.read.format("delta").load(caminho_tabela_delta_log)

    #df_registros = df_registros.sort(col("nome_pasta"), col("nome_arquivo"))
    df.display()

    return df_registros
    

### Função de geração de log ControleRecepcaoArquivo

In [0]:
def logControleRecepcaoArquivo(df_log_ingestao):
    esquema = StructType([
        StructField('nArqTrasiCtbil', IntegerType(), True),
        StructField('cSistOrige', StringType(), True),
        StructField('nmArquivoExterno', StringType(), True),
        StructField('dtGeracaoArquivoOrigem', StringType(), True),
        StructField('hrGeracaoArquivoOrigem', StringType(), True),
        StructField('dtGravacaoArquivo', StringType(), True),
        StructField('dsProgramaGerador', StringType(), True),
        StructField('dsVersaoLayout', StringType(), True),
        StructField('qtTotalRegistro', StringType(), True)      
    ])

    caminho_tbl_delta_log_cra = "/user/hive/warehouse/deltable_files_log"
    criar_tbl_delta(caminho_tbl_delta_log_cra, esquema)
    # Calculando o id_maximo antes do loop
    id_maximo = spark.read.format("delta").load(caminho_tbl_delta_log_cra).select(F.max("nuControleRecepcaoArquivo")).collect()[0][0]
    id_maximo = id_maximo if id_maximo is not None else 0
    
    df_ControleRecepcaoArquivo = spark.createDataFrame([], esquema)
    data_collect = df_log_ingestao.collect()
    
    for idx, row in enumerate(data_collect):
        txt_path = row['caminho_arquivo_stage']
        nome_arquivo = row['nome_arquivo']
        dt_ingest_blob_to_stage = row['dt_ingest_blob_to_stage']
        #destino_tabela_delta_log = "/user/hive/warehouse/deltable_files_log"

        df = spark.read.text(txt_path)
        qtd_linhas = df.count()
    
        df_com_linha = df.withColumn("Sequencial", F.monotonically_increasing_id() + 1)

        # Criação do DF com o Header
        df_header = df_com_linha.orderBy("Sequencial", ascending=True).limit(1).drop("Sequencial")
        # Resultado sendo salvo em uma tabela temporária
        df_header.createOrReplaceTempView("Tb_header")

        # Gerando um dataframe conforme definições do txt, com os dados do header usando um Select na tabela temporária
        df_header_transform = spark.sql("""
            SELECT
            SUBSTR(value, 0, 1) AS cdTipoRegistroHeader,
            SUBSTR(value, 2, 8) AS dtGeracaoArquivoHeader,
            SUBSTR(value, 10, 6) AS hrGeracaoArquivoHeader,
            SUBSTR(value, 16, 4) AS cdSistemaHeader,
            SUBSTR(value, 20, 8) AS dsProgramaGeradorHeader,
            SUBSTR(value, 28, 8) AS dsVersaoLayoutHeader,
            SUBSTR(value, 36, 1465) AS FillerHeader
            from Tb_header
        """)

        # Incluindo algumas informações ao dataframe
        df_header_transform = (df_header_transform
                            .withColumn('nome_arquivo', lit(nome_arquivo))
                            .withColumn('ingest_date', lit(dt_ingest_blob_to_stage))
                            .withColumn('qtTotalRegistro', lit(qtd_linhas)))

        # Selecionando apenas as colunas que realmente serão utilizadas
        df = df_header_transform.select('cdSistemaHeader', "nome_arquivo", "dsProgramaGeradorHeader", "dsVersaoLayoutHeader", "qtTotalRegistro", "ingest_date")

        # Renomeando as colunas
        df = (df.withColumnRenamed("cdSistemaHeader", "cdSistema")
                .withColumnRenamed("nome_arquivo", "nmArquivoExterno")
                .withColumnRenamed("dsProgramaGeradorHeader", "dsProgramaGerador")
                .withColumnRenamed("dsVersaoLayoutHeader", "dsVersaoLayout"))

        # Alimentando duas colunas a partir da "intest_date", e dropando a mesma logo após
        df = (df.withColumn("dtGeracaoArquivoOrigem", F.split("ingest_date", " ")[0])
                .withColumn("hrGeracaoArquivoOrigem", F.split("ingest_date", " ")[1])
                .withColumn("hrGeracaoArquivoOrigem", F.substring("hrGeracaoArquivoOrigem", 1, 8))
                .drop("ingest_date"))

        # Criação de nova coluna
        df = df.withColumn('dtGravacaoArquivo', lit(datetime.utcnow()))

        # Atribuindo valores ao nuControleRecepcaoArquivo
        # Calculando o nuControleRecepcaoArquivo usando o id_maximo e o número da iteração
        nu_controle_recepcao_arquivo = id_maximo + idx + 1
        df = df.withColumn("nuControleRecepcaoArquivo", lit(nu_controle_recepcao_arquivo))

        # Ordenando as colunas conforme preconizado
        df = df.select('nuControleRecepcaoArquivo','cdSistema', 'nmArquivoExterno', 'dtGeracaoArquivoOrigem', 'hrGeracaoArquivoOrigem', 'dtGravacaoArquivo', 'dsProgramaGerador', 'dsVersaoLayout', 'qtTotalRegistro')

        # Unindo os DataFrames
        df_ControleRecepcaoArquivo = df_ControleRecepcaoArquivo.union(df)

    # Corrigindo a linha de ordenação
    df_ControleRecepcaoArquivo = df_ControleRecepcaoArquivo.orderBy("nuControleRecepcaoArquivo", ascending=True)

    # Salvando no delta log
    df_ControleRecepcaoArquivo.write.format("delta").mode("append").save(caminho_tbl_delta_log_cra)


#Ingestão de dados do Blob para Stage

### ** No momento o blob está no meu ambiente /FileStore, mas deverá ser feito a conexão com o Blob Storage para ingestão desses dados quando o ambiente estiver pronto.

### * A função move_blob_to_stage move os dados do blob para a stage e adiciona as informações dessa movimentação em uma tabela delta de log.

In [0]:
# Cria o diretório se ainda não existir
caminho_origem = "/FileStore/tables/blob/"
caminho_stage = "/FileStore/tables/stage_roco/input/"

# Criar tabela delta para registro de logs
caminho_tbl_delta_log_ingestao = "/user/hive/warehouse/logs_roco/ingest_blob_to_stage"

# Esquema do Dataframe de log
esquema = StructType([
    StructField('nome_pasta', StringType(), True),
    StructField('nome_arquivo', StringType(), True),
    StructField('dt_ingest_blob_to_stage', StringType(), True),
    StructField('caminho_arquivo_stage', StringType(), True)
])

ingestao = move_blob_to_stage(caminho_origem, caminho_stage, caminho_tbl_delta_log_ingestao, esquema)

Erro ao criar o diretório /user/hive/warehouse/logs_roco/ingest_blob_to_stage, pois já existe
Erro ao criar o diretório /FileStore/tables/stage_roco/input/, pois já existe


nome_pasta,nome_arquivo,dt_ingest_blob_to_stage,caminho_arquivo_stage
2024-01-10/,EMPF2ROCOerrado.TXT,2024-01-11 13:31:1704979,/FileStore/tables/stage_roco/input/2024-01-10/EMPF2ROCOerrado.TXT
2024-01-11/,EMPF2ROCOcerto.txt,2024-01-11 13:19:1704979,/FileStore/tables/stage_roco/input/2024-01-11/EMPF2ROCOcerto.txt
2023-12-22/,EMPF2ROCOK-12.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-22/EMPF2ROCOK-12.txt
2023-12-22/,EMPF2ROCOK-13.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-22/EMPF2ROCOK-13.txt
2023-12-22/,EMPF2ROCOK-11.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-22/EMPF2ROCOK-11.txt
2023-12-23/,EMPF2ROCO10.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-23/EMPF2ROCO10.txt
2023-12-21/,EMPF2ROCOK1.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-21/EMPF2ROCOK1.txt
2023-12-21/,EMPF2ROCOK2.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-21/EMPF2ROCOK2.txt
2023-12-21/,EMPF2ROCOK3.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-21/EMPF2ROCOK3.txt
2023-12-20/,EMPF2ROCO1.txt,2024-01-11 12:04:1704974,/FileStore/tables/stage_roco/input/2023-12-20/EMPF2ROCO1.txt


In [0]:
ingestao.display()

nome_pasta,nome_arquivo,dt_ingest_blob_to_stage,caminho_arquivo_stage
2024-01-10/,EMPF2ROCOerrado.TXT,2024-01-11 13:31:1704979,/FileStore/tables/stage_roco/input/2024-01-10/EMPF2ROCOerrado.TXT


## Gerando dados de log

In [0]:
logControleRecepcaoArquivo(ingestao)

Erro ao criar o diretório /user/hive/warehouse/deltable_files_log, pois já existe


In [0]:
df = spark.read.format("delta").load("/user/hive/warehouse/deltable_files_log")

#df_registros = df_registros.sort(col("nome_pasta"), col("nome_arquivo"))
#df.orderBy("nuControleRecepcaoArquivo", ascending=True).display()
df.display()

nuControleRecepcaoArquivo,cdSistema,nmArquivoExterno,dtGeracaoArquivoOrigem,hrGeracaoArquivoOrigem,dtGravacaoArquivo,dsProgramaGerador,dsVersaoLayout,qtTotalRegistro
1,EMPF,EMPF2ROCO1.txt,2024-01-11,12:04:17,2024-01-11 15:05:01.327207,EMPF1C40,ROCOV001,10037
2,EMPF,EMPF2ROCO2.txt,2024-01-11,12:04:17,2024-01-11 15:05:02.992333,EMPF1C40,ROCOV001,10037
3,EMPF,EMPF2ROCO3.txt,2024-01-11,12:04:17,2024-01-11 15:05:04.329476,EMPF1C40,ROCOV001,10037
4,EMPF,EMPF2ROCOK1.txt,2024-01-11,12:04:17,2024-01-11 15:05:05.614184,EMPF1C40,ROCOV001,10037
5,EMPF,EMPF2ROCOK2.txt,2024-01-11,12:04:17,2024-01-11 15:05:07.173742,EMPF1C40,ROCOV001,10037
6,EMPF,EMPF2ROCOK3.txt,2024-01-11,12:04:17,2024-01-11 15:05:08.449585,EMPF1C40,ROCOV001,10037
7,EMPF,EMPF2ROCOK-11.txt,2024-01-11,12:04:17,2024-01-11 15:05:09.710392,EMPF1C40,ROCOV001,10037
8,EMPF,EMPF2ROCOK-12.txt,2024-01-11,12:04:17,2024-01-11 15:05:10.953407,EMPF1C40,ROCOV001,10037
9,EMPF,EMPF2ROCOK-13.txt,2024-01-11,12:04:17,2024-01-11 15:05:12.214581,EMPF1C40,ROCOV001,10037
10,EMPF,EMPF2ROCO1.txt,2024-01-11,12:04:17,2024-01-11 15:05:13.367541,EMPF1C40,ROCOV001,10037


# Verificando inconformidades no arquivo

### ** Inicialmente só será conferido a quantidade de caracteres nos arquivos, caso não tenha 1500 caracteres em cada linha, o arquivo é movido para pasta "Discarded", se tiver 1500 caracteres, move para pasta "Accepted"

In [0]:
# Define uma função que será aplicada a cada linha
data_collect = ingestao.collect()
#processamento = True
path_output_accepted = "/FileStore/tables/stage_roco/output/accepted"
path_output_discarded = "/FileStore/tables/stage_roco/output/discarded"
criar_diretorio(path_output_discarded)
criar_diretorio(path_output_accepted)

for row in data_collect:
    txt_path = row['caminho_arquivo_stage']
    df = spark.read.text(txt_path)
    #display(df)
    # Nova coluna com a quantidade de caracteres em cada linha do DF
    df_qtd_caracteres = df.withColumn("Qtd_caracteres", F.length(df["value"]))
    media_caracteres = df_qtd_caracteres.agg(F.sum("Qtd_caracteres")).collect()[0][0] / df_qtd_caracteres.count()

    # Atribuição de um id sequencial para as linhas do dataframe
    df_com_linha = df.withColumn("Sequencial", F.monotonically_increasing_id() + 1)

    # Criação do DF com o Trailer
    df_trailer = df_com_linha.orderBy("Sequencial",ascending=False).limit(1).drop("Sequencial")
    # Resultado sendo salvo em uma tabela temporária
    df_trailer.createOrReplaceTempView("Tb_trailer")
    # display(df_trailer)
    
    # Gerando um dataframe conforme definições do txt, com os dados do Trailer usando um Select na tabela temporária
    df_trailer_transform = spark.sql("""
    SELECT
    SUBSTR(value, 0, 1) AS cdTipoRegistroTrailer,
    SUBSTR(value, 2, 12) AS qtTotalRegistrosTrailer,
    SUBSTR(value, 20, 1487) AS FillerTrailer

    from Tb_trailer
    """)
    
    
    qtTotalRegistrosTrailer = df_trailer_transform.select("qtTotalRegistrosTrailer").collect()[0][0]
    qtTotalRegistrosTrailer_int = int(qtTotalRegistrosTrailer)

    


    if qtTotalRegistrosTrailer_int == df_qtd_caracteres.count() and media_caracteres == 1500:
        dbutils.fs.cp(txt_path, path_output_accepted)
        print(f'Arquivo OK.\n'
            f'Média de caracteres: {media_caracteres}\n'
            f'Trailer verificado no arquivo : {qtTotalRegistrosTrailer_int}\n'
            f'Quantidade de linhas no arquivo: {df_qtd_caracteres.count()}')
    else:
        dbutils.fs.cp(txt_path, path_output_discarded)
        print('Arquivo inconsistente\n'
            f'Média de caracteres: {media_caracteres}\n'
            f'Trailer verificado no arquivo : {qtTotalRegistrosTrailer_int}\n'
            f'Quantidade de linhas no arquivo: {df_qtd_caracteres.count()}')

Erro ao criar o diretório /FileStore/tables/stage_roco/output/discarded, pois já existe
Erro ao criar o diretório /FileStore/tables/stage_roco/output/accepted, pois já existe
Arquivo inconsistente
Média de caracteres: 1499.9995018431803
Trailer verificado no arquivo : 10037
Quantidade de linhas no arquivo: 10037


In [0]:
# Dataframe com logs de ingestão do blob para o stage
"""df = spark.read.format("delta").load(caminho_tabela_delta_log)

df.display()"""

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-171504429459112>:2[0m
[1;32m      1[0m [38;5;66;03m# Dataframe com logs de ingestão do blob para o stage[39;00m
[0;32m----> 2[0m df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mload(caminho_tabela_delta_log)
[1;32m      4[0m [38;5;66;03m#df_registros = df_registros.sort(col("nome_pasta"), col("nome_arquivo"))[39;00m
[1;32m      5[0m df[38;5;241m.[39mdisplay()

[0;31mNameError[0m: name 'caminho_tabela_delta_log' is not defined

# END CMD

#Multiplicando os arquivos em uma pasta

##Cria o diretório, SE não existir

In [0]:
# Verifica se o diretório existe, e cria se não existir
"""try:
    # Verifica se o diretório existe, e cria se não existir
    dbutils.fs.mkdirs(caminho_destino)
    print("Diretório criado com sucesso.")
except Exception as e:
    print(f"Erro ao criar o diretório: {e}")"""

diretorio = None

try:
    # Tentativa de atribuir o caminho a variável
    diretorio = dbutils.fs.ls(caminho_destino)
except Exception as e:
    pass

if not diretorio:
    dbutils.fs.mkdirs(caminho_destino)
    print("Diretório criado com sucesso.")
else:
    print(f"Erro ao criar o diretório, pois já existe")

Diretório criado com sucesso.


## Lê o arquivo original e realiza as cópias

In [0]:
# Nome do arquivo sem a extensão .txt
nome_arquivo = os.path.basename(arquivo_original)
nome_arquivo_sem_extensao = os.path.splitext(nome_arquivo)[0]

# Itera sobre a quantidade de cópias definida (qtd_copias)
for i in range(qtd_copias):
    # Cria um novo nome para o arquivo replicado
    novo_nome_arquivo = f"{caminho_destino}/{nome_arquivo_sem_extensao}{str(i + 1)}.txt"
    
    # Copia o arquivo original para o diretório definido (caminho destino)
    dbutils.fs.cp(arquivo_original, novo_nome_arquivo)

qtd_arquivos = dbutils.fs.ls(caminho_destino)
print(f"Foram copiados {len(qtd_arquivos)} arquivos para a pasta de destino definida.")

Foram copiados 3 arquivos para a pasta de destino definida.


# 1 - Criação da tabela que receberá os dados de log

##1.1 - Criação da tabela delta, se caso ainda não existir

In [0]:
# "Arquivos" definido como None para evitar o erro FileErrorNotFound quando o arquivo delta não existir ainda
arquivos = None

try:
    # Tentativa de atribuir o caminho a variável
    arquivos = dbutils.fs.ls(caminho_tabela_delta)
except Exception as e:
    print(f"Erro ao tentar acessar o caminho {caminho_tabela_delta} pois ele ainda não existe")

# Se arquivos for None, será criado a pasta e o arquivo Delta no caminho especificado
if not arquivos:
    #print(f"O caminho {caminho_tabela_delta} não existe.")    

    # Crie um DataFrame Delta vazio com a estrutura definida
    df_empty = spark.createDataFrame([], esquema)
    
    # Salve o DataFrame como um arquivo Delta
    df_empty.write.format("delta").mode("overwrite").save(caminho_tabela_delta)

    # Leia o DataFrame Delta vazio
    df_log_empty = spark.read.format("delta").load(caminho_tabela_delta)

    print("Arquivo Delta criado")
    # Exibir o DataFrame Delta vazio
    df_log_empty.show(truncate=False)
else:
    print(f"O caminho {caminho_tabela_delta} já existe.")


O caminho /user/hive/warehouse/deltable_files_log já existe.


## 1.2 - Adicionando os valores distintos da coluna de nome_pasta a uma lista

In [0]:
# Essa lista servirá de filtro, para que seja adicionado apenas novos valores a tabela delta

df = spark.read.format("delta").load(caminho_tabela_delta)

"""
.rdd(): Converte o DataFrame resultante em um RDD.
.flatMap(lambda x: x): Converte cada linha do RDD em uma lista com um único valor.
.collect(): Retorna os valores do RDD em uma lista."""

valores_distintos = df.select("nome_pasta").distinct().rdd.flatMap(lambda x: x).collect()


print(valores_distintos)

['2023-12-20/', '2023-12-22/', '2023-12-21/']


# 2 - Criando um DF com os dados dos arquivos de uma pasta

## 2.1 - Verificando as pastas existentes na pasta raiz

In [0]:
# Cria uma lista com os nomes das pastas

qtd_arquivos_blob_raiz = dbutils.fs.ls(blob_raiz)
nomes_pastas = []
for item in qtd_arquivos_blob_raiz:
    nome_pasta = item.name
    if nome_pasta != "EMPF2ROCO.TXT":
        nomes_pastas.append(nome_pasta) 

# Imprime a lista
print(nomes_pastas)

['2023-12-20/', '2023-12-21/', '2023-12-22/', '2023-12-23/']


## 2.2 - Criação do DF sem registros

In [0]:
# Cria um dataframe vazio
df_registros = spark.createDataFrame([], esquema)

df_registros.show()

+----------+------------+------------------+---------------+
|nome_pasta|nome_arquivo|dt_list_files_blob|caminho_arquivo|
+----------+------------+------------------+---------------+
+----------+------------+------------------+---------------+



## 2.3 - Adicionando somente novos registros no Dataframe

In [0]:
lista = [] #new

# Cria uma lista com os nomes das pastas

qtd_arquivos_blob_raiz = dbutils.fs.ls(blob_raiz)
# Obtém o número de linhas no dataframe
#numero_linhas = df.count()
# Itera sobre a lista de arquivos e pastas
for item in qtd_arquivos_blob_raiz:
    # Verifica se o item é uma pasta
    if item.isDir():
        # Obtém o nome da pasta
        nome_pasta = item.name
        if nome_pasta != "EMPF2ROCO.TXT" and nome_pasta not in valores_distintos:
            # Obtém a lista de arquivos na pasta
            arquivos = dbutils.fs.ls(blob_raiz + "/" + nome_pasta)

            # Adiciona a pasta e os arquivos ao dataframe
            for arquivo in arquivos:
                # Adiciona linha por linha no df_registro
                dt_listagem = obter_datetime_brasilia()
                file_path = f"{blob_raiz}{nome_pasta}{arquivo.name}" 
                new_row = spark.createDataFrame([(nome_pasta, arquivo.name, dt_listagem, file_path)], esquema)
                
                 
                #lista.append(file_path)         
                df_registros = df_registros.union(new_row)
                #df_registros = df_registros.withColumn("File_Path", F.lit(file_path))
                #df_join= df_join.withColumn("ingest_date", lit('2023-12-23T15:51:22.367+0000')).


# Imprime o dataframe
df_registros.display()


# Imprime a lista
#print(nomes_pastas)

nome_pasta,nome_arquivo,dt_list_files_blob,caminho_arquivo
2023-12-23/,EMPF2ROCO1.txt,2023-12-28 11:53:1703764,/FileStore/tables/blob/2023-12-23/EMPF2ROCO1.txt
2023-12-23/,EMPF2ROCO2.txt,2023-12-28 11:53:1703764,/FileStore/tables/blob/2023-12-23/EMPF2ROCO2.txt
2023-12-23/,EMPF2ROCO3.txt,2023-12-28 11:53:1703764,/FileStore/tables/blob/2023-12-23/EMPF2ROCO3.txt


##Teste

In [0]:
# Define uma função que será aplicada a cada linha
data_collect = df_registros.collect()

# looping thorough each row of the dataframe
for row in data_collect:
    # while looping through each
    print(row['caminho_arquivo'])

/FileStore/tables/blob/2023-12-23/EMPF2ROCO1.txt
/FileStore/tables/blob/2023-12-23/EMPF2ROCO2.txt
/FileStore/tables/blob/2023-12-23/EMPF2ROCO3.txt


In [0]:
# Define uma função que será aplicada a cada linha
data_collect = df_registros.collect()

for row in data_collect:
    txt_path = row['caminho_arquivo']
    df = spark.read.text(txt_path)
    #display(df)
    # Atribuição de um id sequencial para as linhas do dataframe
    df_com_linha = df.withColumn("Sequencial", F.monotonically_increasing_id() + 1).withColumn("Qtd_caracteres", len(value))
    
    # Criação do DF com o Header
    df_header = df_com_linha.orderBy("Sequencial",ascending=True).limit(1).drop("Sequencial")
    # Resultado sendo salvo em uma tabela temporária
    df_header.createOrReplaceTempView("Tb_header")
    # display(df_header)
    
    # Gerando um dataframe conforme definições do txt, com os dados do header usando um Select na tabela temporária
    df_header_transform = spark.sql("""
    SELECT
    SUBSTR(value, 0, 1) AS cdTipoRegistroHeader,
    SUBSTR(value, 2, 8) AS dtGeracaoArquivoHeader,
    SUBSTR(value, 10, 6) AS hrGeracaoArquivoHeader,
    SUBSTR(value, 16, 4) AS cdSistemaHeader,
    SUBSTR(value, 20, 8) AS dsProgramaGeradorHeader,
    SUBSTR(value, 28, 8) AS dsVersaoLayoutHeader,
    SUBSTR(value, 36, 1465) AS FillerHeader

    from Tb_header
    """)
    # display(df_header_transform)

    # Criação do DF com o Trailer
    df_trailer = df_com_linha.orderBy("Sequencial",ascending=False).limit(1).drop("Sequencial")
    # Resultado sendo salvo em uma tabela temporária
    df_trailer.createOrReplaceTempView("Tb_trailer")
    # display(df_trailer)
    
    # Gerando um dataframe conforme definições do txt, com os dados do Trailer usando um Select na tabela temporária
    df_trailer_transform = spark.sql("""
    SELECT
    SUBSTR(value, 0, 1) AS cdTipoRegistroTrailer,
    SUBSTR(value, 2, 12) AS qtTotalRegistrosTrailer,
    SUBSTR(value, 20, 1487) AS FillerTrailer

    from Tb_trailer
    """)
    # display(df_trailer_transform)
    
    # Criação do dataframe de Detalhe, na qual consiste nas linhas do txt sem o Header e sem o Trailer
    df_data = df.subtract(df_header).subtract(df_trailer)
    # Resultado sendo salvo em uma tabela temporária
    df_data.createOrReplaceTempView("Tb_data")
    # display(df_data)

    # Gerando um dataframe conforme definições do txt, com os dados do Detalhe usando um Select na tabela temporária
    '''
    try:
        
    except Exception as e:
        df.write delta log
        mover
        processamento = False
        break
    
    df_data_transform = spark.sql("""
                    
    SELECT
    SUBSTR(value, 0, 1) AS cdTipoRegistro,
    SUBSTR(value, 2, 10) AS cdEmpresaOrigem,
    SUBSTR(value, 12, 8) AS cdUnidadeContabilOrigem,
    SUBSTR(value, 20, 10) AS cdEmpresaDestino,
    SUBSTR(value, 30, 8) AS cdUnidadeContabilDestino,
    SUBSTR(value, 38, 30) AS cdEventoNegocio,
    SUBSTR(value, 68, 2) AS cdNaturezaValorOperacao,
    SUBSTR(value, 70, 2) AS cdFormatoEventoNegocio,
    SUBSTR(value, 72, 2) AS cdIndicadorTipoMovimento,
    SUBSTR(value, 74, 8) AS cdProdutoOperacional,
    SUBSTR(value, 82, 8) AS cdProdutoLegado,
    SUBSTR(value, 90, 3) AS cdSubProdutoLegado,
    SUBSTR(value, 93, 4) AS cdSistema,
    SUBSTR(value, 97, 8) AS dtContabilMovimento,
    SUBSTR(value, 105, 9) AS cdCPFCNPJBaseCliente,
    SUBSTR(value, 114, 5) AS cdCPFCNPJFilialCliente,
    SUBSTR(value, 119, 2) AS cdCPFCNPJControleCliente,
    SUBSTR(value, 121, 2) AS cdTipoPessoa,
    SUBSTR(value, 123, 2) AS cdTipoLigada,
    SUBSTR(value, 125, 2) AS cdTipoResidente,
    SUBSTR(value, 127, 9) AS cdRamoAtividade,
    SUBSTR(value, 136, 2) AS cdPorteCliente,
    SUBSTR(value, 138, 30) AS cdNumeroContrato,
    SUBSTR(value, 168, 30) AS cdNumeroSubContrato,
    SUBSTR(value, 198, 2) AS cdGarantia,
    SUBSTR(value, 200, 2) AS cdTipoGarantia,
    SUBSTR(value, 202, 2) AS cdEstagioRisco,
    SUBSTR(value, 204, 2) AS cdCategoria,
    SUBSTR(value, 206, 2) AS cdSituacaoContrato,
    SUBSTR(value, 208, 20) AS cdReferenciaMovimento,
    SUBSTR(value, 228, 120) AS dsHistorico,
    SUBSTR(value, 348, 120) AS dsHistoricoComplementar,
    SUBSTR(value, 468, 8) AS dtTransacao,
    SUBSTR(value, 476, 5) AS cdMoeda,
    SUBSTR(value, 481, 20) AS cdEvento,
    SUBSTR(value, 501, 10) AS cdCentroResultadoOrigem,
    SUBSTR(value, 511, 10) AS cdCentroResultadoDestino,
    SUBSTR(value, 521, 10) AS cdOperacao,
    SUBSTR(value, 531, 10) AS cdParceiro,
    SUBSTR(value, 541, 10) AS cdFilialParceiro,
    SUBSTR(value, 551, 3) AS cdCanalDistribuicao,
    SUBSTR(value, 554, 2) AS cdIndicadorCotacao,
    SUBSTR(value, 556, 20) AS vrTaxaConversao,
    SUBSTR(value, 576, 20) AS vrEventoOrigem,
    SUBSTR(value, 596, 20) AS vrEventoConvertido,
    SUBSTR(value, 616, 2) AS cdSetorAtividadeEconomico,
    SUBSTR(value, 618, 9) AS cdGrupoEconomico,
    SUBSTR(value, 627, 1) AS cdPessoaPoliticamenteExporta,
    SUBSTR(value, 628, 2) AS cdTipoLimiarCliente,
    SUBSTR(value, 630, 2) AS cdTipoConta,
    SUBSTR(value, 632, 2) AS cdSegmentoConta,
    SUBSTR(value, 634, 5) AS cdAgenciaContaCorrente,
    SUBSTR(value, 639, 13) AS nuContaCorrente,
    SUBSTR(value, 652, 2) AS cdDigitoContaCorrente,
    SUBSTR(value, 654, 6) AS cdRegionalContaCorrente,
    SUBSTR(value, 660, 2) AS cdTipoLiquidacao,
    SUBSTR(value, 662, 2) AS cdModalidade,
    SUBSTR(value, 664, 20) AS pcTaxa,
    SUBSTR(value, 684, 2) AS cdPeriodicidadeTaxa,
    SUBSTR(value, 686, 20) AS pcIndexadorPosFixada,
    SUBSTR(value, 706, 5) AS cdIndexadorPosFixa,
    SUBSTR(value, 711, 20) AS pcTaxaComplementar,
    SUBSTR(value, 731, 20) AS pcTaxaEfetiva,
    SUBSTR(value, 751, 5) AS nuParcela,
    SUBSTR(value, 756, 5) AS pzDiasContrato,
    SUBSTR(value, 761, 5) AS qtParcela,
    SUBSTR(value, 766, 4) AS cdLancamentoContabil,
    SUBSTR(value, 770, 8) AS cdServicoSAP,
    SUBSTR(value, 778, 9) AS cdCPFCNPJBaseFontePagadoraSAP,
    SUBSTR(value, 787, 5) AS cdCPFCNPJFilialFontePagadoraSAP,
    SUBSTR(value, 792, 2) AS cdCPFCNPJControleFontePagadoraSAP,
    SUBSTR(value, 794, 1) AS cdRiscoOperacional,
    SUBSTR(value, 795, 2) AS cdTipoRiscoOperacional,
    SUBSTR(value, 797, 4) AS cdCarteira,
    SUBSTR(value, 801, 20) AS Reserva01,
    SUBSTR(value, 821, 20) AS Reserva02,
    SUBSTR(value, 841, 20) AS Reserva03,
    SUBSTR(value, 861, 20) AS Reserva04,
    SUBSTR(value, 881, 20) AS Reserva05,
    SUBSTR(value, 901, 20) AS Reserva06,
    SUBSTR(value, 921, 20) AS Reserva07,
    SUBSTR(value, 941, 20) AS Reserva08,
    SUBSTR(value, 961, 20) AS Reserva09,
    SUBSTR(value, 981, 20) AS Reserva10,
    SUBSTR(value, 1001, 20) AS Reserva11,
    SUBSTR(value, 1021, 20) AS Reserva12,
    SUBSTR(value, 1041, 20) AS Reserva13,
    SUBSTR(value, 1061, 20) AS Reserva14,
    SUBSTR(value, 1081, 20) AS Reserva15,
    SUBSTR(value, 1101, 20) AS Reserva16,
    SUBSTR(value, 1121, 20) AS Reserva17,
    SUBSTR(value, 1141, 20) AS Reserva18,
    SUBSTR(value, 1161, 20) AS Reserva19,
    SUBSTR(value, 1181, 20) AS Reserva20,
    SUBSTR(value, 1201, 312) AS Filler


    FROM Tb_data
    ORDER BY cdEmpresaOrigem

    """)'''

df_detalhe_transform = spark.sql("""
    SELECT
    SUBSTR(value, 1, 1) AS cdTipoRegistro,
    SUBSTR(value, 2, 10) AS cdEmpresaOrigemMESU,
    SUBSTR(value, 12, 10) AS cdEmpresaOrigemUORG,
    SUBSTR(value, 22, 8) AS cdUnidadeContabilDebitoMESU,
    SUBSTR(value, 30, 8) AS cdUnidadeContabilDebitoUORG,
    SUBSTR(value, 38, 10) AS cdEmpresaDestinoMESU,
    SUBSTR(value, 48, 10) AS cdEmpresaDestinoUORG,
    SUBSTR(value, 58, 8) AS cdUnidadeContabilCreditoMESU,
    SUBSTR(value, 66, 8) AS cdUnidadeContabilCreditoUORG,
    SUBSTR(value, 74, 30) AS cdEventoNegocio,
    SUBSTR(value, 104, 2) AS cdAtributo,
    SUBSTR(value, 106, 2) AS cdFormatoEventoNegocio,
    SUBSTR(value, 108, 2) AS cdIndicadorTipoMovimento,
    SUBSTR(value, 110, 8) AS cdProdutoOperacional,
    SUBSTR(value, 118, 8) AS cdProduto,
    SUBSTR(value, 126, 3) AS cdSubProduto,
    SUBSTR(value, 129, 4) AS cdRotina,
    SUBSTR(value, 133, 8) AS dtContabilMovimento,
    SUBSTR(value, 141, 9) AS cdCPFCNPJBaseCliente,
    SUBSTR(value, 150, 5) AS cdCPFCNPJFilialCliente,
    SUBSTR(value, 155, 2) AS cdCPFCNPJControleCliente,
    SUBSTR(value, 157, 2) AS cdTipoPessoa,
    SUBSTR(value, 159, 2) AS cdTipoLigada,
    SUBSTR(value, 161, 2) AS cdTipoResidente,
    SUBSTR(value, 163, 9) AS cdRamoAtividade,
    SUBSTR(value, 172, 2) AS cdPorteCliente,
    SUBSTR(value, 174, 30) AS cdNumeroContrato,
    SUBSTR(value, 204, 30) AS cdNumeroSubContrato,
    SUBSTR(value, 234, 2) AS cdGarantia,
    SUBSTR(value, 236, 2) AS cdTipoGarantia,
    SUBSTR(value, 238, 2) AS cdEstagioRisco,
    SUBSTR(value, 240, 2) AS cdCategoria,
    SUBSTR(value, 242, 2) AS cdStatusContrato,
    SUBSTR(value, 244, 20) AS cdReferenciaMovimento,
    SUBSTR(value, 264, 120) AS dsHistorico,
    SUBSTR(value, 384, 120) AS dsHistoricoComplementar,
    SUBSTR(value, 504, 8) AS dtTransacao,
    SUBSTR(value, 512, 5) AS cdMoeda,
    SUBSTR(value, 517, 20) AS cdEvento,
    SUBSTR(value, 537, 10) AS cdCentroResultadoOrigem,
    SUBSTR(value, 547, 10) AS cdCentroResultadoDestino,
    SUBSTR(value, 557, 10) AS cdOperacao,
    SUBSTR(value, 567, 10) AS cdParceiro,
    SUBSTR(value, 577, 10) AS cdFilialParceiro,
    SUBSTR(value, 587, 3) AS cdCanalDistribuicao,
    SUBSTR(value, 590, 2) AS cdIndicadorCotacao,
    SUBSTR(value, 592, 20) AS vrTaxaConversao,
    SUBSTR(value, 612, 20) AS vrEventoOrigem,
    SUBSTR(value, 632, 20) AS vrEventoConvertido,
    SUBSTR(value, 652, 2) AS cdSetorAtividadeEconomico,
    SUBSTR(value, 654, 9) AS cdGrupoEconomico,
    SUBSTR(value, 663, 1) AS cdPessoaPoliticamenteExporta,
    SUBSTR(value, 664, 2) AS cdTipoLimiarCliente,
    SUBSTR(value, 666, 2) AS cdTipoConta,
    SUBSTR(value, 668, 2) AS cdSegmentoConta,
    SUBSTR(value, 670, 5) AS cdAgenciaContaCorrente,
    SUBSTR(value, 675, 13) AS nuContaCorrente,
    SUBSTR(value, 688, 2) AS cdDigitoContaCorrente,
    SUBSTR(value, 690, 6) AS cdRegionalContaCorrente,
    SUBSTR(value, 696, 2) AS cdTipoLiquidacao,
    SUBSTR(value, 698, 2) AS cdModalidade,
    SUBSTR(value, 700, 20) AS pcTaxa,
    SUBSTR(value, 720, 2) AS cdPeriodicidadeTaxa,
    SUBSTR(value, 722, 20) AS pcIndexadorPosFixada,
    SUBSTR(value, 742, 5) AS cdIndexadorPosFixa,
    SUBSTR(value, 747, 20) AS pcTaxa2,
    SUBSTR(value, 767, 20) AS pcTaxaEfetiva,
    SUBSTR(value, 787, 5) AS nuParcela,
    SUBSTR(value, 792, 5) AS pzDiasContrato,
    SUBSTR(value, 797, 5) AS qtParcela,
    SUBSTR(value, 802, 4) AS cdLancamentoContabilDebito,
    SUBSTR(value, 806, 4) AS cdLancamentoContabilCredito,
    SUBSTR(value, 810, 7) AS dsNumeroDocumento,
    SUBSTR(value, 817, 8) AS cdServicoSAP,
    SUBSTR(value, 825, 9) AS cdCPFCNPJBaseFontePagadoraSAP,
    SUBSTR(value, 834, 5) AS cdCPFCNPJFilialFontePagadoraSAP,
    SUBSTR(value, 839, 2) AS cdCPFCNPJControleFontePagadoraSAP,
    SUBSTR(value, 841, 1) AS cdRiscoOperacional,
    SUBSTR(value, 842, 2) AS cdTipoRiscoOperacional,
    SUBSTR(value, 844, 3) AS cdCarteira,
    SUBSTR(value, 847, 4) AS cdStatusTransacao,
    SUBSTR(value, 851, 20) AS Reserva01,
    SUBSTR(value, 871, 20) AS Reserva02,
    SUBSTR(value, 891, 20) AS Reserva03,
    SUBSTR(value, 911, 20) AS Reserva04,
    SUBSTR(value, 931, 20) AS Reserva05,
    SUBSTR(value, 951, 20) AS Reserva06,
    SUBSTR(value, 971, 20) AS Reserva07,
    SUBSTR(value, 991, 20) AS Reserva08,
    SUBSTR(value, 1011, 20) AS Reserva09,
    SUBSTR(value, 1031, 20) AS Reserva10,
    SUBSTR(value, 1051, 20) AS Reserva11,
    SUBSTR(value, 1071, 20) AS Reserva12,
    SUBSTR(value, 1091, 20) AS Reserva13,
    SUBSTR(value, 1111, 20) AS Reserva14,
    SUBSTR(value, 1131, 20) AS Reserva15,
    SUBSTR(value, 1151, 20) AS Reserva16,
    SUBSTR(value, 1171, 20) AS Reserva17,
    SUBSTR(value, 1191, 20) AS Reserva18,
    SUBSTR(value, 1211, 20) AS Reserva19,
    SUBSTR(value, 1251, 250) AS Filler

    
    
    FROM Tb_detalhe
    
    """)

    # display(df_detalhe_transform)

    # display(df_data_transform)
    df_join = df_data_transform.join(df_header_transform).join(df_trailer_transform)
    # df_join= df_join.withColumn("ingest_date", lit(datetime.now(timezone.utc) - timedelta(hours=3)))
    df_join= df_join.withColumn("ingest_date", lit('2023-12-23T15:51:22.367+0000')).withColumn("nome_arquivo", lit(row['nome_arquivo']))
    #df_join= df_join.withColumn("nome_arquivo", row['nome_arquivo'])
    df_join.display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2822265887260690>:171[0m
[1;32m     54[0m     [38;5;66;03m# display(df_data)[39;00m
[1;32m     55[0m 
[1;32m     56[0m     [38;5;66;03m# Gerando um dataframe conforme definições do txt, com os dados do Detalhe usando um Select na tabela temporária[39;00m
[1;32m     57[0m     [38;5;124;03m'''[39;00m
[1;32m     58[0m [38;5;124;03m    try:[39;00m
[1;32m     59[0m [38;5;124;03m        [39;00m
[0;32m   (...)[0m
[1;32m    168[0m 
[1;32m    169[0m [38;5;124;03m    """)'''[39;00m
[0;32m--> 171[0m df_detalhe_transform [38;5;241m=[39m spark[38;5;241m.[39msql([38;5;124m"""[39m
[1;32m    172[0m [38;5;124m    SELECT[39m
[1;32m    173[0m [38;5;124m    SUBSTR(value, 1, 1) AS cdTipoRegistro,[39m
[1;32m    174[0m [38;5;124m    SUBSTR(value, 2, 10) AS cdEmpresaOr

In [0]:
#df_ControleRecepcaoArquivo = df_join.select('cdSistemaHeader') # Existe o cdSistema, mas está null
df = df_join.select('cdSistemaHeader', "nome_arquivo", "dsProgramaGeradorHeader", "dsVersaoLayoutHeader", "qtTotalRegistrosTrailer", "ingest_date") # Usando o ingest_date para alimentar a dtGeracaoArquivoOrigem e hrGeracaoArquivoOrigem

# Uso do regex para retirar os 0 a esquerda da coluna qtTotalRegistrosTrailer
df = df.withColumn("qtTotalRegistro", F.regexp_replace("qtTotalRegistrosTrailer", "^0+", "")).drop("qtTotalRegistrosTrailer")

df = df.withColumnRenamed("cdSistemaHeader", "cdSistema").withColumnRenamed("nome_arquivo", "nmArquivoExterno").withColumnRenamed("dsProgramaGeradorHeader", "dsProgramaGerador").withColumnRenamed("dsVersaoLayoutHeader", "dsVersaoLayout")

# Suponha que você tenha um DataFrame chamado df com uma coluna "ingest_data"
#df = df.withColumn("dtGeracaoArquivoOrigem", F.to_date("ingest_date")).withColumn("hrGeracaoArquivoOrigem", F.to_timestamp("ingest_date")).drop("ingest_date")

# Suponha que você tenha um DataFrame chamado df com uma coluna "ingest_data"
df = df.withColumn("dtGeracaoArquivoOrigem", F.split("ingest_date", "T")[0]).withColumn("hrGeracaoArquivoOrigem", F.split("ingest_date", "T")[1]).drop("ingest_date")

df = df.withColumn('dtGravacaoArquivo', lit(datetime.utcnow()))

df_ControleRecepcaoArquivo = df.select('cdSistema', 'nmArquivoExterno', 'dtGeracaoArquivoOrigem', 'hrGeracaoArquivoOrigem', 'dtGravacaoArquivo', 'dsProgramaGerador', 'dsVersaoLayout', 'qtTotalRegistro')

df_ControleRecepcaoArquivo.display()

cdSistema,nmArquivoExterno,dtGeracaoArquivoOrigem,hrGeracaoArquivoOrigem,dtGravacaoArquivo,dsProgramaGerador,dsVersaoLayout,qtTotalRegistro
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037
EMPF,EMPF2ROCO9.txt,2023-12-23,15:51:22.367+0000,2023-12-26T19:34:33.097+0000,EMPF1C40,ROCOV001,10037


In [0]:
#%run "/Users/fellipemarlus@hotmail.com/DEV - Cap 00/001-stage_to_bronze"

# 3 - Salvando/adicionando os dados em uma tabela

## 3.1 - Os dados serão adicionados em uma tabela. No momento será usado uma Delta Table

In [0]:
df_registros.write.format("delta").mode("append").save(caminho_tabela_delta)


df = spark.read.format("delta").load(caminho_tabela_delta)

df_registros = df_registros.sort(col("nome_pasta"), col("nome_arquivo"))
df.display()

nome_pasta,nome_arquivo,dt_list_files_blob
2023-12-20/,EMPF2ROCO010.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO011.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO012.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO013.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO014.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO015.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO016.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO017.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO018.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO019.txt,2023-12-23 14:28:1703341


#END CMD 2

In [0]:
files = dbutils.fs.ls(blob_raiz)
print(files)

[FileInfo(path='dbfs:/FileStore/tables/blob/2023-12-20/', name='2023-12-20/', size=0, modificationTime=0), FileInfo(path='dbfs:/FileStore/tables/blob/2023-12-21/', name='2023-12-21/', size=0, modificationTime=0), FileInfo(path='dbfs:/FileStore/tables/blob/2023-12-22/', name='2023-12-22/', size=0, modificationTime=0), FileInfo(path='dbfs:/FileStore/tables/blob/EMPF2ROCO.TXT', name='EMPF2ROCO.TXT', size=15075569, modificationTime=1703276916000)]


In [0]:
df = spark.read \
  .format("delta") \
  .load(caminho_tabela_delta) \
  .select("_metadata.file_name", "_metadata.file_size", "_metadata.file_modification_time")

display(df)

'''
Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
'''

file_name,file_size,file_modification_time
part-00023-d5e52792-1080-4cf5-aa97-036d24838ec1-c000.snappy.parquet,1414,2023-12-23T17:28:58.000+0000
part-00031-8fdcbf6b-a79e-4fb7-9803-57d4940b36a3-c000.snappy.parquet,1414,2023-12-23T17:28:59.000+0000
part-00039-1d632e46-4bee-47e7-9154-8c8b0988380a-c000.snappy.parquet,1414,2023-12-23T17:28:59.000+0000
part-00047-3152b6d5-1bf2-4346-90b5-801529679e1a-c000.snappy.parquet,1414,2023-12-23T17:28:59.000+0000
part-00055-475c3eb2-8bb3-477c-9f18-bcd1c7a492ce-c000.snappy.parquet,1414,2023-12-23T17:29:00.000+0000
part-00063-3f838aaf-d4b3-4330-88a1-b7b83f8e97e5-c000.snappy.parquet,1414,2023-12-23T17:29:00.000+0000
part-00071-76e8f9e8-9526-4012-b58d-f33175c7a8ea-c000.snappy.parquet,1414,2023-12-23T17:29:01.000+0000
part-00079-c0d859ad-79a0-443b-be67-d84b28d1a3b3-c000.snappy.parquet,1414,2023-12-23T17:29:01.000+0000
part-00087-39c6c2f8-6280-461a-8ecf-b4fcbb1b8ee1-c000.snappy.parquet,1414,2023-12-23T17:29:02.000+0000
part-00095-7613330d-b817-4d06-bcf9-62e5ffa39f38-c000.snappy.parquet,1414,2023-12-23T17:29:02.000+0000




In [0]:
df = spark.read \
  .format("delta") \
  .load(caminho_tabela_delta)

df.display()

nome_pasta,nome_arquivo,dt_list_files_blob
2023-12-20/,EMPF2ROCO010.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO011.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO012.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO013.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO014.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO015.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO016.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO017.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO018.txt,2023-12-23 14:28:1703341
2023-12-20/,EMPF2ROCO019.txt,2023-12-23 14:28:1703341


In [0]:
ingestao.display()

nome_pasta,nome_arquivo,dt_ingest_blob_to_stage,caminho_arquivo_stage
2024-01-10/,EMPF2ROCOerrado.TXT,2024-01-11 13:31:1704979,/FileStore/tables/stage_roco/input/2024-01-10/EMPF2ROCOerrado.TXT


In [0]:
df = 