In [1]:
import os
import pandas as pd
from setfit import SetFitModel
import huggingface_hub
from datasets import Dataset
from transformers import DebertaForSequenceClassification, DebertaTokenizer
import torch
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import cohen_kappa_score
from joblib import load

# can be 'setfit', 'deberta' 'tfidf', or 'knn'
evaluation_type = 'setfit'

# setfit_path = '/kaggle/input/automated-essay-scoring-setfit/transformers/default'
setfit_path = '/kaggle/input/automated-essay-scoring-setfit-finetuned/transformers/default'
deberta_path = '/kaggle/input/automated-essay-scoring-deberta/transformers/default'
longformer_path = '/kaggle/input/allenailongformer-base-4096/transformers/default'
allMiniLM_path = '/kaggle/input/all-minilm-l6-v2/transformers/default'
# knn_path = '/kaggle/input/automated-essay-scoring-knn/scikitlearn/default'
knn_path = '/kaggle/input/automated-essay-scoring-knn-all-features/scikitlearn/default'

In [2]:
def get_latest_version_path(base_path):
    # List all directories in the base path
    versions = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]
    
    # Sort directories to find the latest version based on the highest number
    latest_version = sorted(versions, key=lambda x: int(x), reverse=True)[0]
    return os.path.join(base_path, latest_version)

# Get full paths with the latest version number appended
setfit_path = get_latest_version_path(setfit_path)
deberta_path = get_latest_version_path(deberta_path)
longformer_path = get_latest_version_path(longformer_path)
allMiniLM_path = get_latest_version_path(allMiniLM_path)
knn_path = get_latest_version_path(knn_path)

# Print the paths to verify
print("SetFit Path:", setfit_path)
print("DeBERTa Path:", deberta_path)
print("Longformer Path:", longformer_path)
print("all-minilm-l6-v2 Path:", allMiniLM_path)
print("knn Path:", knn_path)

SetFit Path: /kaggle/input/automated-essay-scoring-setfit-finetuned/transformers/default/3
DeBERTa Path: /kaggle/input/automated-essay-scoring-deberta/transformers/default/3
Longformer Path: /kaggle/input/allenailongformer-base-4096/transformers/default/1
all-minilm-l6-v2 Path: /kaggle/input/all-minilm-l6-v2/transformers/default/1
knn Path: /kaggle/input/automated-essay-scoring-knn-all-features/scikitlearn/default/1


In [3]:
# Load the training data
train = pd.read_csv("/kaggle/input/learning-agency-lab-automated-essay-scoring-2/train.csv")
test = pd.read_csv("/kaggle/input/learning-agency-lab-automated-essay-scoring-2/test.csv")

train_dataset = Dataset.from_pandas(train)
test_dataset = Dataset.from_pandas(test)

# Set the device to GPU if available, otherwise fallback to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

predictions = []

Using device: cuda


## SetFit

In [4]:
if (evaluation_type == 'setfit'):

    model = SetFitModel.from_pretrained(setfit_path)
    
    # Move model to the appropriate device
    model.to(device)

    predictions = model(test_dataset['full_text'])

    predictions = [int(pred) for pred in predictions]  

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
  self.comm = Comm(**args)


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Input ids are automatically padded to be a multiple of `config.attention_window`: 512


## Deberta

In [5]:
# Define a function to convert continuous scores into grades (1 to 6)
def bin_to_grades(scores):
    # Use np.digitize to categorize scores into grades 1 through 6
    # Bins define the cutoff points for each grade, e.g., 0.5 to 1.5 is grade 1, etc.
    bins = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5]  # The edges for binning
    grades = np.digitize(scores, bins, right=True)
    return grades

if (evaluation_type == 'deberta'):

    model = DebertaForSequenceClassification.from_pretrained(deberta_path)
    tokenizer = DebertaTokenizer.from_pretrained(deberta_path)
    
    # Move model to the appropriate device
    model.to(device)

    model.eval()

    predicted_scores = []

    # Tokenize the test dataset
    print("Tokenizing test dataset...")
    test_dataset = test_dataset.map(lambda x: tokenizer(x['full_text'], truncation=True, padding='max_length', max_length=256), batched=True)
    
    # Set the format for PyTorch (only 'input_ids' and 'attention_mask' are required for inference)
    test_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask'])
    print("Test dataset tokenized and formatted successfully.")

    # Ensure predicted_scores is initialized
    predicted_scores = []
    
    # Run predictions in batches
    print("Generating predictions on the test dataset...")
    with torch.no_grad():  # Disable gradient calculation
        for batch in torch.utils.data.DataLoader(test_dataset, batch_size=20):  # Adjust batch size if needed
            input_ids = batch['input_ids'].to(model.device)
            attention_mask = batch['attention_mask'].to(model.device)
    
            # Forward pass to get logits
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits.squeeze(-1)  # For regression, logits should be a single value per input
    
            # Collect predictions
            predicted_scores.extend(logits.cpu().numpy())
    
    # Convert the continuous scores to grades
    predictions = bin_to_grades(predicted_scores)


