In [3]:
# Instalar pacotes necessários
!pip install fastapi uvicorn nest_asyncio beautifulsoup4 requests



In [9]:
# Importar bibliotecas necessárias
import sqlite3
from fastapi import FastAPI, HTTPException, Query, Depends, Path as FastAPIPath
import pandas as pd
import json
import os
import nest_asyncio
import uvicorn
from pathlib import Path
from typing import List, Dict, Optional, Any
from bs4 import BeautifulSoup
import requests
import re
import time
import random

# Aplicar nest_asyncio para Jupyter
nest_asyncio.apply()

In [10]:
# Configurações
DATA_DIR = Path("data")
RAW_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"
DATABASE_PATH = PROCESSED_DIR / "worldbank.db"

# Criar diretórios se não existirem
os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)

# URLs do Banco Mundial
BASE_URL = "https://data.worldbank.org"
COUNTRIES_URL = f"{BASE_URL}/country"

# Cabeçalhos HTTP
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
}

# Categorias de indicadores
INDICATOR_CATEGORIES = ["social", "economic", "environment", "institutions"]

# Inicializar FastAPI
app = FastAPI(
    title="API do Banco Mundial",
    description="API para acessar dados de países e indicadores do Banco Mundial",
    version="1.0.0",
)

In [11]:
# Funções de Web Scraping
def get_country_list():
    """
    Realiza scraping da lista de países do site do Banco Mundial.
    """
    print("Obtendo lista de países...")
    
    try:
        # Fazer requisição para a página de países
        response = requests.get(COUNTRIES_URL, headers=HEADERS)
        response.raise_for_status()
        
        # Parsear o HTML com BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Encontrar os links dos países
        country_links = []
        
        # Buscar todos os links de país
        for link in soup.find_all('a', href=True):
            href = link.get('href')
            if href and href.startswith('/country/'):
                # Limpar o código do país
                country_code = href.split('/')[-1].split('?')[0]
                country_code = re.sub(r'[^a-zA-Z0-9\-]', '', country_code)
                
                country_links.append({
                    'url': BASE_URL + href,
                    'code': country_code,
                    'name': link.text.strip()
                })
        
        # Remover duplicatas
        unique_links = []
        seen_codes = set()
        for country in country_links:
            if country['code'] not in seen_codes and country['name']:
                unique_links.append(country)
                seen_codes.add(country['code'])
        
        print(f"Encontrados {len(unique_links)} países únicos.")
        
        # Salvar a lista bruta em JSON
        country_list_path = RAW_DIR / "country_list.json"
        with open(country_list_path, 'w', encoding='utf-8') as f:
            json.dump(unique_links, f, indent=2, ensure_ascii=False)
        
        print(f"Lista de países salva em {country_list_path}")
        return unique_links
    
    except Exception as e:
        print(f"Erro ao obter lista de países: {e}")
        return []

def get_country_data(country_url, country_code, country_name):
    """
    Realiza scraping de dados detalhados de um país específico.
    """
    try:
        print(f"Obtendo dados para: {country_name} ({country_code})")
        
        # Fazer requisição para a página do país
        response = requests.get(country_url, headers=HEADERS)
        response.raise_for_status()
        
        # Parsear o HTML com BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # Dados básicos do país
        country_data = {
            'name': country_name,
            'code': country_code,
            'url': country_url,
            'indicators': {},
            'region': None,
            'income_level': None,
            'capital': None
        }
        
        # Buscar região e classificação de renda
        region_element = soup.find('div', string=re.compile(r'Region', re.IGNORECASE))
        if region_element and region_element.find_next():
            country_data['region'] = region_element.find_next().text.strip()
        
        income_element = soup.find('div', string=re.compile(r'Income', re.IGNORECASE))
        if income_element and income_element.find_next():
            country_data['income_level'] = income_element.find_next().text.strip()
        
        # Extrair indicadores da página
        for category in INDICATOR_CATEGORIES:
            country_data['indicators'][category] = []
            
            # Pesquisar por seções que podem conter indicadores desta categoria
            for section in soup.find_all(['section', 'div'], class_=re.compile(r'indicator|data')):
                if category.lower() in section.get_text().lower():
                    # Encontrar tabelas ou indicadores nesta seção
                    for element in section.find_all(['tr', 'div'], class_=re.compile(r'indicator|row')):
                        # Extrair nome e valor
                        name_element = element.find(['th', 'td', 'div'], class_=re.compile(r'name|title'))
                        value_element = element.find(['td', 'div', 'span'], class_=re.compile(r'value|data'))
                        
                        if name_element and value_element:
                            indicator_name = name_element.text.strip()
                            value_text = value_element.text.strip()
                            
                            # Extrair ano
                            year_match = re.search(r'\((\d{4})\)', value_text)
                            year = int(year_match.group(1)) if year_match else 2023
                            
                            # Criar código do indicador
                            indicator_code = re.sub(r'[^a-zA-Z0-9]', '_', indicator_name).lower()
                            
                            # Adicionar indicador
                            country_data['indicators'][category].append({
                                'name': indicator_name,
                                'code': indicator_code,
                                'value': value_text.replace(f"({year})", "").strip(),
                                'year': year
                            })
        
        # Salvar os dados brutos do país
        country_data_path = RAW_DIR / f"{country_code}.json"
        with open(country_data_path, 'w', encoding='utf-8') as f:
            json.dump(country_data, f, indent=2, ensure_ascii=False)
        
        print(f"Dados de {country_name} salvos em {country_data_path}")
        
        # Pausa para evitar sobrecarga no servidor
        time.sleep(random.uniform(1, 2))
        
        return country_data
    
    except Exception as e:
        print(f"Erro ao obter dados do país {country_name}: {e}")
        return None

