In [1]:
import pandas as pd

# RAIS

## Estabelecimentos


In [2]:
def read_data_estb(caminho, ano)-> pd.DataFrame:
    try: # txt file
        colunas = ['CNAE 2.0 Classe', 'Município']
        df_estb = pd.read_csv(caminho + 'ESTB' + ano + '.txt', sep=';', encoding='latin1', usecols=colunas)
        df_estb['ano'] = ano
        extensao_arquivo = 'txt'
        
    except: # csv file
        colunas = ['ano', 'id_municipio', 'cnae_2']
        df_estb = pd.read_csv(caminho + 'ESTB' + ano + '.csv', usecols=colunas)
        extensao_arquivo = 'csv'

    return df_estb, extensao_arquivo

def read_data_vinc(caminho, ano, uf)-> pd.DataFrame:
    colunas = ['CNAE 2.0 Classe', 'Município']
    df_vinc = pd.read_csv(f'{caminho}{ano}/{uf}', sep=';', encoding='latin1', usecols=colunas)  # voltar para cá a cada UF
    df_vinc['ano'] = ano

    return df_vinc, 'txt'

In [3]:
def transform_data(df, extensao) -> pd.DataFrame:
    '''Normaliza o tipo de cada coluna e obtem a cnae_secao de cnae_2'''
    
    if extensao == 'txt':
        df.rename(columns={'Município': 'id_municipio', 'CNAE 2.0 Classe': 'cnae_2'}, inplace=True)	
        df = df[['ano', 'id_municipio', 'cnae_2']]
        df.dropna(inplace=True)
    elif extensao == 'csv':
        df.dropna(inplace=True)
        df['id_municipio'] = df['id_municipio'].astype('int')
        df['id_municipio'] = df['id_municipio'].apply(lambda x: int(x/10)) # remove 7'th dígito
    
    # para todos
    df['ano'] = df['ano'].astype('int16')
    
    df['id_municipio'] = df['id_municipio'].astype('int')
    
    df.loc[:, 'cnae_2'] = df['cnae_2'].apply(lambda x: str(x).zfill(5))
    df['cnae_secao'] = df['cnae_2'].apply(lambda x: x[:2])
    df['cnae_secao'] = df['cnae_secao'].astype('int')
    df.drop(columns=['cnae_2'], inplace=True)
    return df

In [4]:
def merge_municipios(df, caminho, loc) -> pd.DataFrame:
    '''Faz o merge do id_municipio com os dados do dicionário'''
    
    if loc == 'mesorregiao':
        colunas_muni = ['id_municipio_6', 'sigla_uf', 'id_mesorregiao', 'nome_mesorregiao']
    elif loc == 'microrregiao':
        colunas_muni = ['id_municipio_6', 'sigla_uf', 'id_microrregiao', 'nome_microrregiao', 'id_mesorregiao']
    elif loc == 'municipio':
        colunas_muni = ['id_municipio_6', 'sigla_uf', 'centroide', 'nome', 'id_microrregiao']
    dicionario_muni = pd.read_csv(caminho, sep=',', usecols=colunas_muni)
    dicionario_muni.rename(columns={'id_municipio_6': 'id_municipio', 'nome': 'municipio'}, inplace=True)
    
    df = pd.merge(df, dicionario_muni, on='id_municipio', how='left')
    return df

In [5]:
def merge_cnae(df, caminho) -> pd.DataFrame:
    
    colunas_tmp = ['divisao', 'descricao_secao', 'descricao_divisao']
    df_cnae = pd.read_csv(caminho, usecols=colunas_tmp)
    df_cnae.rename(columns={'divisao': 'cnae_secao'}, inplace=True)
    df_cnae.drop_duplicates(inplace=True)   
    
    df = pd.merge(df, df_cnae, on='cnae_secao', how='left')
    df.drop(columns=['cnae_secao'], inplace=True)
    return df

