# News Classification NLP Project
Feng Jiang, Jiayi Chen, Zihan Wang

In [None]:
# Installing dependencies
!pip install geopy > delete.txt
!pip install datasets > delete.txt
!pip install torch torchvision datasets > delete.txt
!pip install huggingface_hub > delete.txt
!rm delete.txt

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gcsfs 2024.10.0 requires fsspec==2024.10.0, but you have fsspec 2024.9.0 which is incompatible.[0m[31m
[0m

In [None]:
# Huggingface login
!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) n
Token is valid (permission: fineGrained).
The token `5190news` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `5190news`

In [None]:
#Imports
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import random
import requests
from bs4 import BeautifulSoup
import time
from concurrent.futures import ThreadPoolExecutor
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
import re
from sklearn.naive_bayes import MultinomialNB
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import LinearSVC
import tensorflow as tf
import torch
import torch.nn as nn
from transformers import PreTrainedModel, PretrainedConfig
from transformers.utils import logging
from typing import Dict, List, Optional, Union
from transformers.modeling_outputs import SequenceClassifierOutput
from huggingface_hub import push_to_hub_keras
from huggingface_hub import notebook_login
from huggingface_hub import login

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Class 1

In [None]:
class NewsClassifierConfig(PretrainedConfig):
    model_type = "news_classifier"

    def __init__(
        self,
        num_labels: int = 2,
        embedding_dim: int = 100,
        **kwargs
    ):
        self.num_labels = num_labels
        self.embedding_dim = embedding_dim
        super().__init__(**kwargs)

class NewsClassifier(PreTrainedModel):
    config_class = NewsClassifierConfig

    def __init__(self, config: NewsClassifierConfig):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.glove_embeddings = {}
        self.svm_classifier = LinearSVC()
        self.embedding_dim = config.embedding_dim

        # Initialize a simple linear layer for PyTorch compatibility
        self.classifier = nn.Linear(self.embedding_dim, self.num_labels)

    def load_glove(self, glove_path: str):
        """Load GloVe embeddings from file."""
        print("Loading GloVe embeddings...")
        with open(glove_path, 'r', encoding='utf-8') as f:
            for line in f:
                values = line.split()
                word = values[0]
                vector = np.asarray(values[1:], dtype='float32')
                self.glove_embeddings[word] = vector
        print("GloVe embeddings loaded.")

    def clean_text(self, text: str) -> str:
        """Clean and preprocess text."""
        text = str(text)
        contractions = {
            "n't": " not",
            "'s": " is",
            "'ll": " will",
            "'ve": " have"
        }
        for contraction, expansion in contractions.items():
            text = text.replace(contraction, expansion)
        text = re.sub(r'\$(\\d+)\\.?\\d*\\s*(million|billion|trillion)?', r'$ \\1', text, flags=re.IGNORECASE)
        text = re.sub(r'http\\S+', '', text)
        text = re.sub(r'-', ' ', text)
        text = text.lower()
        text = ' '.join(text.split())
        return text

    def get_document_vector(self, text: str) -> np.ndarray:
        """Convert text to document vector using GloVe embeddings."""
        words = text.split()
        words = [word for word in words if word in self.glove_embeddings]
        if not words:
            return np.zeros(self.embedding_dim)
        vectors = [self.glove_embeddings[word] for word in words]
        return np.mean(vectors, axis=0)

    def fit(self, texts: List[str], labels: List[int]):
        """Train the SVM classifier on the embedded texts."""
        # Preprocess texts and create document vectors
        X = np.stack([self.get_document_vector(self.clean_text(text)) for text in texts])
        self.svm_classifier.fit(X, labels)

    def forward(
        self,
        input_text: Union[str, List[str]] = None,
        labels: Optional[torch.LongTensor] = None,
        return_dict: Optional[bool] = None,
    ) -> SequenceClassifierOutput:
        """Forward pass for PyTorch compatibility."""
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        if isinstance(input_text, list):
            # Process batch of texts
            embeddings = torch.tensor(
                np.stack([self.get_document_vector(self.clean_text(text)) for text in input_text])
            ).float()
        else:
            # Process single text
            embeddings = torch.tensor(
                self.get_document_vector(self.clean_text(input_text))
            ).float().unsqueeze(0)

        logits = self.classifier(embeddings)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,)
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits
        )

    def predict(self, texts: List[str]) -> np.ndarray:
        """Predict labels for given texts."""
        X = np.stack([self.get_document_vector(self.clean_text(text)) for text in texts])
        return self.svm_classifier.predict(X)

    def save_pretrained(self, save_directory: str, **kwargs):
        """Save the model and its configuration."""
        super().save_pretrained(save_directory, **kwargs)

        import os
        import pickle

        # Save SVM classifier
        with open(os.path.join(save_directory, 'svm_classifier.pkl'), 'wb') as f:
            pickle.dump(self.svm_classifier, f)

        # Save GloVe embeddings
        with open(os.path.join(save_directory, 'glove_embeddings.pkl'), 'wb') as f:
            pickle.dump(self.glove_embeddings, f)

    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path: str, *model_args, **kwargs):
        """Load a pretrained model."""
        model = super().from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs)

        import os
        import pickle

        # Load SVM classifier if exists
        svm_path = os.path.join(pretrained_model_name_or_path, 'svm_classifier.pkl')
        if os.path.exists(svm_path):
            with open(svm_path, 'rb') as f:
                model.svm_classifier = pickle.load(f)

        # Load GloVe embeddings if exist
        glove_path = os.path.join(pretrained_model_name_or_path, 'glove_embeddings.pkl')
        if os.path.exists(glove_path):
            with open(glove_path, 'rb') as f:
                model.glove_embeddings = pickle.load(f)

        return model

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip /content/glove.6B.zip glove.6B.100d.txt -d /content/drive/My\ Drive/5190_project/
!ls "/content/drive/My Drive/5190_project/"

