**Setup e caminhos (primeira célula do ETL)**

In [6]:
from pyspark.sql import functions as F

# Caminhos no HDFS – devem bater com o que você usou na ingestão
HDFS_BRONZE_PATH = "hdfs:///datalake/bronze/processos_cejusc"
HDFS_SILVER_PATH = "hdfs:///datalake/silver/processos_cejusc"
HDFS_GOLD_PATH   = "hdfs:///datalake/gold/processos_metricas"

print("BRONZE:", HDFS_BRONZE_PATH)
print("SILVER:", HDFS_SILVER_PATH)
print("GOLD:",   HDFS_GOLD_PATH)

BRONZE: hdfs:///datalake/bronze/processos_cejusc
SILVER: hdfs:///datalake/silver/processos_cejusc
GOLD: hdfs:///datalake/gold/processos_metricas


In [2]:
!hdfs dfs -ls /datalake
!hdfs dfs -ls /datalake/bronze

Found 3 items
drwxr-xr-x   - root supergroup          0 2025-12-07 22:44 /datalake/bronze
drwxr-xr-x   - root supergroup          0 2025-12-07 22:43 /datalake/gold
drwxr-xr-x   - root supergroup          0 2025-12-07 22:43 /datalake/silver
Found 1 items
drwxr-xr-x   - root supergroup          0 2025-12-07 22:44 /datalake/bronze/processos_cejusc


**Ler BRONZE do HDFS**

In [3]:
df_bronze = spark.read.parquet(HDFS_BRONZE_PATH)

df_bronze.printSchema()
df_bronze.show(5, truncate=False)
print("TOTAL BRONZE:", df_bronze.count())

root
 |-- numero_processo: string (nullable = true)
 |-- comarca: string (nullable = true)
 |-- serventia: string (nullable = true)
 |-- assunto: string (nullable = true)
 |-- classe: string (nullable = true)
 |-- polo_ativo: string (nullable = true)
 |-- polo_passivo: string (nullable = true)
 |-- cpf_cnpj_polo_ativo: string (nullable = true)
 |-- cpf_cnpj_polo_passivo: string (nullable = true)
 |-- inteiro_teor: string (nullable = true)
 |-- codg_cnj_audi: double (nullable = true)
 |-- codg_cnj_julgamento: double (nullable = true)
 |-- classificacao: string (nullable = true)

+-------------------------+---------------------+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------

TOTAL BRONZE: 16583


**Reexecutar as células do ETL**

In [2]:
# 1) Conferir que o Spark voltou
spark
spark.range(5).show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



**Caminhos e imports**

In [3]:
from pyspark.sql import functions as F

HDFS_BRONZE_PATH = "hdfs:///datalake/bronze/processos_cejusc"

df_bronze = spark.read.parquet(HDFS_BRONZE_PATH)
df_bronze.show(5, truncate=False)

+-------------------------+---------------------+------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------+-------------------------------+-------------------+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
# Limpeza e normalização básica
df_silver = (
    df_bronze
      # remover linhas essenciais vazias
      .dropna(subset=["numero_processo", "comarca", "classificacao", "inteiro_teor"])
      # deduplicar por processo
      .dropDuplicates(["numero_processo"])
      # normalizar textos principais
      .withColumn("comarca", F.upper(F.trim(F.col("comarca"))))
      .withColumn("classificacao", F.upper(F.trim(F.col("classificacao"))))
      # garantir que inteiro_teor é string
      .withColumn("inteiro_teor", F.col("inteiro_teor").cast("string"))
)

# Enriquecimento – tamanho do texto
df_silver = df_silver.withColumn(
    "tamanho_texto",
    F.length(F.col("inteiro_teor"))
)

df_silver.select(
    "numero_processo", "comarca", "classificacao", "tamanho_texto"
).show(5, truncate=False)

print("TOTAL SILVER:", df_silver.count())

+-------------------------+--------------+-------------+-------------+
|numero_processo          |comarca       |classificacao|tamanho_texto|
+-------------------------+--------------+-------------+-------------+
|5443016.29.2024.8.09.0051|GOIÂNIA       |INFRUTIFERO  |499          |
|5447239.44.2024.8.09.0174|SENADOR CANEDO|INFRUTIFERO  |6723         |
|5547168.17.2024.8.09.0088|ITUMBIARA     |FRUTIFERO    |3605         |
|5555765.07.2024.8.09.0045|FORMOSA       |INFRUTIFERO  |2311         |
|5564404.21.2024.8.09.0075|IPAMERI       |INFRUTIFERO  |396          |
+-------------------------+--------------+-------------+-------------+
only showing top 5 rows

TOTAL SILVER: 16583


**Salvar SILVER no HDFS**

In [7]:
(
    df_silver
      .write
      .mode("overwrite")
      .parquet(HDFS_SILVER_PATH)
)

!hdfs dfs -ls /datalake/silver
!hdfs dfs -ls /datalake/silver/processos_cejusc

