In [1]:
from tqdm import tqdm
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, date_format, to_date, create_map, lit, when, lower, regexp_replace, substring, concat_ws
from itertools import chain
import os
from dotenv import load_dotenv
load_dotenv()

from datetime import datetime
def log(msg):
    tqdm.write(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] {msg}")

In [2]:
# 📌 Bibliotecas Padrão do Python
import logging
import warnings
import itertools
from datetime import datetime
from typing import Literal

# 📌 Manipulação de Dados
import pandas as pd
import numpy as np

# 📌 Visualização
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go

from datetime import datetime



# 📌 Barras de Progresso
from tqdm import tqdm

# 📌 Configurações Globais para Melhor Exibição dos Dados
warnings.simplefilter(action="ignore", category=UserWarning)  # Ignorar avisos gerais do usuário
warnings.simplefilter(action="ignore", category=FutureWarning)  # Ignorar avisos de futuras mudanças

# Exibição de ponto flutuante sem notação científica
pd.options.display.float_format = "{:.2f}".format
# Configuração do número máximo de colunas e linhas exibidas
pd.set_option("display.max_columns", 500)
pd.set_option("display.max_rows", 65)

# Configuração do backend de gráficos
pd.options.plotting.backend = "plotly"
pd.options.display.colheader_justify = "center"

In [3]:
from pyspark.sql import SparkSession

raw_warehouse_path = "/home/pedromurta/projects/observatorio/caged/data/observatorio_caged/raw/"
bronze_warehouse_path = "/home/pedromurta/projects/observatorio/caged/data/observatorio_caged/bronze"
silver_warehouse_path = "/home/pedromurta/projects/observatorio/caged/data/observatorio_caged/silver"