In [12]:
# Funções para processamento e banco de dados
def process_data_for_db(countries_data):
    """
    Processa os dados dos países para criar o banco de dados.
    """
    # Preparar dados para as tabelas
    countries = []
    indicators = []
    indicator_values = []
    
    # Processar cada país
    for country in countries_data:
        # Dados do país
        country_code = country['code']
        countries.append({
            'country_code': country_code,
            'name': country['name'],
            'region': country.get('region'),
            'income_level': country.get('income_level'),
            'capital': country.get('capital'),
            'iso2_code': country_code[:2].lower() if len(country_code) >= 2 else None,
            'iso3_code': country_code[:3].upper() if len(country_code) >= 3 else None
        })
        
        # Processar indicadores por categoria
        for category, category_indicators in country.get('indicators', {}).items():
            for indicator in category_indicators:
                indicator_code = indicator['code']
                indicator_name = indicator['name']
                
                # Adicionar à lista de indicadores únicos
                indicator_exists = False
                for existing in indicators:
                    if existing['indicator_code'] == indicator_code:
                        indicator_exists = True
                        break
                
                if not indicator_exists:
                    indicators.append({
                        'indicator_code': indicator_code,
                        'name': indicator_name,
                        'category': category,
                        'description': '',
                        'source': 'Banco Mundial'
                    })
                
                # Adicionar valor do indicador
                if indicator.get('value') is not None and indicator.get('year'):
                    try:
                        # Limpar e converter o valor
                        value_str = indicator['value']
                        if isinstance(value_str, str):
                            # Remover caracteres não numéricos, exceto ponto decimal
                            value_str = re.sub(r'[^\d.-]', '', value_str)
                        
                        value = float(value_str) if value_str else None
                        
                        if value is not None:
                            indicator_values.append({
                                'country_code': country_code,
                                'indicator_code': indicator_code,
                                'year': indicator['year'],
                                'value': value
                            })
                    except (ValueError, TypeError) as e:
                        print(f"Erro ao processar valor do indicador {indicator_name} para {country['name']}: {e}")
    
    return countries, indicators, indicator_values

