# Dependências

In [1]:
# Install required Google Cloud packages (commented out as these are typically one-time setup commands)
#!gcloud auth application-default login

# Import necessary Python libraries
import pandas as pd                # Data manipulation and analysis
import numpy as np                 # Numerical computing
import time                        # Time-related functions
import os                          # Operating system interfaces
import pandas_gbq                  # Pandas integration with BigQuery
from google.cloud import bigquery  # BigQuery client library
import glob                        # File path pattern matching
import csv                         # CSV file handling
import re                          # Regular expressions

# Note: The actual imports remain exactly as in the original code

In [2]:
import pandas as pd
import math

# Nome do seu arquivo grande
nome_arquivo = 'output/ano=2024/sigla_uf=SP/data.csv'
numero_de_dataframes = 10

# --- Passo 1: Contar o total de linhas eficientemente ---
# O 'with open...' garante que o arquivo será fechado corretamente
# sum(1 for line in f) é uma forma rápida e com baixo uso de memória para contar linhas
with open(nome_arquivo, 'r', encoding='utf-8') as f:
    total_linhas = sum(1 for line in f) - 1 # Subtrai 1 para o cabeçalho

print(f"O arquivo tem {total_linhas} linhas de dados.")

# --- Passo 2: Calcular o tamanho de cada pedaço ---
# Usamos math.ceil para garantir que todas as linhas sejam incluídas,
# mesmo que a divisão não seja perfeita.
linhas_por_dataframe = math.ceil(total_linhas / numero_de_dataframes)
print(f"Cada um dos {numero_de_dataframes} DataFrames terá aproximadamente {linhas_por_dataframe} linhas.")

# --- Passo 3: Ler o arquivo e criar a lista de DataFrames ---
# Criamos um iterador que vai nos entregar os pedaços (chunks)
chunk_iter = pd.read_csv(
    nome_arquivo,
    chunksize=linhas_por_dataframe,
    # Se seu CSV usa outro separador, especifique aqui, ex: sep=';'
)

# A lista 'lista_de_dataframes' vai conter os 10 DataFrames resultantes
lista_de_dataframes = []
for i in range(numero_de_dataframes):
    try:
        df_parte = next(chunk_iter)
        lista_de_dataframes.append(df_parte)
        print(f"DataFrame {i+1} criado com {len(df_parte)} linhas.")
    except StopIteration:
        # Isso acontece se o último chunk for menor, o que é esperado
        break

# Agora você tem seus 10 DataFrames na lista
# Exemplo: Acessando o primeiro DataFrame
df1 = lista_de_dataframes[0]
print("\nCabeçalho do primeiro DataFrame (df1):")
print(df1.head())

# Exemplo: Acessando o último DataFrame
df10 = lista_de_dataframes[-1]
print("\nInformações do último DataFrame:")
df10.info()

O arquivo tem 22467129 linhas de dados.
Cada um dos 10 DataFrames terá aproximadamente 2246713 linhas.


  df_parte = next(chunk_iter)


DataFrame 1 criado com 2246713 linhas.
DataFrame 2 criado com 2246713 linhas.
DataFrame 3 criado com 2246713 linhas.


  df_parte = next(chunk_iter)


DataFrame 4 criado com 2246713 linhas.
DataFrame 5 criado com 2246713 linhas.
DataFrame 6 criado com 2246713 linhas.
DataFrame 7 criado com 2246713 linhas.
DataFrame 8 criado com 2246713 linhas.
DataFrame 9 criado com 2246713 linhas.


  df_parte = next(chunk_iter)


DataFrame 10 criado com 2246712 linhas.

Cabeçalho do primeiro DataFrame (df1):
   id_municipio  tipo_vinculo  vinculo_ativo_3112  tipo_admissao  \
0       3538709            10                   0              0   
1       3550308            10                   0              0   
2       3534401            10                   0              0   
3       3548054            20                   0              0   
4       3548054            20                   0              0   

   mes_admissao  mes_desligamento  motivo_desligamento  causa_desligamento_1  \
0           NaN                12                   60                  40.0   
1           NaN                 1                   11                   NaN   
2           NaN                 4                   21                  40.0   
3           NaN                 1                   21                   NaN   
4           NaN                 1                   21                   NaN   

   causa_desligamento_2  causa

