In [None]:
import sqlite3
import pandas as pd
import logging
import os
from contextlib import contextmanager

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S', 
    handlers=[
        logging.FileHandler("score_rh.log"),  # Salva logs em um arquivo
        logging.StreamHandler()  # Exibe logs no console
    ]
)

DB_PATH = os.getenv("DB_PATH", "/app/data/teste___desafio_técnico_-_analista_de_dados_pleno_anexo.db")
SQL_SCRIPT_CANDIDATO_PATH = os.getenv("SQL_SCRIPT_CANDIDATO_PATH", "/app/scripts/canditados_pre_selecionados.sql")
SQL_SCRIPT_TRATAMENTO_PATH = os.getenv("SQL_SCRIPT_TRATAMENTO_PATH", "/app/scripts/tratamento_db.sql")
OUTPUT_CSV = os.getenv("OUTPUT_CSV", "/app/output/")

# Conexao DB

In [None]:
@contextmanager
def conectar_banco():
    conn = None
    try:
        conn = sqlite3.connect(DB_PATH)
        logging.info("Conexão ao banco de dados estabelecida")
        yield conn
    except Exception as e:
        logging.error(f"Erro ao conectar ao banco de dados: {e}")
        raise
    finally:
        if conn:
            conn.close()  

# DATA PREP

In [None]:
def listar_tabelas():
    """Lista todas as tabelas do banco SQLite."""
    tabelas = []
    try:
        with conectar_banco() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            tabelas = cursor.fetchall()
        return [t[0] for t in tabelas]
    except Exception as e:
        logging.error(f"Erro ao listar tabelas: {e}")
        return []

def visualizar_tabela(nome_tabela, haslimit=True, limite=5):
    try:
        with conectar_banco() as conn:
            if haslimit:
                query = f"SELECT * FROM {nome_tabela} LIMIT {limite};"
            else:
                query = f"SELECT * FROM {nome_tabela};"
            df = pd.read_sql_query(query, conn)
        return df
    except Exception as e:
        logging.error(f"Erro ao visualizar tabela {nome_tabela}: {e}")
        return pd.DataFrame()

def verificar_nulos():
    """Identifica colunas com valores nulos em todas as tabelas."""
    tabelas = listar_tabelas()
    for tabela in tabelas:
        df = visualizar_tabela(tabela, haslimit=False)
        
        null_counts = df.isnull().sum()
        # Filtra apenas as colunas com valores nulos
        colunas_com_nulos = null_counts[null_counts > 0]
        
        if not colunas_com_nulos.empty:
            logging.warning(f"Na tabela {tabela}, as seguintes colunas têm valores nulos:")
            for coluna, quantidade in colunas_com_nulos.items():
                logging.warning(f"  - {coluna}: {quantidade} valores nulos")
        else:
            logging.info(f"Na tabela {tabela}, não há valores nulos.")

def tratamentos_colunas_db(): 
    try:
        # Conectar ao banco de dados
        with conectar_banco() as conn:
            cursor = conn.cursor()

            # Ler o script SQL
            with open(SQL_SCRIPT_TRATAMENTO_PATH , "r", encoding="utf-8") as f:
                sql_script = f.read()

            # Verificar se a coluna existe antes de executar o ALTER TABLE
            cursor.execute("PRAGMA table_info(vagacompetencia);")
            columns = [column[1] for column in cursor.fetchall()]

            if 'tempo_de_experiencia_meses' not in columns:
                cursor.execute("ALTER TABLE vagacompetencia ADD COLUMN tempo_de_experiencia_meses INT;")
                
            cursor.execute("PRAGMA table_info(competencia_experiencia);")
            columns = [column[1] for column in cursor.fetchall()]

            if 'tempo_competencia_meses' not in columns:
                cursor.execute("ALTER TABLE competencia_experiencia ADD COLUMN tempo_competencia_meses INTEGER;")

            # Executar o script de atualização de dados
            cursor.executescript(sql_script)

            conn.commit()
            logging.info(f"Script de tratamento processado com sucesso")

    except Exception as e:
        logging.error(f"Erro ao executar o script SQL: {e}")




In [None]:
verificar_nulos()
tratamentos_colunas_db()

