<a href="https://colab.research.google.com/github/fabiocastilhoss/StepAnalyzerPAT2Math/blob/main/v2_step_analyser_pat2math.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Step Analyser PAT2Math - Versão 2

*   Versão 2
*   Redes Neurais
*   ~ 80 mil instâncias

Instalação de pacotes e importações

In [None]:
pip install accelerate -U

In [None]:
import tensorflow as tf #Manipulação de expressões matemáticas sobre tensores numéricos
from tensorflow import keras #Biblioteca para Deep Learning
from keras import layers #Estruturas de dados para Deep Learning
from keras.preprocessing.sequence import pad_sequences #Pacote para preenchimento de sequências
from keras import activations, optimizers, losses #Módulos para ativação, otimizadores e funções de loss
from keras.callbacks import ModelCheckpoint #pacote para salvar o melhor modelo
from keras.preprocessing.text import Tokenizer #Tokenizador
from keras.models import Sequential #Classe para criação de redes neurais
from keras.layers import Embedding, GRU, Dense, Dropout #Pacote para criação de redes neurais
from keras.layers import Input, concatenate #Pacote para criação de redes neurais
from keras.models import Model #Pacote para criação de redes neurais
import matplotlib.pyplot as plt #Biblioteca para visualização de dados
import seaborn as sns #Biblioteca para visualização de dados
import numpy as np #Operações em arrays multidimensionais
import pandas as pd #Biblioteca para análise e manipulação de dados
import random #Geração de números aleatórios
import string #Operações em Strings
import re #Operações em expressões regulares
from sklearn.model_selection import train_test_split #Divisão de conjuntos
from sklearn.metrics import confusion_matrix #Matriz de confusão
from sklearn.metrics import f1_score, precision_score, recall_score #Importa F1, Precision e Recall
from sklearn.metrics import roc_curve, auc #Importa curva ROC
from sklearn.utils import shuffle #Embaralhamento
from sklearn.metrics import accuracy_score #Métricas
from google.colab import drive #Conexão do Google Colab ao Google Drive

Leitura e filtragem do log das interações dos estudantes no PAT2Math.

In [None]:
#Conexão com o Google Drive
drive.mount('/content/drive')

In [None]:
#Leitura do log de interações do PAT2Math, armazenado no Google Drive
log = pd.read_csv("/content/drive/MyDrive/colab/fabio-mestrado/log_with_current_step_pat2math.csv", sep=",", encoding="latin-1")
log.head()

In [None]:
#Filtragem do log para quatro colunas
log = log[["initial_equation", "last_correct_step", "currentStep", "step_is_correct"]]
log.head()

In [None]:
# Verifica a quatidade de dados faltantes em cada coluna
dados_faltantes = log.isna().sum()
print(dados_faltantes)

# Junção de colunas:
Para ampliação do conjunto de dados, quando a coluna last_correct_step é diferente de initial_equation, ela é usada como equação inicial, ou seja, uma nova instância é criada, contendo os dados de last_correct_step na coluna initial_equation, mantendo os dados de currentStep e step_is_correct, nas suas colunas equivalentes.

No final do procedimento, a coluna last_correct_step é excluída.

In [None]:
#Amplia o número de instâncias, inserindo last_correct_step como initial_equation, quando contiverem valores diferentes.

# Criar uma lista para armazenar os novos dados
new_rows = []

# Iterar sobre cada linha do DataFrame
for index, row in log.iterrows():
    if row['initial_equation'] != row['last_correct_step']:
        new_row = row.copy()  # Copiar a linha existente
        new_row['initial_equation'] = row['last_correct_step']  # Atualizar 'initial_equation'
        new_rows.append(new_row)  # Adicionar a nova linha à lista

# Adicionar as novas linhas ao DataFrame
log = log.append(new_rows, ignore_index=True)

In [None]:
#Filtra o log resultante para apenas três colunas
log = log[["initial_equation", "currentStep", "step_is_correct"]]
log

A junção de colunas resulta em 251.777 instâncias.

# Tratamento de Dados:

*   Retirada de espaços em branco
*   Retiradas de duplicatas
*   Retirada de dados faltantes
*   Verificação de caracteres inválidos
*   Padronização da equação
*   Checagem e correção dos rótulos

## Retira espaços em branco

Retira espaços em branco nas colinas initial_equation e currentStep.

In [None]:
#Exclui espaços em branco
log["initial_equation"] = log["initial_equation"].replace(" ", "")
log["currentStep"] = log["currentStep"].replace(" ", "")

## Retira duplicatas

