# Projeto de ETL
### Conexão de dados de API para bancos PostgreSQL, MongoDB e Big Query

In [3]:
# Python built-in
import os
import json
from pathlib import Path
from typing import Dict, Optional

# Third-party packages
import requests
import pandas as pd
from dotenv import load_dotenv

# Database connections
from sqlalchemy import create_engine
import pymongo
from pymongo import MongoClient
from google.cloud import bigquery

In [4]:
def load_config(env_path: Optional[str] = None) -> Dict[str, str]:
    """
    Load environment variables from a .env file and return them as a dictionary.

    Args:
        env_path (Optional[str]): Path to the .env file. If not provided, searches in the current or parent directory.

    Returns:
        Dict[str, str]: Dictionary containing database and API credentials.
    """
    if env_path:
        load_dotenv(env_path)
    else:
        # procura .env no diretório atual / parent
        load_dotenv()

    return {
        "DB_USER": os.getenv("DB_USER"),
        "DB_PASSWORD": os.getenv("DB_PASSWORD"),
        "DB_HOST": os.getenv("DB_HOST"),
        "DB_PORT": os.getenv("DB_PORT"),
        "DB_NAME": os.getenv("DB_NAME"),
        "MONGO_USER": os.getenv("MONGO_USER"),
        "MONGO_PASSWORD": os.getenv("MONGO_PASSWORD"),
        "MONGO_HOST": os.getenv("MONGO_HOST"),
        "MONGO_NAME" : os.getenv("MONGO_NAME"),
        "MONGO_PORT": os.getenv("MONGO_PORT"),
        "MONGO_AUTH_SOURCE": os.getenv("MONGO_AUTH_SOURCE"),
        "BIG_QUERY_DATASET" : os.getenv("BIG_QUERY_DATASET"),
        "BIG_QUERY_TABLE_ID" : os.getenv("BIG_QUERY_TABLE_ID"),
        "BIG_QUERY_PROJECT_ID" : os.getenv("BIG_QUERY_PROJECT_ID"),
    }

In [5]:
# Carregar as variáveis de ambiente
root_env = r'C:\Users\link_\Desktop\V4\Projeto_xandao\project\scripts\.venv\.env'
config = load_config(str(root_env))

In [6]:
# Puxando dados da API e analisando com pandas
data = requests.get('https://randomuser.me/api/').json()
data_df = pd.json_normalize(data['results'])
print(data_df.head(5))
print(data_df.columns.tolist())

   gender                   email           phone            cell nat  \
0  female  meral.onur@example.com  (709)-120-4580  (909)-297-3203  TR   

  name.title name.first name.last  location.street.number  \
0        Mrs      Meral      Önür                    3299   

  location.street.name  ...  \
0          Filistin Cd  ...   

                                        login.sha256  \
0  604729d568948d94a30ab7bddaf8ec1bb89eda4a22c2e0...   

                   dob.date dob.age           registered.date registered.age  \
0  1951-07-26T08:47:12.835Z      74  2008-06-20T00:49:49.272Z             17   

  id.name id.value                                     picture.large  \
0             None  https://randomuser.me/api/portraits/women/10.jpg   

                                      picture.medium  \
0  https://randomuser.me/api/portraits/med/women/...   

                                   picture.thumbnail  
0  https://randomuser.me/api/portraits/thumb/wome...  

