> Projeto Desenvolve <br>
Programação Intermediária com Python <br>
Profa. Camila Laranjeira (mila@projetodesenvolve.com.br) <br>

# 3.14 - ORM

## Exercícios

#### Q1. Conhecendo os dados
Baixe o seguinte csv onde iremos trabalhar. Ele contém informações sobre salários de profissionais de dados de uma empresa hipotética entre 2009 e 2016
* https://github.com/camilalaranjeira/python-intermediario/blob/main/salaries.csv

Suas colunas, descritas na [página do Kaggle que contém o dataset](https://www.kaggle.com/datasets/krishujeniya/salary-prediction-of-data-professions?resource=download), são:
* FIRST NAME: Primeiro nome do profissional de dados (String)
* LAST NAME: Sobrenome do profissional de dados (String)
* SEX: Gênero do profissional de dados (String: 'F' para Feminino, 'M' para Masculino)
* DOJ (Date of Joining): A data em que o profissional de dados ingressou na empresa (Data no formato MM/DD/AAAA)
* CURRENT DATE: A data atual ou a data de referência dos dados (Data no formato MM/DD/AAAA)
* DESIGNATION: O cargo ou designação do profissional de dados (String: ex., Analista, Analista Sênior, Gerente)
* AGE: Idade do profissional de dados (Integer)
* SALARY: Salário anual do profissional de dados (Float)
* UNIT: Unidade de negócios ou departamento em que o profissional de dados trabalha (String: ex., TI, Finanças, Marketing)
* LEAVES USED: Número de licenças utilizadas pelo profissional de dados (Integer)
* LEAVES REMAINING: Número de licenças restantes para o profissional de dados (Integer)
* RATINGS: Avaliações de desempenho do profissional de dados (Float)
* PAST EXP: Experiência de trabalho anterior em anos antes de ingressar na empresa atual (Float)

Na célula a seguir, **carregue os dados do CSV e dê uma olhada neles antes de seguir**.

In [None]:
import pandas as pd

url = 'https://raw.githubusercontent.com/camilalaranjeira/python-intermediario/main/salaries.csv'
df = pd.read_csv(url, parse_dates=['DOJ', 'CURRENT DATE'])

print("Primeiras linhas:")
print(df.head())

print("\nInformações gerais:")
df.info()

print("\nEstatísticas descritivas:")
print(df.describe())

print("\nValores únicos em SEX:", df['SEX'].unique())
print("Valores únicos em DESIGNATION:", df['DESIGNATION'].unique())
print("Valores únicos em UNIT:", df['UNIT'].unique())

#### Q2. Modelando os dados
Você deve **criar um ORM com SQLAlchemy capaz de comportar os dados dessa base**.

* Crie um campo de chave primária `ID`, que deve ser incrementado automaticamente
* Os campos SEX, DESIGNATION e UNIT devem ser definidos como classes `Enum` com os possíveis valores (consulte os valores únicos dessas colunas)
* Para os outros campos, consulte os tipos de dados informados na descrição acima

In [None]:
from enum import Enum
from sqlalchemy import Column, Integer, String, Float, Date, Enum as SQLEnum
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class SexEnum(Enum):
    F = 'F'
    M = 'M'

class DesignationEnum(Enum):
    Analyst = 'Analyst'
    Associate = 'Associate'
    Director = 'Director'
    Manager = 'Manager'
    Senior_Analyst = 'Senior Analyst'
    Senior_Manager = 'Senior Manager'

class UnitEnum(Enum):
    Finance = 'Finance'
    IT = 'IT'
    Management = 'Management'
    Marketing = 'Marketing'
    Operations = 'Operations'
    Web = 'Web'

class Salaries(Base):
    __tablename__ = 'salaries'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    first_name = Column("FIRST NAME", String)
    last_name = Column("LAST NAME", String)
    sex = Column("SEX", SQLEnum(SexEnum))
    doj = Column("DOJ", Date)
    current_date = Column("CURRENT DATE", Date)
    designation = Column("DESIGNATION", SQLEnum(DesignationEnum))
    age = Column("AGE", Integer)
    salary = Column("SALARY", Float)
    unit = Column("UNIT", SQLEnum(UnitEnum))
    leaves_used = Column("LEAVES USED", Integer)
    leaves_remaining = Column("LEAVES REMAINING", Integer)
    ratings = Column("RATINGS", Float)
    past_exp = Column("PAST EXP", Float)

#### Q3. Estabelecendo uma conexão

Usando o método `create_engine` do SQLAlchemy, crie uma conexão com um novo banco de dados SQLite chamado `salarios`.

In [None]:
from sqlalchemy import create_engine

engine = create_engine('sqlite:///salarios.db')

#### Q4. Criando as tabelas
Crie as tabelas da questão Q2 no banco `salarios`.

In [None]:
Base.metadata.create_all(engine)

#### Q5. Populando

Usando o método `to_sql` da biblioteca Pandas (veja [a documentação](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html)), popule o banco `salarios` com os dados do csv que você carregou na questão Q1.
* Lembre-se de definir o parâmetro `if_exists='append'` para que as tabelas não sejam dropadas e recriadas.

In [None]:
df.to_sql('salaries', con=engine, if_exists='append', index=False)

#### Q6. Consultas SQL vs ORM

Agrupe os dados por DESIGNATION e selecione o mínimo, máximo e a média dos salários (SALARY) divididos por 12. Já que o atributo SALARY é anual, dividir por 12 nos mostrará os valores mensais.

Assumindo que a variável que armazena a sua conexão se chama `engine`, você deve realizar a query acima de três formas:
* Executando a query SQL através de uma instância de conexão retornada pelo método `engine.connect()`
* Executando a query SQL com o método `read_sql_query` do Pandas (veja [a documentação](https://pandas.pydata.org/docs/reference/api/pandas.read_sql_query.html)). Você usará mesma instância `engine.connect()` como um dos parâmetros.
* Executando uma query criada com o módulo `select` do SQLAlchemy. Sua execução deve ser feita através de um objeto `Session` do módulo `orm` do SQLAlchemy (`Session(engine)`).


In [None]:
from sqlalchemy import text, func, select
from sqlalchemy.orm import Session

query = """
SELECT "DESIGNATION", 
       MIN("SALARY"/12) as min_sal, 
       MAX("SALARY"/12) as max_sal, 
       AVG("SALARY"/12) as avg_sal 
FROM salaries 
GROUP BY "DESIGNATION"
"""

# 1. Executando com engine.connect()
with engine.connect() as conn:
    result = conn.execute(text(query))
    print("Resultados com connect():")
    for row in result:
        print(row)

# 2. Executando com pd.read_sql_query
with engine.connect() as conn:
    df_result = pd.read_sql_query(query, conn)
    print("\nResultados com Pandas:")
    print(df_result)

# 3. Executando com ORM e Session
with Session(engine) as session:
    stmt = select(
        Salaries.designation,
        func.min(Salaries.salary / 12).label('min_sal'),
        func.max(Salaries.salary / 12).label('max_sal'),
        func.avg(Salaries.salary / 12).label('avg_sal')
    ).group_by(Salaries.designation)
    
    result = session.execute(stmt)
    print("\nResultados com ORM:")
    for row in result:
        print(row)

In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
from typing import Dict, Any
from pathlib import Path

class SalaryAnalyzer:
    DB_PATH = Path("salarios.db")
    CSV_URL = "https://raw.githubusercontent.com/camilalaranjeira/python-intermediario/main/salaries.csv"
    
    def __init__(self, db_path: str = None):
        self.db_path = db_path or self.DB_PATH
        self.engine = create_engine(f'sqlite:///{self.db_path}')
        self._validate_connection()
    
    def _validate_connection(self) -> None:
        try:
            with self.engine.connect() as conn:
                conn.execute(text("SELECT 1"))
        except Exception as e:
            raise ConnectionError(f"Falha na conexão com o banco: {e}")
    
    def load_data(self) -> pd.DataFrame:
        try:
            df = pd.read_csv(self.CSV_URL, parse_dates=['DOJ', 'CURRENT DATE'])
            df.to_sql('salaries', con=self.engine, if_exists='replace', index=False)
            print(f"✅ Dados carregados: {len(df)} registros")
            return df
        except Exception as e:
            raise ValueError(f"Erro ao carregar dados: {e}")
    
    def _monthly_salary_query(self) -> str:
        SELECT 
            "DESIGNATION" as designation,
            ROUND(MIN("SALARY"/12), 2) as min_monthly_salary,
            ROUND(MAX("SALARY"/12), 2) as max_monthly_salary,
            ROUND(AVG("SALARY"/12), 2) as avg_monthly_salary,
            COUNT(*) as employee_count
        FROM salaries 
        GROUP BY "DESIGNATION"
        ORDER BY avg_monthly_salary DESC
        """
    
    def get_salary_stats(self) -> pd.DataFrame:
        query = self._monthly_salary_query()
        
        try:
            with self.engine.connect() as connection:
                # Usando Pandas para executar a query
                df_stats = pd.read_sql_query(
                    sql=query, 
                    con=connection,
                    parse_dates=False  # Datas já foram processadas
                )
            
            # Formatação adicional para melhor apresentação
            df_stats = self._format_salary_stats(df_stats)
            
            return df_stats
            
        except Exception as e:
            raise RuntimeError(f"Erro ao executar query: {e}")
    
    def _format_salary_stats(self, df: pd.DataFrame) -> pd.DataFrame:
        column_mapping = {
            'designation': 'Cargo',
            'min_monthly_salary': 'Salário Mín. (R$/mês)',
            'max_monthly_salary': 'Salário Máx. (R$/mês)',
            'avg_monthly_salary': 'Salário Médio (R$/mês)',
            'employee_count': 'Nº Funcionários'
        }
        
        df_formatted = df.rename(columns=column_mapping)
        
        salary_columns = [
            'Salário Mín. (R$/mês)',
            'Salário Máx. (R$/mês)', 
            'Salário Médio (R$/mês)'
        ]
        
        for col in salary_columns:
            df_formatted[col] = df_formatted[col].apply(
                lambda x: f"R$ {x:,.2f}".replace(',', 'X').replace('.', ',').replace('X', '.')
            )

        df_formatted['Nº Funcionários'] = df_formatted['Nº Funcionários'].apply(
            lambda x: f"{x:,}".replace(',', '.')
        )
        
        return df_formatted
    
    def print_salary_analysis(self) -> None:
        stats_df = self.get_salary_stats()
        
        print("\n" + "="*80)
        print("📊 ANÁLISE SALARIAL POR CARGO")
        print("="*80)
        print(stats_df.to_string(index=False))

        total_employees = stats_df['Nº Funcionários'].sum()
        avg_all_salaries = stats_df['Salário Médio (R$/mês)'].str.extract('(\d+[.,]\d+)').astype(float).mean()
        
        print(f"\n📈 RESUMO GERAL:")
        print(f"   • Total de funcionários: {total_employees}")
        print(f"   • Salário médio geral: R$ {avg_all_salaries:.2f}/mês")
        print("="*80)

def main():
    try:
        analyzer = SalaryAnalyzer()

        analyzer.load_data()
    
        analyzer.print_salary_analysis()
        
    except Exception as e:
        print(f"❌ Erro na execução: {e}")
        raise

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
from sqlalchemy import create_engine, text, inspect
from typing import Dict, Any, Optional
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SalaryAnalyzer:
    DB_PATH = Path("salarios.db")
    CSV_URL = "https://raw.githubusercontent.com/camilalaranjeira/python-intermediario/main/salaries.csv"
    EXPECTED_COLUMNS = {
        "FIRST NAME", "LAST NAME", "SEX", "DOJ", "CURRENT DATE", 
        "DESIGNATION", "AGE", "SALARY", "UNIT", "LEAVES USED", 
        "LEAVES REMAINING", "RATINGS", "PAST EXP"
    }
    
    def __init__(self, db_path: Optional[Path] = None):
        self.db_path = db_path or self.DB_PATH
        self.engine = create_engine(f'sqlite:///{self.db_path}')
        self._setup_logging()
        self._validate_dependencies()
    
    def _setup_logging(self) -> None:
        class_name = self.__class__.__name__
        for handler in logger.handlers[:]:
            logger.removeHandler(handler)
        logger.addHandler(logging.StreamHandler())
        logger.setLevel(logging.INFO)
    
    def _validate_dependencies(self) -> None:
        try:
            import pandas as pd
            import sqlalchemy
        except ImportError as e:
            raise ImportError(f"Dependência não encontrada: {e}")
        
        self._test_connection()
    
    def _test_connection(self) -> None:
        try:
            with self.engine.connect() as conn:
                conn.execute(text("SELECT 1"))
                logger.info("✅ Conexão com banco de dados estabelecida")
        except Exception as e:
            logger.error(f"❌ Falha na conexão: {e}")
            raise ConnectionError(f"Não foi possível conectar ao banco: {e}")
    
    def load_and_validate_data(self) -> pd.DataFrame:
        """
        Carrega e valida os dados do CSV
        
        Returns:
            pd.DataFrame: DataFrame validado e carregado no banco
            
        Raises:
            ValueError: Se os dados não passarem na validação
        """
        logger.info("📥 Carregando dados do CSV...")
        
        try:
            df = pd.read_csv(self.CSV_URL, parse_dates=['DOJ', 'CURRENT DATE'])

            self._validate_columns(df)
            self._validate_data_types(df)
            self._validate_data_quality(df)
            
            df.to_sql('salaries', con=self.engine, if_exists='replace', index=False)
            
            logger.info(f"✅ Dados carregados com sucesso: {len(df)} registros")
            return df
            
        except Exception as e:
            logger.error(f"❌ Erro ao carregar dados: {e}")
            raise ValueError(f"Falha no carregamento dos dados: {e}")
    
    def _validate_columns(self, df: pd.DataFrame) -> None:
        missing_cols = self.EXPECTED_COLUMNS - set(df.columns)
        if missing_cols:
            raise ValueError(f"Colunas ausentes: {missing_cols}")
    
    def _validate_data_types(self, df: pd.DataFrame) -> None:
        type_checks = {
            'AGE': 'int64',
            'SALARY': 'float64',
            'LEAVES USED': 'int64',
            'LEAVES REMAINING': 'int64',
            'RATINGS': 'float64',
            'PAST EXP': 'float64'
        }
        
        for col, expected_type in type_checks.items():
            if not pd.api.types.is_dtype_equal(df[col].dtype, expected_type):
                logger.warning(f"⚠️ Tipo de dados incorreto em {col}: {df[col].dtype}")
    
    def _validate_data_quality(self, df: pd.DataFrame) -> None:
        critical_cols = ['SALARY', 'AGE']
        null_counts = df[critical_cols].isnull().sum()
        
        if null_counts.any():
            logger.warning(f"⚠️ Valores nulos encontrados: {null_counts[null_counts > 0].to_dict()}")
        
        negative_checks = {
            'AGE': (df['AGE'] < 0).sum(),
            'SALARY': (df['SALARY'] < 0).sum(),
            'LEAVES USED': (df['LEAVES USED'] < 0).sum(),
            'LEAVES REMAINING': (df['LEAVES REMAINING'] < 0).sum()
        }
        
        for col, count in negative_checks.items():
            if count > 0:
                logger.warning(f"⚠️ {count} valores negativos em {col}")
    
    @staticmethod
    def _create_salary_query() -> str:
        query = """
        -- Query para análise salarial mensal por cargo
        SELECT 
            "DESIGNATION" as designation,
            COUNT(*) as employee_count,
            ROUND(MIN("SALARY"/12), 2) as min_monthly_salary,
            ROUND(MAX("SALARY"/12), 2) as max_monthly_salary,
            ROUND(AVG("SALARY"/12), 2) as avg_monthly_salary,
            ROUND(STDDEV("SALARY"/12), 2) as salary_std_dev
        FROM salaries 
        WHERE "SALARY" IS NOT NULL
        GROUP BY "DESIGNATION"
        HAVING COUNT(*) >= 1
        ORDER BY avg_monthly_salary DESC
        """
        return query
    
    def analyze_salaries(self) -> pd.DataFrame:
        """
        Executa análise salarial completa
        
        Returns:
            pd.DataFrame: Resultados da análise formatados
        """
        logger.info("🔍 Executando análise salarial...")
        
        query = self._create_salary_query()
        
        try:
            with self.engine.connect() as connection:
                # Executa query com Pandas
                df_results = pd.read_sql_query(
                    sql=query,
                    con=connection,
                    parse_dates=False
                )
            
            if df_results.empty:
                raise ValueError("Nenhum resultado retornado pela query")

            formatted_results = self._format_analysis_results(df_results)
            
            logger.info(f"✅ Análise concluída: {len(formatted_results)} cargos analisados")
            return formatted_results
            
        except Exception as e:
            logger.error(f"❌ Erro na análise: {e}")
            raise RuntimeError(f"Falha na análise salarial: {e}")
    
    def _format_analysis_results(self, df: pd.DataFrame) -> pd.DataFrame:
        """Aplica formatação profissional aos resultados"""

        column_mapping = {
            'designation': 'Cargo',
            'employee_count': 'Funcionários',
            'min_monthly_salary': 'Mínimo (R$/mês)',
            'max_monthly_salary': 'Máximo (R$/mês)',
            'avg_monthly_salary': 'Média (R$/mês)',
            'salary_std_dev': 'Desvio Padrão'
        }
        
        df_formatted = df.rename(columns=column_mapping)
        
        monetary_cols = ['Mínimo (R$/mês)', 'Máximo (R$/mês)', 'Média (R$/mês)', 'Desvio Padrão']
        for col in monetary_cols:
            if col in df_formatted.columns:
                df_formatted[col] = df_formatted[col].apply(
                    lambda x: f"R$ {float(x):,.2f}".replace(',', 'X').replace('.', ',').replace('X', '.')
                )
        
        df_formatted['Funcionários'] = df_formatted['Funcionários'].apply(
            lambda x: f"{int(x):,}".replace(',', '.')
        )
        
        return df_formatted.sort_values('Média (R$/mês)', key=lambda x: x.str.extract('(\d+[.,]\d+)').astype(float), ascending=False)
    
    def generate_report(self) -> None:
        """Gera relatório completo da análise salarial"""
        try:
            inspector = inspect(self.engine)
            if 'salaries' not in inspector.get_table_names():
                self.load_and_validate_data()
            
            results = self.analyze_salaries()
            
            self._display_professional_report(results)
            
        except Exception as e:
            logger.error(f"❌ Erro ao gerar relatório: {e}")
            raise
    
    def _display_professional_report(self, results: pd.DataFrame) -> None:
        """Exibe relatório formatado profissionalmente"""
        print("\n" + "═" * 90)
        print("💼 RELATÓRIO ANALÍTICO DE SALÁRIOS POR CARGO")
        print("═" * 90)
        print(f"📅 Gerado em: {pd.Timestamp.now().strftime('%d/%m/%Y %H:%M')}")
        print(f"📊 Fonte: {len(results)} cargos analisados")
        print("═" * 90)
        
        # Tabela principal
        print("\n📋 DETALHAMENTO POR CARGO:")
        print("-" * 90)
        print(results.to_string(index=False, max_colwidth=25))
        
        # Estatísticas gerais
        self._display_summary_statistics(results)
        
        print("\n" + "═" * 90)
    
    def _display_summary_statistics(self, results: pd.DataFrame) -> None:
        """Exibe estatísticas resumidas"""
        total_employees = int(results['Funcionários'].str.replace('.', '').sum())
        
        # Extrai valores numéricos para cálculos
        salary_cols = ['Mínimo (R$/mês)', 'Máximo (R$/mês)', 'Média (R$/mês)']
        numeric_values = {}
        
        for col in salary_cols:
            values = results[col].str.extract('([0-9,]+[.,]?[0-9]*)')
            numeric_values[col] = values[0].str.replace('.', '').str.replace(',', '.').astype(float)
        
        print(f"\n📈 ESTATÍSTICAS GERAIS:")
        print("-" * 40)
        print(f"👥 Total de funcionários: {total_employees:,}".replace(',', '.'))
        print(f"💰 Salário médio geral: R$ {numeric_values['Média (R$/mês)'].mean():,.2f}/mês")
        print(f"📈 Maior salário médio: {results.iloc[0]['Cargo']}")
        print(f"📉 Menor salário médio: {results.iloc[-1]['Cargo']}")
        print(f"🎯 Faixa salarial: R$ {numeric_values['Mínimo (R$/mês)'].min():,.2f} - R$ {numeric_values['Máximo (R$/mês)'].max():,.2f}/mês")

def run_analysis():
    try:
        logger.info("🚀 Iniciando análise salarial...")
        analyzer = SalaryAnalyzer()
        analyzer.generate_report()
        
        logger.info("✅ Análise concluída com sucesso!")
        
    except Exception as e:
        logger.error(f"💥 Falha na execução: {e}")
        raise

if __name__ == "__main__":
    run_analysis()