Exclui instâncias que contém dados duplicados, usando como chave as duas colunas: initial_equation e currentStep. Só exclui quando ambas contiverem valores iguais, reduzindo o número de instâncias para 34.769.

In [None]:
#Exclui instâncias duplicadas
log = log.drop_duplicates(subset=['initial_equation', 'currentStep'])
log.head(), log.shape

## Retira linhas com valores faltantes

Exclui as instâncias que contém valores faltantes, reduzindo o número de instâncias para 30.279.

In [None]:
#Exclui as instâncias que contém valores faltantes
log = log.dropna()
log.head(), log.shape

Verifica o número de rótulos corretos e incorretos

In [None]:
#Conta número de instâncias com label incorreto e correto
counts = log["step_is_correct"].value_counts()
counts

In [None]:
# Crie um gráfico de barras
plt.bar(counts.index, counts.values, color=['red', 'blue'], edgecolor='k')

# Adicione rótulos e título
plt.xlabel("Status")
plt.ylabel("Instâncias")
plt.title("Contagem de Instâncias com Rótulo Incorreto e Correto")

# Personalize os rótulos no eixo x (opcional)
plt.xticks(counts.index, ['Incorreto', 'Correto'])

# Mostre o gráfico
plt.show()

# Pré-Processamento

*    Balanceamento de Dados - Data Augmentation

## Balanceamento de Dados

O balanceamento de dados é realizado através da função data augmentation.

*    A função de data augmentation (augment_equation) recebe uma equação da coluna initial equation e o indicador correct (para gerar uma equação correta ou incorreta). Essa função, quando correct é informado como 1, insere operações aleatórias de adição, subtração, multiplicação e divisão dos dois lados da equação. No final, também aleatoriamente, pode inverter os lados da equação. Se correct é 0, a inserção é realizada em apenas um lado da equação, tornando-a incorreta.

In [None]:
# Função de data augmentation para alterar initial equation, incluindo instâncias corretas ou incorretas
def augment_equation(equation, correct):

    # Separa os lados da equação e retira os espaços em branco
    left, right = equation.split('=')
    left = left.strip()
    right = right.strip()

    # Seleciona aleatoriamente uma ou duas operações
    choice = random.choice(["add", "multiply", "div", "add&multiply", "add&div"])

    # Gera constantes aleatórias entre -100 e 100 (inclusive)
    constant_to_add = random.randint(-100, 100)
    constant_to_multiply = random.choice([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10])
    constant_to_div = random.choice([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10])

    # Se correct == 1, adiciona as operações a ambos os lados
    if correct:
      # Adição
      if choice in ["add", "add&multiply", "add&div"]:
        if constant_to_add >= 0:
          left = f"({left}) + {constant_to_add}"
          right = f"({right}) + {constant_to_add}"
        else:
          left = f"({left}) + ({constant_to_add})"
          right = f"({right}) + ({constant_to_add})"

      # Multiplicação
      if choice in ["multiply", "add&multiply"]:
        if constant_to_multiply >= 0:
          left = f"({left}) * {constant_to_multiply}"
          right = f"({right}) * {constant_to_multiply}"
        else:
          left = f"({left}) * ({constant_to_multiply})"
          right = f"({right}) * ({constant_to_multiply})"

      # Divisão
      if choice in ["div", "add&div"]:
        if constant_to_div >= 0:
          left = f"({left}) / {constant_to_div}"
          right = f"({right}) / {constant_to_div}"
        else:
          left = f"({left}) / ({constant_to_div})"
          right = f"({right}) / ({constant_to_div})"

    # Se correct == 0, adiciona as operações apenas no lado esquerdo
    else:

      # Adição
      if choice in ["add", "add&multiply", "add&div"]:
        if constant_to_add >= 0:
          left = f"({left}) + {constant_to_add}"
        else:
          left = f"({left}) + ({constant_to_add})"

      # Multiplicação
      if choice in ["multiply", "add&multiply"]:
        if constant_to_multiply >= 0:
          left = f"({left}) * {constant_to_multiply}"
        else:
          left = f"({left}) * ({constant_to_multiply})"

      # Divisão
      if choice in ["div", "add&div"]:
        if constant_to_div >= 0:
          left = f"({left}) / {constant_to_div}"
        else:
          left = f"({left}) / ({constant_to_div})"

    # Troca os lados da equação aleatoriamente
    if random.choice([True, False]):
        left, right = right, left

    # Retira espaços em branco e retorna equação initial equation modificada
    left = left.replace(" ", "")
    right = right.replace(" ", "")
    return f"{left}={right}"