## Baseline

In [6]:
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk import pos_tag
from collections import Counter
import spacy
import shutil
# import language_tool_python
from textblob import TextBlob
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from gensim.corpora import Dictionary
from gensim.models import LdaModel
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.corpus import stopwords, wordnet
import syllapy
from sentence_transformers import SentenceTransformer
import numpy as np
from transformers import LongformerModel, LongformerTokenizer
import torch
from sklearn.metrics.pairwise import cosine_similarity
from nltk.tokenize import word_tokenize
from collections import Counter
from lexical_diversity import lex_div as ld
import pickle
from spellchecker import SpellChecker

# Load spaCy model
with open("/kaggle/input/spacy-en-core-web-sm/spacy_en_core_web_sm.pkl", "rb") as file:
    nlp = pickle.load(file)

# Load NLTK
paths = [("/kaggle/input/spacy-en-core-web-sm/punkt_tab", "/root/nltk_data/tokenizers"), ("/kaggle/input/spacy-en-core-web-sm/averaged_perceptron_tagger_eng", "/root/nltk_data/taggers")]
for path in paths:
    source_folder_path = path[0]
    target_folder_path = path[1]
    
    # Ensure the target directory exists
    os.makedirs(target_folder_path, exist_ok=True)
    
    # Copy the folder to the target location
    shutil.copytree(source_folder_path, target_folder_path, dirs_exist_ok=True)

# nlp = spacy.load('en_core_web_sm')
# pickle_file_path = "spacy_en_core_web_sm.pkl"
# with open(pickle_file_path, "wb") as file:
#     pickle.dump(nlp, file)

stop_words = set(stopwords.words('english'))
corpus = [text for text in train_dataset['full_text']] + [text for text in test_dataset['full_text']]

# Vocabulary Maturity using frequency as a proxy
from nltk.corpus import brown  # Brown corpus for word frequencies
word_frequencies = Counter(brown.words())  # Get frequencies from a standard corpus

# Load Longformer model and tokenizer, supporting sequences up to 4096 tokens
longformer_model = LongformerModel.from_pretrained(longformer_path)
tokenizer = LongformerTokenizer.from_pretrained(longformer_path)

# Move the model to the selected device
longformer_model.to(device)

embedding_model = SentenceTransformer(allMiniLM_path)

# tool = language_tool_python.LanguageTool('en-US', remote_server='http://0.0.0.0:8081')
spell = SpellChecker()