2025-02-14 12:45:07 - INFO - Na tabela competencias, não há valores nulos.
2025-02-14 12:45:07 - INFO - Na tabela vagas, não há valores nulos.
2025-02-14 12:45:07 - INFO - Na tabela candidatos, não há valores nulos.
2025-02-14 12:45:07 - INFO - Na tabela vagacompetencia, não há valores nulos.
2025-02-14 12:45:07 - INFO - Na tabela candidato_vaga, não há valores nulos.
2025-02-14 12:45:07 - INFO - Na tabela experiencias, não há valores nulos.
2025-02-14 12:45:08 - INFO - Na tabela competencia_experiencia, não há valores nulos.


# Calcular SCORE

In [None]:
def criar_csv(OUTPUT_CSV, nome_arquivo, df:pd.DataFrame): 
    try:
        path = OUTPUT_CSV + nome_arquivo
        df.to_csv(path, index=False, encoding="utf-8")
        logging.info(f"Arquivo salvo com sucesso em: {path}")
    except Exception as e:
        logging.error(f"Erro ao salvar o arquivo: {e}")

def executar_sqlite_query_candidatos():
    try:
        with conectar_banco() as conn:
            cursor = conn.cursor()
            with open(SQL_SCRIPT_CANDIDATO_PATH, "r", encoding="utf-8") as f:
                sql_query = f.read()

            cursor.execute("PRAGMA foreign_keys = ON;") 
            df = pd.read_sql_query(sql_query, conn) 
            df = df.sort_values("candidato_id")

        criar_csv(OUTPUT_CSV, "candidatos_pre_selecionados.csv", df)
        return df

    except Exception as e:
        logging.error(f"Erro ao executar a consulta: {e}")
        return pd.DataFrame()

def calcular_score_salario(situacao_candidato, vagas_info: pd.DataFrame, candidato_vaga_info: pd.DataFrame, peso):
    vaga_id = situacao_candidato['vaga_id']
    candidato_id = situacao_candidato['candidato_id']

    vaga = vagas_info.loc[vagas_info['id'] == vaga_id]
    candidato = candidato_vaga_info.loc[(candidato_vaga_info['id_candidato'] == candidato_id) & (candidato_vaga_info['id_vaga'] == vaga_id)]

    if vaga.empty or candidato.empty:
        logging.warning(f'Candidato: {candidato_id} ou Vaga: {vaga_id} não encontrado no banco de dados')
        return 0

    salario_min = vaga['salario_minimo'].values[0]
    salario_max = vaga['salario_maximo'].values[0]
    pretensao_salarial = candidato['pretensao_salarial'].values[0]

    if pretensao_salarial < salario_min:
        distancia = salario_min - pretensao_salarial
    elif pretensao_salarial > salario_max:
        distancia = pretensao_salarial - salario_max
    else:
        distancia = min(abs(pretensao_salarial - salario_min), abs(salario_max - pretensao_salarial))
    
    score = peso * (1 - distancia / (salario_max - salario_min))
    
    return round(max(0, min(score, peso)), 2)  

def calcular_score_competencia(situacao_candidato, vagacompetencias_info: pd.DataFrame, peso):

    vaga_id = situacao_candidato['vaga_id']
    soma_competencia_candidato = situacao_candidato['soma_competencias_atendidas']

    competencias_vaga = vagacompetencias_info[vagacompetencias_info['id_vaga'] == vaga_id]

    if competencias_vaga.empty:
        logging.warning(f'Vaga: {vaga_id} não encontrada no banco de dados')
        return 0

    total_competencias_vaga = competencias_vaga['id_competencia'].nunique()

    if total_competencias_vaga == 0:
        return 0

    peso_por_competencia = peso / total_competencias_vaga  
    score = soma_competencia_candidato * peso_por_competencia

    
    return round(max(0, min(score, peso)), 2)

