<a href="https://colab.research.google.com/github/dennistay1981/Resources/blob/main/Code%20and%20data%20in%20publications/Article%3A%20Fingerprints%20of%20EFL%20writing.%20An%20AI%20Deep%20Learning%20Approach/VAE_CLUSTER.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Install and import libraries
!pip install python-docx
!pip install contractions

import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords

import numpy as np
import string
import re
import torch.nn as nn
import torch.nn.functional as F
import os
import matplotlib.pyplot as plt
from docx import Document
import pandas as pd
from collections import Counter
from sklearn.manifold import TSNE
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from transformers import BertTokenizer, BertModel
import torch
import contractions
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
from sklearn.decomposition import PCA
import seaborn as sns
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
import pickle

In [None]:
#Import texts. Ensure that all texts are in the directory defined as base_dir
def extract_text_from_docx(file_path):
    doc = Document(file_path)
    full_text = []
    for para in doc.paragraphs:
        full_text.append(para.text)
    return '\n'.join(full_text)

# Base directory (containing all .docx files)
base_dir = r'C:\your_directory'

# Process all documents
essays = []
for file in os.listdir(base_dir):
    if file.endswith('.docx') and not file.startswith('~$'):
        try:
            file_path = os.path.join(base_dir, file)
            text = extract_text_from_docx(file_path)
            essays.append({'filename': file, 'text': text})
        except Exception as e:
            print(f"Error processing {file}: {e}")

# Create DataFrame
essays_df = pd.DataFrame(essays)


Data Preprocessing

In [None]:
# Function for comprehensive text preprocessing
def preprocess_text(text):

    # handle possessives
    text = re.sub(r"'s\b", "", text)
    # Convert to lowercase
    text = text.lower()
    # Expand contractions
    text = contractions.fix(text)
    # Directly remove apostrophes
    text = text.replace("’", "")
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    # Remove numbers
    text = re.sub(r'\d+', '', text)
    # Remove whitespace
    text = text.strip()
    text = re.sub(r'\s+', ' ', text)
    # Remove tabs
    text = re.sub(r'\t', ' ', text)   # Remove actual tab
    # Return the text
    return text

In [None]:
# Apply preprocessing to the text column
essays_df['corrected_text'] = essays_df['text'].apply(preprocess_text)

In [None]:
# Word Tokenization
essays_df['word_tokens'] = essays_df['corrected_text'].apply(lambda x:word_tokenize(x))
essays_df['word_count'] = essays_df['word_tokens'].apply(lambda x: len(x))
print('\nUnique word count: ')
print(essays_df['word_count'].unique())

In [None]:
# Stop-word removal (skip punctuation check)
stop_words = set(stopwords.words('english'))

def categorize_tokens(tokens):
    filtered_tokens = []
    stopwords_found = []

    for token in tokens:
        if token in stop_words and token not in stopwords_found:
            stopwords_found.append(token)
        elif token not in stop_words:
            filtered_tokens.append(token)

    return filtered_tokens, len(filtered_tokens), stopwords_found

In [None]:
# Apply to the word tokens column
essays_df[['filtered_tokens', 'tokens_count', 'stopwords_found']] = pd.DataFrame(essays_df['word_tokens'].apply(categorize_tokens).tolist(), index = essays_df.index)

In [None]:
# Lemmatization with POS tagging
lemmatizer = WordNetLemmatizer()

def get_wordnet_pos(word):
    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = {
        "J": wordnet.ADJ,
        "N": wordnet.NOUN,
        "V": wordnet.VERB,
        "R": wordnet.ADV
    }
    return tag_dict.get(tag, wordnet.NOUN)

# Apply lemmatization safely
def lemmatize_token_list(token_list):
    return [lemmatizer.lemmatize(str(token), get_wordnet_pos(str(token)))
            for token in token_list if token]  # Added check for empty tokens

essays_df['lemmatized_tokens'] = essays_df['filtered_tokens'].apply(lemmatize_token_list)

SBERT

In [None]:
from sentence_transformers import SentenceTransformer

# Load model
sbert_model = SentenceTransformer('all-MiniLM-L6-v2')

# Create embeddings
sbert_vectors = sbert_model.encode(essays_df['text'])

BERT

In [None]:
from transformers import BertTokenizer, BertModel
import torch

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert = BertModel.from_pretrained('bert-base-uncased')