# preprocessing
def extract_linguistic_lexical_features(example):
    # Sentence Count
    sentences = sent_tokenize(example['full_text'])
    sentence_count = len(sentences)
    
    # Average Sentence Length
    avg_sentence_length = len(word_tokenize(example['full_text'])) / sentence_count if sentence_count > 0 else 0

    # POS Tagging
    words = word_tokenize(example['full_text'])
    pos_tags = pos_tag(words)
    pos_counts = Counter(tag for _, tag in pos_tags)

    # Count specific POS tags
    pos_noun_count = sum(pos_counts[tag] for tag in ['NN', 'NNS', 'NNP', 'NNPS'])
    pos_verb_count = sum(pos_counts[tag] for tag in ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'])
    pos_adj_count = sum(pos_counts[tag] for tag in ['JJ', 'JJR', 'JJS'])
    pos_adv_count = sum(pos_counts[tag] for tag in ['RB', 'RBR', 'RBS'])

    example['sentence_count'] = sentence_count
    example['average_sentence_length'] = avg_sentence_length
    example['pos_noun_count'] = pos_noun_count
    example['pos_verb_count'] = pos_verb_count
    example['pos_adj_count'] = pos_adj_count
    example['pos_adv_count'] = pos_adv_count
    
    return example

def remove_punctuation_except_apostrophe(text):
    # Define the punctuation to be removed (exclude apostrophes)
    punctuation_to_remove = string.punctuation.replace("'", "")
    return text.translate(str.maketrans('', '', punctuation_to_remove))

def extract_error_based_features(example):
    # Grammar Error Count using LanguageTool
    clean_text = remove_punctuation_except_apostrophe(example)
    words = clean_text.split()  # Simple word splitting
    misspelled = spell_checker.unknown(words)  # Identify misspelled words
    # Count how often each misspelled word appears
    grammar_error_count = Counter([word for word in words if word in misspelled])

    # grammar_matches = tool.check(example['full_text'])
    # grammar_error_count = len(grammar_matches)
    
    # Syntactic Complexity Calculation with Spacy
    doc = nlp(example['full_text'])
    sentence_depths = [len([token for token in sentence if token.dep_ != 'punct']) for sentence in doc.sents]
    syntactic_complexity = sum(sentence_depths) / len(sentence_depths) if sentence_depths else 0

    # Spelling Mistake Count using TextBlob
    blob = TextBlob(example['full_text'])
    spelling_mistake_count = sum(1 for word in blob.words if word.correct() != word)

    # Error Density 
    word_count = len(blob.words)
    error_density = (grammar_error_count + spelling_mistake_count) / word_count if word_count > 0 else 0

    example['grammar_error_count'] = grammar_error_count
    example['syntactic_complexity'] = syntactic_complexity
    example['spelling_mistake_count'] = spelling_mistake_count
    example['error_density'] = error_density
    
    return example

# TF-IDF Vectorizer
TOP_N_KEYWORDS = 100  
vectorizer = TfidfVectorizer(stop_words=list(stop_words), max_features=TOP_N_KEYWORDS)
tfidf_matrix = vectorizer.fit_transform(corpus)

# Reduce dimensionality of the TF-IDF matrix to 50 dimensions
svd = TruncatedSVD(n_components=50, random_state=42)
tfidf_reduced = svd.fit_transform(tfidf_matrix)
tfidf_keywords_vectors = tfidf_reduced.tolist()

# Tokenize and prepare corpus for LDA
tokenized_corpus = [[word for word in word_tokenize(doc.lower()) if word.isalpha() and word not in stop_words] for doc in corpus]
dictionary = Dictionary(tokenized_corpus)
bow_corpus = [dictionary.doc2bow(text) for text in tokenized_corpus]

NUM_TOPICS = 15 
lda_model = LdaModel(bow_corpus, num_topics=NUM_TOPICS, id2word=dictionary, passes=10)

# Function to generate topic coherence vector for each essay
def lda_topic_vector(text):
    bow = dictionary.doc2bow([word for word in word_tokenize(text.lower()) if word.isalpha()])
    topic_distribution = lda_model.get_document_topics(bow, minimum_probability=0.0)
    return [prob for _, prob in topic_distribution]

# Define high-frequency keywords based on TF-IDF
top_keywords = set(vectorizer.get_feature_names_out())

def keyword_coverage_ratio(text):
    words_in_text = set(word_tokenize(text.lower()))
    coverage_ratio = len(top_keywords & words_in_text) / len(top_keywords)
    return coverage_ratio

def extract_semantic_features(example):
    text = example['full_text']
    response = vectorizer.transform([text])
    tfidf_vector = svd.transform(response)  # 50-dimensional vector

    lda_vector = lda_topic_vector(text)

    coverage_ratio = keyword_coverage_ratio(text)
    
    example['tfidf_keywords_vector'] = tfidf_vector.tolist()[0]
    example['lda_topic_vector'] = lda_vector
    example['keyword_coverage'] = coverage_ratio
    
    return example

def is_long_word(word):
    """Checks if a word has 3 or more syllables."""
    return syllapy.count(word) >= 3

def is_imagery_word(word):
    """Checks if a word is an imagery word, based on WordNet's synsets."""
    synsets = wordnet.synsets(word)
    if not synsets:
        return False
    imagery_tags = ['noun.artifact', 'noun.object', 'noun.plant', 'noun.animal', 'noun.body']
    return any(tag in str(synset.lexname()) for synset in synsets for tag in imagery_tags)

def extract_stylistic_features(example):
    words = [word for word in word_tokenize(example['full_text'].lower()) if word.isalpha() and word not in stop_words]

    pos_tags = pos_tag(words)
    pronoun_count = sum(1 for _, tag in pos_tags if tag in ['PRP', 'PRP$', 'WP', 'WP$'])
    example['pronoun_usage'] = pronoun_count / len(words) if words else 0

    long_words = [word for word in words if is_long_word(word)]
    imagery_words = [word for word in words if is_imagery_word(word)]

    unique_words = set(words)
    example['unique_word_proportion'] = len(unique_words) / len(words) if words else 0

    example['long_word_proportion'] = len(long_words) / len(words) if words else 0
    example['imagery_word_proportion'] = len(imagery_words) / len(words) if words else 0

    blob = TextBlob(example['full_text'])
    example['positive_sentiment_score'] = blob.sentiment.polarity if blob.sentiment.polarity > 0 else 0
    example['negative_sentiment_score'] = -blob.sentiment.polarity if blob.sentiment.polarity < 0 else 0

    return example


def is_visual_word(word):
    """Checks if a word evokes visual imagery, using WordNet synsets."""
    synsets = wordnet.synsets(word)
    if not synsets:
        return False
    
    visual_tags = ['noun.artifact', 'noun.object', 'noun.plant', 'noun.animal', 'noun.body', 'adj.all']
    return any(tag in str(synset.lexname()) for synset in synsets for tag in visual_tags)

def extract_visual_descriptive_features(example):
    words = [word for word in word_tokenize(example['full_text'].lower()) if word.isalpha() and word not in stop_words]

    visual_words = [word for word in words if is_visual_word(word)]
    unique_visual_words = set(visual_words)

    example['visual_word_proportion'] = len(visual_words) / len(words) if words else 0
    example['unique_visual_word_proportion'] = len(unique_visual_words) / len(words) if words else 0
    example['average_imagery_score'] = len(visual_words) / len(unique_visual_words) if unique_visual_words else 0

    return example

def extract_cohesion_features(example):
    
    # Discourse Coherence with Dependency Parsing
    doc = nlp(example['full_text'])
    discourse_marker_count = 0
    for token in doc:
        if token.dep_ in {"mark", "cc", "advmod"}:  
            discourse_marker_count += 1

    words = [token.text for token in doc if token.is_alpha]
    example['discourse_marker_count'] = discourse_marker_count / len(words) if words else 0

    # Neural Coherence Score
    sentences = sent_tokenize(example['full_text'])
    if len(sentences) > 1:
        sentence_embeddings = embedding_model.encode(sentences)
        similarities = [
            np.dot(sentence_embeddings[i], sentence_embeddings[i + 1]) / 
            (np.linalg.norm(sentence_embeddings[i]) * np.linalg.norm(sentence_embeddings[i + 1]))
            for i in range(len(sentence_embeddings) - 1)
        ]
        example['neural_coherence_score'] = np.mean(similarities) if similarities else 0
    else:
        example['neural_coherence_score'] = 0  

    return example

# Function to get Longformer sentence embeddings
def get_longformer_embedding(text):
    # Tokenize the text and move input tensors to the same device as the model
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=4096).to(device)
    with torch.no_grad():
        outputs = longformer_model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()  # Move to CPU for numpy compatibility
    return embeddings

