<a href="https://colab.research.google.com/github/Altaieb-Mohammed/Course_NLP/blob/main/NLP_works.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import random
import numpy as np
import torch
import pandas as pd
import os
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score, classification_report

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(42)

def load_data_from_csv(file_path, text_col='english_text', label_col='label'):
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"CSV file not found: {file_path}")
    df = pd.read_csv(file_path)
    if text_col not in df.columns or label_col not in df.columns:
        raise ValueError(f"CSV must contain columns '{text_col}' and '{label_col}'")
    texts = df[text_col].astype(str).tolist()
    labels = df[label_col].tolist()
    return texts, labels

def tfidf_svm_train_predict(train_texts, train_labels, val_texts, val_labels):
    print("Начинаем обучение TF-IDF + SVM...")
    pipeline = make_pipeline(TfidfVectorizer(), LinearSVC())
    pipeline.fit(train_texts, train_labels)
    val_preds = pipeline.predict(val_texts)
    print("TF-IDF + SVM Validation Accuracy:", accuracy_score(val_labels, val_preds))
    print(classification_report(val_labels, val_preds, zero_division=0))
    print("TF-IDF + SVM завершено.\n")
    return val_preds

if __name__ == "__main__":
    train_csv_path = "train.csv"
    val_csv_path = "validation.csv"
    text_column = 'english_text'

    train_texts, train_labels = load_data_from_csv(train_csv_path, text_col=text_column)
    val_texts, val_labels = load_data_from_csv(val_csv_path, text_col=text_column)

    tfidf_svm_train_predict(train_texts, train_labels, val_texts, val_labels)

FileNotFoundError: CSV file not found: train.csv

In [None]:
import random
import numpy as np
import torch
import pandas as pd
import os
import argparse
import logging
from tqdm import tqdm
from typing import List, Tuple, Union

try:
    from gensim.models import KeyedVectors
except ImportError:
    KeyedVectors = None

try:
    import fasttext
except ImportError:
    fasttext = None

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report


def set_seed(seed: int = 42) -> None:
    """Set random seed for reproducibility."""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    logging.info(f"Random seed set to {seed}")


def load_data_from_csv(file_path: str, text_col: str = 'english_text', label_col: str = 'label') -> Tuple[List[str], List[int]]:
    """
    Load texts and labels from a CSV file.

    Args:
        file_path: Path to the CSV file.
        text_col: Column name for text data.
        label_col: Column name for labels.

    Returns:
        texts: List of text strings.
        labels: List of labels.
    """
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"CSV file not found: {file_path}")
    df = pd.read_csv(file_path)
    if text_col not in df.columns or label_col not in df.columns:
        raise ValueError(f"CSV must contain columns '{text_col}' and '{label_col}'")
    texts = df[text_col].astype(str).tolist()
    labels = df[label_col].tolist()
    logging.info(f"Loaded {len(texts)} samples from {file_path}")
    return texts, labels


def load_fasttext_embeddings(path: str, embedding_dim: int = 300) -> Union[KeyedVectors, fasttext.FastText._FastText]:
    """
    Load FastText embeddings from a .vec or .bin file.

    Args:
        path: Path to the embeddings file.
        embedding_dim: Dimension of embeddings.

    Returns:
        Loaded FastText model.
    """
    logging.info(f"Loading FastText embeddings from {path} ...")
    if path.endswith('.bin'):
        if fasttext is None:
            raise ImportError("fasttext library is not installed. Install it with `pip install fasttext` to load .bin files.")
        model = fasttext.load_model(path)
        logging.info("FastText binary model loaded.")
        return model
    else:
        if KeyedVectors is None:
            raise ImportError("gensim is not installed. Install it with `pip install gensim` to load .vec files.")
        model = KeyedVectors.load_word2vec_format(path, binary=False)
        logging.info("FastText text vector model loaded.")
        return model


def fasttext_vectorize(text: str, ft_model: Union[KeyedVectors, fasttext.FastText._FastText], embedding_dim: int = 300) -> np.ndarray:
    """
    Vectorize a single text using FastText embeddings.

    Args:
        text: Input text string.
        ft_model: Loaded FastText model.
        embedding_dim: Embedding dimension.

    Returns:
        Mean vector of word embeddings.
    """
    words = text.split()
    vectors = []
    if hasattr(ft_model, 'get_word_vector'):
        # fasttext binary model
        vectors = [ft_model.get_word_vector(word) for word in words]
    else:
        # gensim KeyedVectors
        vectors = [ft_model[word] for word in words if word in ft_model]

    if not vectors:
        return np.zeros(embedding_dim, dtype=np.float32)
    return np.mean(vectors, axis=0)


