# Preparação

## Bibliotecas

In [1]:

import pandas as pd

# !pip install pandas pyarrow
import numpy as np
import sys

# !pip install chardet
import chardet
import chardet.universaldetector

# !pip install unidecode
import unidecode

from functools import lru_cache

import os
from os import remove
import gc

# !pip install wget
import wget

# !pip install py7zr
from py7zr import SevenZipFile

import glob

from io import StringIO

## Diretório

In [2]:
# Ajustando diretório de trabalho
pwd = os.getcwd()

# Download e Extração dos arquivos

## Funções

In [5]:
def extract_caged (ano, mes):
  """ Faz o download dos arquivos do NOVO CAGED para o ano e o mês especificados.
  O período do NOVO CAGED tem início em Janeiro de 2020.
  Nesse script não extraímos os microdados antriores a esse período devido a divergências metodológicas estabelecidas pelo MTE.

  São 3 tipos de arquivo: CAGEDMOV, CAGEDFOR e CAGEDEXC.
    1. CAGEDMOV: movimentações declaradas dentro do prazo com competência de declaração igual a AAAAMM.
    2. CAGEDFOR: movimentações declaradas fora do prazo com competência de declaração igual a AAAAMM.
    3. CAGEDEXC: movimentações excluídas com competência de declaração da exclusão igual a AAAAMM.
  
  Argumentos:
      ano: Ano do arquivo.
      mes: Mês do arquivo.
  """
  # Download
  if ano < 2020:
        print(f"Os microdados de {mes}/{ano} não serão baixados.")
        return

  tipos_arquivos = ['CAGEDMOV', 'CAGEDEXC', 'CAGEDFOR']

  for tipo in tipos_arquivos:
    url = f'ftp://ftp.mtps.gov.br/pdet/microdados/NOVO CAGED/{ano}/{ano}{mes}/{tipo}{ano}{mes}.7z'
    nome_arquivo = f'{tipo}{ano}{mes}.7z'

    try:
      print(f'\nBaixando {tipo} de {mes}/{ano}...')
      wget.download(url, nome_arquivo)
      print(f'\n{tipo}{ano}{mes}.7z baixado com sucesso.')
    except:
      print(f'\nOs microdados {tipo} de {mes}/{ano} não estão disponíveis.')
      continue

    # Extração do arquivo 7z
    try:
        with SevenZipFile(nome_arquivo, mode = 'r') as archive:
             archive.extractall()
             print(f'{tipo}{ano}{mes}.txt extraído com sucesso.')
    except:
        print(f'Erro ao extrair {tipo}{ano}{mes}.7z.')
        continue

    # Remoção do arquivo 7z após a extração
    try:
        remove(nome_arquivo)
        print(f'{nome_arquivo} removido.\n')
    except:
        print(f'Erro ao remover {nome_arquivo}.')

In [6]:
# Funções para detectar e remover linhas defeituosas dos dados brutos

# Listando arquivos para corrigir
arquivos_com_erro = [
    "dados_brutos/CAGEDFOR202008.txt",
    "dados_brutos/CAGEDFOR202211.txt",
    "dados_brutos/CAGEDFOR202212.txt",
    "dados_brutos/CAGEDMOV202302.txt"
]

# Mapeamento das colunas antigas para as novas
mapa_colunas = {
    'compet√™nciamov': 'competênciamov',
    'regi√£o': 'região',
    'uf': 'uf',
    'munic√≠pio': 'município',
    'se√ß√£o': 'seção',
    'subclasse': 'subclasse',
    'saldomovimenta√ß√£o': 'saldomovimentação',
    'cbo2002ocupa√ß√£o': 'cbo2002ocupação',
    'categoria': 'categoria',
    'graudeinstru√ß√£o': 'graudeinstrução',
    'idade': 'idade',
    'horascontratuais': 'horascontratuais',
    'ra√ßacor': 'raçacor',
    'sexo': 'sexo',
    'tipoempregador': 'tipoempregador',
    'tipoestabelecimento': 'tipoestabelecimento',
    'tipomovimenta√ß√£o': 'tipomovimentação',
    'tipodedefici√™ncia': 'tipodedeficiência',
    'indtrabintermitente': 'indtrabintermitente',
    'indtrabparcial': 'indtrabparcial',
    'sal√°rio': 'salário',
    'tamestabjan': 'tamestabjan',
    'indicadoraprendiz': 'indicadoraprendiz',
    'origemdainforma√ß√£o': 'origemdainformação',
    'compet√™nciadec': 'competênciadec',
    'indicadordeforadoprazo': 'indicadordeforadoprazo',
    'unidadesal√°rioc√≥digo': 'unidadesaláriocódigo',
    'valorsal√°riofixo': 'valorsaláriofixo'
}