# Function to calculate coherence score using cosine similarity between sentence embeddings
def calculate_coherence_score(text):
    sentences = text.split('.')
    sentence_embeddings = []

    # Get Longformer embeddings for each sentence
    for sentence in sentences:
        if sentence.strip():  # Avoid empty sentences
            embedding = get_longformer_embedding(sentence.strip())
            sentence_embeddings.append(embedding)

    # Calculate pairwise cosine similarities between consecutive sentence embeddings
    coherence_scores = [
        cosine_similarity([sentence_embeddings[i]], [sentence_embeddings[i + 1]])[0][0]
        for i in range(len(sentence_embeddings) - 1)
    ]

    return np.mean(coherence_scores) if coherence_scores else 0

# Function to extract neural network-based features
def extract_neural_features(example):
    essay_embedding = get_longformer_embedding(example['full_text'])
    example['longformer_sentence_embedding'] = essay_embedding

    example['longformer_coherence_score'] = calculate_coherence_score(example['full_text'])

    return example

def calculate_frequency_score(words):
    """Calculates average frequency of words in a standard corpus, with lower frequencies indicating more sophisticated vocabulary."""
    frequencies = [word_frequencies[word.lower()] for word in words if word.lower() in word_frequencies]
    return sum(frequencies) / len(frequencies) if frequencies else 0