def batch_vectorize(texts: List[str], ft_model: Union[KeyedVectors, fasttext.FastText._FastText], embedding_dim: int = 300, batch_size: int = 512) -> np.ndarray:
    """
    Vectorize a list of texts in batches.

    Args:
        texts: List of text strings.
        ft_model: Loaded FastText model.
        embedding_dim: Embedding dimension.
        batch_size: Number of texts to process per batch.

    Returns:
        Array of vectorized texts.
    """
    vectors = []
    for i in tqdm(range(0, len(texts), batch_size), desc="Vectorizing texts"):
        batch = texts[i:i + batch_size]
        batch_vecs = [fasttext_vectorize(text, ft_model, embedding_dim) for text in batch]
        vectors.extend(batch_vecs)
    return np.array(vectors, dtype=np.float32)


def fasttext_train_predict(train_texts: List[str], train_labels: List[int], val_texts: List[str], val_labels: List[int],
                           ft_model: Union[KeyedVectors, fasttext.FastText._FastText], embedding_dim: int = 300) -> List[int]:
    """
    Train Logistic Regression on FastText embeddings and predict on validation data.

    Args:
        train_texts: Training texts.
        train_labels: Training labels.
        val_texts: Validation texts.
        val_labels: Validation labels.
        ft_model: Loaded FastText model.
        embedding_dim: Embedding dimension.

    Returns:
        Validation predictions.
    """
    logging.info("Starting training FastText + Logistic Regression...")
    X_train = batch_vectorize(train_texts, ft_model, embedding_dim)
    X_val = batch_vectorize(val_texts, ft_model, embedding_dim)

    clf = LogisticRegression(max_iter=5000, n_jobs=-1)
    clf.fit(X_train, train_labels)

    val_preds = clf.predict(X_val)

    acc = accuracy_score(val_labels, val_preds)
    logging.info(f"Validation Accuracy: {acc:.4f}")
    logging.info("Classification Report:\n" + classification_report(val_labels, val_preds, zero_division=0))
    logging.info("Training completed.\n")

    return val_preds.tolist()


def main():
    parser = argparse.ArgumentParser(description="FastText + Logistic Regression Text Classification")
    parser.add_argument("--train_csv", type=str, default="train.csv", help="Path to training CSV file")
    parser.add_argument("--val_csv", type=str, default="validation.csv", help="Path to validation CSV file")
    parser.add_argument("--text_col", type=str, default="english_text", help="Name of the text column")
    parser.add_argument("--label_col", type=str, default="label", help="Name of the label column")
    parser.add_argument("--fasttext_path", type=str, default="cc.ru.300.vec", help="Path to FastText embeddings file (.vec or .bin)")
    parser.add_argument("--embedding_dim", type=int, default=300, help="Dimension of FastText embeddings")
    parser.add_argument("--seed", type=int, default=42, help="Random seed")
    args = parser.parse_args()

    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s')

    set_seed(args.seed)

    try:
        train_texts, train_labels = load_data_from_csv(args.train_csv, args.text_col, args.label_col)
        val_texts, val_labels = load_data_from_csv(args.val_csv, args.text_col, args.label_col)
    except (FileNotFoundError, ValueError) as e:
        logging.error(e)
        return

    if not os.path.exists(args.fasttext_path):
        logging.error(f"FastText embeddings file not found at {args.fasttext_path}. Skipping FastText model.")
        return

    try:
        ft_model = load_fasttext_embeddings(args.fasttext_path, args.embedding_dim)
    except Exception as e:
        logging.error(f"Failed to load FastText embeddings: {e}")
        return

    fasttext_train_predict(train_texts, train_labels, val_texts, val_labels, ft_model, args.embedding_dim)


if __name__ == "__main__":
    main()


In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import os
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(42)

def load_data_from_csv(file_path, text_col='english_text', label_col='label'):
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"CSV file not found: {file_path}")
    df = pd.read_csv(file_path)
    if text_col not in df.columns or label_col not in df.columns:
        raise ValueError(f"CSV must contain columns '{text_col}' and '{label_col}'")
    texts = df[text_col].astype(str).tolist()
    labels = df[label_col].tolist()
    return texts, labels