delimitadores_esperados = 27 # Número esperado de delimitadores para 28 colunas

def remover_linhas_defeituosas(arquivo, linhas_para_remover_conteudo=None, encoding_entrada='macroman', encoding_saida='utf-8'):
    linhas_validas = []

    with open(arquivo, 'r', encoding=encoding_entrada) as f:
        for i, linha in enumerate(f):
            # Remover a linha específica pelo índice, caso esteja na lista de linhas para remover
            if linhas_para_remover_conteudo and any (conteudo in linha for conteudo in linhas_para_remover_conteudo):
                continue  # Pula a linha
            # Adiciona a linha se tiver o número correto de delimitadores
            if linha.count(';') == delimitadores_esperados:
                linhas_validas.append(linha)

    # Salvando o arquivo corrigido no formato correto (utf-8)
    novo_arquivo = arquivo.replace(".txt", "_corrigido.txt")
   
    with open(novo_arquivo, 'w', encoding=encoding_saida) as f:
        f.writelines(linhas_validas)

    print(f"Arquivo corrigido salvo como: {novo_arquivo}")
    return novo_arquivo

In [7]:
# Função para criar coluna de faixa etária
def adicionar_faixaeta(idade):
    if idade >= 10 and idade <=14:
        fxet = 1
    elif idade >= 15 and idade <=17:
        fxet = 2
    elif idade >= 18 and idade <=24:
        fxet = 3
    elif idade >= 25 and idade <=29:
        fxet = 4
    elif idade >= 30 and idade <=39:
        fxet = 5
    elif idade >= 40 and idade <=49:
        fxet = 6
    elif idade >= 50 and idade < 64:
        fxet = 7
    elif idade > 65:
        fxet = 8
    else:
        fxet = 99
    return fxet

In [8]:
# função para converter arquivos corrigidos para parquet
def converter_txt_to_parquet(input_dir, output_dir, padrao, sep=';', encoding='latin1', compression='snappy'):
    """
    Converte arquivos .txt em um diretório para o formato Parquet, salvando os resultados em outro diretório.
    
    Argumentos:
        input_dir (str): Diretório onde estão os arquivos .txt.
        output_dir (str): Diretório onde os arquivos .parquet serão salvos.
        padrao (str): Padrão de nomeação dos arquivos a serem convertidos (e.g., 'CAGED').
        sep (str): Separador dos arquivos .txt. Padrão é ';'.
        encoding (str): Codificação dos arquivos .txt. Padrão é 'latin1'.
        compression (str): Tipo de compressão para os arquivos Parquet. Padrão é 'snappy'.
    """
    # Certificar-se de que o diretório de saída existe
    os.makedirs(output_dir, exist_ok=True)

    # Percorrer todos os arquivos no diretório de entrada
    for file_name in os.listdir(input_dir):
        # Verificar se o arquivo segue o padrão especificado e é .txt
        if padrao in file_name and file_name.endswith('.txt'):
            input_file = os.path.join(input_dir, file_name)
            output_file = os.path.join(output_dir, file_name.replace('.txt', '.parquet'))

            try:
                print(f'Convertendo {file_name} para Parquet...')
                # Ler o arquivo .txt
                df = pd.read_csv(input_file, sep=sep, encoding=encoding)
                # Salvar como Parquet
                df.to_parquet(output_file, engine='pyarrow', compression=compression)
                print(f'{file_name} convertido para {output_file} com sucesso.')
            except Exception as e:
                print(f'Erro ao converter {file_name}: {e}')

## Extração

In [None]:
# Extraindo arquivos para todo o período

# Definir meses de janeiro a dezembro
meses = [f'{i:02}' for i in range(1, 13)]

# Loop
for ano in range(2020, 2025):
    for mes in meses:
        extract_caged(ano, mes)

In [None]:
# Extraindo arquivos de um mês específico
extract_caged(2024, 11)

# Leitura

### Dicionários

In [11]:
# Abrindo Dicionários
dicionarios = pd.read_excel(pwd + "\\arquivos auxiliares\\dicionários.xlsx",
                           sheet_name=None)

dic_classes = dicionarios[list(dicionarios.keys())[0]]
dic_faixaeta = dicionarios[list(dicionarios.keys())[1]]
dic_escolaridade = dicionarios[list(dicionarios.keys())[2]]
dic_sexo = dicionarios[list(dicionarios.keys())[3]]
dic_racacor = dicionarios[list(dicionarios.keys())[4]]
dic_local = dicionarios[list(dicionarios.keys())[5]]


