In [38]:
from prefect import task, flow
import pandas as pd
import requests
import sqlite3

# Definir as tarefas do pipeline
@task
def extract_data():
    api_restcountries = "https://restcountries.com/v3.1/all"
    response = requests.get(api_restcountries)

    if response.status_code == 200:
        print("Importação com sucesso!")
        data = response.json()

        df = pd.json_normalize(data)
        return df
    else:
        print("Falha na importação!")
        return pd.DataFrame() 

@task
def transform_data(df):
    if not df.empty:
        # Selecionar colunas específicas
        df_paises = df[['name.common', 'capital', 'population', 'continents']].copy()

        # Aplicar transformações nas colunas
        df_paises['capital'] = df_paises['capital'].apply(lambda x: x[0] if isinstance(x, list) else x)
        df_paises['capital'] = df_paises['capital'].fillna('Sem Capital')
        df_paises['population'] = df_paises['population'].fillna(0)
        df_paises['continents'] = df_paises['continents'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)
        df_paises['name.common'] = df_paises['name.common'].apply(lambda x: x if isinstance(x, str) else 'Desconhecido')

        # Renomear as colunas
        df_paises = df_paises.rename(columns={'name.common': 'Nome do Pais', 'capital': 'Capital', 'population': 'Populacao', 'continents': 'Continentes'})

        # Filtrar países com população superior a 1 milhão
        df_paises = df_paises[df_paises['Populacao'] > 1_000_000]
        print("Dados transformados com sucesso!")
        return df_paises
    else:
        print("Erro: DataFrame vazio na transformação.")
        return pd.DataFrame()

@task
def load_data(df):
    if not df.empty:
        con = sqlite3.connect('paises.db')
        cur = con.cursor()

        cur.execute("DROP TABLE IF EXISTS paises")
        con.commit()

        cur.execute('''
        CREATE TABLE IF NOT EXISTS paises (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            Nome_do_Pais TEXT,
            Capital TEXT,
            Populacao INTEGER,
            Continentes TEXT
        )
        ''')
        con.commit()

        # Inserir dados no banco
        for index, row in df.iterrows():
            cur.execute('''
            INSERT INTO paises (Nome_do_Pais, Capital, Populacao, Continentes)
            VALUES (?, ?, ?, ?)''', (row['Nome do Pais'], row['Capital'], row['Populacao'], row['Continentes']))
        
        con.commit()
        con.close()
        print("Dados inseridos com sucesso no banco de dados!")
    else:
        print("Erro: DataFrame vazio na carga de dados.")

# Definir o Flow do Prefect para orquestrar as tarefas
@flow
def etl_pipeline():
    data = extract_data()
    cleaned_data = transform_data(data)
    load_data(cleaned_data)



In [39]:
# Executar o fluxo manualmente para verificar se funciona corretamente
etl_pipeline()

Importação com sucesso!


Dados transformados com sucesso!


Dados inseridos com sucesso no banco de dados!