Found 1 items
drwxr-xr-x   - root supergroup          0 2025-12-07 22:55 /datalake/silver/processos_cejusc
Found 201 items
-rw-r--r--   2 root supergroup          0 2025-12-07 22:55 /datalake/silver/processos_cejusc/_SUCCESS
-rw-r--r--   2 root supergroup     280824 2025-12-07 22:55 /datalake/silver/processos_cejusc/part-00000-64242425-d6b3-4663-a264-b1870c4a6651-c000.snappy.parquet
-rw-r--r--   2 root supergroup     323079 2025-12-07 22:55 /datalake/silver/processos_cejusc/part-00001-64242425-d6b3-4663-a264-b1870c4a6651-c000.snappy.parquet
-rw-r--r--   2 root supergroup     383861 2025-12-07 22:55 /datalake/silver/processos_cejusc/part-00002-64242425-d6b3-4663-a264-b1870c4a6651-c000.snappy.parquet
-rw-r--r--   2 root supergroup     405396 2025-12-07 22:55 /datalake/silver/processos_cejusc/part-00003-64242425-d6b3-4663-a264-b1870c4a6651-c000.snappy.parquet
-rw-r--r--   2 root supergroup     298421 2025-12-07 22:55 /datalake/silver/processos_cejusc/part-00004-64242425-d6b3-4663-a264-b18

**Criar camada GOLD (métricas agregadas)**

In [8]:
df_gold = (
    df_silver
      .groupBy("comarca", "classificacao")
      .agg(
          F.count("*").alias("qtd_processos"),
          F.avg("tamanho_texto").alias("tamanho_medio_texto"),
          F.min("tamanho_texto").alias("tamanho_min_texto"),
          F.max("tamanho_texto").alias("tamanho_max_texto")
      )
      .orderBy("comarca", "classificacao")
)

df_gold.show(20, truncate=False)
print("TOTAL LINHAS GOLD:", df_gold.count())

+---------------------+-------------+-------------+-------------------+-----------------+-----------------+
|comarca              |classificacao|qtd_processos|tamanho_medio_texto|tamanho_min_texto|tamanho_max_texto|
+---------------------+-------------+-------------+-------------------+-----------------+-----------------+
|ABADIÂNIA            |FRUTIFERO    |24           |3710.1666666666665 |0                |10842            |
|ABADIÂNIA            |INFRUTIFERO  |30           |4259.8             |0                |19079            |
|ACREÚNA              |FRUTIFERO    |55           |4244.490909090909  |34               |24048            |
|ACREÚNA              |INFRUTIFERO  |10           |3746.1             |160              |10552            |
|ALEXÂNIA             |FRUTIFERO    |61           |6049.950819672131  |23               |29613            |
|ALEXÂNIA             |INFRUTIFERO  |39           |5895.897435897436  |26               |14605            |
|ALTO PARAÍSO DE GOIÁS|FRUTI

**Salvar GOLD no HDFS**

In [9]:
(
    df_gold
      .coalesce(1)  # aqui está sendo opcional: juntamos em poucos arquivos
      .write
      .mode("overwrite")
      .parquet(HDFS_GOLD_PATH)
)

!hdfs dfs -ls /datalake/gold
!hdfs dfs -ls /datalake/gold/processos_metricas

Found 1 items
drwxr-xr-x   - root supergroup          0 2025-12-07 22:57 /datalake/gold/processos_metricas
Found 2 items
-rw-r--r--   2 root supergroup          0 2025-12-07 22:57 /datalake/gold/processos_metricas/_SUCCESS
-rw-r--r--   2 root supergroup       7094 2025-12-07 22:57 /datalake/gold/processos_metricas/part-00000-21a2447b-8796-438c-aeed-32f4297b2420-c000.snappy.parquet


**Exportar GOLD para o MongoDB**

In [12]:
# Instalar pymongo 
!pip install -q pymongo

In [14]:
from pymongo import MongoClient

# Config do Mongo do docker-bigdata
MONGO_USER = "root"
MONGO_PASS = "root"
MONGO_HOST = "mongo"      # nome do container, como você já estava usando
MONGO_PORT = 27017
MONGO_AUTH_DB = "admin"   # banco de autenticação padrão do stack

uri = (
    f"mongodb://{MONGO_USER}:{MONGO_PASS}"
    f"@{MONGO_HOST}:{MONGO_PORT}/?authSource={MONGO_AUTH_DB}"
)

client = MongoClient(uri)

db = client["pmd"]                     # será criado automaticamente se não existir
coll = db["processos_metricas"]

# Limpar coleção antes de inserir de novo
coll.delete_many({})

# Converter DF GOLD para documentos
pdf_gold = df_gold.toPandas()
docs = pdf_gold.to_dict(orient="records")

print("Inserindo documentos no Mongo:", len(docs))
coll.insert_many(docs)

print("Total na coleção:", coll.count_documents({}))

Inserindo documentos no Mongo: 233
Total na coleção: 233