dicionarios_2 = pd.read_excel(pwd + "\\arquivos auxiliares\\Layout Novo Caged Movimentação.xlsx",
                              sheet_name=None)

dic_regiao = dicionarios_2[list(dicionarios_2.keys())[1]]
dic_muni = dicionarios_2[list(dicionarios_2.keys())[3]]
dic_categoria = dicionarios_2[list(dicionarios_2.keys())[7]]
dic_cbo2002 = dicionarios_2[list(dicionarios_2.keys())[8]]
dic_empregador = dicionarios_2[list(dicionarios_2.keys())[11]]
dic_estab = dicionarios_2[list(dicionarios_2.keys())[12]]
dic_movimentacao = dicionarios_2[list(dicionarios_2.keys())[13]]
dic_trab_parcial = dicionarios_2[list(dicionarios_2.keys())[14]]
dic_trab_intermitente = dicionarios_2[list(dicionarios_2.keys())[15]]
dic_deficiencia = dicionarios_2[list(dicionarios_2.keys())[16]]
dic_aprendiz = dicionarios_2[list(dicionarios_2.keys())[17]]
dic_tamaestab = dicionarios_2[list(dicionarios_2.keys())[18]]

In [12]:
# Criando mapas
mapa_ufs = dict(zip(dic_local['UF'], dic_local['Nome_UF']))
mapa_fxeta = dict(zip(dic_faixaeta['cod'], dic_faixaeta['nom']))
mapa_escolaridade = dict(zip(dic_escolaridade['cod'], dic_escolaridade['nom']))
mapa_sexo = dict(zip(dic_sexo['cod'], dic_sexo['nom']))
mapa_racacor = dict(zip(dic_racacor['cod'], dic_racacor['nom']))
mapa_secao = dict(zip(dic_classes['Seção'], dic_classes['Nome Seção']))
mapa_classe = dict(zip(dic_classes['Classe'], dic_classes['Nome Classe']))
mapa_subclasse = dict(zip(dic_classes['Subclasse'], dic_classes['Nome Subclasse']))


mapa_muni = dict(zip(dic_muni['Código'], dic_muni['Descrição']))
mapa_regiao = dict(zip(dic_regiao['Código'], dic_regiao['Descrição']))
mapa_categoria = dict(zip(dic_categoria['Código'], dic_categoria['Descrição']))
mapa_cbo2002 = dict(zip(dic_cbo2002['Código'], dic_cbo2002['Descrição']))
mapa_empregador = dict(zip(dic_empregador['Código'], dic_empregador['Descrição']))
mapa_estab = dict(zip(dic_estab['Código'], dic_estab['Descrição']))
mapa_movimentacao = dict(zip(dic_movimentacao['Código'], dic_movimentacao['Descrição']))
mapa_trab_parcial = dict(zip(dic_trab_parcial['Código'], dic_trab_parcial['Descrição']))
mapa_trab_intermitente = dict(zip(dic_trab_intermitente['Código'], dic_trab_intermitente['Descrição']))
mapa_deficiencia = dict(zip(dic_deficiencia['Código'], dic_deficiencia['Descrição']))
mapa_aprendiz = dict(zip(dic_aprendiz['Código'], dic_aprendiz['Descrição']))
mapa_tamaestab = dict(zip(dic_tamaestab['Código'], dic_tamaestab['Descrição']))



### Processamento

#### Corrigindo arquivos

In [None]:
# Processando arquivos com defeitos
for arquivo in arquivos_com_erro:
    try:
        print(f"\nProcessando o arquivo: {arquivo}")

        # Lista com índices de linhas a serem removidas
        linhas_defeituosas_conteudo = ["18,03", "G"] #CAGEDFOR202008

        # Remover as linhas defeituosas e salvar o arquivo corrigido
        arquivo_corrigido = remover_linhas_defeituosas(arquivo, linhas_para_remover_conteudo=linhas_defeituosas_conteudo, encoding_entrada='macroman', encoding_saida='utf-8')
        
        # Carregar e processar o arquivo corrigido
        df = pd.read_csv(arquivo_corrigido, sep=";", encoding='utf-8')
        df.rename(columns=mapa_colunas, inplace=True)
        df.to_csv(arquivo_corrigido, sep=";", encoding='utf-8', index=False)

        print(f"Colunas renomeadas e arquivo salvo: {arquivo_corrigido}")
        
    except Exception as e:
        print(f"Erro ao processar o arquivo {arquivo}: {e}")

#### Convertendo para Parquet