# Exemplo da equação "2x + 3 = 5" correta e incorreta
equation = "2x + 3 = 5"
augmented_equation1 = augment_equation(equation, 1)
augmented_equation2 = augment_equation(equation, 0)
print("Equação Correta: ", augmented_equation1)
print("Equação Incorreta: ", augmented_equation2)

In [None]:
# Contar o número de ocorrências de step_is_correct igual a 1 e 0
count_correct = log['step_is_correct'].value_counts()[1.0]
count_incorrect = log['step_is_correct'].value_counts()[0.0]

print("Número de linhas com passos corretos:", count_correct)
print("Número de linhas com passos incorretos:", count_incorrect)

In [None]:
# Variável log_temp recebe rótulos corretos (7.851)
log_temp = log[log["step_is_correct"] == 1.0]

# Variável log_temp_subset recebe um subconjunto dos rótulos corretos (1.125)
log_temp_subset = log_temp.head(1125)

O código abaixo aplica 4 vezes a função augment_equation com correct == 1, nos rótulos corretos, aumentando de 7.851 instâncias corretas para 39.255.

Após, ele aplica 2 vezes a função augment_equation com correct == 0, nos rótulos corretos e mais uma no subjconjunto de 1.125, aumentando os rótulos incorretos de 22.428 para 39.255.

In [None]:
# Lista de rótulos para as funções
labels = [1.0, 1.0, 1.0, 1.0, 0.0, 0.0]

# Lista para armazenar os DataFrames gerados
new_dataframes = []

# Aplica função augment_equation 5 vezes para geração de equações corretas e 3 para geração de equações incorretas
# Loop para aplicar as funções e criar os DataFrames
for i, lab in enumerate(labels):
    new_rows = log_temp.apply(lambda row: pd.Series({'initial_equation': augment_equation(row['initial_equation'], int(labels[i])),
                                                     'currentStep': row['currentStep'],
                                                     'step_is_correct': labels[i]}), axis=1)
    # Anexar as novas linhas ao DataFrame original
    log = log.append(new_rows, ignore_index=True)

# Aplica função augment_equation para geração de equações incorretas
new_rows = log_temp_subset.apply(lambda row: pd.Series({'initial_equation': augment_equation(row['initial_equation'], 0),
                                                     'currentStep': row['currentStep'],
                                                     'step_is_correct': 0.0}), axis=1)
# Anexar as novas linhas ao DataFrame original
log = log.append(new_rows, ignore_index=True)

# Imprime o log atualizado
log

In [None]:
# Contar o número de ocorrências de step_is_correct igual a 1 e 0
count_correct = log['step_is_correct'].value_counts()[1.0]
count_incorrect = log['step_is_correct'].value_counts()[0.0]

print("Número de linhas com passos corretos:", count_correct)
print("Número de linhas com passos incorretos:", count_incorrect)

In [None]:
# Variável counts recebe os valores de rótulos corretos e incorretos
counts = log["step_is_correct"].value_counts()

# Crie um gráfico de barras
plt.bar(counts.index, counts.values, color=['red', 'blue'], edgecolor='k')

# Adicione rótulos e título
plt.xlabel("Status")
plt.ylabel("Instâncias")
plt.title("Contagem de Instâncias com Rótulo Incorreto e Correto")

# Personalize os rótulos no eixo x (opcional)
plt.xticks(counts.index, ['Incorreto', 'Correto'])

# Mostre o gráfico
plt.show()

In [None]:
# Exclui os valores duplicados novamente, desde que constem como iguais em ambas as colunas: initial equation e currentStep
log = log.drop_duplicates(subset=['initial_equation', 'currentStep'])
log.head(), log.shape

In [None]:
# Contar o número de ocorrências de step_is_correct igual a 1 e 0
counts = counts = log["step_is_correct"].value_counts()
counts

In [None]:
# Crie um gráfico de barras
plt.bar(counts.index, counts.values, color=['red', 'blue'], edgecolor='k')

# Adicione rótulos e título
plt.xlabel("Status")
plt.ylabel("Instâncias")
plt.title("Contagem de Instâncias com Rótulo Incorreto e Correto")

# Personalize os rótulos no eixo x (opcional)
plt.xticks(counts.index, ['Incorreto', 'Correto'])

# Mostre o gráfico
plt.show()

In [None]:
#Imprime log atualizado
log

In [None]:
#Converte o tipo da coluna step is correct, de float para inteiro.
log['step_is_correct'] = log['step_is_correct'].astype(int)

In [None]:
log.head()

Embaralhar o conjunto de dados.

In [None]:
# Cria variável para embaralhar o conjunto de dados
passos_embaralhados = shuffle(log)
passos_embaralhados.head()

In [None]:
# Embaralha os dados de entrada e os rótulos
X_series = passos_embaralhados[['initial_equation', 'currentStep']]
y_series = passos_embaralhados['step_is_correct']
X = X_series.values.tolist()
y = y_series.tolist()

## Divisão de Dados

Aqui os dados são divididos em três conjuntos: conjunto de treinamento, com 70% das instâncias, conjunto de validação, com 20% das instâncias e conjunto de testes, com 10% das instâncias.

In [None]:
# Usar o sklearn para dividir os dados
X_train, X_test, y_train, y_test = train_test_split(X_series, y_series, test_size=0.1, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

## Tokenização, Encoding e Padding

In [None]:
# Tokenização
max_words = 5000  # Número máximo de palavras a serem mantidas no vocabulário
tokenizer_initial_equation = Tokenizer(num_words=max_words, oov_token='<OOV>')
tokenizer_initial_equation.fit_on_texts(X_train['initial_equation'])

tokenizer_current_step = Tokenizer(num_words=max_words, oov_token='<OOV>')
tokenizer_current_step.fit_on_texts(X_train['currentStep'])

X_train_seq_initial_equation = tokenizer_initial_equation.texts_to_sequences(X_train['initial_equation'])
X_val_seq_initial_equation = tokenizer_initial_equation.texts_to_sequences(X_val['initial_equation'])
X_test_seq_initial_equation = tokenizer_initial_equation.texts_to_sequences(X_test['initial_equation'])

X_train_seq_current_step = tokenizer_current_step.texts_to_sequences(X_train['currentStep'])
X_val_seq_current_step = tokenizer_current_step.texts_to_sequences(X_val['currentStep'])
X_test_seq_current_step = tokenizer_current_step.texts_to_sequences(X_test['currentStep'])

In [None]:
# Padding
max_sequence_length = 20

X_train_padded_initial_equation = pad_sequences(X_train_seq_initial_equation, maxlen=max_sequence_length, padding='post', truncating='post')
X_val_padded_initial_equation = pad_sequences(X_val_seq_initial_equation, maxlen=max_sequence_length, padding='post', truncating='post')
X_test_padded_initial_equation = pad_sequences(X_test_seq_initial_equation, maxlen=max_sequence_length, padding='post', truncating='post')

X_train_padded_current_step = pad_sequences(X_train_seq_current_step, maxlen=max_sequence_length, padding='post', truncating='post')
X_val_padded_current_step = pad_sequences(X_val_seq_current_step, maxlen=max_sequence_length, padding='post', truncating='post')
X_test_padded_current_step = pad_sequences(X_test_seq_current_step, maxlen=max_sequence_length, padding='post', truncating='post')

# Criação da Rede



In [None]:
# Construir o modelo GRU
embedding_dim = 128

# Initial Equation
input_initial_equation = Input(shape=(max_sequence_length,))
embedding_layer = Embedding(input_dim=max_words, output_dim=embedding_dim)(input_initial_equation)
gru_layer = GRU(128)(embedding_layer)

#Curreny Step
input_current_step = Input(shape=(max_sequence_length,))
embedding_layer2 = Embedding(input_dim=max_words, output_dim=embedding_dim)(input_current_step)
gru_layer2 = GRU(128)(embedding_layer2)

# Concatena as camadas
concatenated = concatenate([gru_layer, gru_layer2])

#Output Layer
output_layer = Dense(1, activation='sigmoid')(concatenated)
model = Model(inputs=[input_initial_equation, input_current_step], outputs=output_layer)

# Treinamento e Validação

O modelo é treinado e validado com 25 épocas. O melhor modelo é armazenado na variável 'melhor_modelo'. O otimizador utilizado é Adam e a função de loss é Binary Crossentropy.

In [None]:
# Compilar o modelo
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Treinar o modelo
checkpoint = ModelCheckpoint('melhor_modelo', monitor='val_loss', save_best_only=True, save_format='tf')
history = model.fit([X_train_padded_initial_equation, X_train_padded_current_step], y_train,
          validation_data=([X_val_padded_initial_equation, X_val_padded_current_step], y_val),
          epochs=25, batch_size=32)

# Imprime as métricas de treinamento e validação
print("Training loss:", history.history['loss'])
print("Training accuracy:", history.history['accuracy'])
print("Validation loss:", history.history['val_loss'])
print("Validation accuracy:", history.history['val_accuracy'])

# Teste
Usando o conjunto de dados de teste para avaliar o modelo

In [None]:
# Avaliar o modelo no conjunto de teste com return_dict=True
benchmarks = model.evaluate([X_test_padded_initial_equation, X_test_padded_current_step], y_test, return_dict=True)
print(benchmarks)

In [None]:
N_EPOCHS = 25
# Plotar a curva de aprendizado da perda
plt.figure(figsize=(10, 5))
plt.plot(range(1, N_EPOCHS+1), history.history['loss'], 'r', label='Training Loss')
plt.plot(range(1, N_EPOCHS+1), history.history['val_loss'], 'b', label='Validation Loss')
plt.title('Curva de Aprendizado - Perda')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

# Plotar a curva de aprendizado da acurácia
plt.figure(figsize=(10, 5))
plt.plot(range(1, N_EPOCHS+1), history.history['accuracy'], 'r', label='Training Accuracy')
plt.plot(range(1, N_EPOCHS+1), history.history['val_accuracy'], 'b', label='Validation Accuracy')
plt.title('Curva de Aprendizado - Acurácia')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()
plt.show()

In [None]:
# Avaliar o modelo no conjunto de testes
y_pred = model.predict([X_test_padded_initial_equation, X_test_padded_current_step])
y_pred_binary = (y_pred > 0.5).astype(int)

# Calcular a acurácia
accuracy = accuracy_score(y_test, y_pred_binary)
print("Acurácia:", accuracy)

# Crie uma lista de resultados com base na comparação entre Previsão e Rótulo
resultados = ["Certo" if p == r else "Errado" for p, r in zip(y_pred_binary.squeeze(), y_test)]

# Crie um DataFrame com as previsões, probabilidades e entradas originais
results_df = pd.DataFrame({
    "Equação Inicial": X_test['initial_equation'],  # Use os textos decodificados
    "Passo Atual": X_test['currentStep'],  # Use os textos decodificados
    "Probabilidade (%)": y_pred.squeeze(),
    "Previsão": y_pred_binary.squeeze(),
    "Rótulo": y_test ,  # Adicione a lista de rótulos
    "Resultado": resultados  # Adicione a lista de resultados
})

In [None]:
results_df

In [None]:
results_df["Resultado"].value_counts()

In [None]:
# Contagem dos valores "Certo" e "Errado" na coluna "Resultado"
contagem_resultados = results_df["Resultado"].value_counts()

# Calcula o percentual de acerto
percentual_acerto = (contagem_resultados["Certo"] / len(results_df)) * 100

# Imprime o percentual de acerto
print(f"Percentual de acerto: {percentual_acerto:.2f}%")

In [None]:
# Calcular a matriz de confusão
cm = confusion_matrix(y_test, y_pred_binary.squeeze())

# Definir rótulos das classes (substitua pelos seus rótulos)
class_names = ['Classe 0', 'Classe 1']

# Criar um heatmap da matriz de confusão
plt.figure(figsize=(8, 6))
sns.set(font_scale=1.2)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False,
            xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Rótulos Previstos')
plt.ylabel('Rótulos Reais')
plt.title('Matriz de Confusão')
plt.show()

## Precision, Recall e F1 Score

*    Precision = Verdadeiros Positivos(TP) / (Verdadeiros Positivos(TP) + Falsos Positivos(FP))
*    Recall = Verdadeiros Positivos(TP) / (Verdadeiros Positivos(TP) + Falsos Negativos(FN))
*    F1 Score = 2 * (Precision * Recall) / (Precision + Recall)

In [None]:
# Exemplo de valores verdadeiros (ground truth) e previsões
verdadeiros = y_test
previsoes = y_pred_binary.squeeze()

# Calcular a precisão e a revocação
precisao = precision_score(verdadeiros, previsoes)
revocacao = recall_score(verdadeiros, previsoes)
# Calcular o F1 Score
f1 = f1_score(verdadeiros, previsoes)

print("Precisão (Precision):", precisao)
print("Revocação (Recall):", revocacao)
print("F1 Score:", f1)

## Curva ROC

In [None]:
fpr, tpr, thresholds = roc_curve(y_test, y_pred.squeeze())
roc_auc = auc(fpr, tpr)

# Plote a curva ROC
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='Curva ROC (AUC = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Taxa de Falsos Positivos')
plt.ylabel('Taxa de Verdadeiros Positivos')
plt.title('Curva ROC')
plt.legend(loc='lower right')
plt.show()