def create_database_from_data(countries, indicators, indicator_values):
    """
    Cria o banco de dados SQLite a partir dos dados processados.
    """
    try:
        # Converter para DataFrames
        countries_df = pd.DataFrame(countries)
        indicators_df = pd.DataFrame(indicators)
        values_df = pd.DataFrame(indicator_values)
        
        # Conectar ao banco de dados SQLite
        conn = sqlite3.connect(DATABASE_PATH)
        
        # Criar tabelas
        with conn:
            # Tabela de países
            conn.execute('''
            CREATE TABLE IF NOT EXISTS countries (
                country_code TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                region TEXT,
                income_level TEXT,
                capital TEXT,
                longitude REAL,
                latitude REAL,
                iso2_code TEXT,
                iso3_code TEXT
            )
            ''')
            
            # Tabela de indicadores (metadados)
            conn.execute('''
            CREATE TABLE IF NOT EXISTS indicators (
                indicator_code TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                category TEXT,
                description TEXT,
                source TEXT
            )
            ''')
            
            # Tabela de valores de indicadores (dados)
            conn.execute('''
            CREATE TABLE IF NOT EXISTS indicator_values (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                country_code TEXT NOT NULL,
                indicator_code TEXT NOT NULL,
                year INTEGER NOT NULL,
                value REAL,
                FOREIGN KEY (country_code) REFERENCES countries (country_code),
                FOREIGN KEY (indicator_code) REFERENCES indicators (indicator_code),
                UNIQUE(country_code, indicator_code, year)
            )
            ''')
        
        # Inserir dados
        print("Inserindo dados no banco de dados...")
        
        # Países
        countries_df.to_sql("countries", conn, if_exists="replace", index=False)
        print(f"Inseridos {len(countries_df)} países")
        
        # Indicadores (metadados)
        if not indicators_df.empty:
            indicators_df.to_sql("indicators", conn, if_exists="replace", index=False)
            print(f"Inseridos {len(indicators_df)} indicadores")
        
        # Valores de indicadores (dados)
        if not values_df.empty:
            values_df.to_sql("indicator_values", conn, if_exists="replace", index=False)
            print(f"Inseridos {len(values_df)} valores de indicadores")
        
        # Criar índices para melhorar performance
        with conn:
            conn.execute("CREATE INDEX IF NOT EXISTS idx_country_code ON indicator_values (country_code)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_indicator_code ON indicator_values (indicator_code)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_year ON indicator_values (year)")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_country_indicator ON indicator_values (country_code, indicator_code)")
        
        conn.close()
        print("Banco de dados criado com sucesso!")
        return True
        
    except Exception as e:
        print(f"Erro ao criar banco de dados: {e}")
        return False

def setup_data(limit=5):
    """
    Coleta dados do Banco Mundial, processa e cria o banco de dados.
    
    Args:
        limit: Número de países para coletar (limite para teste)
        
    Returns:
        bool: True se a operação foi bem-sucedida, False caso contrário
    """
    print(f"Coletando dados do Banco Mundial para {limit} países...")
    
    # 1. Obter lista de países
    countries = get_country_list()
    
    if not countries:
        print("Falha ao obter lista de países. Abortando.")
        return False
    
    # 2. Obter dados para um subconjunto de países
    countries_data = []
    for country in countries[:limit]:  # Limitar para teste
        country_data = get_country_data(country['url'], country['code'], country['name'])
        if country_data:
            countries_data.append(country_data)
    
    if not countries_data:
        print("Falha ao obter dados dos países. Abortando.")
        return False
    
    # 3. Processar dados para o banco de dados
    countries, indicators, indicator_values = process_data_for_db(countries_data)
    
    # 4. Criar banco de dados
    return create_database_from_data(countries, indicators, indicator_values)

In [13]:
# Dependência para conexão com o banco de dados
def get_db():
    conn = sqlite3.connect(DATABASE_PATH)
    conn.row_factory = sqlite3.Row  # Retornar resultados como dicionários
    try:
        yield conn
    finally:
        conn.close()

# Rotas da API
@app.get("/")
def read_root():
    """Retorna informações básicas sobre a API."""
    return {
        "name": "API do Banco Mundial",
        "description": "API para acessar dados de países e indicadores do Banco Mundial",
        "endpoints": {
            "countries": "/countries",
            "country": "/countries/{country_code}",
            "indicators": "/countries/{country_code}/indicators",
            "profile": "/countries/{country_code}/profile",
            "categories": "/indicators/categories"
        }
    }

@app.get("/countries")
def list_countries(
    skip: int = Query(0, description="Número de registros para pular"),
    limit: int = Query(100, description="Número máximo de registros para retornar"),
    db: sqlite3.Connection = Depends(get_db)
):
    """Lista todos os países disponíveis."""
    cursor = db.cursor()
    cursor.execute("SELECT COUNT(*) FROM countries")
    total = cursor.fetchone()[0]
    
    cursor.execute(
        "SELECT country_code, name, region FROM countries ORDER BY name LIMIT ? OFFSET ?", 
        (limit, skip)
    )
    countries = [dict(row) for row in cursor.fetchall()]
    
    return {"total": total, "data": countries}

@app.get("/countries/{country_code}")
def get_country(
    country_code: str = FastAPIPath(..., description="Código do país"),
    db: sqlite3.Connection = Depends(get_db)
):
    """Retorna informações detalhadas sobre um país específico."""
    cursor = db.cursor()
    cursor.execute(
        "SELECT * FROM countries WHERE country_code = ?", 
        (country_code,)
    )
    
    country = cursor.fetchone()
    
    if not country:
        raise HTTPException(status_code=404, detail=f"País com código '{country_code}' não encontrado")
    
    return dict(country)