--2024-12-14 04:28:34--  http://nlp.stanford.edu/data/glove.6B.zip
Resolving nlp.stanford.edu (nlp.stanford.edu)... 171.64.67.140
Connecting to nlp.stanford.edu (nlp.stanford.edu)|171.64.67.140|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://nlp.stanford.edu/data/glove.6B.zip [following]
--2024-12-14 04:28:34--  https://nlp.stanford.edu/data/glove.6B.zip
Connecting to nlp.stanford.edu (nlp.stanford.edu)|171.64.67.140|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://downloads.cs.stanford.edu/nlp/data/glove.6B.zip [following]
--2024-12-14 04:28:35--  https://downloads.cs.stanford.edu/nlp/data/glove.6B.zip
Resolving downloads.cs.stanford.edu (downloads.cs.stanford.edu)... 171.64.64.22
Connecting to downloads.cs.stanford.edu (downloads.cs.stanford.edu)|171.64.64.22|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 862182613 (822M) [application/zip]
Saving to: ‘glove.6B.zip’


202

In [None]:
# Push to Hugging Face
REPO_NAME = "CIS5190GoGo/NewsClassifierConfig"
print(f"Pushing model to Hugging Face Hub ({REPO_NAME})...")
model.push_to_hub(REPO_NAME)
print(f"Model successfully pushed to https://huggingface.co/{REPO_NAME}")

Pushing model to Hugging Face Hub (CIS5190GoGo/NewsClassifierConfig)...


model.safetensors:   0%|          | 0.00/992 [00:00<?, ?B/s]

svm_classifier.pkl:   0%|          | 0.00/1.40k [00:00<?, ?B/s]

glove_embeddings.pkl:   0%|          | 0.00/177M [00:00<?, ?B/s]

Upload 3 LFS files:   0%|          | 0/3 [00:00<?, ?it/s]

Model successfully pushed to https://huggingface.co/CIS5190GoGo/NewsClassifierConfig


# Transformer

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, TensorDataset
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer, AdamW, get_scheduler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [None]:
news_data = pd.read_csv("/content/drive/My Drive/5190_project/news_data.csv")