def calcular_scores(df_min_aprovados: pd.DataFrame,df_min_inconsistencias: pd.DataFrame, vagas_info, candidato_vaga_info, vagacompetencias_info):
    peso = {
        'localizacao' : 20, 
        'salario': 20, 
        'fitcultural': 35, 
        'competencia': 25
    } 
    scores = []

    for _, row in df_min_inconsistencias.iterrows():
        scores.append({
            'vaga_id': row['vaga_id'], 
            'vaga_titulo': row['vaga_titulo'],
            'candidato_id': row['candidato_id'], 
            'candidato_nome': row['candidato_nome'],
            'score_localizacao': 0, 
            'score_fitcultural': 0, 
            'score_salario': 0, 
            'score_competencia': 0, 
            'score': 1
        })
    

    for _, row in df_min_aprovados.iterrows():
        score_loc = peso.get('localizacao', 0) if row['atende_localizacao'] == 1 else 0
        score_fit = peso.get('fitcultural', 0) if row['atende_fit_cultural'] == 1 else 0
        score_salario = calcular_score_salario(row, vagas_info, candidato_vaga_info, peso.get('salario', 0)) if row['atende_salario'] == 1 else 0
        score_competencia = calcular_score_competencia(row, vagacompetencias_info, peso.get('competencia', 0)) if row['soma_competencias_atendidas'] >= 1 else 0
        
        score_total = score_loc + score_fit + score_salario + score_competencia

        scores.append({
            'vaga_id': row['vaga_id'], 
            'vaga_titulo': row['vaga_titulo'],
            'candidato_id': row['candidato_id'], 
            'candidato_nome': row['candidato_nome'],
            'score_localizacao': score_loc, 
            'score_fitcultural': score_fit, 
            'score_salario': score_salario, 
            'score_competencia': score_competencia, 
            'score': score_total
        })

    df_scores = pd.DataFrame(scores) 
    df_scores = df_scores.sort_values(['vaga_id','score'])

    return df_scores

def obter_candidatos_incoerentes(df_min_candidatos: pd.DataFrame, df_scores: pd.DataFrame, vagas_info):
    grupos_coerentes = {
        "Financeira": {"Financeira", "Contabilidade"},
        "Engenharia": {"Engenharia", "Dados"},
        "RH": {"RH"},
        "Comercial": {"Comercial"},
    }
    
    df_candidatos_com_mais_vagas = df_min_candidatos.groupby('candidato_id').size().reset_index(name='quantidade_vagas')
    df_candidatos_com_mais_vagas = df_candidatos_com_mais_vagas[df_candidatos_com_mais_vagas['quantidade_vagas'] > 1]
    df_candidatos_com_mais_vagas = df_min_candidatos.merge(df_candidatos_com_mais_vagas, on='candidato_id', how='right') 

    candidatos_coerencia = []

    for candidato_id, grupo in df_candidatos_com_mais_vagas.groupby('candidato_id'):
        vagas_candidato = grupo['vaga_id'].unique()
        departamentos = set()

        for vaga_id in vagas_candidato:
            departamento = vagas_info.loc[vagas_info['id'] == vaga_id, 'departamento'].values
            if len(departamento) > 0:
                departamentos.add(departamento[0]) 
    
        grupos_associados = set()
        for area in departamentos:
            for grupo, areas_relacionadas in grupos_coerentes.items():
                if area in areas_relacionadas:
                    grupos_associados.add(grupo)
        
        coerencia = "Incoerente" if len(grupos_associados) > 1 else "Coerente"

        candidatos_coerencia.append({"candidato_id": candidato_id, "coerencia_vagas": coerencia}) 

    df_candidatos_coerencia = pd.DataFrame(candidatos_coerencia)
    df_scores = df_scores.merge(df_candidatos_coerencia, how='inner', on='candidato_id')
    df_scores['coerencia_vagas'] = df_scores['coerencia_vagas'].fillna("Coerente")

    return df_scores

def chamada_de_tabelas(df_min_aprovados: pd.DataFrame):

    if df_min_aprovados.empty:
        logging.warning('Tabela não possui candidatos preselecionados minimamente aprovados')
        return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
    
    candidatos_ids = list(set(df_min_aprovados['candidato_id'].tolist()))
    vagas_ids = list(set(df_min_aprovados['vaga_id'].tolist())) 

    candidatos_ids_str = ",".join(map(str, candidatos_ids))
    vagas_ids_str = ",".join(map(str, vagas_ids))

    with conectar_banco() as conn:
        query_vagas = f"""
        SELECT * FROM vagas 
        WHERE id IN ({vagas_ids_str})
        """
        vagas_info = pd.read_sql_query(query_vagas, conn)

        query_candidato_vaga = f"""
        SELECT * FROM candidato_vaga 
        WHERE id_candidato IN ({candidatos_ids_str}) AND id_vaga IN ({vagas_ids_str})
        """
        candidato_vaga_info = pd.read_sql_query(query_candidato_vaga, conn)

        query_vaga_competencias = f"""
        SELECT * FROM vagacompetencia 
        WHERE id_vaga IN ({vagas_ids_str})
        """
        vagacompetencias_info = pd.read_sql_query(query_vaga_competencias, conn)    

    return vagas_info, candidato_vaga_info, vagacompetencias_info