@app.get("/countries/{country_code}/indicators")
def get_country_indicators(
    country_code: str = FastAPIPath(..., description="Código do país"),
    category: Optional[str] = Query(None, description="Filtrar por categoria"),
    db: sqlite3.Connection = Depends(get_db)
):
    """Retorna indicadores para um país específico."""
    cursor = db.cursor()
    
    # Verificar se o país existe
    cursor.execute("SELECT * FROM countries WHERE country_code = ?", (country_code,))
    country = cursor.fetchone()
    
    if not country:
        raise HTTPException(status_code=404, detail=f"País com código '{country_code}' não encontrado")
    
    # Construir consulta para indicadores
    query = """
    SELECT 
        i.indicator_code, 
        i.name, 
        i.category,
        iv.year, 
        iv.value
    FROM 
        indicator_values iv
    JOIN 
        indicators i ON iv.indicator_code = i.indicator_code
    WHERE 
        iv.country_code = ?
    """
    
    params = [country_code]
    
    if category:
        query += " AND i.category = ?"
        params.append(category)
    
    query += " ORDER BY i.name, iv.year DESC"
    
    cursor.execute(query, params)
    results = cursor.fetchall()
    
    # Agrupar por indicador
    indicators = {}
    for row in results:
        indicator_code = row["indicator_code"]
        
        if indicator_code not in indicators:
            indicators[indicator_code] = {
                "indicator_code": indicator_code,
                "name": row["name"],
                "category": row["category"],
                "values": []
            }
        
        indicators[indicator_code]["values"].append({
            "year": row["year"],
            "value": row["value"]
        })
    
    return {
        "country": dict(country),
        "indicators": list(indicators.values())
    }

@app.get("/countries/{country_code}/profile")
def get_country_profile(
    country_code: str = FastAPIPath(..., description="Código do país"),
    db: sqlite3.Connection = Depends(get_db)
):
    """Retorna um perfil completo do país com os indicadores mais recentes."""
    cursor = db.cursor()
    
    # Verificar se o país existe
    cursor.execute("SELECT * FROM countries WHERE country_code = ?", (country_code,))
    country = cursor.fetchone()
    
    if not country:
        raise HTTPException(status_code=404, detail=f"País com código '{country_code}' não encontrado")
    
    # Buscar indicadores mais recentes por categoria
    query = """
    WITH RankedIndicators AS (
        SELECT 
            i.indicator_code, 
            i.name,
            i.category,
            iv.year, 
            iv.value,
            ROW_NUMBER() OVER (PARTITION BY i.indicator_code ORDER BY iv.year DESC) as rank
        FROM 
            indicator_values iv
        JOIN 
            indicators i ON iv.indicator_code = i.indicator_code
        WHERE 
            iv.country_code = ?
    )
    SELECT 
        indicator_code, 
        name,
        category,
        year, 
        value
    FROM 
        RankedIndicators
    WHERE 
        rank = 1
    ORDER BY 
        category, name
    """
    
    cursor.execute(query, (country_code,))
    indicators = cursor.fetchall()
    
    # Organizar por categoria
    categories = {}
    for indicator in indicators:
        category = indicator["category"] or "other"
        
        if category not in categories:
            categories[category] = []
        
        categories[category].append({
            "indicator_code": indicator["indicator_code"],
            "name": indicator["name"],
            "value": indicator["value"],
            "year": indicator["year"]
        })
    
    return {
        "country": dict(country),
        "profile": categories
    }

@app.get("/indicators/categories")
def get_indicator_categories(
    db: sqlite3.Connection = Depends(get_db)
):
    """Retorna as categorias de indicadores disponíveis."""
    cursor = db.cursor()
    cursor.execute(
        "SELECT category, COUNT(*) as count FROM indicators GROUP BY category ORDER BY count DESC"
    )
    
    categories = [dict(row) for row in cursor.fetchall()]
    
    return {"categories": categories}

# Funções para uso no Jupyter Notebook
def show_countries():
    """
    Exibe os países atualmente no banco de dados como um DataFrame.
    Útil para verificação no Jupyter.
    """
    try:
        conn = sqlite3.connect(DATABASE_PATH)
        df = pd.read_sql("SELECT country_code, name, region, income_level FROM countries", conn)
        conn.close()
        return df
    except Exception as e:
        print(f"Erro ao mostrar países: {e}")
        return None

def show_indicators():
    """
    Exibe os indicadores atualmente no banco de dados como um DataFrame.
    """
    try:
        conn = sqlite3.connect(DATABASE_PATH)
        df = pd.read_sql("SELECT indicator_code, name, category FROM indicators", conn)
        conn.close()
        return df
    except Exception as e:
        print(f"Erro ao mostrar indicadores: {e}")
        return None