In [6]:
def calc_ql(df, indice, loc) -> pd.DataFrame:
    
    if loc == 'mesorregiao':
        numerador_est = df.groupby(['ano', 'sigla_uf', 'id_mesorregiao', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'sigla_uf', 'id_mesorregiao']).size()
        denominador_est = df.groupby(['ano', 'sigla_uf', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'sigla_uf']).size()
        ql_est = numerador_est / denominador_est
        ql_est = ql_est.reset_index()
        ql_est.drop(columns=['sigla_uf'], inplace=True)
        ql_est.columns = ['ano', 'id_mesorregiao', 'descricao_secao', 'descricao_divisao', f'{indice}_est']
        
        numerador_nac = df.groupby(['ano', 'id_mesorregiao', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'id_mesorregiao']).size()
        denominador_nac = df.groupby(['ano', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano']).size()
        ql_nac = numerador_nac / denominador_nac
        ql_nac = ql_nac.reset_index()
        ql_nac.columns = ['ano', 'id_mesorregiao', 'descricao_secao', 'descricao_divisao', f'{indice}_nac']
        
        
    elif loc == 'microrregiao':
        numerador_est = df.groupby(['ano', 'sigla_uf', 'id_microrregiao', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'sigla_uf', 'id_microrregiao']).size()
        denominador_est = df.groupby(['ano', 'sigla_uf', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'sigla_uf']).size()
        ql_est = numerador_est / denominador_est
        ql_est = ql_est.reset_index()
        ql_est.drop(columns=['sigla_uf'], inplace=True)
        ql_est.columns = ['ano', 'id_microrregiao', 'descricao_secao', 'descricao_divisao', f'{indice}_est']
        
        numerador_nac = df.groupby(['ano', 'id_microrregiao', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'id_microrregiao']).size()
        denominador_nac = df.groupby(['ano', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano']).size()
        ql_nac = numerador_nac / denominador_nac
        ql_nac = ql_nac.reset_index()
        ql_nac.columns = ['ano', 'id_microrregiao', 'descricao_secao', 'descricao_divisao', f'{indice}_nac']
        
    elif loc == 'municipio':
        numerador_est = df.groupby(['ano', 'sigla_uf', 'id_municipio', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'sigla_uf', 'id_municipio']).size()
        denominador_est = df.groupby(['ano', 'sigla_uf', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'sigla_uf']).size()
        ql_est = numerador_est / denominador_est
        ql_est = ql_est.reset_index()
        ql_est.drop(columns=['sigla_uf'], inplace=True)
        ql_est.columns = ['ano', 'id_municipio', 'descricao_secao', 'descricao_divisao', f'{indice}_est']
        
        numerador_nac = df.groupby(['ano', 'id_municipio', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano', 'id_municipio']).size()
        denominador_nac = df.groupby(['ano', 'descricao_secao', 'descricao_divisao']).size() / df.groupby(['ano']).size()
        ql_nac = numerador_nac / denominador_nac
        ql_nac = ql_nac.reset_index()
        ql_nac.columns = ['ano', 'id_municipio', 'descricao_secao', 'descricao_divisao', f'{indice}_nac']
        
    
    df_tmp = pd.merge(ql_est, ql_nac, on=['ano', f'id_{loc}', 'descricao_secao', 'descricao_divisao'], how='outer')
    df_tmp[f'{indice}_nac'] = df_tmp[f'{indice}_nac'].map(lambda x: float(f"{x:.2f}"))
    df_tmp[f'{indice}_est'] = df_tmp[f'{indice}_est'].map(lambda x: float(f"{x:.2f}"))
    return df_tmp

In [7]:
import os

def etl_pipeline(caminhos, ano_ini, ano_fim, loc):
   
    dfs_ql = []
    dfs_qe = []
    
    # indice ql
    for ano in range(ano_ini, ano_fim+1):
        df, extensao = read_data_estb(caminhos['rais'][0], str(ano))
        df = transform_data(df, extensao)
        df = merge_municipios(df, caminhos['municipios'], loc)
        df = merge_cnae(df, caminhos['cnae'])
        
        dfs_ql.append(calc_ql(df, indice='ql', loc=loc))  # estb anual

    # indice qe
    for ano in range(ano_ini, ano_fim+1):
        ufs = os.listdir(os.path.join(caminhos['rais'][1], str(ano)))
        for arq_uf in ufs:
            df, extensao = read_data_vinc(caminhos['rais'][1], str(ano), arq_uf)
            df = transform_data(df, extensao)
            df = merge_municipios(df, caminhos['municipios'], loc)
            df = merge_cnae(df, caminhos['cnae'])
            
            dfs_qe.append(calc_ql(df, indice='qe', loc=loc))
    
    dfs_ql = pd.concat(dfs_ql)
    dfs_qe = pd.concat(dfs_qe)
    df = pd.merge(dfs_ql, dfs_qe, on=['ano', f'id_{loc}', 'descricao_secao', 'descricao_divisao'], how='outer')
    return df

---

In [8]:
from joblib import Parallel, delayed
from pathlib import Path

def processar_loc(loc):
    
    df_ql = etl_pipeline(caminhos, ano_ini, ano_fim, loc=loc)
    path_out = Path(f"C:/dev/ndti/rais/indices2/divisao/fact_div_{loc}.csv")
    path_out.parent.mkdir(exist_ok=True)
    df_ql.to_csv(path_out, sep=';', index=False)
    return loc

# Parâmetros globais
caminho_estb = "D:/dados/rais/estabelecimentos/"
caminho_vinc = "D:/dados/rais/vinculos/extraidos/"
caminho_dict_muni = "D:/dados/rais/dicionarios/dicionario_municipios.csv"
caminho_dict_cnae = "D:/dados/rais/dicionarios/dicionario_cnae_2.csv"
ano_ini = 2007
ano_fim = 2024
caminhos = {'rais': [caminho_estb, caminho_vinc], 
            'municipios': caminho_dict_muni, 
            'cnae': caminho_dict_cnae}

resultados = Parallel(n_jobs=3)(delayed(processar_loc)(loc) for loc in ['municipio'])


---