def obter_especialistas(df_min_aprovados: pd.DataFrame):

    df_min_aprovados = df_min_aprovados[df_min_aprovados['soma_competencias_atendidas'] > 0]
    candidatos_ids = list(set(df_min_aprovados['candidato_id'].tolist()))
    vagas_ids = list(set(df_min_aprovados['vaga_id'].tolist()))
    areas_tecnicas = ['Engenharia', 'Dados']

    ids_candidatos_str = ', '.join(map(str, candidatos_ids))
    ids_vagas_str = ', '.join(map(str, vagas_ids))
    areas_tecnicas_str = ', '.join(f"'{area}'" for area in areas_tecnicas)
    

    query = f"""
    SELECT
        c.id AS candidato_id,
        c.nome AS candidato_nome,
        comp.id AS competencia_id,
        comp.nome AS competencia_nome,
        comp.area AS competencia_area,
        ce.tempo_competencia_meses,
        CASE
            WHEN ce.tempo_competencia_meses >= 60 THEN 'Especialista'
            ELSE 'Não Especialista'
        END AS nivel_competencia,
        v.id AS vaga_id,
        v.nome AS vaga_titulo
    FROM
        candidatos c
    JOIN competencia_experiencia ce ON c.id = ce.id_experiencia
    JOIN competencias comp ON ce.id_competencia = comp.id
    JOIN vagacompetencia vc ON comp.id = vc.id_competencia
    JOIN vagas v ON vc.id_vaga = v.id
    WHERE
        comp.tipo = 'Conhecimento'
        AND comp.area IN ({areas_tecnicas_str})
        AND ce.tempo_competencia_meses >= 60
        AND c.id IN ({ids_candidatos_str})
        AND v.id IN ({ids_vagas_str});
    """
    with conectar_banco() as conn: 
        df_resultado = pd.read_sql_query(query, conn)
    
    df_resultado = df_resultado.sort_values(['vaga_id', 'competencia_id', 'tempo_competencia_meses'])
    
    return df_resultado

In [88]:
def main(): 
    logging.info('Iniciando processo obter candidatos minimamente avaliados')
    df_min_candidatos = executar_sqlite_query_candidatos()

    df_min_inconsistencias = df_min_candidatos[df_min_candidatos['status'] == 'Inconsistência']
    df_min_aprovados = df_min_candidatos[df_min_candidatos['status'] == 'Aprovado'] 

    vagas_info, candidato_vaga_info, vagacompetencias_info = chamada_de_tabelas(df_min_aprovados)
    
    logging.info('Iniciando processo de calculo de scores')
    df_scores = calcular_scores(df_min_aprovados, df_min_inconsistencias, vagas_info, candidato_vaga_info, vagacompetencias_info)
    
    df_scores = obter_candidatos_incoerentes(df_min_candidatos, df_scores, vagas_info)
    df = obter_especialistas(df_min_aprovados)
    criar_csv(OUTPUT_CSV, "especialistas_areas_tecnicas.csv", df)
    criar_csv(OUTPUT_CSV,"candidatos_scores.csv", df_scores)
    logging.info('Finalizado processos de calculo de scores')
    

In [89]:
main()

2025-02-14 16:10:46 - INFO - Iniciando processo obter candidatos minimamente avaliados
2025-02-14 16:10:46 - INFO - Conexão ao banco de dados estabelecida
2025-02-14 16:10:50 - INFO - Arquivo salvo com sucesso em: /app/output/candidatos_pre_selecionados.csv
2025-02-14 16:10:50 - INFO - Conexão ao banco de dados estabelecida
2025-02-14 16:10:50 - INFO - Iniciando processo de calculo de scores
2025-02-14 16:10:51 - INFO - Conexão ao banco de dados estabelecida
2025-02-14 16:10:51 - INFO - Arquivo salvo com sucesso em: /app/output/especialistas_areas_tecnicas.csv
2025-02-14 16:10:51 - INFO - Arquivo salvo com sucesso em: /app/output/candidatos_scores.csv
2025-02-14 16:10:51 - INFO - Finalizado processos de calculo de scores