In [None]:
# Ensure GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load data
X = news_data["title"].values
y = news_data["labels"].values

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Tokenization
print("Initializing tokenizer...")
tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

def preprocess_data(texts, labels, tokenizer, max_len=128):
    """Preprocess the data: tokenize and prepare tensors."""
    encodings = tokenizer(
        texts.tolist(),
        max_length=max_len,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    )
    return encodings["input_ids"], encodings["attention_mask"], torch.tensor(labels)

# Preprocess the datasets
print("Tokenizing datasets...")
train_inputs, train_masks, train_labels = preprocess_data(X_train, y_train, tokenizer)
test_inputs, test_masks, test_labels = preprocess_data(X_test, y_test, tokenizer)

# Create DataLoader objects
train_dataset = TensorDataset(train_inputs, train_masks, train_labels)
test_dataset = TensorDataset(test_inputs, test_masks, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=16, num_workers=4)

# Initialize model
print("Initializing model...")
model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)
model.to(device)

# Optimizer and Scheduler
optimizer = AdamW(model.parameters(), lr=5e-5)
num_training_steps = len(train_loader) * 3  # Assuming 3 epochs
lr_scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

# Training loop
epochs = 3
print("Starting training...")
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for step, batch in enumerate(train_loader):
        optimizer.zero_grad()
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        lr_scheduler.step()
        total_loss += loss.item()

        if step % 100 == 0:
            print(f"Epoch {epoch + 1}, Step {step}, Loss: {loss.item():.4f}")

    print(f"Epoch {epoch + 1} completed. Average Loss: {total_loss / len(train_loader):.4f}")

# Evaluation
print("Evaluating model...")
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch[0].to(device)
        attention_mask = batch[1].to(device)
        labels = batch[2].to(device)
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")

Initializing tokenizer...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

Tokenizing datasets...




Initializing model...


model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...




Epoch 1, Step 0, Loss: 0.7017


KeyboardInterrupt: 

#Class 2

In [None]:
from transformers import PreTrainedModel, PretrainedConfig, DistilBertForSequenceClassification, DistilBertTokenizer
from transformers.utils import logging
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch.optim import AdamW
import torch
import pandas as pd
from huggingface_hub import login
import re


from huggingface_hub import push_to_hub_keras
from huggingface_hub import notebook_login


In [None]:
logging.set_verbosity_error()

In [None]:
class NewsClassifierConfig(PretrainedConfig):
    model_type = "news_classifier"

    def __init__(self, num_labels: int = 2, **kwargs):
        super().__init__(**kwargs)
        self.num_labels = num_labels

class NewsClassifier(PreTrainedModel):
    config_class = NewsClassifierConfig

    def __init__(self, config: NewsClassifierConfig):
        super().__init__(config)
        self.model = DistilBertForSequenceClassification.from_pretrained(
            "distilbert-base-uncased", num_labels=config.num_labels
        )
        self.tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

    def clean_text(self, text: str) -> str:
        """Clean and preprocess text."""
        text = str(text)
        contractions = {
            "n't": " not",
            "'s": " is",
            "'ll": " will",
            "'ve": " have"
        }
        for contraction, expansion in contractions.items():
            text = text.replace(contraction, expansion)
        text = re.sub(r'\$\\d+\.?\\d*\s*(million|billion|trillion)?', r'$ \1', text, flags=re.IGNORECASE)
        text = re.sub(r'http\\S+', '', text)
        text = re.sub(r'-', ' ', text)
        text = text.lower()
        text = ' '.join(text.split())
        return text

    def tokenize(self, texts, max_len=128):
        cleaned_texts = [self.clean_text(text) for text in texts]
        return self.tokenizer(
            cleaned_texts,
            padding="max_length",
            truncation=True,
            max_length=max_len,
            return_tensors="pt"
        )

    def forward(self, input_ids, attention_mask, labels=None):
        return self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)


    def save_pretrained(self, save_directory, **kwargs):
        super().save_pretrained(save_directory, **kwargs)
        self.tokenizer.save_pretrained(save_directory)


    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
        model = super().from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs)
        model.tokenizer = DistilBertTokenizer.from_pretrained(pretrained_model_name_or_path)
        return model


class NewsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(),
            "attention_mask": encoding["attention_mask"].squeeze(),
            "labels": torch.tensor(label, dtype=torch.long)
        }


In [None]:
# Initialize model and tokenizer
config = NewsClassifierConfig(num_labels=2)
model = NewsClassifier(config)

# Prepare datasets and dataloaders
train_dataset = NewsDataset(X_train, y_train, model.tokenizer)
test_dataset = NewsDataset(X_test, y_test, model.tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=16, num_workers=2)

# Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=5e-5)

# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}")

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")

Epoch 1, Loss: 0.5149
Epoch 2, Loss: 0.2561
Epoch 3, Loss: 0.1015
Test Accuracy: 0.8483


##sample test data

In [None]:
import pandas as pd
import torch
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import classification_report

In [None]:
df_test = pd.read_csv('/content/drive/MyDrive/5190_project/test_data_random_subset.csv')

In [None]:
# Load the model and tokenizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NewsClassifier.from_pretrained("/content/drive/MyDrive/5190_project/news_transformer_model")
model.to(device)
tokenizer = DistilBertTokenizer.from_pretrained(model_dir)

In [None]:
print(model.device)

cuda:0


In [None]:
# Define a dataset for handling tokenization
class TextDataset(Dataset):
    def __init__(self, texts, labels=None, tokenizer=None, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=False,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        input_ids = inputs['input_ids'].squeeze()
        attention_mask = inputs['attention_mask'].squeeze()

        if self.labels is not None:
            label = self.labels[idx]
            return {
                'input_ids': input_ids,
                'attention_mask': attention_mask,
                'labels': torch.tensor(label, dtype=torch.long)
            }
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask
        }


In [None]:
# Prepare the test data loader
test_dataset = TextDataset(df_test['title'].tolist(), df_test['labels'].tolist(), tokenizer, max_len=128)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [None]:
# Evaluate the model
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        preds = torch.argmax(outputs.logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())



In [None]:
print(classification_report(all_labels, all_preds, target_names=['NBC', 'FoxNews']))

              precision    recall  f1-score   support

         NBC       0.75      0.60      0.67        10
     FoxNews       0.67      0.80      0.73        10

    accuracy                           0.70        20
   macro avg       0.71      0.70      0.70        20
weighted avg       0.71      0.70      0.70        20



##Hyperparameter Tuning

In [None]:
from transformers import get_linear_schedule_with_warmup


# Hyperparameters
learning_rate = 2e-5
batch_size = 32
epochs = 4
max_grad_norm = 1.0

# DataLoader setup with new batch size
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Optimizer setup
optimizer = AdamW(model.parameters(), lr=learning_rate, eps=1e-8)

# Total number of training steps
num_training_steps = epochs * len(train_loader)

# Scheduler and gradient clipping
lr_scheduler = get_linear_schedule_with_warmup(optimizer,
                                               num_warmup_steps=0,
                                               num_training_steps=num_training_steps)

# Adjusted training loop with gradient clipping
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for step, batch in enumerate(train_loader):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

        total_loss += loss.item()
        if step % 100 == 0:
            print(f"Epoch {epoch + 1}, Step {step}, Loss: {loss.item():.4f}")

    print(f"Epoch {epoch + 1} completed. Average Loss: {total_loss / len(train_loader):.4f}")



NameError: name 'train_dataset' is not defined

# Class 3

In [None]:
from transformers import (
    PreTrainedModel, PretrainedConfig, BertForSequenceClassification, BertTokenizer,
    RobertaForSequenceClassification, RobertaTokenizer
)
from transformers.utils import logging
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch.optim import AdamW
import torch
import torch.nn as nn
import pandas as pd
import re
from huggingface_hub import login


from sklearn.model_selection import KFold, ParameterGrid
from torch.utils.data import DataLoader, Subset
from transformers import AdamW, get_scheduler
from sklearn.metrics import accuracy_score
import numpy as np

logging.set_verbosity_error()
from transformers import AutoModel, AutoTokenizer, AutoConfig

