In [None]:
pip install dbfread



In [None]:
import pandas as pd
from google.cloud import storage
from google.colab import auth
from dbfread import DBF
import logging

def processar_arquivo_dbf(local_file_path, gcs_bucket_name, novo_nome, encoding='utf-8'):
    # Inicializar o cliente do Google Cloud Storage
    storage_client = storage.Client()

    try:
        # Baixar o arquivo DBF localmente
        bucket = storage_client.bucket(gcs_bucket_name)
        blob = bucket.blob(f"DBF/{novo_nome}")
        blob.download_to_filename(local_file_path)

        # Lista para armazenar os registros do DBF
        records = []

        # Ler o arquivo DBF e adicionar os registros à lista
        with DBF(local_file_path, encoding=encoding) as table:
            for record in table:
                records.append(record)

        # Converter a lista de registros em um DataFrame pandas
        df = pd.DataFrame(records)

        # Caminho para salvar o arquivo CSV
        csv_file_path = f"/tmp/{novo_nome.replace('.dbf', '.csv')}"

        # Salvar o DataFrame como um arquivo CSV
        df.to_csv(csv_file_path, index=False)

        print(f"Arquivo CSV salvo localmente em: {csv_file_path}")

        # Upload do arquivo CSV para o Google Cloud Storage
        gcs_blob_name = f"tratado/dimensao/{novo_nome.replace('.dbf', '.csv')}"
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(csv_file_path)

        print(f"Arquivo CSV enviado com sucesso para o Google Cloud Storage: gs://{gcs_bucket_name}/{gcs_blob_name}")

    except FileNotFoundError as e:
        logging.error(f"O arquivo {novo_nome} não foi encontrado: {e}")
    except Exception as e:
        logging.error(f"Erro ao processar o arquivo {novo_nome}: {e}")

# Configurar o registro de logs
logging.basicConfig(filename='processamento_dbf.log', level=logging.ERROR)

# Autenticação do usuário
auth.authenticate_user()

# Lista de Ufs, Anos e Meses
ufs = ['HUF_MEC']

# Caminho do bucket do Google Cloud Storage
gcs_bucket_name = "tabsus"

# Renomear e processar os arquivos DBF
for uf in ufs:
    # Novo nome do arquivo
    novo_nome = f"{uf}.dbf"

    # Caminho do arquivo local
    local_file_path = f"/tmp/{novo_nome}"

    # Processar o arquivo DBF
    processar_arquivo_dbf(local_file_path, gcs_bucket_name, novo_nome, encoding='latin1')  # Tente usar 'latin1' como codificação


Arquivo CSV salvo localmente em: /tmp/HUF_MEC.csv
Arquivo CSV enviado com sucesso para o Google Cloud Storage: gs://tabsus/tratado/dimensao/HUF_MEC.csv


In [None]:
from google.colab import auth
from google.cloud import bigquery
import pandas as pd
from google.api_core.exceptions import GoogleAPIError
from google.cloud.exceptions import NotFound
import logging

# Autenticar usuário
auth.authenticate_user()

# Inicializar cliente BigQuery
client = bigquery.Client()

# Definir informações do projeto e dataset
project_id = 'etlgcp-416120'
dataset_id = 'CNES'

# Lista de arquivos no Cloud Storage
ufs = ['HUF_MEC']

# Lista para armazenar DataFrames de cada arquivo CSV
dfs = []

# Iterar sobre os arquivos CSV e carregar para o BigQuery
for uf in ufs:
    # Caminho do arquivo CSV no Cloud Storage
    csv_file_path = f'gs://tabsus/tratado/dimensao/{uf}.csv'

    try:
        # Carregar o arquivo CSV para um DataFrame e adicionar à lista
        df = pd.read_csv(csv_file_path)

        # Converter colunas int64 para string
        df = df.astype(str)

        dfs.append(df)
    except FileNotFoundError as e:
        logging.error(f'O arquivo {csv_file_path} não foi encontrado: {e}')
    except Exception as e:
        logging.error(f'Erro ao ler o arquivo {csv_file_path}: {e}')

# Concatenar todos os DataFrames em um único DataFrame
combined_df = pd.concat(dfs, ignore_index=True)

# Verificar se o DataFrame combinado não está vazio
if not combined_df.empty:
    # Salvar o DataFrame combinado como uma tabela no BigQuery
    table_name = 'HUF_MEC'
    try:
        # Obter o esquema do DataFrame
        schema = [bigquery.SchemaField(name, 'STRING') for name in combined_df.columns]

        # Carregar a tabela com o DataFrame e o esquema
        table_ref = client.dataset(dataset_id).table(table_name)
        job_config = bigquery.LoadJobConfig(schema=schema)
        client.load_table_from_dataframe(combined_df, table_ref, job_config=job_config)

        print(f'Tabela {table_name} criada com sucesso no BigQuery.')
    except GoogleAPIError as e:
        logging.error(f'Erro ao criar tabela {table_name} no BigQuery: {e}')
    except Exception as e:
        logging.error(f'Ocorreu um erro inesperado ao criar tabela {table_name} no BigQuery: {e}')
else:
    logging.warning('O DataFrame combinado está vazio. Nenhuma tabela será criada no BigQuery.')

Tabela HUF_MEC criada com sucesso no BigQuery.