def load_glove_embeddings(path, word_to_idx, embedding_dim=100):
    print(f"Loading GloVe embeddings from {path} ...")
    embedding_matrix = np.zeros((len(word_to_idx), embedding_dim))
    found = 0
    with open(path, encoding="utf8") as f:
        for line in f:
            parts = line.strip().split()
            word = parts[0]
            if word in word_to_idx:
                embedding_matrix[word_to_idx[word]] = np.array(parts[1:], dtype=np.float32)
                found += 1
    print(f"Found embeddings for {found} words out of {len(word_to_idx)}")
    return torch.tensor(embedding_matrix, dtype=torch.float32)

def build_vocab(texts):
    vocab = {'<PAD>': 0, '<UNK>': 1}
    for text in texts:
        for word in text.split():
            if word not in vocab:
                vocab[word] = len(vocab)
    return vocab

def text_to_indices(texts, vocab, max_len=100):
    indices = []
    for text in texts:
        idxs = [vocab.get(word, vocab['<UNK>']) for word in text.split()]
        idxs = (idxs + [vocab['<PAD>']] * max_len)[:max_len]
        indices.append(idxs)
    return torch.tensor(indices, dtype=torch.long)

class BiLSTM(nn.Module):
    def __init__(self, embedding_matrix, hidden_dim, output_dim, dropout=0.3):
        super().__init__()
        num_embeddings, embedding_dim = embedding_matrix.shape
        self.embedding = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.embedding.weight = nn.Parameter(embedding_matrix)
        self.embedding.weight.requires_grad = False
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        _, (hidden, _) = self.lstm(embedded)
        hidden_cat = torch.cat((hidden[-2], hidden[-1]), dim=1)
        return self.fc(self.dropout(hidden_cat))

def train_bilstm(train_texts, train_labels, val_texts, val_labels, glove_path, num_classes,
                 epochs=10, batch_size=32, max_len=100, hidden_dim=128, lr=0.001, device=None):
    print("Начинаем обучение BiLSTM...")
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    vocab = build_vocab(train_texts + val_texts)
    embedding_matrix = load_glove_embeddings(glove_path, vocab, embedding_dim=100)
    X_train = text_to_indices(train_texts, vocab, max_len)
    X_val = text_to_indices(val_texts, vocab, max_len)
    y_train = torch.tensor(train_labels, dtype=torch.long)
    y_val = torch.tensor(val_labels, dtype=torch.long)
    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    model = BiLSTM(embedding_matrix, hidden_dim, num_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_val_acc = 0
    patience = 3
    patience_counter = 0
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        total = 0
        for x_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch+1} training"):
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * x_batch.size(0)
            preds = outputs.argmax(dim=1)
            train_correct += (preds == y_batch).sum().item()
            total += x_batch.size(0)
        train_acc = train_correct / total
        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for x_batch, y_batch in val_loader:
                x_batch, y_batch = x_batch.to(device), y_batch.to(device)
                outputs = model(x_batch)
                preds = outputs.argmax(dim=1)
                val_correct += (preds == y_batch).sum().item()
                val_total += x_batch.size(0)
        val_acc = val_correct / val_total
        print(f"Epoch {epoch+1}: Train Loss={train_loss/total:.4f}, Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}")
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            torch.save(model.state_dict(), "best_bilstm.pt")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                break
    model.load_state_dict(torch.load("best_bilstm.pt"))
    model.eval()
    all_preds = []
    with torch.no_grad():
        for x_batch, _ in val_loader:
            x_batch = x_batch.to(device)
            outputs = model(x_batch)
            preds = outputs.argmax(dim=1).cpu().numpy()
            all_preds.extend(preds)
    print("BiLSTM Validation Accuracy:", accuracy_score(val_labels, all_preds))
    print(classification_report(val_labels, all_preds, zero_division=0))
    print("BiLSTM обучение завершено.\n")
    return all_preds