In [None]:
converter_txt_to_parquet(
    input_dir=pwd + "\\dados_brutos\\",
    output_dir=pwd + "\\dados_processados\\",
    padrao='CAGED',
    sep=';',
    encoding='latin1',
    compression='snappy'
)

#### Criando Cache

In [14]:
# Diretório para salvar os arquivos processados
diretorio_processados = pwd + "\\dados_processados\\"
os.makedirs(diretorio_processados, exist_ok=True)

# Arquivo de cache
cache_file = os.path.join(diretorio_processados, "processamento_cache.csv")

# Criar arquivo de cache, se não existir
if not os.path.exists(cache_file):
    cache = pd.DataFrame(columns=["arquivo", "status"])
    cache.to_csv(cache_file, index=False)
else:
    cache = pd.read_csv(cache_file)

# Verificar se o arquivo já foi processado
def verificar_cache(arquivo):
    if arquivo in cache["arquivo"].values:
        status = cache.loc[cache["arquivo"] == arquivo, "status"].values[0]
        return status
    else:
        return None
    
# Atualizar o cache com o status do arquivo
def atualizar_cache(arquivo, status):
    global cache
    if arquivo in cache["arquivo"].values:
        cache.loc[cache["arquivo"] == arquivo, "status"] = status
    else:
        cache = pd.concat([cache, pd.DataFrame({"arquivo": [arquivo], "status": [status]})], ignore_index=True)
    cache.to_csv(cache_file, index=False)

#### Loop Processamento

In [None]:
# Processando arquivos

tipos_arquivos = ['CAGEDEXC', 'CAGEDFOR', 'CAGEDMOV2020', 'CAGEDMOV2021', 'CAGEDMOV2022', 'CAGEDMOV2023', 'CAGEDMOV2024']
diretorio_processados = pwd + "\\dados_processados\\"

# Colunas necessárias
colunas_necessarias = ['competênciamov', 'região', 'uf', 'município', 
                       'seção', 'subclasse', 'cbo2002ocupação', 'categoria',
                       'graudeinstrução', 'idade', 'raçacor', 'sexo', 'tipodedeficiência',
                       'indtrabintermitente', 'indtrabparcial', 'indicadoraprendiz', 
                       'salário', 'valorsaláriofixo', 'unidadesaláriocódigo',
                       'tipoempregador', 'tipoestabelecimento', 'tamestabjan',
                       'tipomovimentação', 'saldomovimentação'
                       ]


mapeamento_colunas = {
    'competÃªnciamov': 'competênciamov',
    'regiÃ£o': 'região',
    'municÃ­pio': 'município',
    'seÃ§Ã£o': 'seção',
    'saldomovimentaÃ§Ã£o': 'saldomovimentação',
    'cbo2002ocupaÃ§Ã£o': 'cbo2002ocupação',
    'graudeinstruÃ§Ã£o': 'graudeinstrução',
    'raÃ§acor': 'raçacor',
    'tipomovimentaÃ§Ã£o': 'tipomovimentação',
    'tipodedeficiÃªncia': 'tipodedeficiência',
    'salÃ¡rio': 'salário',
    'unidadesalÃ¡riocÃ³digo': 'unidadesaláriocódigo',
    'valorsalÃ¡riofixo': 'valorsaláriofixo',
}