# Tratamento

In [54]:
df = pd.read_csv('output/ano=2024/sigla_uf=TO/data.csv')

  df = pd.read_csv('output/ano=2024/sigla_uf=TO/data.csv')


In [55]:
df_uf = pd.read_csv('sigla_uf.csv')

In [56]:
df1 = df.merge(df_uf[['id_municipio', 'sigla_uf']], on='id_municipio', how='left')

# Mostrar o resultado
df.head()

Unnamed: 0,id_municipio,tipo_vinculo,vinculo_ativo_3112,tipo_admissao,mes_admissao,mes_desligamento,motivo_desligamento,causa_desligamento_1,causa_desligamento_2,causa_desligamento_3,...,ano_chegada_brasil,tamanho_estabelecimento,tipo_estabelecimento,natureza_juridica,indicador_simples,bairros_sp,distritos_sp,bairros_fortaleza,bairros_rj,regioes_administrativas_df
0,1714203,25,0,0,,1,11,,99,99,...,1198,2,5,,0,,,,,
1,1721000,10,0,0,,1,11,,99,99,...,1198,2,1,2062.0,1,,,,,
2,1718501,60,0,2,8.0,8,11,,99,99,...,1198,6,6,,0,,,,,
3,1718501,60,0,2,8.0,8,11,,99,99,...,1198,6,6,,0,,,,,
4,1718204,70,0,0,,1,12,,99,99,...,1198,7,1,2054.0,0,,,,,


In [57]:
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 368274 entries, 0 to 368273
Data columns (total 65 columns):
 #   Column                           Non-Null Count   Dtype  
---  ------                           --------------   -----  
 0   id_municipio                     368274 non-null  int64  
 1   tipo_vinculo                     368274 non-null  int64  
 2   vinculo_ativo_3112               368274 non-null  int64  
 3   tipo_admissao                    368274 non-null  int64  
 4   mes_admissao                     150207 non-null  float64
 5   mes_desligamento                 368274 non-null  object 
 6   motivo_desligamento              368274 non-null  int64  
 7   causa_desligamento_1             50163 non-null   float64
 8   causa_desligamento_2             368274 non-null  int64  
 9   causa_desligamento_3             368274 non-null  int64  
 10  faixa_tempo_emprego              368274 non-null  int64  
 11  tempo_emprego                    368274 non-null  float64
 12  fa

In [58]:
df1['ano'] = 2024  # Adiciona a coluna 'ano' com valor 2024 em todas as linhas

In [59]:
print(df1.dtypes)

id_municipio                    int64
tipo_vinculo                    int64
vinculo_ativo_3112              int64
tipo_admissao                   int64
mes_admissao                  float64
                               ...   
bairros_fortaleza             float64
bairros_rj                    float64
regioes_administrativas_df    float64
sigla_uf                       object
ano                             int64
Length: 66, dtype: object


In [60]:
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 368274 entries, 0 to 368273
Data columns (total 66 columns):
 #   Column                           Non-Null Count   Dtype  
---  ------                           --------------   -----  
 0   id_municipio                     368274 non-null  int64  
 1   tipo_vinculo                     368274 non-null  int64  
 2   vinculo_ativo_3112               368274 non-null  int64  
 3   tipo_admissao                    368274 non-null  int64  
 4   mes_admissao                     150207 non-null  float64
 5   mes_desligamento                 368274 non-null  object 
 6   motivo_desligamento              368274 non-null  int64  
 7   causa_desligamento_1             50163 non-null   float64
 8   causa_desligamento_2             368274 non-null  int64  
 9   causa_desligamento_3             368274 non-null  int64  
 10  faixa_tempo_emprego              368274 non-null  int64  
 11  tempo_emprego                    368274 non-null  float64
 12  fa

In [61]:
df1['valor_remuneracao_fevereiro'].unique()

array([     nan,  2410.2 ,  1718.76, ..., 15202.78,  9643.91,  2787.65],
      shape=(104669,))

In [62]:
import pandas as pd
import numpy as np

