In [None]:
import pymysql
from pymysql import Error
from typing import List, Dict, Any, Optional

class DatabaseManager:
    """Gerenciador de conexão e operações com MySQL"""
    
    def __init__(self, host='localhost', database='sql_tutorial', user='dbuser', password='dbpass', port=3306):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.connection = None
    
    def connect(self) -> bool:
        """Estabelece conexão com o banco de dados"""
        try:
            self.connection = pymysql.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password,
                port=self.port
            )
            print(f"✓ Conectado ao MySQL - Database: {self.database}")
            return True
        except Error as e:
            print(f"✗ Erro ao conectar: {e}")
            return False
    
    def disconnect(self):
        """Fecha a conexão com o banco"""
        if self.connection:
            self.connection.close()
            print("✓ Conexão fechada")
    
    def execute_query(self, query: str, params: tuple = None) -> bool:
        """Executa queries de INSERT, UPDATE, DELETE"""
        cursor = self.connection.cursor()
        try:
            cursor.execute(query, params or ())
            self.connection.commit()
            print(f"✓ Query executada com sucesso. Linhas afetadas: {cursor.rowcount}")
            return True
        except Error as e:
            print(f"✗ Erro ao executar query: {e}")
            return False
        finally:
            cursor.close()
    
    def fetch_all(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
        """Executa SELECT e retorna todos os resultados como lista de dicionários"""
        cursor = self.connection.cursor(pymysql.cursors.DictCursor)
        try:
            cursor.execute(query, params or ())
            results = cursor.fetchall()
            return results
        except Error as e:
            print(f"✗ Erro ao buscar dados: {e}")
            return []
        finally:
            cursor.close()
    
    def fetch_one(self, query: str, params: tuple = None) -> Optional[Dict[str, Any]]:
        """Executa SELECT e retorna um único resultado"""
        cursor = self.connection.cursor(pymysql.cursors.DictCursor)
        try:
            cursor.execute(query, params or ())
            result = cursor.fetchone()
            return result
        except Error as e:
            print(f"✗ Erro ao buscar dados: {e}")
            return None
        finally:
            cursor.close()

In [None]:
# Inicializa o gerenciador
db = DatabaseManager(
    host='localhost',
    database='sql_tutorial',
    user='root',
    password='root',
    port=3306
)

# Conecta ao banco
if not db.connect():
    print('Erro ao conectar ao BD')

In [None]:
try:
    # 3. Buscar todos os empregados
    print("\n--- Listando todos os empregados ---")
    empregados = db.fetch_all("SELECT * FROM employees")
    print(empregados)
    for user in empregados:
        #print(f"ID: {user['id']} | Nome: {user['nome']} | Email: {user['email']} | Idade: {user['idade']}")
        print(user)
        
finally:
    # Fecha a conexão
    print('erro na query')
    #db.disconnect()

In [None]:
query = 'SELECT * FROM employees'
result = db.fetch_all(query)
#result

## Carregar o result do SQL em um Pandas DataFrame

In [None]:
import pandas as pd

# Converter para DataFrame
df = pd.DataFrame(result)
df

## Executar queries SQL em um Pandas DataFrame

In [None]:
import duckdb
import pandas as pd

# seu df
# df = ...

# Executando SQL diretamente sobre o DataFrame
resultado = duckdb.query("""
    SELECT 
        first_name,
        last_name,
        salary
    FROM df2
    WHERE salary > 20000
""").to_df()

print(resultado)


## Outros exemplos de uso

In [None]:
def main():
    # Inicializa o gerenciador
    db = DatabaseManager(
        host='localhost',
        database='testdb',
        user='root',
        password='root',
        port=3306
    )
    
    # Conecta ao banco
    if not db.connect():
        return
    
    try:
        # 1. Criar tabela de usuários
        print("\n--- Criando tabela 'usuarios' ---")
        db.create_table('usuarios', {
            'id': 'INT AUTO_INCREMENT PRIMARY KEY',
            'nome': 'VARCHAR(100) NOT NULL',
            'email': 'VARCHAR(100) UNIQUE NOT NULL',
            'idade': 'INT',
            'criado_em': 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
        })
        
        # 2. Inserir dados
        print("\n--- Inserindo usuários ---")
        db.insert('usuarios', {
            'nome': 'João Silva',
            'email': 'joao@email.com',
            'idade': 28
        })
        
        db.insert('usuarios', {
            'nome': 'Maria Santos',
            'email': 'maria@email.com',
            'idade': 32
        })
        
        db.insert('usuarios', {
            'nome': 'Pedro Costa',
            'email': 'pedro@email.com',
            'idade': 25
        })
        
        # 3. Buscar todos os usuários
        print("\n--- Listando todos os usuários ---")
        usuarios = db.fetch_all("SELECT * FROM usuarios")
        for user in usuarios:
            print(f"ID: {user['id']} | Nome: {user['nome']} | Email: {user['email']} | Idade: {user['idade']}")
        
        # 4. Buscar um usuário específico
        print("\n--- Buscando usuário por email ---")
        usuario = db.fetch_one("SELECT * FROM usuarios WHERE email = %s", ('joao@email.com',))
        if usuario:
            print(f"Encontrado: {usuario['nome']}")
        
        # 5. Atualizar dados
        print("\n--- Atualizando idade do João ---")
        db.update('usuarios', 
                 {'idade': 29}, 
                 'email = %s', 
                 ('joao@email.com',))
        
        # 6. Buscar usuários com idade maior que 27
        print("\n--- Usuários com idade > 27 ---")
        usuarios_filtrados = db.fetch_all("SELECT nome, idade FROM usuarios WHERE idade > %s", (27,))
        for user in usuarios_filtrados:
            print(f"{user['nome']} - {user['idade']} anos")
        
        # 7. Deletar usuário
        print("\n--- Deletando usuário ---")
        db.delete('usuarios', 'email = %s', ('pedro@email.com',))
        
        # 8. Contar usuários restantes
        print("\n--- Total de usuários ---")
        resultado = db.fetch_one("SELECT COUNT(*) as total FROM usuarios")
        print(f"Total de usuários: {resultado['total']}")
        
    finally:
        # Fecha a conexão
        db.disconnect()