if __name__ == "__main__":
    train_csv_path = "train.csv"
    val_csv_path = "validation.csv"
    text_column = 'english_text'
    glove_path = "glove.6B.100d.txt"

    train_texts, train_labels = load_data_from_csv(train_csv_path, text_col=text_column)
    val_texts, val_labels = load_data_from_csv(val_csv_path, text_col=text_column)
    num_classes = len(set(train_labels))

    if os.path.exists(glove_path):
        train_bilstm(train_texts, train_labels, val_texts, val_labels, glove_path, num_classes)
    else:
        print(f"GloVe embeddings file not found at {glove_path}. Skipping BiLSTM model.")


In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from datasets import Dataset
from tqdm import tqdm
from gensim.models import KeyedVectors
from collections import Counter
import pandas as pd
import os

# Set seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(42)

# Load dataset from CSV
def load_data_from_csv(file_path, text_col='english_text', label_col='label'):
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"CSV file not found: {file_path}")
    df = pd.read_csv(file_path)
    if text_col not in df.columns or label_col not in df.columns:
        raise ValueError(f"CSV must contain columns '{text_col}' and '{label_col}'")
    texts = df[text_col].astype(str).tolist()
    labels = df[label_col].tolist()
    return texts, labels

# 4. Fine-tuning mBERT
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc}

def mbert_train(train_texts, train_labels, val_texts, val_labels, num_classes,
                model_name="bert-base-multilingual-cased", batch_size=8, epochs=3, lr=2e-5, max_len=128):
    print("Начинаем обучение mBERT...")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    def tokenize_function(examples):
        return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=max_len)
    train_dataset = Dataset.from_dict({"text": train_texts, "label": train_labels})
    val_dataset = Dataset.from_dict({"text": val_texts, "label": val_labels})
    train_dataset = train_dataset.map(tokenize_function, batched=True)
    val_dataset = val_dataset.map(tokenize_function, batched=True)
    train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_classes)
    training_args = TrainingArguments(
        output_dir="./mbert_output",
        eval_strategy="epoch",          # Correct argument name
        save_strategy="epoch",
        learning_rate=lr,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        num_train_epochs=epochs,
        weight_decay=0.01,
        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        save_total_limit=2,
        seed=42,
        logging_dir="./logs",
        logging_steps=50,
        report_to="none"
    )
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
    )
    trainer.train()
    print("mBERT обучение завершено.\n")
    return model, tokenizer

def mbert_predict(model, tokenizer, texts, labels, batch_size=8, max_len=128, device=None):
    print("Начинаем предсказание mBERT...")
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    all_preds = []
    for i in tqdm(range(0, len(texts), batch_size), desc="Predicting"):
        batch_texts = texts[i:i+batch_size]
        encodings = tokenizer(batch_texts, truncation=True, padding=True, max_length=max_len, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model(**encodings)
            preds = torch.argmax(outputs.logits, dim=1).cpu().numpy()
            all_preds.extend(preds)
    print("Fine-tuned mBERT Validation Accuracy:", accuracy_score(labels, all_preds))
    print(classification_report(labels, all_preds, zero_division=0))
    print("Предсказание mBERT завершено.\n")
    return all_preds

# Main execution
if __name__ == "__main__":
    train_csv_path = "train.csv"       # Your training CSV path
    val_csv_path = "validation.csv"    # Your validation CSV path
    text_column = 'english_text'       # or 'uzbek_text'

    train_texts, train_labels = load_data_from_csv(train_csv_path, text_col=text_column)
    val_texts, val_labels = load_data_from_csv(val_csv_path, text_col=text_column)

    num_classes = len(set(train_labels))

    tfidf_svm_train_predict(train_texts, train_labels, val_texts, val_labels)

    fasttext_path = "cc.ru.300.vec"
    if os.path.exists(fasttext_path):
        ft_model, ft_dim = load_fasttext_embeddings(fasttext_path)
        fasttext_train_predict(train_texts, train_labels, val_texts, val_labels, ft_model, ft_dim)
    else:
        print(f"FastText embeddings file not found at {fasttext_path}. Skipping FastText model.")

    glove_path = "glove.6B.100d.txt"
    if os.path.exists(glove_path):
        train_bilstm(train_texts, train_labels, val_texts, val_labels, glove_path, num_classes)
    else:
        print(f"GloVe embeddings file not found at {glove_path}. Skipping BiLSTM model.")

    model, tokenizer = mbert_train(train_texts, train_labels, val_texts, val_labels, num_classes)
    mbert_predict(model, tokenizer, val_texts, val_labels)