# Supondo que sua coluna se chame 'codigo' (substitua pelo nome real)
def clean_column(df, column_name):
    """
    Limpa uma coluna com valores mistos:
    1. Remove o valor inválido '{ñ'
    2. Converte strings numéricas para inteiros
    3. Mantém valores numéricos originais
    4. Retorna a coluna padronizada
    """
    # Passo 1: Converter tudo para string e remover o valor inválido
    cleaned = df[column_name].astype(str).replace('{ñ', np.nan)

    # Passo 2: Converter strings numéricas para inteiros
    cleaned = pd.to_numeric(cleaned, errors='coerce').astype('Int64')

    # Passo 3: Atualizar a coluna no DataFrame original
    df[column_name] = cleaned

    return df

# Aplicando a função
df1 = clean_column(df1, 'mes_desligamento')  # Substitua pelo nome real da coluna

In [63]:
df1['mes_desligamento'].unique()
#NA é nulo

<IntegerArray>
[1, 8, 5, 12, 4, 3, 10, 6, 2, 7, 11, 9, <NA>]
Length: 13, dtype: Int64

In [64]:
df1['nacionalidade'].unique()

array([10, 26, 28, 27, 49, 24, 70, 54, 51, 35, 22, 45, 41, 42, 39, 80, 21,
       55, 31, 25, 40, 30, 23, 60, 36, 52, 37, 48])

In [65]:
df1 = clean_column(df1, 'nacionalidade')  # Substitua pelo nome real da coluna

In [66]:
df1['nacionalidade'].unique()

<IntegerArray>
[10, 26, 28, 27, 49, 24, 70, 54, 51, 35, 22, 45, 41, 42, 39, 80, 21, 55, 31,
 25, 40, 30, 23, 60, 36, 52, 37, 48]
Length: 28, dtype: Int64

In [67]:
df1['nacionalidade'] = pd.to_numeric(df1['nacionalidade'], errors='coerce').astype('Int64')

In [68]:
df1['nacionalidade'] = pd.to_numeric(df1['nacionalidade'], errors='coerce').astype('Int64')

In [69]:
df1['motivo_desligamento'] = pd.to_numeric(df1['motivo_desligamento'], errors='coerce').astype('Int64')

In [70]:
df1['motivo_desligamento'].unique()
#NA é nulo

<IntegerArray>
[11, 12, 21, 31, 10, 89, 60, 20, 0]
Length: 9, dtype: Int64

In [71]:
def fix_columns(df):
    # Colunas que devem ser strings (códigos categóricos)
    string_cols = ['mes_desligamento', 'motivo_desligamento', 'tipo_vinculo', 'sigla_uf', 'faixa_remuneracao_media_sm']

    for col in string_cols:
        if col in df.columns:
            # Converte para string, tratando nulos e valores inválidos
            df[col] = df[col].astype(str).replace(['nan', 'None', '<NA>'], np.nan)

    # Colunas numéricas que devem manter decimais
    float_cols = ['valor_remuneracao_media', 'tempo_emprego']
    for col in float_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    return df

# 3. Aplicar as correções
df1 = fix_columns(df1)

In [72]:
df1['faixa_remuneracao_media_sm'].unique()