In [None]:
class NewsClassifierConfig(PretrainedConfig):
    model_type = "news_classifier"

    def __init__(self, num_labels: int = 2, model_type: str = "bert", **kwargs):
        """
        model_type: 'bert' or 'roberta'
        """
        super().__init__(**kwargs)
        self.num_labels = num_labels
        self.model_type = model_type

class NewsClassifier(PreTrainedModel):
    config_class = NewsClassifierConfig

    def __init__(self, config: NewsClassifierConfig):
        super().__init__(config)

        # Choose between BERT and RoBERTa based on config
        if config.model_type == "bert":
            self.model = BertForSequenceClassification.from_pretrained(
                "bert-base-uncased", num_labels=config.num_labels
            )
            self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
        elif config.model_type == "roberta":
            self.model = RobertaForSequenceClassification.from_pretrained(
                "roberta-base", num_labels=config.num_labels
            )
            self.tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
        else:
            raise ValueError("Invalid model_type. Choose 'bert' or 'roberta'.")

    def clean_text(self, text: str) -> str:
        """Clean and preprocess text."""
        text = str(text)
        contractions = {
            "n't": " not",
            "'s": " is",
            "'ll": " will",
            "'ve": " have"
        }
        for contraction, expansion in contractions.items():
            text = text.replace(contraction, expansion)
        text = re.sub(r'\$\\d+\.?\\d*\s*(million|billion|trillion)?', r'$ \1', text, flags=re.IGNORECASE)
        text = re.sub(r'http\\S+', '', text)
        text = re.sub(r'-', ' ', text)
        text = text.lower()
        text = ' '.join(text.split())
        return text

    def forward(self, input_ids, attention_mask, labels=None):
        return self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)

    def save_pretrained(self, save_directory, **kwargs):
        super().save_pretrained(save_directory, **kwargs)
        self.tokenizer.save_pretrained(save_directory)

    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
        config = NewsClassifierConfig.from_pretrained(pretrained_model_name_or_path, **kwargs)
        model = super(NewsClassifier, cls).from_pretrained(pretrained_model_name_or_path, config=config, *model_args, **kwargs)

        # Load tokenizer separately
        tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path)
        model.tokenizer = tokenizer
        return model


class NewsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(),
            "attention_mask": encoding["attention_mask"].squeeze(),
            "labels": torch.tensor(label, dtype=torch.long)
        }

In [None]:
from transformers import AutoConfig, AutoModel
# Register the custom classes
AutoConfig.register("news_classifier", NewsClassifierConfig)
AutoModel.register(NewsClassifierConfig, NewsClassifier)

In [None]:
# Initialize Hugging Face login
HF_TOKEN = "REPLACE WITH TOKEN"
login(HF_TOKEN)

In [None]:
# Load data
news_data = pd.read_csv("/content/drive/My Drive/5190_project/news_data.csv")
X = news_data['title'].values
y = news_data['labels'].values

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

## Model 1 BERT

In [None]:
# model_type="bert"
config = NewsClassifierConfig(num_labels=2, model_type="bert")
model = NewsClassifier(config)

# Prepare datasets and dataloaders
train_dataset = NewsDataset(X_train, y_train, model.tokenizer)
test_dataset = NewsDataset(X_test, y_test, model.tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=16, num_workers=2)

# Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=3e-5)

# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}")

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Epoch 1, Loss: 0.5309
Epoch 2, Loss: 0.2537
Epoch 3, Loss: 0.1000
Test Accuracy: 0.8647


## Model 2: RoBERTa

In [None]:
# model_type="roberta"
config = NewsClassifierConfig(num_labels=2, model_type="roberta")
model = NewsClassifier(config)

# Prepare datasets and dataloaders
train_dataset = NewsDataset(X_train, y_train, model.tokenizer)
test_dataset = NewsDataset(X_test, y_test, model.tokenizer)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=16, num_workers=2)

# Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=3e-5)

# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}")

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")



Epoch 1, Loss: 0.5741
Epoch 2, Loss: 0.3464
Epoch 3, Loss: 0.1999
Test Accuracy: 0.8739


In [None]:
new_test = pd.read_csv("/content/drive/MyDrive/5190_project/test_data_random_subset.csv")
X_test_new = new_test['title'].values
y_test_new = new_test['labels'].values
test_dataset_new = NewsDataset(X_test_new, y_test_new, model.tokenizer)
test_loader_new = DataLoader(test_dataset_new, batch_size=16, num_workers=2)

In [None]:
# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for batch in test_loader_new:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        preds = torch.argmax(logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")

Test Accuracy: 0.8500


## 5-Fold Cross Validation

In [None]:
param_grid = {
    "learning_rate": [2e-5, 3e-5, 5e-5],
    "batch_size": [8, 16],
    "num_epochs": [3, 4]
}

def train_and_evaluate(train_idx, val_idx, dataset, model, device, lr, batch_size, num_epochs):
    # Create subsets
    train_subset = Subset(dataset, train_idx)
    val_subset = Subset(dataset, val_idx)

    # Prepare dataloaders
    train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=batch_size)

    # Optimizer and learning rate scheduler
    optimizer = AdamW(model.parameters(), lr=lr)
    num_training_steps = len(train_loader) * num_epochs
    scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=num_training_steps)

    # Training loop
    model.to(device)
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        for batch in train_loader:
            optimizer.zero_grad()
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            scheduler.step()
            total_loss += loss.item()

        print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}")

    # Evaluation
    model.eval()
    val_preds, val_labels = [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs.logits, dim=-1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(val_labels, val_preds)
    return accuracy

In [None]:
# Hyperparameter Tuning and Cross-Validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)

best_accuracy = 0
best_params = None
best_model = None

# Create the dataset
dataset = NewsDataset(X, y, model.tokenizer)

for params in ParameterGrid(param_grid):
    lr = params["learning_rate"]
    batch_size = params["batch_size"]
    num_epochs = params["num_epochs"]

    print(f"\nTesting Hyperparameters: LR={lr}, Batch Size={batch_size}, Epochs={num_epochs}")
    fold_accuracies = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
        print(f"  Fold {fold + 1}...")
        # Reinitialize the model for each fold
        config = NewsClassifierConfig(num_labels=2, model_type="roberta")
        model = NewsClassifier(config)

        # Train and evaluate
        accuracy = train_and_evaluate(train_idx, val_idx, dataset, model, device, lr, batch_size, num_epochs)
        fold_accuracies.append(accuracy)
        print(f"    Fold {fold + 1} Accuracy: {accuracy:.4f}")

    mean_accuracy = np.mean(fold_accuracies)
    print(f"Mean Accuracy for LR={lr}, Batch Size={batch_size}, Epochs={num_epochs}: {mean_accuracy:.4f}")

    # Update best model and hyperparameters
    if mean_accuracy > best_accuracy:
        best_accuracy = mean_accuracy
        best_params = params
        best_model = model  # Save the best model

print("\nBest Hyperparameters:", best_params)
print("Best Cross-Validation Accuracy:", best_accuracy)


## Save Model

In [None]:
# Save the model
save_directory = "/content/drive/MyDrive/5190_project/news_classifier_roberta"
model.save_pretrained(save_directory)
model.tokenizer.save_pretrained(save_directory)
print(f"Model and tokenizer saved locally at: {save_directory}")

Model and tokenizer saved locally at: /content/drive/MyDrive/5190_project/news_classifier_roberta


In [None]:
# Push to Hugging Face Hub
REPO_NAME = "CIS5190GoGo/NewsClassifierConfig"
print(f"Pushing model to Hugging Face Hub ({REPO_NAME})...")
model.push_to_hub(REPO_NAME)
print(f"Model successfully pushed to https://huggingface.co/{REPO_NAME}")


Pushing model to Hugging Face Hub (CIS5190GoGo/NewsClassifierConfig)...


README.md:   0%|          | 0.00/2.63k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Model successfully pushed to https://huggingface.co/CIS5190GoGo/NewsClassifierConfig