def start_server(port=8000):
    """
    Inicia o servidor FastAPI em uma porta específica.
    """
    print(f"Iniciando servidor na porta {port}...")
    print(f"Acesse a API em: http://127.0.0.1:{port}")
    print(f"Documentação em: http://127.0.0.1:{port}/docs")
    uvicorn.run(app, host="127.0.0.1", port=port)

In [14]:
# Verificar se o banco de dados existe e iniciar a coleta de dados se necessário
if DATABASE_PATH.exists():
    print("Banco de dados encontrado. API pronta para uso.")
else:
    print("Banco de dados não encontrado. Executando coleta de dados...")
    setup_data(limit=5)  # Coleta dados para 5 países como exemplo

print("\nAgora você pode usar:")
print("- show_countries() - Mostra os países coletados")
print("- show_indicators() - Mostra os indicadores coletados")
print("- start_server(port=8000) - Inicia o servidor da API")

Banco de dados encontrado. API pronta para uso.

Agora você pode usar:
- show_countries() - Mostra os países coletados
- show_indicators() - Mostra os indicadores coletados
- start_server(port=8000) - Inicia o servidor da API


In [15]:
# Visualizar países coletados
print("Países coletados:")
countries_df = show_countries()
display(countries_df)

# Visualizar indicadores coletados
print("\nIndicadores coletados:")
indicators_df = show_indicators()
display(indicators_df)

Países coletados:


Unnamed: 0,country_code,name,region,income_level
0,af,Afghanistan,,
1,al,Albania,,
2,dz,Algeria,,
3,as,American Samoa,,
4,ad,Andorra,,



Indicadores coletados:


Unnamed: 0,indicator_code,name,category
0,NY.GDP.MKTP.CD,PIB (US$ atual),economic
1,NY.GDP.PCAP.CD,PIB per capita (US$ atual),economic
2,NY.GDP.MKTP.KD.ZG,Crescimento do PIB (% anual),economic
3,SP.POP.TOTL,"População, total",social
4,SP.POP.GROW,Crescimento populacional (% anual),social
5,SP.DYN.LE00.IN,"Expectativa de vida ao nascer, total (anos)",social
6,SI.POV.DDAY,Poverty headcount ratio at $2.15 a day (2017 P...,social
7,SM.POP.NETM,Net migration,social
8,HD.HCI.OVRL,Human Capital Index (HCI) (scale 0-1),social
9,SL.UEM.TOTL.ZS,"Unemployment, total (% of total labor force)",economic


In [None]:
# Tente com uma porta diferente
start_server(port=9999)

Iniciando servidor na porta 9999...
Acesse a API em: http://127.0.0.1:9999
Documentação em: http://127.0.0.1:9999/docs


INFO:     Started server process [2620]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:9999 (Press CTRL+C to quit)


INFO:     127.0.0.1:56516 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:56516 - "GET /favicon.ico HTTP/1.1" 404 Not Found


In [17]:
# Testar a função get_country diretamente
def test_api_function():
    # Obter a lista de países do banco de dados
    countries = show_countries()
    if countries is None or len(countries) == 0:
        print("Nenhum país encontrado no banco de dados")
        return
    
    # Obter código do primeiro país
    country_code = countries.iloc[0]['country_code']
    
    # Conectar ao banco de dados
    conn = sqlite3.connect(DATABASE_PATH)
    conn.row_factory = sqlite3.Row
    
    # Executar a consulta diretamente
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM countries WHERE country_code = ?", (country_code,))
    country = cursor.fetchone()
    
    if country:
        print(f"Detalhes do país '{country['name']}':")
        for key, value in dict(country).items():
            print(f"  - {key}: {value}")
        
        # Buscar indicadores deste país
        cursor.execute("""
        SELECT i.name, i.category, iv.value, iv.year
        FROM indicator_values iv
        JOIN indicators i ON iv.indicator_code = i.indicator_code
        WHERE iv.country_code = ?
        ORDER BY i.category, i.name, iv.year DESC
        LIMIT 10
        """, (country_code,))
        
        indicators = cursor.fetchall()
        
        print("\nAlguns indicadores:")
        for indicator in indicators:
            print(f"  - {indicator['name']} ({indicator['category']}): {indicator['value']} ({indicator['year']})")
    else:
        print(f"País com código '{country_code}' não encontrado")
    
    conn.close()

# Executar o teste
test_api_function()

Detalhes do país 'Afghanistan':
  - country_code: af
  - name: Afghanistan
  - region: None
  - income_level: None
  - capital: None
  - longitude: None
  - latitude: None
  - iso2_code: AF
  - iso3_code: AFG

Alguns indicadores:
