In [4]:
local = True
log = True
log_detail = False

# Imports

## Libraries

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split

import os
import sys
sys.path.append(os.path.abspath('..'))
import importlib
import pickle

In [None]:
from sklearn.preprocessing import LabelEncoder
from transformers import BertTokenizer, BertForSequenceClassification
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import AdamW

In [2]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

## Data

In [5]:
if local:
    misconceptions = pd.read_csv('../kaggle/input/eedi-mining-misconceptions-in-mathematics/misconception_mapping.csv', index_col='MisconceptionId')
    train = pd.read_csv('../kaggle/input/eedi-mining-misconceptions-in-mathematics/train.csv')
    test = pd.read_csv('../kaggle/input/eedi-mining-misconceptions-in-mathematics/test.csv')
else:
    misconceptions = pd.read_csv('/kaggle/input/eedi-mining-misconceptions-in-mathematics/misconception_mapping.csv', index_col='MisconceptionId')
    train = pd.read_csv('/kaggle/input/eedi-mining-misconceptions-in-mathematics/train.csv')
    test = pd.read_csv('/kaggle/input/eedi-mining-misconceptions-in-mathematics/test.csv')
if log: print("(1) Imported data")

(1) Imported data


In [9]:
#########################
# Create train_melted 
#########################

# Define the identifier columns
id_cols = [
    'QuestionId', 'ConstructId', 'ConstructName', 
    'SubjectId', 'SubjectName', 'CorrectAnswer', 'QuestionText'
]

# Define the corresponding Answer options
answer_cols = ['AnswerAText', 'AnswerBText', 'AnswerCText', 'AnswerDText']
misconception_cols = ['MisconceptionAId', 'MisconceptionBId', 'MisconceptionCId', 'MisconceptionDId']

# Melt Answer Text
text_melted = train.melt(
    id_vars=id_cols,
    value_vars=answer_cols,
    var_name='Attribute',
    value_name='AnswerText'
)

# Melt Misconception IDs
misconception_melted = train.melt(
    id_vars=id_cols,
    value_vars=misconception_cols,
    var_name='Attribute',
    value_name='MisconceptionId'
)

# Extract the option letter (A, B, C, D) and the attribute type
text_melted['AnswerOption'] = text_melted['Attribute'].str.extract(r'Answer([ABCD])Text')[0]
misconception_melted['AnswerOption'] = misconception_melted['Attribute'].str.extract(r'Misconception([ABCD])Id')[0]

# Drop the original 'Attribute' columns as they are no longer needed
text_melted.drop('Attribute', axis=1, inplace=True)
misconception_melted.drop('Attribute', axis=1, inplace=True)

# Merge the two melted DataFrames on id_vars and AnswerOption
train_melted = pd.merge(
    text_melted,
    misconception_melted,
    on=id_cols + ['AnswerOption'],
    how='left'
)

train_melted = train_melted.merge(misconceptions, left_on='MisconceptionId', right_index=True, how='left')
train_melted = train_melted[train_melted['CorrectAnswer'] != train_melted['AnswerOption']]

In [16]:
print(f"SubjectName nunique: {train_melted['SubjectName'].nunique()}")
print(f"ConstructName nunique: {train_melted['ConstructName'].nunique()}")
print(f"MisconceptionName nunique: {train_melted['MisconceptionName'].nunique()}")


SubjectName nunique: 163
ConstructName nunique: 757
MisconceptionName nunique: 1604


In [17]:
hierarchy_scm = {}
for subject in train_melted['SubjectName'].unique():
    subject_data = train_melted[train_melted['SubjectName'] == subject]
    constructs = subject_data['ConstructName'].dropna().unique()
    construct_list = []
    for construct in constructs:
        misconceptions = subject_data[subject_data['ConstructName'] == construct]['MisconceptionName'].dropna().unique().tolist()
        construct_dict = {construct: misconceptions}
        construct_list.append(construct_dict)
    hierarchy_scm[subject] = construct_list

In [20]:
# Show whether duplicates exist in the hierarchy

duplicates_per_subject = {}

for subject, constructs in hierarchy_scm.items():
    misconception_to_constructs = {}
    for construct_dict in constructs:
        for construct, misconceptions in construct_dict.items():
            for misconception in misconceptions:
                if misconception not in misconception_to_constructs:
                    misconception_to_constructs[misconception] = set()
                misconception_to_constructs[misconception].add(construct)
    duplicates = {m: c for m, c in misconception_to_constructs.items() if len(c) > 1}
    duplicates_per_subject[subject] = duplicates
    if duplicates:
        print(f"Subject: {subject}")
        for misconception, constructs in duplicates.items():
            print(f"  Misconception '{misconception}' appears in constructs: {', '.join(constructs)}")
    else:
        print(f"Subject: {subject} has no misconceptions in multiple constructs.")


Subject: Simplifying Algebraic Fractions
  Misconception 'Thinks that when you cancel identical terms from the numerator and denominator, they just disappear' appears in constructs: Simplify an algebraic fraction by factorising both the numerator and denominator, Simplify an algebraic fraction by factorising the numerator
  Misconception 'Does not recognise difference of two squares' appears in constructs: Simplify an algebraic fraction by factorising both the numerator and denominator, Simplify an algebraic fraction by factorising the numerator
  Misconception 'Only applies a division to one of multiple terms in a numerator when simplifying an algebraic fraction' appears in constructs: Simplify an algebraic fraction by dividing by a single letter, Simplify an algebraic fraction by factorising the numerator
  Misconception 'Simplifies a fraction by adding or subtracting the same amount from the numerator  and denominator' appears in constructs: Simplify an algebraic fraction by factori

# Classes

In [19]:
class Hierarchy:
    def __init__(self):
        self.parent_to_children = {
                **hierarchy_scm,
        }
        self.child_to_parent = {child: parent for parent, children in self.parent_to_children.items() for child in children}

    def get_parent(self, child):
        return self.child_to_parent.get(child, None)

    def get_children(self, parent):
        return self.parent_to_children.get(parent, [])

In [21]:
class DataPreprocessor:
    def __init__(self, train_path, test_path, hierarchy):
        self.train_path = train_path
        self.test_path = test_path
        self.hierarchy = hierarchy
        self.label_encoders = {}
    
    def load_data(self):
        self.train_df = pd.read_csv(self.train_path)
        self.test_df = pd.read_csv(self.test_path)
    
    def map_hierarchy(self):
        self.train_df['ParentCategory'] = self.train_df['MisconceptionName'].apply(self.hierarchy.get_parent)
        self.test_df['ParentCategory'] = None  # To be predicted later
    
    def encode_labels(self):
        # Encode parent categories
        le_parent = LabelEncoder()
        self.train_df['ParentCategoryEncoded'] = le_parent.fit_transform(self.train_df['ParentCategory'])
        self.label_encoders['parent'] = le_parent
        
        # Encode specific misconceptions
        le_child = LabelEncoder()
        self.train_df['MisconceptionEncoded'] = le_child.fit_transform(self.train_df['MisconceptionName'])
        self.label_encoders['child'] = le_child
    
    def preprocess(self):
        self.load_data()
        self.map_hierarchy()
        self.encode_labels()
        # Additional preprocessing steps like text cleaning can be added here
        return self.train_df, self.test_df

In [22]:
class HierarchicalClassifier:
    def __init__(self, parent_num_labels, child_num_labels, pretrained_model='bert-base-uncased'):
        self.tokenizer = BertTokenizer.from_pretrained(pretrained_model)
        self.parent_model = BertForSequenceClassification.from_pretrained(pretrained_model, num_labels=parent_num_labels)
        self.child_models = nn.ModuleDict({
            # Initialize a separate model for each parent category
            # Example:
            # "Conceptual Errors": BertForSequenceClassification.from_pretrained(pretrained_model, num_labels=3),
            # "Procedural Errors": BertForSequenceClassification.from_pretrained(pretrained_model, num_labels=3),
        })
    
    def add_child_model(self, parent_category, num_labels):
        self.child_models[parent_category] = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_labels)
    
    def forward_parent(self, input_ids, attention_mask):
        return self.parent_model(input_ids=input_ids, attention_mask=attention_mask)
    
    def forward_child(self, parent_category, input_ids, attention_mask):
        child_model = self.child_models.get(parent_category, None)
        if child_model:
            return child_model(input_ids=input_ids, attention_mask=attention_mask)
        else:
            return None

In [23]:
class DistractorDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        encoding = self.tokenizer.encode_plus(
            self.texts[idx],
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long)
        }

def train_stage1(model, train_df, tokenizer, device):
    dataset = DistractorDataset(
        texts=train_df['DistractorText'].tolist(),
        labels=train_df['ParentCategoryEncoded'].tolist(),
        tokenizer=tokenizer
    )
    dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
    optimizer = AdamW(model.parent_model.parameters(), lr=2e-5)
    model.parent_model.to(device)
    model.parent_model.train()
    
    for epoch in range(3):
        for batch in dataloader:
            optimizer.zero_grad()
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model.parent_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            # Optional: Add logging here

def train_stage2(model, train_df, tokenizer, hierarchy, device):
    for parent in hierarchy.parent_to_children.keys():
        child_subset = train_df[train_df['ParentCategory'] == parent]
        if child_subset.empty:
            continue
        num_labels = len(hierarchy.get_children(parent))
        model.add_child_model(parent, num_labels)
        dataset = DistractorDataset(
            texts=child_subset['DistractorText'].tolist(),
            labels=child_subset['MisconceptionEncoded'].tolist(),
            tokenizer=tokenizer
        )
        dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
        optimizer = AdamW(model.child_models[parent].parameters(), lr=2e-5)
        child_model = model.child_models[parent].to(device)
        child_model.train()
        
        for epoch in range(3):
            for batch in dataloader:
                optimizer.zero_grad()
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                outputs = child_model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                loss.backward()
                optimizer.step()
                # Optional: Add logging here

  from .autonotebook import tqdm as notebook_tqdm


In [24]:
def predict(model, texts, hierarchy, tokenizer, label_encoders, device):
    dataset = DistractorDataset(
        texts=texts,
        labels=[0]*len(texts),  # Dummy labels
        tokenizer=tokenizer
    )
    dataloader = DataLoader(dataset, batch_size=16, shuffle=False)
    model.parent_model.to(device)
    model.parent_model.eval()
    
    parent_preds = []
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            outputs = model.parent_model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            parent_preds.extend(preds)
    
    parent_labels = label_encoders['parent'].inverse_transform(parent_preds)
    specific_preds = []
    
    for i, parent in enumerate(parent_labels):
        child_model = model.child_models.get(parent, None)
        if child_model is None:
            specific_preds.append(None)
            continue
        child_model.to(device)
        child_model.eval()
        with torch.no_grad():
            encoding = tokenizer.encode_plus(
                texts[i],
                add_special_tokens=True,
                max_length=128,
                padding='max_length',
                truncation=True,
                return_attention_mask=True,
                return_tensors='pt'
            )
            input_ids = encoding['input_ids'].to(device)
            attention_mask = encoding['attention_mask'].to(device)
            outputs = child_model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            pred = torch.argmax(logits, dim=1).cpu().numpy()[0]
            specific_misconception = label_encoders['child'].inverse_transform([pred])[0]
            specific_preds.append(specific_misconception)
    
    return specific_preds

# Training

In [25]:
hierarchy = Hierarchy()

TypeError: unhashable type: 'dict'

In [None]:
# Define paths to your data files
train_path = '../kaggle/input/eedi-mining-misconceptions-in-mathematics/train.csv'
test_path = '../kaggle/input/eedi-mining-misconceptions-in-mathematics/test.csv'

# Initialize DataPreprocessor
preprocessor = DataPreprocessor(train_path, test_path, hierarchy)
train_df, test_df = preprocessor.preprocess()

In [None]:
# Determine the number of parent labels
parent_num_labels = train_df['ParentCategoryEncoded'].nunique()

# Initialize HierarchicalClassifier
model = HierarchicalClassifier(parent_num_labels=parent_num_labels, child_num_labels=0)  # child_num_labels handled per parent

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f'Using device: {device}')

In [None]:
# Train Stage 1: Parent Category Classification
train_stage1(model, train_df, model.tokenizer, device)

In [None]:
# Train Stage 2: Specific Misconception Classification
train_stage2(model, train_df, model.tokenizer, hierarchy, device)

In [None]:
import os

# Create directory to save models
os.makedirs('models', exist_ok=True)

# Save parent model
model.parent_model.save_pretrained('models/parent_model')

# Save child models
for parent, child_model in model.child_models.items():
    model.child_models[parent].save_pretrained(f'models/child_model_{parent}')

In [None]:
from sklearn.metrics import classification_report

def evaluate_stage1(model, train_df, tokenizer, device):
    dataset = DistractorDataset(
        texts=train_df['DistractorText'].tolist(),
        labels=train_df['ParentCategoryEncoded'].tolist(),
        tokenizer=tokenizer
    )
    dataloader = DataLoader(dataset, batch_size=16, shuffle=False)
    model.parent_model.to(device)
    model.parent_model.eval()
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model.parent_model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
    
    print(classification_report(all_labels, all_preds, target_names=preprocessor.label_encoders['parent'].classes_))

# Evaluate Stage 1
evaluate_stage1(model, train_df, model.tokenizer, device)

In [None]:
def evaluate_stage2(model, train_df, tokenizer, hierarchy, device):
    for parent in hierarchy.parent_to_children.keys():
        child_subset = train_df[train_df['ParentCategory'] == parent]
        if child_subset.empty:
            continue
        
        child_model = model.child_models.get(parent, None)
        if child_model is None:
            continue
        
        child_model.to(device)
        child_model.eval()
        
        dataset = DistractorDataset(
            texts=child_subset['DistractorText'].tolist(),
            labels=child_subset['MisconceptionEncoded'].tolist(),
            tokenizer=tokenizer
        )
        dataloader = DataLoader(dataset, batch_size=16, shuffle=False)
        
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for batch in dataloader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)
                outputs = child_model(input_ids=input_ids, attention_mask=attention_mask)
                logits = outputs.logits
                preds = torch.argmax(logits, dim=1).cpu().numpy()
                all_preds.extend(preds)
                all_labels.extend(labels.cpu().numpy())
        
        print(f"Classification Report for Parent Category: {parent}")
        print(classification_report(all_labels, all_preds, target_names=preprocessor.label_encoders['child'].classes_))
        print("\n")

# Evaluate Stage 2
evaluate_stage2(model, train_df, model.tokenizer, hierarchy, device)