# 🏠 Imobiliária 3 Irmãos - Análise e Integração com API

Este notebook demonstra como um desenvolvedor sênior trabalharia com o projeto da Imobiliária 3 Irmãos, integrando análise de dados, conexão com MySQL e uso da API FastAPI.

## 📋 Índice
1. [Configuração do Ambiente](#configuração)
2. [Conexão com MySQL](#mysql)
3. [Análise dos Dados](#análise)
4. [Integração com API](#api)
5. [Relatórios e Visualizações](#relatórios)
6. [Testes e Validações](#testes)


## 1. Configuração do Ambiente {#configuração}

Primeiro, vamos importar todas as bibliotecas necessárias e configurar o ambiente.


In [None]:
# Importações essenciais
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import requests
import json
from datetime import datetime, date, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configurações de visualização
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)

print("✅ Bibliotecas importadas com sucesso!")
print(f"📅 Data atual: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")


## 2. Conexão com MySQL {#mysql}

Configuração da conexão com o banco de dados MySQL usando SQLAlchemy.


In [None]:
# Configuração da conexão MySQL
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
import mysql.connector
from mysql.connector import Error

# Configurações do banco (ajuste conforme necessário)
DB_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'sua_senha_aqui',  # ⚠️ Altere para sua senha
    'database': 'imobiliaria_3_irmaos',
    'charset': 'utf8mb4'
}

# URL de conexão SQLAlchemy
DATABASE_URL = f"mysql+mysqlconnector://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

def test_mysql_connection():
    """Testa a conexão com MySQL"""
    try:
        connection = mysql.connector.connect(**DB_CONFIG)
        if connection.is_connected():
            db_info = connection.get_server_info()
            print(f"✅ Conectado ao MySQL Server versão {db_info}")
            cursor = connection.cursor()
            cursor.execute("SELECT DATABASE();")
            database_name = cursor.fetchone()
            print(f"📊 Banco de dados atual: {database_name[0]}")
            return True
    except Error as e:
        print(f"❌ Erro ao conectar com MySQL: {e}")
        return False
    finally:
        if 'connection' in locals() and connection.is_connected():
            cursor.close()
            connection.close()
            print("🔒 Conexão MySQL fechada")

# Teste da conexão
test_mysql_connection()


In [None]:
# Configuração do SQLAlchemy Engine
try:
    engine = create_engine(DATABASE_URL, echo=False)
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    # Teste da conexão SQLAlchemy
    with engine.connect() as connection:
        result = connection.execute(text("SELECT 1 as test"))
        print("✅ SQLAlchemy Engine configurado com sucesso!")
        
except Exception as e:
    print(f"❌ Erro ao configurar SQLAlchemy: {e}")
    print("💡 Verifique se o MySQL está rodando e as credenciais estão corretas")


## 3. Análise dos Dados {#análise}

Vamos analisar a estrutura do banco e os dados existentes.


In [None]:
def get_table_info():
    """Obtém informações sobre as tabelas do banco"""
    try:
        with engine.connect() as connection:
            # Lista todas as tabelas
            tables_query = text("""
                SELECT TABLE_NAME, TABLE_ROWS, DATA_LENGTH, INDEX_LENGTH
                FROM information_schema.TABLES 
                WHERE TABLE_SCHEMA = :database_name
                ORDER BY TABLE_ROWS DESC
            """)
            
            result = connection.execute(tables_query, {"database_name": DB_CONFIG['database']})
            tables_df = pd.DataFrame(result.fetchall(), 
                                   columns=['Tabela', 'Registros', 'Tamanho_Dados', 'Tamanho_Indices'])
            
            print("📊 Informações das Tabelas:")
            print(tables_df.to_string(index=False))
            
            return tables_df
            
    except Exception as e:
        print(f"❌ Erro ao obter informações das tabelas: {e}")
        return None

tables_info = get_table_info()


In [None]:
def load_data_from_mysql():
    """Carrega dados de todas as tabelas principais"""
    data = {}
    
    tables = [
        'usuario', 'perfil', 'corretor', 'status_imovel', 
        'imovel', 'visita', 'contrato_aluguel', 'contrato_venda'
    ]
    
    try:
        for table in tables:
            query = f"SELECT * FROM {table}"
            df = pd.read_sql(query, engine)
            data[table] = df
            print(f"✅ {table}: {len(df)} registros carregados")
            
    except Exception as e:
        print(f"❌ Erro ao carregar dados: {e}")
        
    return data

# Carregar dados
data = load_data_from_mysql()


In [None]:
# Análise exploratória dos dados
if data:
    print("🔍 ANÁLISE EXPLORATÓRIA DOS DADOS")
    print("=" * 50)
    
    for table_name, df in data.items():
        if not df.empty:
            print(f"\n📋 {table_name.upper()}:")
            print(f"   • Registros: {len(df)}")
            print(f"   • Colunas: {list(df.columns)}")
            print(f"   • Tipos: {df.dtypes.to_dict()}")
            
            # Mostrar primeiras linhas
            if len(df) > 0:
                print(f"   • Primeiras linhas:")
                print(df.head(3).to_string(index=False))
else:
    print("⚠️ Nenhum dado carregado. Verifique a conexão com o banco.")


## 4. Integração com API {#api}

Configuração e uso da API FastAPI para operações CRUD.


In [None]:
# Configuração da API
API_BASE_URL = "http://localhost:8000"  # URL base da API FastAPI

class ImobiliariaAPI:
    """Cliente para interagir com a API da Imobiliária"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        })
    
    def test_connection(self):
        """Testa a conexão com a API"""
        try:
            response = self.session.get(f"{self.base_url}/")
            if response.status_code == 200:
                print(f"✅ API conectada: {response.json()}")
                return True
            else:
                print(f"❌ Erro na API: {response.status_code}")
                return False
        except Exception as e:
            print(f"❌ Erro ao conectar com API: {e}")
            return False
    
    def get_endpoints(self):
        """Lista todos os endpoints disponíveis"""
        try:
            response = self.session.get(f"{self.base_url}/docs")
            if response.status_code == 200:
                print("📚 Documentação da API disponível em:")
                print(f"   {self.base_url}/docs")
                return True
        except Exception as e:
            print(f"❌ Erro ao acessar documentação: {e}")
        return False
    
    def get_usuarios(self, limit: int = 10, offset: int = 0):
        """Busca usuários"""
        try:
            response = self.session.get(
                f"{self.base_url}/usuarios/",
                params={'limit': limit, 'offset': offset}
            )
            if response.status_code == 200:
                return response.json()
            else:
                print(f"❌ Erro ao buscar usuários: {response.status_code}")
                return None
        except Exception as e:
            print(f"❌ Erro na requisição: {e}")
            return None
    
    def get_imoveis(self, limit: int = 10, offset: int = 0):
        """Busca imóveis"""
        try:
            response = self.session.get(
                f"{self.base_url}/imoveis/",
                params={'limit': limit, 'offset': offset}
            )
            if response.status_code == 200:
                return response.json()
            else:
                print(f"❌ Erro ao buscar imóveis: {response.status_code}")
                return None
        except Exception as e:
            print(f"❌ Erro na requisição: {e}")
            return None
    
    def create_usuario(self, usuario_data: dict):
        """Cria um novo usuário"""
        try:
            response = self.session.post(
                f"{self.base_url}/usuarios/",
                json=usuario_data
            )
            if response.status_code == 201:
                print(f"✅ Usuário criado: {response.json()}")
                return response.json()
            else:
                print(f"❌ Erro ao criar usuário: {response.status_code} - {response.text}")
                return None
        except Exception as e:
            print(f"❌ Erro na requisição: {e}")
            return None

# Instanciar cliente da API
api_client = ImobiliariaAPI(API_BASE_URL)

# Testar conexão
api_client.test_connection()
api_client.get_endpoints()


In [None]:
# Exemplo de uso da API - Buscar dados
print("🔍 BUSCANDO DADOS VIA API")
print("=" * 40)

# Buscar usuários
usuarios_api = api_client.get_usuarios(limit=5)
if usuarios_api:
    print(f"\n👥 Usuários encontrados: {len(usuarios_api.get('items', []))}")
    for usuario in usuarios_api.get('items', [])[:3]:
        print(f"   • {usuario.get('nome', 'N/A')} - {usuario.get('email', 'N/A')}")

# Buscar imóveis
imoveis_api = api_client.get_imoveis(limit=5)
if imoveis_api:
    print(f"\n🏠 Imóveis encontrados: {len(imoveis_api.get('items', []))}")
    for imovel in imoveis_api.get('items', [])[:3]:
        print(f"   • {imovel.get('tipo', 'N/A')} - R$ {imovel.get('valor', 0):,.2f}")


## 5. Relatórios e Visualizações {#relatórios}

Criação de relatórios e visualizações para análise de negócio.


In [None]:
def create_dashboard_data():
    """Cria dados para dashboard"""
    dashboard_data = {}
    
    try:
        with engine.connect() as connection:
            # Total de usuários por perfil
            usuarios_perfil = pd.read_sql("""
                SELECT p.tipo_perf, COUNT(u.id) as total
                FROM usuario u
                JOIN perfil p ON u.fk_perfil_id = p.id
                GROUP BY p.tipo_perf
            """, connection)
            dashboard_data['usuarios_perfil'] = usuarios_perfil
            
            # Imóveis por status
            imoveis_status = pd.read_sql("""
                SELECT s.descricao, COUNT(i.id) as total
                FROM imovel i
                JOIN status_imovel s ON i.fk_status_id = s.id_status
                GROUP BY s.descricao
            """, connection)
            dashboard_data['imoveis_status'] = imoveis_status
            
            # Valor médio por tipo de imóvel
            valor_medio = pd.read_sql("""
                SELECT tipo, AVG(valor) as valor_medio, COUNT(*) as quantidade
                FROM imovel
                GROUP BY tipo
            """, connection)
            dashboard_data['valor_medio'] = valor_medio
            
            print("✅ Dados do dashboard carregados com sucesso!")
            
    except Exception as e:
        print(f"❌ Erro ao carregar dados do dashboard: {e}")
    
    return dashboard_data

dashboard_data = create_dashboard_data()


In [None]:
# Visualizações do Dashboard
if dashboard_data:
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Usuários por Perfil', 'Imóveis por Status', 'Valor Médio por Tipo', 'Distribuição de Valores'),
        specs=[[{"type": "pie"}, {"type": "bar"}],
               [{"type": "bar"}, {"type": "histogram"}]]
    )
    
    # Gráfico 1: Usuários por perfil
    if 'usuarios_perfil' in dashboard_data and not dashboard_data['usuarios_perfil'].empty:
        fig.add_trace(
            go.Pie(
                labels=dashboard_data['usuarios_perfil']['tipo_perf'],
                values=dashboard_data['usuarios_perfil']['total'],
                name="Usuários por Perfil"
            ),
            row=1, col=1
        )
    
    # Gráfico 2: Imóveis por status
    if 'imoveis_status' in dashboard_data and not dashboard_data['imoveis_status'].empty:
        fig.add_trace(
            go.Bar(
                x=dashboard_data['imoveis_status']['descricao'],
                y=dashboard_data['imoveis_status']['total'],
                name="Imóveis por Status"
            ),
            row=1, col=2
        )
    
    # Gráfico 3: Valor médio por tipo
    if 'valor_medio' in dashboard_data and not dashboard_data['valor_medio'].empty:
        fig.add_trace(
            go.Bar(
                x=dashboard_data['valor_medio']['tipo'],
                y=dashboard_data['valor_medio']['valor_medio'],
                name="Valor Médio"
            ),
            row=2, col=1
        )
    
    # Gráfico 4: Distribuição de valores (se houver dados de imóveis)
    if 'imovel' in data and not data['imovel'].empty:
        fig.add_trace(
            go.Histogram(
                x=data['imovel']['valor'],
                name="Distribuição de Valores",
                nbinsx=20
            ),
            row=2, col=2
        )
    
    fig.update_layout(
        height=800,
        title_text="🏠 Dashboard Imobiliária 3 Irmãos",
        showlegend=False
    )
    
    fig.show()
else:
    print("⚠️ Dados insuficientes para criar visualizações")


In [None]:
# Relatório de Performance
def generate_performance_report():
    """Gera relatório de performance da imobiliária"""
    report = {
        'timestamp': datetime.now().strftime('%d/%m/%Y %H:%M:%S'),
        'summary': {}
    }
    
    try:
        with engine.connect() as connection:
            # Total de usuários
            total_usuarios = pd.read_sql("SELECT COUNT(*) as total FROM usuario", connection)
            report['summary']['total_usuarios'] = total_usuarios['total'].iloc[0]
            
            # Total de imóveis
            total_imoveis = pd.read_sql("SELECT COUNT(*) as total FROM imovel", connection)
            report['summary']['total_imoveis'] = total_imoveis['total'].iloc[0]
            
            # Valor total do portfólio
            valor_total = pd.read_sql("SELECT SUM(valor) as total FROM imovel", connection)
            report['summary']['valor_total_portfolio'] = float(valor_total['total'].iloc[0] or 0)
            
            # Total de corretores
            total_corretores = pd.read_sql("SELECT COUNT(*) as total FROM corretor", connection)
            report['summary']['total_corretores'] = total_corretores['total'].iloc[0]
            
            # Total de visitas
            total_visitas = pd.read_sql("SELECT COUNT(*) as total FROM visita", connection)
            report['summary']['total_visitas'] = total_visitas['total'].iloc[0]
            
    except Exception as e:
        print(f"❌ Erro ao gerar relatório: {e}")
    
    return report

# Gerar e exibir relatório
performance_report = generate_performance_report()

print("📊 RELATÓRIO DE PERFORMANCE")
print("=" * 50)
print(f"📅 Gerado em: {performance_report['timestamp']}")
print(f"👥 Total de Usuários: {performance_report['summary'].get('total_usuarios', 0):,}")
print(f"🏠 Total de Imóveis: {performance_report['summary'].get('total_imoveis', 0):,}")
print(f"💰 Valor Total do Portfólio: R$ {performance_report['summary'].get('valor_total_portfolio', 0):,.2f}")
print(f"👨‍💼 Total de Corretores: {performance_report['summary'].get('total_corretores', 0):,}")
print(f"👀 Total de Visitas: {performance_report['summary'].get('total_visitas', 0):,}")


## 6. Testes e Validações

Implementação de testes para validar a integridade dos dados e funcionamento da API.


In [None]:
def validate_data_integrity():
    """Valida a integridade dos dados no banco"""
    validation_results = []
    
    try:
        with engine.connect() as connection:
            # Verificar usuários órfãos (sem perfil)
            usuarios_orfos = pd.read_sql("""
                SELECT COUNT(*) as total
                FROM usuario u
                LEFT JOIN perfil p ON u.fk_perfil_id = p.id
                WHERE p.id IS NULL
            """, connection)
            
            validation_results.append({
                'test': 'Usuários sem perfil',
                'result': 'PASS' if usuarios_orfos['total'].iloc[0] == 0 else 'FAIL',
                'count': usuarios_orfos['total'].iloc[0]
            })
            
            # Verificar imóveis órfãos (sem status)
            imoveis_orfos = pd.read_sql("""
                SELECT COUNT(*) as total
                FROM imovel i
                LEFT JOIN status_imovel s ON i.fk_status_id = s.id_status
                WHERE s.id_status IS NULL
            """, connection)
            
            validation_results.append({
                'test': 'Imóveis sem status',
                'result': 'PASS' if imoveis_orfos['total'].iloc[0] == 0 else 'FAIL',
                'count': imoveis_orfos['total'].iloc[0]
            })
            
            # Verificar valores negativos
            valores_negativos = pd.read_sql("""
                SELECT COUNT(*) as total
                FROM imovel
                WHERE valor < 0
            """, connection)
            
            validation_results.append({
                'test': 'Valores negativos',
                'result': 'PASS' if valores_negativos['total'].iloc[0] == 0 else 'FAIL',
                'count': valores_negativos['total'].iloc[0]
            })
            
    except Exception as e:
        print(f"❌ Erro na validação: {e}")
    
    return validation_results

# Executar validações
validation_results = validate_data_integrity()

print("🔍 VALIDAÇÃO DE INTEGRIDADE DOS DADOS")
print("=" * 50)

for result in validation_results:
    status_icon = "✅" if result['result'] == 'PASS' else "❌"
    print(f"{status_icon} {result['test']}: {result['result']} ({result['count']} registros)")


In [None]:
def test_api_endpoints():
    """Testa os principais endpoints da API"""
    endpoints_to_test = [
        ('GET', '/usuarios/', 'Listar usuários'),
        ('GET', '/imoveis/', 'Listar imóveis'),
        ('GET', '/corretores/', 'Listar corretores'),
        ('GET', '/perfis/', 'Listar perfis'),
        ('GET', '/status-imoveis/', 'Listar status de imóveis')
    ]
    
    test_results = []
    
    for method, endpoint, description in endpoints_to_test:
        try:
            response = api_client.session.get(f"{API_BASE_URL}{endpoint}")
            status = 'PASS' if response.status_code in [200, 201] else 'FAIL'
            test_results.append({
                'endpoint': endpoint,
                'description': description,
                'status': status,
                'status_code': response.status_code
            })
        except Exception as e:
            test_results.append({
                'endpoint': endpoint,
                'description': description,
                'status': 'ERROR',
                'error': str(e)
            })
    
    return test_results

# Executar testes da API
api_test_results = test_api_endpoints()

print("\n🧪 TESTES DA API")
print("=" * 30)

for result in api_test_results:
    if result['status'] == 'PASS':
        print(f"✅ {result['description']}: {result['status_code']}")
    elif result['status'] == 'FAIL':
        print(f"❌ {result['description']}: {result['status_code']}")
    else:
        print(f"⚠️ {result['description']}: {result.get('error', 'Erro desconhecido')}")


## 🚀 Exemplo de Uso Avançado

Demonstração de como um desenvolvedor sênior utilizaria este notebook para análise e desenvolvimento.


In [None]:
# Exemplo: Análise de tendências de preços
def analyze_price_trends():
    """Analisa tendências de preços por tipo de imóvel"""
    try:
        with engine.connect() as connection:
            # Query para análise de preços
            price_analysis = pd.read_sql("""
                SELECT 
                    tipo,
                    COUNT(*) as quantidade,
                    AVG(valor) as preco_medio,
                    MIN(valor) as preco_minimo,
                    MAX(valor) as preco_maximo,
                    STDDEV(valor) as desvio_padrao
                FROM imovel
                GROUP BY tipo
                ORDER BY preco_medio DESC
            """, connection)
            
            if not price_analysis.empty:
                print("📈 ANÁLISE DE TENDÊNCIAS DE PREÇOS")
                print("=" * 50)
                
                for _, row in price_analysis.iterrows():
                    print(f"\n🏠 {row['tipo'].upper()}:")
                    print(f"   • Quantidade: {row['quantidade']:,}")
                    print(f"   • Preço Médio: R$ {row['preco_medio']:,.2f}")
                    print(f"   • Faixa: R$ {row['preco_minimo']:,.2f} - R$ {row['preco_maximo']:,.2f}")
                    print(f"   • Desvio Padrão: R$ {row['desvio_padrao']:,.2f}")
                
                return price_analysis
            else:
                print("⚠️ Nenhum dado de preço encontrado")
                return None
                
    except Exception as e:
        print(f"❌ Erro na análise de preços: {e}")
        return None

# Executar análise
price_trends = analyze_price_trends()


In [None]:
# Exemplo: Criação de dados de teste via API
def create_test_data():
    """Cria dados de teste para demonstração"""
    
    # Dados de exemplo para um novo usuário
    novo_usuario = {
        "nome": "João Silva Teste",
        "cpf": "12345678901",
        "telefone": "(11) 99999-9999",
        "email": "joao.teste@email.com",
        "data_nascimento": "1990-01-01",
        "sexo": "M",
        "login_usu": "joao.teste",
        "senha_usu": "senha123",
        "fk_perfil_id": 1
    }
    
    print("🧪 CRIANDO DADOS DE TESTE")
    print("=" * 30)
    
    # Tentar criar usuário via API
    resultado = api_client.create_usuario(novo_usuario)
    
    if resultado:
        print(f"✅ Usuário de teste criado com ID: {resultado.get('id')}")
    else:
        print("⚠️ Não foi possível criar usuário de teste (pode já existir)")

# Executar criação de dados de teste
create_test_data()


## 📝 Conclusões e Próximos Passos

Este notebook demonstra como um desenvolvedor sênior trabalharia com o projeto da Imobiliária 3 Irmãos:

### ✅ O que foi implementado:
1. **Conexão robusta com MySQL** usando SQLAlchemy
2. **Cliente API completo** para interação com FastAPI
3. **Análise exploratória de dados** com pandas
4. **Visualizações interativas** com Plotly
5. **Validação de integridade** dos dados
6. **Testes automatizados** da API
7. **Relatórios de performance** em tempo real

### 🚀 Próximos passos recomendados:
1. **Implementar cache Redis** para melhorar performance
2. **Adicionar autenticação JWT** na API
3. **Criar testes unitários** com pytest
4. **Implementar logging** estruturado
5. **Adicionar monitoramento** com Prometheus/Grafana
6. **Criar pipeline CI/CD** com GitHub Actions

### 💡 Dicas para desenvolvimento:
- Sempre valide dados antes de inserir no banco
- Use transações para operações críticas
- Implemente rate limiting na API
- Documente todas as funções e endpoints
- Use type hints em Python para melhor manutenibilidade


In [None]:
# Finalização do notebook
print("🎉 NOTEBOOK EXECUTADO COM SUCESSO!")
print("=" * 50)
print(f"📅 Concluído em: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
print("\n💼 Este notebook está pronto para uso em produção!")
print("🔧 Ajuste as configurações de banco e API conforme necessário.")
print("📚 Consulte a documentação da API em: http://localhost:8000/docs")