def get_bert_embedding(text, max_length=512, max_chunks=5):
    # Tokenize and truncate
    tokens = tokenizer(text, padding='max_length', truncation=True,
                       max_length=max_length, return_tensors='pt')

    # Get BERT embeddings
    with torch.no_grad():
        outputs = bert(**tokens)

    # Use CLS token embedding (sentence representation)
    return outputs.pooler_output[0].numpy()

# Process each essay
embeddings = []

for i, essay in enumerate(essays_df['text']):
    # Tokenize the essay
    encoding = tokenizer(
        essay,
        max_length=512,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )

    # Get BERT embeddings
    with torch.no_grad():
        outputs = bert(input_ids=encoding['input_ids'],
                      attention_mask=encoding['attention_mask'])
        # Use the [CLS] token as essay representation
        cls_embedding = outputs.last_hidden_state[:, 0, :].numpy()

    embeddings.append(cls_embedding[0])

    if (i + 1) % 50 == 0:
        print(f"Processed {i + 1}/{len(essays_df)} essays")

bert_embedding = embeddings

TRAIN VAE

In [None]:
class VAE_LayerNorm(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, latent_dim=64, dropout_rate=0.2):
        super(VAE_LayerNorm, self).__init__()

        # Encoder layers using LayerNorm instead of BatchNorm
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim*2),
            nn.LayerNorm(hidden_dim*2),  # LayerNorm instead of BatchNorm
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_rate),

            nn.Linear(hidden_dim*2, hidden_dim),
            nn.LayerNorm(hidden_dim),    # LayerNorm instead of BatchNorm
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_rate),
        )

        # Mean and log variance layers for the latent space
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)

        # Decoder layers using LayerNorm instead of BatchNorm
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),    # LayerNorm instead of BatchNorm
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_rate),

            nn.Linear(hidden_dim, hidden_dim*2),
            nn.LayerNorm(hidden_dim*2),  # LayerNorm instead of BatchNorm
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_rate),

            nn.Linear(hidden_dim*2, input_dim)
        )

        self.latent_dim = latent_dim

    def encode(self, x):
        """Encode input to latent distribution parameters"""
        hidden = self.encoder(x)
        mu = self.fc_mu(hidden)
        logvar = self.fc_logvar(hidden)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        """Reparameterization trick to sample from latent space"""
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def decode(self, z):
        """Decode latent samples back to input space"""
        return self.decoder(z)

    def forward(self, x):
        """Forward pass through the VAE"""
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

    def get_latent(self, x):
        """Get the latent representation without sampling"""
        mu, _ = self.encode(x)
        return mu


In [None]:
def vae_loss_function(recon_x, x, mu, logvar, beta=1.0):
    # Reconstruction loss (MSE)
    MSE = F.mse_loss(recon_x, x, reduction='sum')

    # KL divergence
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return MSE + beta * KLD, MSE, KLD