array(['98', '1', '0', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11'],
      dtype=object)

In [73]:
df1['tipo_admissao'].unique()

array([0, 2, 4, 6])

In [74]:
df1['bairros_sp'].unique()

array([nan])

In [75]:
df1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 368274 entries, 0 to 368273
Data columns (total 66 columns):
 #   Column                           Non-Null Count   Dtype  
---  ------                           --------------   -----  
 0   id_municipio                     368274 non-null  int64  
 1   tipo_vinculo                     368274 non-null  object 
 2   vinculo_ativo_3112               368274 non-null  int64  
 3   tipo_admissao                    368274 non-null  int64  
 4   mes_admissao                     150207 non-null  float64
 5   mes_desligamento                 139890 non-null  object 
 6   motivo_desligamento              368274 non-null  object 
 7   causa_desligamento_1             50163 non-null   float64
 8   causa_desligamento_2             368274 non-null  int64  
 9   causa_desligamento_3             368274 non-null  int64  
 10  faixa_tempo_emprego              368274 non-null  int64  
 11  tempo_emprego                    368274 non-null  float64
 12  fa

In [76]:
df1.head(10)

Unnamed: 0,id_municipio,tipo_vinculo,vinculo_ativo_3112,tipo_admissao,mes_admissao,mes_desligamento,motivo_desligamento,causa_desligamento_1,causa_desligamento_2,causa_desligamento_3,...,tipo_estabelecimento,natureza_juridica,indicador_simples,bairros_sp,distritos_sp,bairros_fortaleza,bairros_rj,regioes_administrativas_df,sigla_uf,ano
0,1714203,25,0,0,,1,11,,99,99,...,5,,0,,,,,,TO,2024
1,1721000,10,0,0,,1,11,,99,99,...,1,2062.0,1,,,,,,TO,2024
2,1718501,60,0,2,8.0,8,11,,99,99,...,6,,0,,,,,,TO,2024
3,1718501,60,0,2,8.0,8,11,,99,99,...,6,,0,,,,,,TO,2024
4,1718204,70,0,0,,1,12,,99,99,...,1,2054.0,0,,,,,,TO,2024
5,1721000,10,0,0,,1,11,,99,99,...,6,,0,,,,,,TO,2024
6,1705508,25,0,0,,5,21,40.0,99,99,...,5,4120.0,0,,,,,,TO,2024
7,1721000,10,0,0,,1,11,,99,99,...,1,2062.0,0,,,,,,TO,2024
8,1711100,15,0,0,,1,21,,99,99,...,5,,0,,,,,,TO,2024
9,1702109,60,0,0,,1,21,,99,99,...,1,2062.0,1,,,,,,TO,2024


# Upload

In [77]:
schema = [
    # Identificadores e metadados
    bigquery.SchemaField('ano', 'INTEGER'),
    bigquery.SchemaField('id_municipio', 'INTEGER'),  # Alterado para INTEGER
    bigquery.SchemaField('sigla_uf', 'STRING'),

    # Informações do vínculo
    bigquery.SchemaField('tipo_vinculo', 'STRING', description='Tipo de vínculo empregatício'),
    bigquery.SchemaField('vinculo_ativo_3112', 'INTEGER', description='Indicador se o vínculo estava ativo em 31/12'),
    bigquery.SchemaField('tipo_admissao', 'INTEGER', description='Tipo de admissão do trabalhador'),
    bigquery.SchemaField('mes_admissao', 'FLOAT', description='Mês de admissão do trabalhador'),
    bigquery.SchemaField('mes_desligamento', 'STRING', description='Mês de desligamento do trabalhador (se aplicável)'),

    # Motivos e causas
    bigquery.SchemaField('motivo_desligamento', 'STRING', description='Motivo do desligamento'),
    bigquery.SchemaField('causa_desligamento_1', 'FLOAT', description='Causa principal do desligamento'),
    bigquery.SchemaField('causa_desligamento_2', 'INTEGER', description='Causa secundária do desligamento'),
    bigquery.SchemaField('causa_desligamento_3', 'INTEGER', description='Causa terciária do desligamento'),

    # Duração do emprego
    bigquery.SchemaField('faixa_tempo_emprego', 'INTEGER', description='Faixa de tempo de emprego'),
    bigquery.SchemaField('tempo_emprego', 'FLOAT', description='Tempo de emprego em anos (com decimais)'),

    # Carga horária
    bigquery.SchemaField('faixa_horas_contratadas', 'INTEGER', description='Faixa de horas contratadas'),
    bigquery.SchemaField('quantidade_horas_contratadas', 'INTEGER', description='Quantidade de horas contratadas semanais'),

    # Localização
    bigquery.SchemaField('id_municipio_trabalho', 'FLOAT', description='Identificador do município onde o trabalho é exercido'),

    # Afastamentos
    bigquery.SchemaField('quantidade_dias_afastamento', 'INTEGER', description='Quantidade de dias de afastamento no ano'),

    # Indicadores
    bigquery.SchemaField('indicador_cei_vinculado', 'INTEGER', description='Indicador de CEI vinculado'),
    bigquery.SchemaField('indicador_trabalho_parcial', 'INTEGER', description='Indicador de trabalho parcial'),
    bigquery.SchemaField('indicador_trabalho_intermitente', 'INTEGER', description='Indicador de trabalho intermitente'),

    # Remunerações
    bigquery.SchemaField('faixa_remuneracao_media_sm', 'STRING', description='Faixa da remuneração média em salários mínimos'),
    bigquery.SchemaField('valor_remuneracao_media_sm', 'FLOAT', description='Valor da remuneração média em salários mínimos'),
    bigquery.SchemaField('valor_remuneracao_media', 'FLOAT', description='Valor absoluto da remuneração média'),

    # Remunerações mensais (12 campos)
    bigquery.SchemaField('valor_remuneracao_janeiro', 'FLOAT', description='Remuneração em janeiro'),
    bigquery.SchemaField('valor_remuneracao_fevereiro', 'FLOAT', description='Remuneração em fevereiro'),
    bigquery.SchemaField('valor_remuneracao_marco', 'FLOAT', description='Remuneração em março'),
    bigquery.SchemaField('valor_remuneracao_abril', 'FLOAT', description='Remuneração em abril'),
    bigquery.SchemaField('valor_remuneracao_maio', 'FLOAT', description='Remuneração em maio'),
    bigquery.SchemaField('valor_remuneracao_junho', 'FLOAT', description='Remuneração em junho'),
    bigquery.SchemaField('valor_remuneracao_julho', 'FLOAT', description='Remuneração em julho'),
    bigquery.SchemaField('valor_remuneracao_agosto', 'FLOAT', description='Remuneração em agosto'),
    bigquery.SchemaField('valor_remuneracao_setembro', 'FLOAT', description='Remuneração em setembro'),
    bigquery.SchemaField('valor_remuneracao_outubro', 'FLOAT', description='Remuneração em outubro'),
    bigquery.SchemaField('valor_remuneracao_novembro', 'FLOAT', description='Remuneração em novembro'),
    bigquery.SchemaField('valor_remuneracao_dezembro', 'FLOAT', description='Remuneração em dezembro'),

    # Classificações profissionais
    bigquery.SchemaField('cbo_1994', 'FLOAT', description='Classificação Brasileira de Ocupações 1994'),
    bigquery.SchemaField('cbo_2002', 'INTEGER', description='Classificação Brasileira de Ocupações 2002'),
    bigquery.SchemaField('cnae_1', 'INTEGER', description='Classificação Nacional de Atividades Econômicas - Versão 1'),
    bigquery.SchemaField('cnae_2', 'INTEGER', description='Classificação Nacional de Atividades Econômicas - Versão 2'),
    bigquery.SchemaField('cnae_2_subclasse', 'INTEGER', description='Subclasse da CNAE 2.0'),

    # Características do trabalhador
    bigquery.SchemaField('idade', 'INTEGER', description='Idade do trabalhador'),
    bigquery.SchemaField('sexo', 'INTEGER', description='Sexo do trabalhador (1-Masc, 2-Fem)'),
    bigquery.SchemaField('raca_cor', 'INTEGER', description='Raça/cor do trabalhador'),
    bigquery.SchemaField('grau_instrucao_apos_2005', 'INTEGER', description='Grau de instrução após 2005'),
    bigquery.SchemaField('nacionalidade', 'INTEGER', description='Nacionalidade do trabalhador'),

    # Informações do estabelecimento
    bigquery.SchemaField('tamanho_estabelecimento', 'INTEGER', description='Tamanho do estabelecimento'),
    bigquery.SchemaField('tipo_estabelecimento', 'INTEGER', description='Tipo do estabelecimento'),
    bigquery.SchemaField('natureza_juridica', 'FLOAT', description='Natureza jurídica do estabelecimento'),

    # Campos adicionais
    bigquery.SchemaField('bairros_sp', 'FLOAT', description='Bairros de São Paulo (quando aplicável)'),
    bigquery.SchemaField('distritos_sp', 'FLOAT', description='Distritos de São Paulo (quando aplicável)'),
    bigquery.SchemaField('regioes_administrativas_df', 'FLOAT', description='Regiões administrativas do DF')
]

In [78]:
## Subindo para datalake
client = bigquery.Client(project='repositoriodedadosgpsp')
dataset_ref = client.dataset('Datalake')



In [79]:
table_ref = dataset_ref.table('RAIS_TO_2024') # nome da tabela no padrão FONTE_algo_intuitivo_dado
job_config = bigquery.LoadJobConfig(schema=schema)
job = client.load_table_from_dataframe(df1, table_ref, job_config=job_config)
job.result()

LoadJob<project=repositoriodedadosgpsp, location=US, id=9a806a87-4fbf-4b65-80ed-f3adfc76e561>