for tipo in tipos_arquivos:

    # Listando arquivos disponíveis
    arquivos = glob.glob(f"dados_processados\\{tipo}*.parquet")
    print(f"\nArquivos encontrados: {arquivos}")

    # Lista para armezanar arquivos processados
    lista_arquivos = []

    # Função para liberar memória
    def liberar_memoria():
        gc.collect()
        print("Memória liberada")

    # Loop pelos arquivos encontrados
    for arquivo in arquivos:
    
        # Verificar status no cache
        status = verificar_cache(arquivo)

        # Se o arquivo já foi processado, pular
        if status == "processado":
            print(f"Arquivo {arquivo} já processado. Pulando...")
            continue

        try:
            print(f"\nProcessando: {arquivo}")
            data = pd.read_parquet(arquivo)
            data.rename(columns=mapeamento_colunas, inplace=True)

            # Mapeamento de colunas
            data['região'] = data['região'].map(mapa_regiao).fillna('Não Identificado')
            data['uf'] = data['uf'].map(mapa_ufs).fillna('Não Identificado')
            data['município'] = data['município'].astype(int)
            data['município'] = data['município'].map(mapa_muni).fillna('Não Identificado')
            data['município'] = data['município'].str.slice(start=3)
            data['faixa_etaria'] = data['idade'].apply(adicionar_faixaeta)
            data['faixa_etaria'] = data['faixa_etaria'].map(mapa_fxeta).fillna("Não Identificado")
            data['graudeinstrução'] = data['graudeinstrução'].map(mapa_escolaridade).fillna("Não Identificado")
            data['sexo'] = data['sexo'].map(mapa_sexo).fillna("Não Identificado")
            data['raçacor'] = data['raçacor'].map(mapa_racacor).fillna("Não Identificado")
            data['seção'] = data['seção'].map(mapa_secao).fillna("Não Identificado")   
            data['subclasse'] = data['subclasse'].map(mapa_subclasse).fillna("Não Identificado") 
            data['categoria'] = data['categoria'].map(mapa_categoria).fillna("Não Identificado") 
            data['cbo2002ocupação'] = data['cbo2002ocupação'].map(mapa_cbo2002).fillna("Não Identificado")
            data['tipoempregador'] = data['tipoempregador'].map(mapa_empregador).fillna("Não Identificado")
            data['tipoestabelecimento'] = data['tipoestabelecimento'].map(mapa_estab).fillna("Não Identificado")
            data['tipomovimentação'] = data['tipomovimentação'].map(mapa_movimentacao).fillna("Não Identificado")
            data['indtrabparcial'] = data['indtrabparcial'].map(mapa_trab_parcial).fillna("Não Identificado")
            data['indtrabintermitente'] = data['indtrabintermitente'].map(mapa_trab_intermitente).fillna("Não Identificado")
            data['tipodedeficiência'] = data['tipodedeficiência'].map(mapa_deficiencia).fillna("Não Identificado")
            data['indicadoraprendiz'] = data['indicadoraprendiz'].map(mapa_aprendiz).fillna("Não Identificado")
            data['tamestabjan'] = data['tamestabjan'].map(mapa_tamaestab).fillna("Não Identificado")

            # Adicionando coluna de admissões e desligamentos
            if tipo == 'CAGEDEXC':
                data['admissoes'] = data['saldomovimentação'].apply(lambda x: -1 if x == 1 else 0)
                data['desligamentos'] = data['saldomovimentação'].apply(lambda x: -1 if x == -1 else 0)
                data['saldomovimentação'] = data['saldomovimentação'].apply(lambda x: -1 if x == 1 else 1)
            else:
                data['admissoes'] = data['saldomovimentação'].apply(lambda x: 1 if x == 1 else 0)
                data['desligamentos'] = data['saldomovimentação'].apply(lambda x: 1 if x == -1 else 0)

            # Transformação da coluna 'salário'
            data['salário'] = data['salário'].apply(lambda x: str(x) if isinstance(x, (int, float)) else x)  # Converter valores numéricos para string
            data['salário'] = pd.to_numeric(data['salário'].str.replace(',', '.').fillna('0'), errors='coerce')

            # Agrupando e somando
            data = data.groupby(data.columns.tolist(), as_index=False).agg({
                'admissoes': 'sum',
                'desligamentos': 'sum',
                'saldomovimentação': 'sum'
            })
        
            # Transformação para 'category' em colunas apropriadas para reduzir o uso de memória
            colunas_category = ['região', 'uf', 'município', 'seção', 'subclasse', 'categoria', 
                                'graudeinstrução', 'sexo', 'raçacor', 'faixa_etaria', 'cbo2002ocupação', 
                                'tipodedeficiência', 'indtrabintermitente', 'indtrabparcial', 'indicadoraprendiz']
        
            for coluna in colunas_category:
                data[coluna] = data[coluna].astype('category')
        
            # Adicionar df processado à lista
            lista_arquivos.append(data)

            # Atualizar cache após o processamento do arquivo
            atualizar_cache(arquivo, "processado")

            # Liberar memória após processamento de cada arquivo
            liberar_memoria()

        except Exception as e:
            print(f"Erro ao processar o arquivo {arquivo}: {e}")

    # Concatenando todos os df
    if lista_arquivos:
        lista_arquivos = pd.concat(lista_arquivos, ignore_index=True)
    else:
        print("Nenhum arquivo foi processado.")
        continue

    # Liberar memória após concatenar
    liberar_memoria()

    print(f"{tipo} processada com sucesso!")

    # Salvando arquivo
    nome_arquivo = f"{tipo}.csv"
    caminho_arquivo = os.path.join(diretorio_processados, nome_arquivo)
    lista_arquivos.to_csv(caminho_arquivo, index=False, encoding='latin1', sep=';')
    print(f"{nome_arquivo} salvo com sucesso!")

# Liberar memória após concatenar
liberar_memoria()