In [None]:
%pip install lxml
%pip install psycopg2-binary
%pip install -qU langchain-google-genai
%pip install python-dotenv
%pip install langchain-core
%pip install -qU langchain-openai
%pip install pandas
%pip install numpy
%pip install sqlalchemy
%pip install scipy
%pip install matplotlib
%pip install seaborn

In [None]:
import time
from lxml import etree
import psycopg2
import xml.etree.ElementTree as ET
import random
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import scipy.stats as stats
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
GEMINI_KEY = ""
OPEN_AI_KEY = ""

DB_NAME = "dumpstack"
DB_USER = "postgres"
DB_PASSWORD = "postgres"
DB_HOST = "localhost"
DB_PORT = "5432"

LLM_PROVIDER = "openai" # "google" or "openai"

XML_FILE = "dump\Posts.xml"

In [None]:
conn = psycopg2.connect(
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT
)

cursor = conn.cursor()

engine = create_engine(f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

In [None]:
context = etree.iterparse(XML_FILE, events=('end',), tag='row')

In [None]:
if LLM_PROVIDER == "google":
    llm = ChatGoogleGenerativeAI(
        model="gemini-1.5-pro",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
        google_api_key=GEMINI_KEY
    )
elif LLM_PROVIDER == "openai":
    llm = ChatOpenAI(
        model="gpt-4o",
        temperature=0,
        max_tokens=None,
        timeout=None,
        max_retries=2,
        api_key=OPEN_AI_KEY,
    )
else:
    raise ValueError("LLM_PROVIDER deve ser 'google' ou 'openai'.")

In [None]:
def process_post_question_element(elem):
    post_id = int(elem.get('Id'))
    creation_date = elem.get('CreationDate')
    parent_id = int(elem.get('ParentId')) if elem.get('ParentId') else None
    post_type_id = int(elem.get('PostTypeId'))
    accepted_answer_id = int(elem.get('AcceptedAnswerId')) if elem.get('AcceptedAnswerId') else None
    score = int(elem.get('Score'))
    body = elem.get('Body')
    title = elem.get('Title')
    tags = elem.get('Tags').strip('|').split('|') if elem.get('Tags') else []
    answer_count = int(elem.get('AnswerCount')) if elem.get('AnswerCount') else None
    
    if post_type_id != 1:
        return None
    if accepted_answer_id is None:
        return None
    if answer_count is None or answer_count <= 5:
        return None
    if 'java' not in tags:
        return None
    
    return (post_id, creation_date, parent_id, post_type_id, accepted_answer_id, score, body, title, tags, answer_count)

In [None]:
def insert_to_valid_posts_db(data, cursor, conn):
    if data:
        cursor.execute("SELECT 1 FROM posts WHERE post_id = %s", (data[0],))
        exists = cursor.fetchone()

        if exists:
            print(f"post_id {data[0]} já existe no banco de dados. Ignorando inserção.")
        else:
            cursor.execute(
                """
                INSERT INTO posts (post_id, creation_date, parent_id, post_type_id, accepted_answer_id, score, body, title, tags, answer_count)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                data
            )
            conn.commit()

In [None]:
for event, elem in context:
    result = process_post_question_element(elem)
    insert_to_valid_posts_db(result, cursor, conn)
    
    elem.clear()
    while elem.getprevious() is not None:
        del elem.getparent()[0]


### Obtendo amostras analisáveis (5% de cada quartil)

In [None]:
# Passo 1: Carregando a amostra em um DataFrame
query = "SELECT post_id, title, is_technical_question, body FROM posts"
df = pd.read_sql(query, engine)

total_elements = len(df)
print(f"Total de elementos no DataFrame original: {total_elements}")

# Passo 2: Dividindo a amostra em quatro partes
parts = np.array_split(df, 4)

for i, part in enumerate(parts, 1):
    print(f"Parte {i} - Número de elementos: {len(part)}")
    
# Passo 3: Selecionando aleatoriamente 5% de cada parte
sampled_parts = []
for i, part in enumerate(parts, 1):
    sampled_part = part.sample(frac=0.05)
    sampled_parts.append(sampled_part)
    print(f"Parte {i} - 5% amostrados: {len(sampled_part)} elementos")
    
# Passo 4: Combinando os resultados
final_sample = pd.concat(sampled_parts)

final_sample_elements = len(final_sample)
print(f"Total de elementos na amostra final: {final_sample_elements}")

In [None]:
check_technical_question_prompt = PromptTemplate.from_template(
"""
You will be given a question title and its detailed description.
Your task is to determine if the question is a technical question related to software development.

Question Title: {question_title}
Details: {question_body}

Please respond with 'True' if the question is related to software development or 'False' if it is not:
"""
)

In [None]:
for index, row in final_sample.iterrows():
    question_id = row['post_id']
    question_title = row['title']
    question_body = row['body']

    ai_msg = llm.invoke(check_technical_question_prompt.format(question_title=question_title, question_body=question_body))

    result = ai_msg.content.strip().lower()

    if result == 'true':
        final_sample.at[index, 'is_opinionated'] = True
    elif result == 'false':
        final_sample.at[index, 'is_opinionated'] = False
    else:
        print(f"Error checking question at index {index}: Unexpected response '{result}'")
        print(final_sample.illoc[index])
        final_sample.at[index, 'is_opinionated'] = None  # Opcional: Pode deixar como None ou outro valor indicando erro

    print(f"Checked at question id {question_id}: response '{result}'")
    time.sleep(5)

In [None]:
for i in range(len(final_sample)):
    post_id = final_sample.iloc[i]['post_id']
    is_opinionated = str(final_sample.iloc[i]['is_opinionated']).lower()

    print(f"Atualizado ({i}) id: {post_id} | is_technical_question: {is_opinionated} ")

    #SQL para atualizar a tabela
    update_sql = f"""
    UPDATE posts
    SET is_technical_question = {is_opinionated}
    WHERE post_id = {post_id};
    """

    cursor.execute(update_sql)
    conn.commit()

In [None]:
queryTechnicalSelectedQuestions = "SELECT post_id FROM posts WHERE is_technical_question = true"

# Execute the SQL query
cursor.execute(queryTechnicalSelectedQuestions)

# Fetch all rows from the executed query
rows = cursor.fetchall()

# Save the post_ids in an array
technicalSelectedQuestions = [row[0] for row in rows]

# Print the array of technicalSelectedQuestions
print(technicalSelectedQuestions)

In [None]:
# Função para processar um elemento
def process_post_answer_element(elem):
    try:
        post_type_id = int(elem.get('PostTypeId')) if elem.get('PostTypeId') else None
        if post_type_id != 2:
            return None

        post_id = int(elem.get('Id')) if elem.get('Id') else None
        parent_id = int(elem.get('ParentId')) if elem.get('ParentId') else None
        accepted_answer_id = int(elem.get('AcceptedAnswerId')) if elem.get('AcceptedAnswerId') else None
        score = int(elem.get('Score')) if elem.get('Score') else None
        body = elem.get('Body')
        title = elem.get('Title')
        tags = elem.get('Tags').strip('|').split('|') if elem.get('Tags') else []
        answer_count = int(elem.get('AnswerCount')) if elem.get('AnswerCount') else None

        if parent_id not in technicalSelectedQuestions:
            return None
        
        if parent_id is None:
            return None

        return (post_id, parent_id, post_type_id, accepted_answer_id, score, body, title, tags, answer_count)
    
    except Exception as e:
        print(f"Erro ao processar o elemento: {e}")
        return None

In [None]:
for event, elem in context:
    result = process_post_answer_element(elem)
    insert_to_valid_posts_db(result, cursor, conn)
    
    # Limpar o elemento para liberar memória
    elem.clear()
    while elem.getprevious() is not None:
        del elem.getparent()[0]

In [None]:
# Consulta SQL para verificar as condições
sql_check_conditions = """
WITH response_counts AS (
    SELECT 
        p.post_id, 
        p.answer_count, 
        COUNT(r.post_id) AS response_count,
        CASE 
            WHEN p.accepted_answer_id IS NULL THEN true
            WHEN EXISTS (SELECT 1 FROM posts WHERE post_id = p.accepted_answer_id AND post_type_id = 2) THEN true
            ELSE false
        END AS accepted_answer_exists
    FROM 
        posts p
    LEFT JOIN 
        posts r ON r.parent_id = p.post_id AND r.post_type_id = 2
    WHERE 
        p.post_type_id = 1 
        AND p.is_technical_question = true
    GROUP BY 
        p.post_id, p.answer_count, p.accepted_answer_id
)
SELECT 
    post_id, 
    answer_count, 
    response_count, 
    accepted_answer_exists
FROM 
    response_counts;
"""

# Executar a consulta SQL
cursor.execute(sql_check_conditions)

# Recuperar os resultados da consulta
posts_data = cursor.fetchall()

# Arrays para armazenar os IDs que satisfazem as condições e os que não satisfazem
valid_post_ids = []
invalid_post_results = []

# Processar os resultados da consulta
for post in posts_data:
    post_id, answer_count, response_count, accepted_answer_exists = post
    if response_count == answer_count and accepted_answer_exists:
        valid_post_ids.append(post_id)
    else:
        reason = []
        if response_count != answer_count:
            reason.append(f"response_count ({response_count}) != answer_count ({answer_count})")
        if not accepted_answer_exists:
            reason.append("accepted_answer_id não encontrado ou inválido")
        invalid_post_results.append((post_id, reason))

# Exibir os IDs das postagens que são válidas
print("IDs válidos:", valid_post_ids)
print("\nTotal de IDs válidos:", len(valid_post_ids))

# Exibir os elementos que não satisfizeram as condições
print("\nElementos inválidos:")
for invalid_post in invalid_post_results:
    print(f"post_id: {invalid_post[0]} - Motivo: {', '.join(invalid_post[1])}")

In [None]:
def fetch_question_and_answers(post_id, conn):
    """
    Recupera o título da pergunta, o corpo da pergunta, a resposta aceita, 
    e quatro outras respostas com maior score para o post_id fornecido.
    Organiza as respostas de forma aleatória.
    """

    # Query para obter a pergunta e a resposta aceita
    sql_fetch_question = """
    SELECT 
        p.title AS question_title, 
        p.body AS question_body, 
        a.body AS accepted_answer, 
        a.post_id AS accepted_answer_id
    FROM 
        posts p
    JOIN 
        posts a ON p.accepted_answer_id = a.post_id
    WHERE 
        p.post_id = %s;
    """

    cursor.execute(sql_fetch_question, (post_id,))
    question_data = cursor.fetchone()

    if not question_data:
        return None
    
    question_title, question_body, accepted_answer, accepted_answer_id = question_data

    # Query para obter as quatro respostas com maior score
    sql_fetch_top_answers = """
    SELECT 
        post_id, 
        body 
    FROM 
        posts 
    WHERE 
        parent_id = %s AND post_type_id = 2 AND post_id != %s
    ORDER BY 
        score DESC 
    LIMIT 4;
    """

    cursor.execute(sql_fetch_top_answers, (post_id, accepted_answer_id))
    top_answers = cursor.fetchall()

    # Adicionar a resposta aceita na lista e embaralhar
    all_answers = [(accepted_answer_id, accepted_answer)] + top_answers
    random.shuffle(all_answers)

    return {
        'question_title': question_title,
        'question_body': question_body,
        'answers': all_answers,
    }

In [None]:
def insert_api_call_data(conn, post_id, answers, chatgpt_response):
    """
    Insere os dados da chamada à API na tabela api_calls.
    """

    # Comando SQL para inserção
    insert_query = """
        INSERT INTO api_calls (post_id, answer_id_1, answer_id_2, answer_id_3, answer_id_4, answer_id_5, chatgpt_response)
        VALUES (%s, %s, %s, %s, %s, %s, %s)
    """

    # Extrai os IDs das respostas
    answer_ids = [answer_id for answer_id, _ in answers]

    # Dados para inserção
    values = (
        post_id,
        answer_ids[0],
        answer_ids[1],
        answer_ids[2],
        answer_ids[3],
        answer_ids[4],
        chatgpt_response
    )

    # Inserção dos dados
    cursor.execute(insert_query, values)
    conn.commit()

    print(f"inserido questão postid: => {post_id}")

In [None]:
question_answer_prompt_template = PromptTemplate.from_template(
"""
You will be given a question title, a question, and five potential answers.
Your task is to select the most correct answer from the options provided.

Question Title: {question_title}
Question: {question_body}

Potential Answers:
{answers_text}

Please provide only the ID of the most correct answer, nothing more:
"""
)

In [None]:
def generate_prompt(question_data):
    """
    Gera o prompt preenchido com os dados da pergunta e respostas.
    """
    answers_text = ""
    for i, (answer_id, answer_body) in enumerate(question_data['answers'], start=1):
        answers_text += f"{i}. {answer_body} (ID: {answer_id})\n"
    
    prompt = question_answer_prompt_template.format(
        question_title=question_data['question_title'],
        question_body=question_data['question_body'],
        answers_text=answers_text
    )

    return prompt


In [None]:
def post_id_exists(conn, post_id):
    """
    Verifica se o post_id já existe na tabela api_calls.
    Retorna True se existir, False caso contrário.
    """
    cursor = conn.cursor()
    check_query = "SELECT COUNT(*) FROM api_calls WHERE post_id = %s"
    cursor.execute(check_query, (post_id,))
    result = cursor.fetchone()

    return result[0] > 0

In [None]:
for post_id in valid_post_ids:
    question_data = fetch_question_and_answers(post_id, conn)

    if question_data:
        if post_id_exists(conn, post_id):
            print(f"O post_id {post_id} já está inserido. Pulando para o próximo.")
            continue

        prompt = generate_prompt(question_data)
        ai_msg = llm.invoke(prompt)

        result = ai_msg.content.strip().lower()
        time.sleep(1)
        # Inserção dos dados na tabela após a obtenção do resultado do ChatGPT
        insert_api_call_data(
            conn=conn,
            post_id=post_id,
            answers=question_data['answers'],
            chatgpt_response=result
        )

In [None]:
analysis_dataset =  """
    SELECT 
        api_calls.post_id,
        posts.creation_date,
        posts.score,
        posts.accepted_answer_id,
        api_calls.answer_id_1,
        p1.score AS score_answer_1,
        api_calls.answer_id_2,
        p2.score AS score_answer_2,
        api_calls.answer_id_3,
        p3.score AS score_answer_3,
        api_calls.answer_id_4,
        p4.score AS score_answer_4,
        api_calls.answer_id_5,
        p5.score AS score_answer_5,
        api_calls.chatgpt_response,
        api_calls.timestamp
    FROM 
        api_calls
    LEFT JOIN posts ON api_calls.post_id = posts.post_id
    LEFT JOIN posts p1 ON api_calls.answer_id_1 = p1.post_id
    LEFT JOIN posts p2 ON api_calls.answer_id_2 = p2.post_id
    LEFT JOIN posts p3 ON api_calls.answer_id_3 = p3.post_id
    LEFT JOIN posts p4 ON api_calls.answer_id_4 = p4.post_id
    LEFT JOIN posts p5 ON api_calls.answer_id_5 = p5.post_id;
"""

In [None]:
cursor.execute(analysis_dataset)

rows = cursor.fetchall()

columns = [
    "post_id", "creation_date", "score", "accepted_answer_id", "answer_id_1", "score_answer_1", 
    "answer_id_2", "score_answer_2", "answer_id_3", "score_answer_3", 
    "answer_id_4", "score_answer_4", "answer_id_5", "score_answer_5", 
    "chatgpt_response", "timestamp"
]

df = pd.read_sql(analysis_dataset, engine)

In [None]:
df['chatgpt_response_integers_only'] = df['chatgpt_response'].str.findall(r'\d+').apply(lambda x: ' '.join(x))

In [None]:
import re

# Função para verificar se algum dos IDs está presente nas colunas de respostas
def check_if_id_in_answers(row):
    # Extrair todos os números da coluna 'chatgpt_response'
    chatgpt_response_numbers = re.findall(r'\d+', row['chatgpt_response'])
    
    # Conjunto com os IDs das respostas
    answer_ids = {str(row['answer_id_1']), str(row['answer_id_2']), str(row['answer_id_3']),
                  str(row['answer_id_4']), str(row['answer_id_5'])}
    
    # Verificar se algum dos números extraídos está nos IDs das respostas
    return any(num in answer_ids for num in chatgpt_response_numbers)

# Criar nova coluna 'id_in_answers' para armazenar o resultado da verificação
df['check_llm_answer_exists_in_answers'] = df.apply(check_if_id_in_answers, axis=1)

In [None]:
# Criar a nova coluna 'chatgpt_response_numeric' extraindo apenas números da coluna 'chatgpt_response'
df['chatgpt_response_numeric'] = df['chatgpt_response'].str.extract(r'(\d+)')

# Converter para inteiro
df['chatgpt_response_numeric'] = df['chatgpt_response_numeric'].astype(int)


In [None]:
# Criar a nova coluna 'is_llm_answer_correct' comparando 'chatgpt_response_numeric' com 'accepted_answer_id'
df['rq1_is_llm_answer_correct'] = np.where(df['chatgpt_response_numeric'] == df['accepted_answer_id'], 1, 0)

In [None]:
# Criar uma nova coluna 'max_score_answer' que indica qual coluna tem o maior score
def find_max_score_answer(row):
    # Listar os pares (answer_id, score)
    scores = {
        'answer_id_1': row['score_answer_1'],
        'answer_id_2': row['score_answer_2'],
        'answer_id_3': row['score_answer_3'],
        'answer_id_4': row['score_answer_4'],
        'answer_id_5': row['score_answer_5']
    }
    
    # Encontrar o answer_id com o maior score
    max_score_answer_id = max(scores, key=scores.get)
    return max_score_answer_id

# Aplicar a função para cada linha do DataFrame
df['check_max_score_answer'] = df.apply(find_max_score_answer, axis=1)

# Se desejar adicionar a coluna com o answer_id correspondente
def find_max_score_answer_id(row):
    answer_ids = {
        'answer_id_1': row['answer_id_1'],
        'answer_id_2': row['answer_id_2'],
        'answer_id_3': row['answer_id_3'],
        'answer_id_4': row['answer_id_4'],
        'answer_id_5': row['answer_id_5']
    }
    
    max_score_answer = find_max_score_answer(row)
    return answer_ids[max_score_answer]

# Adicionar a coluna 'max_score_answer_id' com o answer_id correspondente
df['max_score_answer_id'] = df.apply(find_max_score_answer_id, axis=1)

In [None]:
# Verifica se max_score_answer_id não é igual a accepted_answer_id
df['is_max_score_answer_equal_accepted_answer'] = np.where(df['max_score_answer_id'] == df['accepted_answer_id'], 0, 1)

In [None]:
df['chatgpt_response_numeric_with_integer_responses'] = df.apply(
    lambda row: row[f'answer_id_{int(row["chatgpt_response_numeric"])}']
    if 1 <= row["chatgpt_response_numeric"] <= 5 else row['chatgpt_response_numeric'],
    axis=1
)

# Criar a nova coluna 'is_llm_answer_correct' comparando 'chatgpt_response_numeric' com 'accepted_answer_id'
df['is_llm_answer_correct_with_integer_responses'] = np.where(df['chatgpt_response_numeric_with_integer_responses'] == df['accepted_answer_id'], 1, 0)

In [None]:
# Verifica se a opção escolhida é a que possui maior score e não é a resposta aceita pelo autor
df['selected_max_score_and_not_selected_accepted_answer'] = np.where(
    (df['chatgpt_response_numeric_with_integer_responses'] == df['max_score_answer_id']) & 
    (df['chatgpt_response_numeric_with_integer_responses'] != df['accepted_answer_id']),
    1,
    0
)

df['selected_max_score'] = np.where((df['chatgpt_response_numeric_with_integer_responses'] == df['max_score_answer_id']), 1, 0)

In [None]:
print(df.to_string())

In [None]:
# Verificar se o LLM selecionou a resposta correta
concise_accuracy = (df['chatgpt_response_numeric'] == df['accepted_answer_id']).mean()
generic_accuracy = (df['chatgpt_response_numeric_with_integer_responses'] == df['accepted_answer_id']).mean()

count_selected_max_score_and_not_selected_accepted_answer = df['selected_max_score_and_not_selected_accepted_answer'].sum()
count_selected_max_score = df['selected_max_score'].sum()

# Calcular a acurácia
# accuracy = df['is_correct'].mean()

print(f"Acurácia utilizando somente respostas concisas: {concise_accuracy:.2%}")
print(f"Acurácia utilizando também respostas com numerais: {generic_accuracy:.2%}")
print(f"Quantidade de vezes que a resposta com maior score foi selecionada e a resposta aceita não foi: {count_selected_max_score_and_not_selected_accepted_answer}")
print(f"Quantidade de vezes que a resposta com maior score foi selecionada: {count_selected_max_score}")

In [None]:
# Exportar o DataFrame para um arquivo CSV separado por ponto e vírgula
df.to_csv('exports/analysis.csv', sep=';', index=False)

In [None]:
cursor.close()
conn.close()