[1 rows x 34 columns]
['

In [7]:
#Tratando o nome das colunas
data_df.columns = (
    data_df.columns
    .str.replace('.', '_', regex=False)
    .str.lower()
    .str.strip()
)
print(data_df.columns.tolist())

['gender', 'email', 'phone', 'cell', 'nat', 'name_title', 'name_first', 'name_last', 'location_street_number', 'location_street_name', 'location_city', 'location_state', 'location_country', 'location_postcode', 'location_coordinates_latitude', 'location_coordinates_longitude', 'location_timezone_offset', 'location_timezone_description', 'login_uuid', 'login_username', 'login_password', 'login_salt', 'login_md5', 'login_sha1', 'login_sha256', 'dob_date', 'dob_age', 'registered_date', 'registered_age', 'id_name', 'id_value', 'picture_large', 'picture_medium', 'picture_thumbnail']


In [8]:
# Convertendo os valores numéricos para string
numeric_columns = data_df.select_dtypes(include=['int64', 'float64']).columns
data_df[numeric_columns] = data_df[numeric_columns].astype(str)

In [7]:
# Salvando o DataFrame como CSV
data_df.to_csv(r'C:\Users\link_\Desktop\V4\Projeto_xandao\project\data\random_users.csv', index=False, sep=',')

In [15]:
# --- Configurações do Banco de Dados PostgreSQL ---
DB_HOST = config["DB_HOST"]
DB_USER = config["DB_USER"]
DB_PASSWORD = config["DB_PASSWORD"]
DB_NAME = config["DB_NAME"]
DB_PORT = config["DB_PORT"]

# Conexão com o banco de dados PostgreSQL usando SQLAlchemy
DATABASE_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

In [12]:
# Criando a engine de conexão
engine = create_engine(DATABASE_URL)

# Selecionando colunas relacionais
colunas_relacionais = ['email','name_title', 'name_first', 'name_last', 'location_street_number', 'location_street_name','location_city', 'location_state', 'location_country', 'location_postcode']
data_relacionais = data_df[colunas_relacionais].copy()
data_relacionais.head(5)

# Salvando o DataFrame no banco de dados
data_relacionais.to_sql(
    name='users', 
    con=engine, 
    if_exists='append', 
    index=False,
    chunksize=1000
)

print("Dados salvos no banco de dados com sucesso!")

Dados salvos no banco de dados com sucesso!


In [17]:
# --- Configurações do Banco de Dados MongoDB ---
USERNAME = config["MONGO_USER"]
PASSWORD = config["MONGO_PASSWORD"]
HOST = config["MONGO_HOST"]
PORT = config["MONGO_PORT"]
DATABASE_NAME = config["MONGO_NAME"]
AUTH_SOURCE = config["MONGO_AUTH_SOURCE"]

# Conexão com o MongoDB
connection_string = f"mongodb://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE_NAME}?authSource={AUTH_SOURCE}"
client = MongoClient(connection_string)
client.server_info() 
db = client[DATABASE_NAME]

In [11]:
# Inserindo dados no MongoDB
collection = db['users']

# Convertendo DataFrame para formato dicionário (registros)
records = data_df.to_dict('records')

result = collection.insert_many(records)
print("Dados salvos no banco de dados com sucesso!")

Dados salvos no banco de dados com sucesso!


In [12]:
#Configuração/conexão BigQuery 
client = bigquery.Client()

print(f"Conectado ao projeto: {client.project}")

Conectado ao projeto: projetotest-joao


In [17]:
DATASET_ID = config["BIG_QUERY_DATASET"]
TABLE_ID = config["BIG_QUERY_TABLE_ID"]
PROJECT_ID = client.project

table_id_completo = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
print(table_id_completo)

# Configuração do carregamento dos dados
# WRITE_APPEND: Adiciona novas linhas à tabela existente (padrão).
# WRITE_TRUNCATE: Substitui a tabela inteira pelos novos dados.
job_config = bigquery.LoadJobConfig(
    write_disposition="WRITE_APPEND", 
)

# Executar o carregamento
job = client.load_table_from_dataframe(
    data_df, 
    table_id_completo, 
    job_config=job_config
) 

# Aguardar o término do Job
job.result()  

# Verificar o Resultado
table = client.get_table(table_id_completo)
print(f"Dados carregados. A tabela agora tem {table.num_rows} linhas.")

projetotest-joao.datase_joao.users




Dados carregados. A tabela agora tem 2 linhas.