In [None]:
def train_vae_layernorm(vectors, model_name, batch_size=64, epochs=100, learning_rate=1e-4,
                         hidden_dim=128, latent_dim=64, beta=1.0, min_batch_size=4):

    # Convert to dense if sparse
    if not isinstance(vectors, np.ndarray):
        print("Converting sparse matrix to dense array...")
        vectors = vectors.toarray()

    # Standardize the data
    print(f"Standardizing {vectors.shape[0]} vectors of dimension {vectors.shape[1]}...")
    scaler = StandardScaler()
    vectors_scaled = scaler.fit_transform(vectors)

    # Convert to torch tensors
    tensor_x = torch.tensor(vectors_scaled, dtype=torch.float32)

    # Create DataLoader
    dataset = TensorDataset(tensor_x)

    # Calculate appropriate batch size (make sure it's not too small)
    effective_batch_size = min(batch_size, max(min_batch_size, len(dataset) // 10))
    if effective_batch_size < batch_size:
        print(f"Warning: Adjusted batch size from {batch_size} to {effective_batch_size} to ensure enough batches")

    dataloader = DataLoader(dataset, batch_size=effective_batch_size, shuffle=True)

    # Create VAE model
    input_dim = vectors.shape[1]
    print(f"Creating VAE model with input_dim={input_dim}, hidden_dim={hidden_dim}, latent_dim={latent_dim}")
    vae = VAE_LayerNorm(input_dim=input_dim, hidden_dim=hidden_dim, latent_dim=latent_dim)

    # Move to device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    vae.to(device)

    # Optimizer
    optimizer = torch.optim.Adam(vae.parameters(), lr=learning_rate)

    # Training loop
    train_losses = []
    recon_losses = []
    kl_losses = []

    print(f"Starting training for {model_name} ({epochs} epochs)...")
    try:
        for epoch in range(epochs):
            epoch_loss = 0
            epoch_recon = 0
            epoch_kl = 0

            vae.train()
            for batch_idx, (data,) in enumerate(dataloader):
                data = data.to(device)

                optimizer.zero_grad()
                recon_batch, mu, logvar = vae(data)
                loss, recon, kl = vae_loss_function(recon_batch, data, mu, logvar, beta=beta)
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()
                epoch_recon += recon.item()
                epoch_kl += kl.item()

            # Calculate average losses
            avg_loss = epoch_loss / len(dataloader.dataset)
            avg_recon = epoch_recon / len(dataloader.dataset)
            avg_kl = epoch_kl / len(dataloader.dataset)

            train_losses.append(avg_loss)
            recon_losses.append(avg_recon)
            kl_losses.append(avg_kl)

            if (epoch + 1) % 5 == 0 or epoch == 0:
                print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Recon: {avg_recon:.4f}, KL: {avg_kl:.4f}")
    except Exception as e:
        print(f"Error during training: {e}")
        print("Saving the model up to this point...")

    # Plot training curves
    try:
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(train_losses, label='Total Loss')
        plt.title(f'VAE Training Loss - {model_name}')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(recon_losses, label='Reconstruction')
        plt.plot(kl_losses, label='KL Divergence')
        plt.title(f'Loss Components - {model_name}')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()

        plt.tight_layout()
        plt.savefig(f'vae_training_{model_name}.png')
        plt.show()
    except Exception as e:
        print(f"Error creating plots: {e}")

    return vae, scaler

In [None]:
def get_latent_representations_layernorm(vae, vectors, scaler, batch_size=128):

    # Convert to dense if sparse
    if not isinstance(vectors, np.ndarray):
        vectors = vectors.toarray()

    # Standardize using the same scaler
    vectors_scaled = scaler.transform(vectors)

    # Convert to torch tensor
    tensor_x = torch.tensor(vectors_scaled, dtype=torch.float32)
    dataset = TensorDataset(tensor_x)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    vae.to(device)
    vae.eval()

    latent_vectors = []
    with torch.no_grad():
        for (data,) in dataloader:
            data = data.to(device)
            mu, _ = vae.encode(data)
            latent_vectors.append(mu.cpu().numpy())

    return np.vstack(latent_vectors)

In [None]:
def process_embedding_layernorm(name, vectors, hidden_dim=128, latent_dim=64, beta=1.0, epochs=50, batch_size=64):

    print(f"\nTraining VAE for {name} embeddings...")
    vae, scaler = train_vae_layernorm(
        vectors=vectors,
        model_name=name,
        hidden_dim=hidden_dim,
        latent_dim=latent_dim,
        beta=beta,
        epochs=epochs,
        batch_size=batch_size
    )

    # Get latent representations
    latent_vectors = get_latent_representations_layernorm(vae, vectors, scaler)

    print(f"Completed VAE training for {name}. Latent vectors shape: {latent_vectors.shape}")

    # Save the model and latent vectors
    save_dir = f"vae_{name}"
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # Save VAE model
    torch.save(vae.state_dict(), f"{save_dir}/vae_model.pt")

    # Save scaler
    with open(f"{save_dir}/scaler.pkl", 'wb') as f:
        pickle.dump(scaler, f)

    # Save latent vectors
    np.save(f"{save_dir}/latent_vectors.npy", latent_vectors)

    print(f"Saved model and latent vectors to {save_dir}/")

    return latent_vectors

In [None]:
# SBERT
sbert_latent = process_embedding_layernorm("SBERT", sbert_vectors, epochs=100)

In [None]:
# BERT
bert_latent = process_embedding_layernorm("Bert", np.array(bert_embedding), epochs=100)

In [None]:
# List of embedding approaches (replace with whatever you want to compare)
approaches = ["SBERT", "Bert"]

# Results dictionaries
kmeans_results = {}
gmm_results = {}

# Count how many embedding approaches actually exist
available_approaches = []
for approach in approaches:
    vectors_path = f"vae_{approach}/latent_vectors.npy"
    if os.path.exists(vectors_path):
        available_approaches.append(approach)

# Calculate grid dimensions for plots
n_approaches = len(available_approaches)
n_cols = 2
n_rows = (n_approaches + 1) // 2  # Ceiling division to ensure enough space

# Set up the visualization figure
plt.figure(figsize=(18, 4 * n_rows))

for i, approach in enumerate(approaches):
    # Path to the saved latent vectors
    vectors_path = f"vae_{approach}/latent_vectors.npy"

    # Skip if file doesn't exist
    if not os.path.exists(vectors_path):
        print(f"No saved vectors found for {approach}")
        continue

    # Load latent vectors
    latent_vectors = np.load(vectors_path)
    print(f"Loaded {approach} latent vectors with shape {latent_vectors.shape}")

    # Apply PCA for visualization (reduce to 2D)
    pca = PCA(n_components=2)
    reduced_vectors = pca.fit_transform(latent_vectors)

    # K-means Clustering
    kmeans_scores = []
    kmeans_best_score = -1
    kmeans_best_k = 0
    kmeans_best_labels = None

    for k in range(2, 15):
        kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
        clusters = kmeans.fit_predict(latent_vectors)
        score = silhouette_score(latent_vectors, clusters)
        kmeans_scores.append(score)

        print(f"{approach} K-means with {k} clusters: silhouette = {score:.4f}")

        if score > kmeans_best_score:
            kmeans_best_score = score
            kmeans_best_k = k
            kmeans_best_labels = clusters

    kmeans_results[approach] = {
        "best_k": kmeans_best_k,
        "best_score": kmeans_best_score,
        "all_scores": kmeans_scores,
        "labels": kmeans_best_labels
    }

    print(f"Best K-means clustering for {approach}: {kmeans_best_k} clusters with score {kmeans_best_score:.4f}")

    # GMM Clustering
    gmm_scores = []
    gmm_best_score = -1
    gmm_best_k = 0
    gmm_best_labels = None

    for k in range(2, 15):
        gmm = GaussianMixture(n_components=k, random_state=42, n_init=10)
        gmm.fit(latent_vectors)
        clusters = gmm.predict(latent_vectors)
        score = silhouette_score(latent_vectors, clusters)
        gmm_scores.append(score)

        print(f"{approach} GMM with {k} clusters: silhouette = {score:.4f}")

        if score > gmm_best_score:
            gmm_best_score = score
            gmm_best_k = k
            gmm_best_labels = clusters

    gmm_results[approach] = {
        "best_k": gmm_best_k,
        "best_score": gmm_best_score,
        "all_scores": gmm_scores,
        "labels": gmm_best_labels
    }

    print(f"Best GMM clustering for {approach}: {gmm_best_k} clusters with score {gmm_best_score:.4f}")
    print()

    # ----------------- Visualization -----------------
    # Plot 1: K-means vs GMM silhouette scores
    plt.subplot(n_rows, n_cols, i+1)
    x = list(range(2, 15))
    plt.plot(x, kmeans_scores, 'o-', label='K-means')
    plt.plot(x, gmm_scores, 's-', label='GMM')
    plt.xlabel('Number of clusters')
    plt.ylabel('Silhouette Score')
    plt.title(f'{approach} Clustering Performance')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)

plt.tight_layout()
plt.show()

In [None]:
# Generate scatter plots of the best clustering for each approach
for approach in approaches:
    if approach not in kmeans_results:
        continue

    # Path to the saved latent vectors and load them
    vectors_path = f"vae_{approach}/latent_vectors.npy"
    latent_vectors = np.load(vectors_path)

    # Apply PCA for visualization
    pca = PCA(n_components=2)
    reduced_vectors = pca.fit_transform(latent_vectors)

    # Create a figure with two subplots side by side
    plt.figure(figsize=(16, 7))

    # K-means visualization
    plt.subplot(1, 2, 1)
    kmeans_labels = kmeans_results[approach]["labels"]
    plt.scatter(reduced_vectors[:, 0], reduced_vectors[:, 1], c=kmeans_labels, cmap='tab10', alpha=0.7)
    plt.title(f'{approach} - K-means (k={kmeans_results[approach]["best_k"]}, score={kmeans_results[approach]["best_score"]:.4f})')
    plt.xlabel('PCA Component 1')
    plt.ylabel('PCA Component 2')
    plt.colorbar(label='Cluster')

    # GMM visualization
    plt.subplot(1, 2, 2)
    gmm_labels = gmm_results[approach]["labels"]
    plt.scatter(reduced_vectors[:, 0], reduced_vectors[:, 1], c=gmm_labels, cmap='tab10', alpha=0.7)
    plt.title(f'{approach} - GMM (k={gmm_results[approach]["best_k"]}, score={gmm_results[approach]["best_score"]:.4f})')
    plt.xlabel('PCA Component 1')
    plt.ylabel('PCA Component 2')
    plt.colorbar(label='Cluster')

    plt.tight_layout()
    plt.show()