def extract_vocabulary_features(example):
    # Tokenize and filter words
    words = [word for word in word_tokenize(example['full_text'].lower()) if word.isalpha() and word not in stop_words]

    # Type-Token Ratio (TTR) via spaCy
    doc = nlp(" ".join(words))
    unique_words = set(token.text for token in doc)
    example['type_token_ratio'] = len(unique_words) / len(words) if words else 0

    # Lexical Diversity (spacy-based diversity metric)
    example['lexical_diversity'] = doc._.mtld if hasattr(doc._, 'mtld') else ld.mtld(words)  # Ensure spaCy extension or fallback
    
    # Vocabulary Maturity (using word frequency as proxy for rarity/sophistication)
    example['vocabulary_maturity'] = calculate_frequency_score(words)

    return example


def apply_all_features(dataset):
    features = []
    
    for example in dataset:
        example = extract_linguistic_lexical_features(example)
        example = extract_error_based_features(example)
        example = extract_semantic_features(example)
        example = extract_stylistic_features(example)
        example = extract_visual_descriptive_features(example)
        example = extract_cohesion_features(example)
        example = extract_neural_features(example)
        example = extract_vocabulary_features(example)
        
        features.append(example)
    
    return features

  import pkg_resources
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(parent)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespac

## KNN

In [7]:
if (evaluation_type == 'knn'):
    # train_data_features = apply_all_features(train_dataset)
    test_data_features = apply_all_features(test_dataset)

    
    # load model and do predictions
    model = load(f"{knn_path}/model.joblib")
    predictions = model.predict(test_data_features)
    print(predictions)

## TF-IDF

In [8]:
if (evaluation_type == 'tfidf'):

    # Define the corpus from the dataset
    train_corpus = [text for text in train_dataset['full_text']]
    test_corpus = [text for text in test_dataset['full_text']]
    
    # Define the number of top keywords for TF-IDF and the number of components for SVD
    CONFIGURATIONS = [(500, 250)]
    
    # Prepare a dictionary to hold datasets for each configuration of TOP_N_KEYWORDS and N_COMPONENTS
    datasets = {}
    
    for TOP_N_KEYWORDS, N_COMPONENTS in CONFIGURATIONS:
        # TF-IDF Vectorizer
        vectorizer = TfidfVectorizer(stop_words='english', max_features=TOP_N_KEYWORDS)
        
        # Fit and transform on train data, transform on test
        tfidf_train = vectorizer.fit_transform(train_corpus)
        tfidf_test = vectorizer.transform(test_corpus)
    
        if N_COMPONENTS is None:
            # No dimensionality reduction, use the original TF-IDF vectors (convert to dense)
            datasets[f'X_train_{TOP_N_KEYWORDS}'] = tfidf_train.toarray()
            datasets[f'X_test_{TOP_N_KEYWORDS}'] = tfidf_test.toarray()
        else:
            # Reduce dimensionality of the TF-IDF matrix
            svd = TruncatedSVD(n_components=N_COMPONENTS, random_state=42)
            tfidf_train_reduced = svd.fit_transform(tfidf_train)
            tfidf_test_reduced = svd.transform(tfidf_test)
    
            # Assign the reduced TF-IDF vectors to specific variables
            datasets[f'X_train_{TOP_N_KEYWORDS}_{N_COMPONENTS}'] = tfidf_train_reduced
            datasets[f'X_test_{TOP_N_KEYWORDS}_{N_COMPONENTS}'] = tfidf_test_reduced
    
    
    X_train_500_250 = datasets['X_train_500_250']
    X_test_500_250 = datasets['X_test_500_250']

    # load model and do predictions
    model = load(f"{knn_path}/model.joblib")
    predictions = model.predict(X_test_500_250)
    print(predictions)
    

In [9]:
# Add predictions to the Dataset using `map`
test_dataset_with_predictions = test_dataset.add_column("predictions", predictions)

In [10]:
test_dataset_with_predictions

Dataset({
    features: ['essay_id', 'full_text', 'predictions'],
    num_rows: 3
})

In [11]:
submission = test_dataset_with_predictions.to_pandas()

# Select and rename the required columns
submission = submission[['essay_id', 'predictions']].rename(columns={'predictions': 'score'})

# Save as CSV
submission.to_csv("submission.csv", index=False)