spark = SparkSession.builder \
    .appName("Salvar CAGED Iceberg") \
    .config("spark.sql.catalog.raw", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.raw.type", "hadoop") \
    .config("spark.sql.catalog.raw.warehouse", raw_warehouse_path) \
    .config("spark.sql.catalog.bronze", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.bronze.type", "hadoop") \
    .config("spark.sql.catalog.bronze.warehouse", bronze_warehouse_path) \
    .config("spark.sql.catalog.silver", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.silver.type", "hadoop") \
    .config("spark.sql.catalog.silver.warehouse", silver_warehouse_path) \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memoryOverhead", "2g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .getOrCreate()


your 131072x1 screen size is bogus. expect trouble
25/05/12 15:36:41 WARN Utils: Your hostname, GEGOVE-DT-09 resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/12 15:36:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/05/12 15:36:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/12 15:36:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/12 15:36:42 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
# Lê mapeamentos do Excel
mapeamentos = pd.read_excel("/home/pedromurta/projects/observatorio/caged/data/observatorio_caged/support/mapeamentos/mapeamentos_caged.xlsx", sheet_name=None)

In [5]:
df_old = spark.read \
    .option("basePath", "/home/pedromurta/projects/observatorio/caged/data/caged_old") \
    .parquet("/home/pedromurta/projects/observatorio/caged/data/caged_old/*")

                                                                                

In [6]:
catalog_bronze = "bronze"
schema_name_bronze = "default"
table_name_bronze = "caged"

df_bronze = spark.read.table(f"{catalog_bronze}.{schema_name_bronze}.{table_name_bronze}")

In [7]:
df_bronze.count()

222731869

In [8]:
df_old.printSchema()

root
 |-- sigla_uf: string (nullable = true)
 |-- saldo_movimentacao: integer (nullable = true)
 |-- codigo_cbo: string (nullable = true)
 |-- cbo: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- idade: byte (nullable = true)
 |-- sexo: string (nullable = true)
 |-- admissao: integer (nullable = true)
 |-- demissao: integer (nullable = true)
 |-- etnia: string (nullable = true)
 |-- sub_classe: string (nullable = true)
 |-- segmento: string (nullable = true)
 |-- salario: float (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cnae_divisao: string (nullable = true)
 |-- secao_cnae: string (nullable = true)
 |-- competencia: string (nullable = true)



In [9]:
df_old.count()

17993875

In [10]:
df_old.show(5, truncate=False)

+--------+------------------+----------+--------------------------------+-----------------+-----+------+--------+--------+------+----------+--------------------+-------+------------+------------+----------+-----------+
|sigla_uf|saldo_movimentacao|codigo_cbo|cbo                             |escolaridade     |idade|sexo  |admissao|demissao|etnia |sub_classe|segmento            |salario|regiao      |cnae_divisao|secao_cnae|competencia|
+--------+------------------+----------+--------------------------------+-----------------+-----+------+--------+--------+------+----------+--------------------+-------+------------+------------+----------+-----------+
|MG      |-1                |142105    |Gerente Administrativo          |Superior Completo|30   |Homem |0       |1       |Branca|4911600   |FERROVIÁRIO         |13000.0|SUDESTE     |49          |H         |2013-10    |
|GO      |1                 |411005    |Auxiliar de Escritorio, em Geral|Médio Completo   |19   |Mulher|1       |0       |Br

In [11]:
df_old = df_old.withColumnRenamed("sigla_uf", "uf_sigla") \
               .withColumnRenamed("secao_cnae", "secao")

In [12]:
df_bronze.printSchema()

root
 |-- competencia: integer (nullable = true)
 |-- uf: integer (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- secao: string (nullable = true)
 |-- sub_classe: integer (nullable = true)
 |-- saldo_movimentacao: integer (nullable = true)
 |-- codigo_cbo: integer (nullable = true)
 |-- escolaridade: integer (nullable = true)
 |-- idade: integer (nullable = true)
 |-- etnia: integer (nullable = true)
 |-- sexo: integer (nullable = true)
 |-- salario: double (nullable = true)
 |-- salario_fixo: double (nullable = true)



In [13]:
df_bronze.show(5, truncate=False)

[Stage 9:>                                                          (0 + 1) / 1]

+-----------+---+---------+-----+----------+------------------+----------+------------+-----+-----+----+-------+------------+
|competencia|uf |municipio|secao|sub_classe|saldo_movimentacao|codigo_cbo|escolaridade|idade|etnia|sexo|salario|salario_fixo|
+-----------+---+---------+-----+----------+------------------+----------+------------+-----+-----+----+-------+------------+
|202502     |41 |410840   |M    |7112000   |-1                |715315    |7           |33   |1    |1   |2752.2 |2752.2      |
|202502     |29 |290320   |G    |4781400   |1                 |521110    |7           |22   |3    |3   |1518.0 |1518.0      |
|202502     |26 |261160   |G    |4781400   |1                 |763015    |7           |55   |1    |3   |2500.0 |2500.0      |
|202502     |33 |330455   |I    |5620102   |1                 |513405    |7           |46   |3    |1   |0.0    |0.0         |
|202502     |35 |350920   |M    |7119701   |1                 |312320    |7           |29   |3    |1   |1800.0 |1800.0

                                                                                

In [14]:
df_bronze = df_bronze.withColumn(
    "competencia",
    concat_ws(
        "-",
        substring(col("competencia"), 1, 4),  # ano
        substring(col("competencia"), 5, 2)   # mês
    )
)

In [15]:
# Converte os DataFrames do Pandas em dicionários
uf_dict           = {row['uf']: row['uf_desc'] for _, row in mapeamentos["uf"].iterrows()}
regiao_dict       = {row['regiao']: row['regiao_desc'] for _, row in mapeamentos["regiao"].iterrows()}
municipio_dict    = {row['municipio']: row['municipio_desc'] for _, row in mapeamentos["municipio"].iterrows()}
etnia_dict        = {row['etnia']: row['etnia_desc'] for _, row in mapeamentos["etnia"].iterrows()}
escolaridade_dict = {row['escolaridade']: row['escolaridade_desc'] for _, row in mapeamentos["escolaridade"].iterrows()}
cbo_dict          = {row['cbo']: row['cbo_desc'] for _, row in mapeamentos["cbo"].iterrows()}
subclasse_dict    = {row['sub_classe']: row['sub_classe_desc'] for _, row in mapeamentos["subclasse"].iterrows()}

In [16]:
# Nome → Sigla
uf_nome_para_sigla = {
    "Acre": "AC", "Alagoas": "AL", "Amazonas": "AM", "Amapá": "AP", "Bahia": "BA", "Ceará": "CE",
    "Distrito Federal": "DF", "Espírito Santo": "ES", "Goiás": "GO", "Maranhão": "MA", "Mato Grosso": "MT",
    "Mato Grosso do Sul": "MS", "Minas gerais": "MG", "Pará": "PA", "Paraíba": "PB", "Paraná": "PR",
    "Pernambuco": "PE", "Piauí": "PI", "Rio de Janeiro": "RJ", "Rio Grande do Norte": "RN",
    "Rio Grande do Sul": "RS", "Rondônia": "RO", "Roraima": "RR", "Santa Catarina": "SC",
    "Sergipe": "SE", "São Paulo": "SP", "Tocantins": "TO", "Não identificado": "NI"
}

# Sigla → Região
uf_sigla_para_regiao = {
    "RO": "NORTE", "AC": "NORTE", "AM": "NORTE", "RR": "NORTE", "PA": "NORTE", "AP": "NORTE", "TO": "NORTE",
    "MA": "NORDESTE", "PI": "NORDESTE", "CE": "NORDESTE", "RN": "NORDESTE", "PB": "NORDESTE", "PE": "NORDESTE",
    "AL": "NORDESTE", "SE": "NORDESTE", "BA": "NORDESTE",
    "MG": "SUDESTE", "ES": "SUDESTE", "RJ": "SUDESTE", "SP": "SUDESTE",
    "PR": "SUL", "SC": "SUL", "RS": "SUL",
    "MS": "CENTRO-OESTE", "MT": "CENTRO-OESTE", "GO": "CENTRO-OESTE", "DF": "CENTRO-OESTE",
    "NI": "Não identificado"
}

regiao_map = {
    11: "NORTE", 12: "NORTE", 13: "NORTE", 14: "NORTE", 15: "NORTE", 16: "NORTE", 17: "NORTE",
    21: "NORDESTE", 22: "NORDESTE", 23: "NORDESTE", 24: "NORDESTE", 25: "NORDESTE", 26: "NORDESTE",
    27: "NORDESTE", 28: "NORDESTE", 29: "NORDESTE",
    31: "SUDESTE", 32: "SUDESTE", 33: "SUDESTE", 35: "SUDESTE",
    41: "SUL", 42: "SUL", 43: "SUL",
    50: "CENTRO-OESTE", 51: "CENTRO-OESTE", 52: "CENTRO-OESTE", 53: "CENTRO-OESTE",
    9: "Não identificado", 99: "Não identificado"
}

sub_classe_map = {
    4930202: "TRANSPORTE DE CARGAS", 4930203: "TRANSPORTE DE CARGAS", 4930204: "TRANSPORTE DE CARGAS", 3600602: "TRANSPORTE DE CARGAS",
    5212500: "TRANSPORTE DE CARGAS", 5229002: "TRANSPORTE DE CARGAS", 5320201: "TRANSPORTE DE CARGAS", 5320202: "TRANSPORTE DE CARGAS",
    4921301: "TRANSPORTE DE PASSAGEIROS", 4921302: "TRANSPORTE DE PASSAGEIROS", 4922101: "TRANSPORTE DE PASSAGEIROS",
    4922102: "TRANSPORTE DE PASSAGEIROS", 4922103: "TRANSPORTE DE PASSAGEIROS", 4923001: "TRANSPORTE DE PASSAGEIROS",
    4924800: "TRANSPORTE DE PASSAGEIROS", 4929901: "TRANSPORTE DE PASSAGEIROS", 4929902: "TRANSPORTE DE PASSAGEIROS",
    4929903: "TRANSPORTE DE PASSAGEIROS", 4929904: "TRANSPORTE DE PASSAGEIROS", 4929999: "TRANSPORTE DE PASSAGEIROS",
    5229099: "TRANSPORTE DE PASSAGEIROS", 8622400: "TRANSPORTE DE PASSAGEIROS", 4681801: "DISTRIBUIÇÃO DE PETRÓLEO",
    4681802: "DISTRIBUIÇÃO DE PETRÓLEO", 4681803: "DISTRIBUIÇÃO DE PETRÓLEO", 4681804: "DISTRIBUIÇÃO DE PETRÓLEO",
    4681805: "DISTRIBUIÇÃO DE PETRÓLEO", 4682600: "DISTRIBUIÇÃO DE PETRÓLEO", 4923002: "LOCAÇÃO DE VEÍCULO",
    7711000: "LOCAÇÃO DE VEÍCULO", 8012900: "TRANSPORTE DE VALORES", 5011401: "AQUAVIÁRIO", 5021102: "AQUAVIÁRIO",
    5011402: "AQUAVIÁRIO", 5012201: "AQUAVIÁRIO", 5012202: "AQUAVIÁRIO", 5021101: "AQUAVIÁRIO", 5022001: "AQUAVIÁRIO",
    5022002: "AQUAVIÁRIO", 5091201: "AQUAVIÁRIO", 5091202: "AQUAVIÁRIO", 5099801: "AQUAVIÁRIO", 5099899: "AQUAVIÁRIO",
    5030101: "AQUAVIÁRIO", 5030102: "PORTUÁRIO", 5030103: "PORTUÁRIO", 5239799: "PORTUÁRIO", 5232000: "AQUAVIÁRIO",
    4911600: "FERROVIÁRIO", 4912401: "FERROVIÁRIO", 4912402: "FERROVIÁRIO", 4912403: "METROVIÁRIO", 5231102: "PORTUÁRIO",
    4291000: "PORTUÁRIO", 5231101: "PORTUÁRIO", 5111100: "AÉREO", 5112901: "AÉREO", 5112999: "AÉREO", 5240199: "AÉREO",
    3316301: "AÉREO", 3316302: "AÉREO", 5240101: "AÉREO", 5120000: "AÉREO"
}

sexo_dict = {
    1: "Homem",
    3: "Mulher",
    9: "Não Identificado"
    
}

In [17]:
# Expressões de mapeamento
regiao_expr = create_map([lit(x) for x in chain(*regiao_map.items())])
segmento_expr = create_map([lit(x) for x in chain(*sub_classe_map.items())])
sexo_expr = create_map([lit(x) for x in chain(*sexo_dict.items())])
cbo_expr         = create_map([lit(x) for x in chain(*cbo_dict.items())])

# Mapas Spark
uf_codigo_to_nome_expr = create_map([lit(k) for k in chain(*uf_dict.items())])
uf_nome_to_sigla_expr = create_map([lit(k) for k in chain(*uf_nome_para_sigla.items())])
uf_sigla_to_regiao_expr = create_map([lit(k) for k in chain(*uf_sigla_para_regiao.items())])

municipio_expr    = create_map([lit(x) for x in chain(*municipio_dict.items())])
etnia_expr        = create_map([lit(x) for x in chain(*etnia_dict.items())])
escolaridade_expr = create_map([lit(x) for x in chain(*escolaridade_dict.items())])

In [18]:
# Garantir que os campos sejam inteiros para funcionar nos maps
df_bronze = df_bronze \
    .withColumn("sub_classe", col("sub_classe").cast("int")) \
    .withColumn("sexo", col("sexo").cast("int")) \
    .withColumn("municipio", col("municipio").cast("int")) \
    .withColumn("etnia", col("etnia").cast("int")) \
    .withColumn("escolaridade", col("escolaridade").cast("int"))

In [19]:
# Aplicar mapeamento com getItem (em vez de UDF)
df_bronze = df_bronze \
    .withColumn("segmento", segmento_expr.getItem(col("sub_classe"))) \
    .withColumn("sexo", sexo_expr.getItem(col("sexo"))) \
    .withColumn("salario", round(col("salario"), 2)) \
    .withColumn("salario_fixo", round(col("salario_fixo"), 2)) \
    .withColumn("municipio", municipio_expr.getItem(col("municipio"))) \
    .withColumn("etnia", etnia_expr.getItem(col("etnia"))) \
    .withColumn("escolaridade", escolaridade_expr.getItem(col("escolaridade"))) \
    .withColumn("uf_nome", uf_codigo_to_nome_expr.getItem(col("uf").cast("int"))) \
    .withColumn("uf_sigla", uf_nome_to_sigla_expr.getItem(col("uf_nome"))) \
    .withColumn("regiao", uf_sigla_to_regiao_expr.getItem(col("uf_sigla"))) \
    .withColumn("cbo", cbo_expr.getItem(col("codigo_cbo"))) \
    .withColumn("admissao", when(col("saldo_movimentacao") == 1, 1).otherwise(0).cast("int")) \
    .withColumn("demissao", when(col("saldo_movimentacao") == -1, 1).otherwise(0).cast("int"))

In [20]:
df_bronze = df_bronze.withColumn(
    "segmento",
    when(col("segmento").isNull(), "OUTROS").otherwise(col("segmento"))
)

In [21]:
df_bronze.show(5, truncate=False)

+-----------+---+--------------------+-----+----------+------------------+----------+--------------+-----+------+------+-------+------------+--------+--------------+--------+--------+---------------------------------------+--------+--------+
|competencia|uf |municipio           |secao|sub_classe|saldo_movimentacao|codigo_cbo|escolaridade  |idade|etnia |sexo  |salario|salario_fixo|segmento|uf_nome       |uf_sigla|regiao  |cbo                                    |admissao|demissao|
+-----------+---+--------------------+-----+----------+------------------+----------+--------------+-----+------+------+-------+------------+--------+--------------+--------+--------+---------------------------------------+--------+--------+
|2025-02    |41 |Pr-Francisco Beltrao|M    |7112000   |-1                |715315    |Médio Completo|33   |Branca|Homem |2752.2 |2752.2      |OUTROS  |Paraná        |PR      |SUL     |Armador de Estrutura de Concreto Armado|0       |1       |
|2025-02    |29 |Ba-Barreiras   

In [22]:
df_bronze.count()

222731869

In [23]:
df_old.printSchema()

root
 |-- uf_sigla: string (nullable = true)
 |-- saldo_movimentacao: integer (nullable = true)
 |-- codigo_cbo: string (nullable = true)
 |-- cbo: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- idade: byte (nullable = true)
 |-- sexo: string (nullable = true)
 |-- admissao: integer (nullable = true)
 |-- demissao: integer (nullable = true)
 |-- etnia: string (nullable = true)
 |-- sub_classe: string (nullable = true)
 |-- segmento: string (nullable = true)
 |-- salario: float (nullable = true)
 |-- regiao: string (nullable = true)
 |-- cnae_divisao: string (nullable = true)
 |-- secao: string (nullable = true)
 |-- competencia: string (nullable = true)



In [24]:
df_bronze = df_bronze.select('competencia', 'saldo_movimentacao', 'secao', 'sub_classe', 'segmento', 'codigo_cbo', 'cbo', 'idade', 'etnia', 'sexo', 'salario','escolaridade', 'uf_sigla', 'regiao', 'admissao', 'demissao')

In [25]:
df_bronze.count()

222731869

In [26]:
df_old = df_old.select('competencia', 'saldo_movimentacao', 'secao', 'sub_classe', 'segmento', 'codigo_cbo', 'cbo', 'idade', 'etnia', 'sexo', 'salario','escolaridade', 'uf_sigla', 'regiao', 'admissao', 'demissao')

In [27]:
df_old.count()

17993875

In [28]:
df = df_bronze.unionByName(df_old)

In [29]:
df.count()

                                                                                

240725744

In [None]:
spark.stop()