# Preparação e Estruturação dos Dados Brutos (Camada RAW)

> Este notebook é responsável por ler os dados coletados na camada Landing (LND)

## Configuração e Importações do pyspark e delta lake

In [1]:
from typing import Optional, Union, List
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import functions as f
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable

In [2]:
builder: SparkSession.Builder = SparkSession.builder \
    .appName("Preparação RAW de Motivos") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g")

In [3]:
spark: SparkSession = configure_spark_with_delta_pip(builder).getOrCreate()

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/09 20:27:54 WARN Utils: Your hostname, wilcb, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/09 20:27:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/wilcb/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/wilcb/.ivy2.5.2/cache
The jars for the packages stored in: /home/wilcb/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5d326166-d3b5-445d-9b84-31f288c70e5b;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 259ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-spark_2.13;4.0.0 from central in [de

## Funções Auxiliares

> Ler arquivo CSV, evitando repetições de código

In [4]:
def ler_csv(
        path: str,
        encoding: str,
        schema: Optional[StructType] = None,
        header: bool = False,
        sep: str = ";",
        quote: str = '"'
) -> DataFrame:
    """
    Lê um arquivo CSV usando PySpark com esquema definido e opções customizáveis.

    Esta função lê um arquivo CSV a partir do caminho especificado, aplicando o esquema
    fornecido para garantir tipos corretos nas colunas. Permite configurar o encoding,
    se o arquivo possui cabeçalho, o separador de campo e o caractere de aspas.

    Args:
        path (str): Caminho do arquivo CSV a ser lido.
        schema (StructType): Esquema (schema) PySpark para aplicar ao DataFrame.
        encoding (str): Codificação do arquivo CSV, ex: "UTF-8", "ISO-8859-1", etc.
        header (bool, optional): Indica se o arquivo possui linha de cabeçalho com nomes das colunas.
            Padrão é False.
        sep (str, optional): Caractere separador dos campos no CSV. Padrão é ponto e vírgula ";".
        quote (str, optional): Caractere usado para delimitar campos com texto. Padrão é aspas duplas '"'.

    Returns:
        DataFrame: DataFrame do Spark contendo os dados lidos do CSV conforme o esquema informado.

    """
    try:
        # Inicializa a leitura do arquivo CSV com o formato adequado
        reader = spark.read.format("csv") \
            .option("header", header) \
            .option("sep", sep) \
            .option("quote", quote) \
            .option("encoding", encoding)

        # Se foi passado schema explícito, aplica o schema fixo para o DataFrame
        if schema:
            reader = reader.schema(schema)
        else:
            # Se não, ativa inferência automática de esquema pelo Spark
            reader = reader.option("inferSchema", True)

        # Carrega o arquivo CSV do caminho informado com as opções configuradas
        df = reader.load(path)

        # Retorna o DataFrame carregado
        return df

    # Captura qualquer erro na leitura, imprime a mensagem e pode ser melhorado para levantar erro
    except Exception as e:
        print(f"[ERRO] Falha na leitura do CSV: {e}")
        raise

> Gravar arquivo .parquet na camada RAW

In [5]:
def gravar_delta(
    df: DataFrame,
    path: str,
    mode: str = "overwrite",
    partitionBy: Optional[Union[str, List[str]]] = None
) -> DataFrame:
    """
    Grava um DataFrame no formato Delta Lake em um caminho especificado.

    Salva o DataFrame no formato Delta, permitindo definir o modo de gravação e colunas
    para particionamento dos dados. Útil para organizar dados grandes em diretórios particionados.

    Args:
        df (DataFrame): DataFrame do Spark que será salvo.
        path (str): Caminho do diretório onde os dados serão gravados.
        mode (str, optional): Modo de gravação, como 'overwrite' (sobrescrever), 'append' (acrescentar),
            entre outros suportados pelo Spark. Default é 'overwrite'.
        partitionBy (str ou List[str], optional): Coluna ou lista de colunas para particionamento dos dados.
            Se None, não realiza particionamento. Default é None.

    Returns:
        DataFrame: Retorna o mesmo DataFrame recebido como argumento para facilitar encadeamento.
    """
    try:
        # Cria um escritor de DataFrame para o formato Delta
        writer = df.write.format("delta").mode(mode)

        # Se foram informadas colunas para particionamento, ajusta o escritor para particionar
        if partitionBy:

            # Se for uma string só, transforma em lista para unpack
            if isinstance(partitionBy, str):
                partitionBy = [partitionBy]

            # Aplica o particionamento passando todas as colunas indicadas
            writer = writer.partitionBy(*partitionBy)

        # Salva o DataFrame no caminho especificado no formato Delta
        writer.save(path)

        # Mensagem de sucesso na gravação
        print("Gravação efetuada!")

        # Retorna o DataFrame original para permitir encadeamento de operações, se desejar
        return df

    except Exception as e:
        # Captura qualquer erro durante a gravação, imprime mensagem de erro
        print(f"[ERRO] Falha ao gravar Delta no caminho '{path}': {e}")
        raise

## RAW - Dados Motivos

In [6]:
# Caminho para o arquivo CSV na camada
path_csv: str = "../../LND/motivos/2025-12/"

Definição do schema

In [7]:
schema: StructType = StructType([
    StructField("codigo_motivo", StringType(), True),
    StructField("descricao_motivo", StringType(), True),
]) 

Leitura do arquivo CSV

In [8]:
try:
    df = ler_csv(path_csv, "ISO-8859-1", schema=schema)
    df.show(truncate=False)
    print(df.count())
except Exception as e:
    print("[ERRO] Falha na leitura")
    raise

+-------------+-------------------------------------------------+
|codigo_motivo|descricao_motivo                                 |
+-------------+-------------------------------------------------+
|00           |SEM MOTIVO                                       |
|01           |EXTINCAO POR ENCERRAMENTO LIQUIDACAO VOLUNTARIA  |
|02           |INCORPORACAO                                     |
|03           |FUSAO                                            |
|04           |CISAO TOTAL                                      |
|05           |ENCERRAMENTO DA FALENCIA                         |
|06           |ENCERRAMENTO DA LIQUIDACAO                       |
|07           |ELEVACAO A MATRIZ                                |
|08           |TRANSPASSE                                       |
|09           |NAO INICIO DE ATIVIDADE                          |
|10           |EXTINCAO PELO ENCERRAMENTO DA LIQUIDACAO JUDICIAL|
|11           |ANULACAO POR MULTICIPLIDADE                      |
|12       

Inclusão da coluna data_ingestao para controle

In [9]:
df = df.withColumn(
    "data_ingestao",
    f.current_timestamp()
)

df.show(truncate=False)

+-------------+-------------------------------------------------+-------------------------+
|codigo_motivo|descricao_motivo                                 |data_ingestao            |
+-------------+-------------------------------------------------+-------------------------+
|00           |SEM MOTIVO                                       |2026-01-09 20:28:04.06679|
|01           |EXTINCAO POR ENCERRAMENTO LIQUIDACAO VOLUNTARIA  |2026-01-09 20:28:04.06679|
|02           |INCORPORACAO                                     |2026-01-09 20:28:04.06679|
|03           |FUSAO                                            |2026-01-09 20:28:04.06679|
|04           |CISAO TOTAL                                      |2026-01-09 20:28:04.06679|
|05           |ENCERRAMENTO DA FALENCIA                         |2026-01-09 20:28:04.06679|
|06           |ENCERRAMENTO DA LIQUIDACAO                       |2026-01-09 20:28:04.06679|
|07           |ELEVACAO A MATRIZ                                |2026-01-09 20:2

Gravar dados com delta

In [10]:
try:
    gravar_delta(df, "../../RAW/motivos/2025-12")
except Exception as e:
    print("[ERRO] Falha ao tentar gravar dados.")
    raise

26/01/09 20:28:07 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Gravação efetuada!


In [11]:
dt = DeltaTable.forPath(spark, "../../RAW/motivos/2025-12")

In [12]:
df = dt.toDF()
df.show()

                                                                                

+-------------+--------------------+--------------------+
|codigo_motivo|    descricao_motivo|       data_ingestao|
+-------------+--------------------+--------------------+
|           00|          SEM MOTIVO|2026-01-09 20:28:...|
|           01|EXTINCAO POR ENCE...|2026-01-09 20:28:...|
|           02|        INCORPORACAO|2026-01-09 20:28:...|
|           03|               FUSAO|2026-01-09 20:28:...|
|           04|         CISAO TOTAL|2026-01-09 20:28:...|
|           05|ENCERRAMENTO DA F...|2026-01-09 20:28:...|
|           06|ENCERRAMENTO DA L...|2026-01-09 20:28:...|
|           07|   ELEVACAO A MATRIZ|2026-01-09 20:28:...|
|           08|          TRANSPASSE|2026-01-09 20:28:...|
|           09|NAO INICIO DE ATI...|2026-01-09 20:28:...|
|           10|EXTINCAO PELO ENC...|2026-01-09 20:28:...|
|           11|ANULACAO POR MULT...|2026-01-09 20:28:...|
|           12|ANULACAO ONLINE D...|2026-01-09 20:28:...|
|           13|     OMISSA CONTUMAZ|2026-01-09 20:28:...|
|           14

## Fechar sessão Spark

In [13]:
spark.stop()