In [15]:
import numpy as np # linear algebra
import pandas as pd

In [17]:
import json
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score
from transformers import GPT2Tokenizer, GPT2Model

In [19]:
def process_file_fixed(file_path, num_rows=21000):
    news = []
    with open(file_path, 'r') as file:
        for i, line in enumerate(file):
            if i >= num_rows:
                break
            json_object = json.loads(line)
            news.append(json_object)
    return news

file_path = r'C:\Users\dell\Desktop\MyDocs\Docs\MK\News_Category_Dataset_v3.json'
news = process_file_fixed(file_path, num_rows=21000)
print(len(news))

21000


In [21]:
news[5]

{'link': 'https://www.huffpost.com/entry/belk-worker-found-dead-columbiana-centre-bathroom_n_632c5f8ce4b0572027b0251d',
 'headline': 'Cleaner Was Dead In Belk Bathroom For 4 Days Before Body Found: Police',
 'category': 'U.S. NEWS',
 'short_description': 'The 63-year-old woman was seen working at the South Carolina store on Thursday. She was found dead Monday after her family reported her missing, authorities said.',
 'authors': '',
 'date': '2022-09-22'}

In [23]:
text_cat_pairs = []

for news_item in news:
    headline = news_item.get("headline")
    short_description = news_item.get("short_description")
    text = headline + " || " + short_description
    category = news_item.get("category")
    text_cat_pairs.append((text, category))

text_cat_pairs[5]

('Cleaner Was Dead In Belk Bathroom For 4 Days Before Body Found: Police || The 63-year-old woman was seen working at the South Carolina store on Thursday. She was found dead Monday after her family reported her missing, authorities said.',
 'U.S. NEWS')

In [25]:
# Create dictionaries to encode and decode category labels
label_to_index = {}  # Map category to index
index_to_label = {}  # Map index to category
i = 0  # Start index at 0

# Populate the dictionaries with unique categories
for (_, label) in text_cat_pairs:
    if label not in label_to_index:  # If the label is not yet added
        label_to_index[label] = i  # Assign it the next index
        index_to_label[i] = label  # Map the index back to the label
        i += 1  # Increment the index

In [27]:
label_to_index

{'U.S. NEWS': 0,
 'COMEDY': 1,
 'PARENTING': 2,
 'WORLD NEWS': 3,
 'CULTURE & ARTS': 4,
 'TECH': 5,
 'SPORTS': 6,
 'ENTERTAINMENT': 7,
 'POLITICS': 8,
 'WEIRD NEWS': 9,
 'ENVIRONMENT': 10,
 'EDUCATION': 11,
 'CRIME': 12,
 'SCIENCE': 13,
 'WELLNESS': 14,
 'BUSINESS': 15,
 'STYLE & BEAUTY': 16,
 'FOOD & DRINK': 17,
 'MEDIA': 18,
 'QUEER VOICES': 19,
 'HOME & LIVING': 20,
 'WOMEN': 21,
 'BLACK VOICES': 22,
 'TRAVEL': 23,
 'MONEY': 24,
 'RELIGION': 25,
 'LATINO VOICES': 26,
 'IMPACT': 27,
 'WEDDINGS': 28,
 'COLLEGE': 29,
 'PARENTS': 30,
 'ARTS & CULTURE': 31,
 'STYLE': 32,
 'GREEN': 33,
 'TASTE': 34,
 'HEALTHY LIVING': 35}

In [29]:
index_to_label

{0: 'U.S. NEWS',
 1: 'COMEDY',
 2: 'PARENTING',
 3: 'WORLD NEWS',
 4: 'CULTURE & ARTS',
 5: 'TECH',
 6: 'SPORTS',
 7: 'ENTERTAINMENT',
 8: 'POLITICS',
 9: 'WEIRD NEWS',
 10: 'ENVIRONMENT',
 11: 'EDUCATION',
 12: 'CRIME',
 13: 'SCIENCE',
 14: 'WELLNESS',
 15: 'BUSINESS',
 16: 'STYLE & BEAUTY',
 17: 'FOOD & DRINK',
 18: 'MEDIA',
 19: 'QUEER VOICES',
 20: 'HOME & LIVING',
 21: 'WOMEN',
 22: 'BLACK VOICES',
 23: 'TRAVEL',
 24: 'MONEY',
 25: 'RELIGION',
 26: 'LATINO VOICES',
 27: 'IMPACT',
 28: 'WEDDINGS',
 29: 'COLLEGE',
 30: 'PARENTS',
 31: 'ARTS & CULTURE',
 32: 'STYLE',
 33: 'GREEN',
 34: 'TASTE',
 35: 'HEALTHY LIVING'}

In [31]:
# Function to convert a label to its corresponding tensor representation
def convert_labels(label):
    return torch.tensor(label_to_index[label])  # Convert label to its encoded index

# Convert all labels into tensors
labels = [cat for (text, cat) in text_cat_pairs]  # Extract categories
labels = [convert_labels(label) for label in labels]  # Convert each label to a tensor
stacked_tensors_y = torch.stack(labels).long()  # Combine all label tensors into one

In [33]:
stacked_tensors_y.shape

torch.Size([21000])

In [43]:
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
embedding_model = GPT2Model.from_pretrained('gpt2').to('cpu')
tokenizer.pad_token = tokenizer.eos_token  # Set padding token to <|endoftext|>

In [44]:
ex = text_cat_pairs[4][0]
ex

'Woman Who Called Cops On Black Bird-Watcher Loses Lawsuit Against Ex-Employer || Amy Cooper accused investment firm Franklin Templeton of unfairly firing her and branding her a racist after video of the Central Park encounter went viral.'

In [45]:
tokens = tokenizer.encode(ex)
print([tokenizer.decode([t]) for t in tokens])

['Woman', ' Who', ' Called', ' C', 'ops', ' On', ' Black', ' Bird', '-', 'W', 'atcher', ' L', 'oses', ' Law', 'suit', ' Against', ' Ex', '-', 'Employ', 'er', ' ||', ' Amy', ' Cooper', ' accused', ' investment', ' firm', ' Franklin', ' Temple', 'ton', ' of', ' unfairly', ' firing', ' her', ' and', ' branding', ' her', ' a', ' racist', ' after', ' video', ' of', ' the', ' Central', ' Park', ' encounter', ' went', ' viral', '.']


In [49]:
# Function to embed a single sentence using GPT-2
def embed_sentence(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to('cuda')  # Tokenize and encode
    with torch.no_grad():  # Disable gradient computation
        outputs = embedding_model(**inputs)  # Forward pass through GPT-2
    hidden_states = outputs.last_hidden_state  # Extract hidden states
    sentence_embedding = hidden_states.mean(dim=1)  # Compute sentence embedding via mean pooling
    return sentence_embedding

In [51]:
len(tokens)

48

In [55]:
def embed_sentence(text):
    # Tokenize and encode the text
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to('cpu')  # Use CPU instead of GPU

    with torch.no_grad():  # Disable gradient computation
        outputs = embedding_model(**inputs)  # Forward pass through GPT-2
        embeddings = outputs.last_hidden_state.mean(dim=1)  # Get the sentence embedding (mean of token embeddings)
    return embeddings

embedded_example = embed_sentence(ex)
embedded_example.shape

torch.Size([1, 768])

In [None]:
embedded_sentences = [embed_sentence(text) for (text, cat) in text_cat_pairs]
print(embedded_sentences[-1].shape)

In [None]:
len(embedded_sentences)

In [None]:
max_len = max([x.shape[0] for x in embedded_sentences])

# Pad the shorter sentences with zeros
padded_sentences = []
for sentence in embedded_sentences:
  padding_length = max_len - sentence.shape[0]
  if padding_length == 0:
    padded_sentences.append(sentence)
  else:
    padding = torch.zeros(padding_length, sentence.shape[1])
    padded_sentence = torch.cat((sentence, padding), dim=0)
    padded_sentences.append(padded_sentence)

# Stack the padded tensors
stacked_tensors_x = torch.stack(padded_sentences)
stacked_tensors_x.shape

In [None]:
train_texts, val_texts, train_labels, val_labels = train_test_split(
    stacked_tensors_x, stacked_tensors_y, test_size=0.2, random_state=42)

In [None]:
val_texts, test_texts, val_labels, test_labels = train_test_split(
    val_texts, val_labels, test_size=0.5, random_state=42)

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, num_classes)  # 2 for bidirection

    def forward(self, x):
        # Get the device of the input tensor
        device = x.device

        # Set initial hidden and cell states on the same device as x
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)  # 2 for bidirection
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)

        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size*2)

        # Decode the hidden state of the last time step
        last_hidden = self.fc(out[:, -1, :])  # Last time step
        first_hidden = self.fc(out[:, 0, :])  # First time step
        output = last_hidden + first_hidden
        return output

In [None]:
embedding_dimensionality = stacked_tensors_x.shape[-1]
num_classes = len(label_to_index)
embedding_dimensionality, num_classes

In [None]:
embedding_dimensionality = stacked_tensors_x.shape[-1]
num_classes = len(label_to_index)
input_size = embedding_dimensionality
hidden_size = embedding_dimensionality
num_layers = 2
model = LSTM(input_size, hidden_size, num_layers, num_classes).to('cpu')

In [None]:
output = model(stacked_tensors_x.to('cpu'))
output.shape

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.005)

In [None]:
def train_model(model, train_texts, train_labels, val_texts, val_labels, epochs, optimizer, criterion):
    train_loss_history, val_loss_history = [], []
    train_acc_history, val_acc_history = [], []

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        outputs = model(train_texts.to('cuda'))
        loss = criterion(outputs, train_labels.to('cuda'))
        train_loss_history.append(loss.item())
        loss.backward()
        optimizer.step()

        _, preds = torch.max(outputs, 1)
        train_acc = (preds == train_labels.to('cuda')).float().mean().item()
        train_acc_history.append(train_acc)

        model.eval()
        with torch.no_grad():
            val_outputs = model(val_texts.to('cuda'))
            val_loss = criterion(val_outputs, val_labels.to('cuda'))
            val_loss_history.append(val_loss.item())

            _, val_preds = torch.max(val_outputs, 1)
            val_acc = (val_preds == val_labels.to('cuda')).float().mean().item()
            val_acc_history.append(val_acc)

        print(f"Epoch {epoch+1}/{epochs} - "
              f"Train Loss: {loss.item():.4f}, Train Acc: {train_acc:.4f}, "
              f"Val Loss: {val_loss.item():.4f}, Val Acc: {val_acc:.4f}")

    return train_loss_history, val_loss_history, train_acc_history, val_acc_history

In [None]:
epochs = 200
train_loss, val_loss, train_acc, val_acc = train_model(
    model, train_texts, train_labels, val_texts, val_labels, epochs, optimizer, criterion)

In [None]:
def plot_metrics(train_history, val_history, metric_name):
    plt.figure(figsize=(8, 5))
    plt.plot(train_history, label=f'Train {metric_name}')
    plt.plot(val_history, label=f'Validation {metric_name}')
    plt.xlabel('Epochs')
    plt.ylabel(metric_name)
    plt.title(f'{metric_name} over Epochs')
    plt.legend()
    plt.show()

In [None]:
plot_metrics(train_acc, val_acc, "Accuracy")
plot_metrics(train_loss, val_loss, "Loss")

In [None]:
def evaluate_model(model, test_texts, test_labels):
    model.eval()
    with torch.no_grad():
        test_outputs = model(test_texts.to('cuda'))
        _, test_preds = torch.max(test_outputs, 1)

    test_labels = test_labels.cpu().numpy()
    test_preds = test_preds.cpu().numpy()

    precision = precision_score(test_labels, test_preds, average='weighted')
    recall = recall_score(test_labels, test_preds, average='weighted')
    print(f"Precision: {precision:.4f}, Recall: {recall:.4f}")

In [None]:
evaluate_model(model, test_texts, test_labels)

In [None]:
class LSTMAttention(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMAttention, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, 
                            batch_first=True, bidirectional=True)
        # Define the attention mechanism
        self.attention = nn.Linear(hidden_size * 2, 1)  # 2 for bidirectional
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        # Initialize hidden and cell states
        device = x.device
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)

        # LSTM
        out, _ = self.lstm(x, (h0, c0))

        # Attention mechanism
        # Compute attention weights
        attn_weights = torch.softmax(self.attention(out), dim=1)  # Shape: [batch_size, seq_len, 1]
        attn_output = torch.sum(attn_weights * out, dim=1)  # Weighted sum of hidden states

        # Fully connected layer
        output = self.fc(attn_output)
        return output

In [None]:
# Initialize and train the LSTMAttention model
attention_model = LSTMAttention(input_size, hidden_size, num_layers, num_classes).to('cuda')

attention_optimizer = optim.AdamW(attention_model.parameters(), lr=0.005)
attention_train_loss, attention_val_loss, attention_train_acc, attention_val_acc = train_model(
    attention_model, train_texts, train_labels, val_texts, val_labels, epochs, attention_optimizer, criterion)

plot_metrics(attention_train_acc, attention_val_acc, "Accuracy (Attention)")
plot_metrics(attention_train_loss, attention_val_loss, "Loss (Attention)")

In [None]:
def evaluate_per_class(model, test_texts, test_labels, index_to_label):
    model.eval()
    with torch.no_grad():
        test_outputs = model(test_texts.to('cuda'))
        _, test_preds = torch.max(test_outputs, 1)

    test_labels = test_labels.cpu().numpy()
    test_preds = test_preds.cpu().numpy()

    precision = precision_score(test_labels, test_preds, average=None)
    recall = recall_score(test_labels, test_preds, average=None)

    for i, label in index_to_label.items():
        print(f"Class {label}: Precision: {precision[i]:.4f}, Recall: {recall[i]:.4f}")

In [None]:
print("LSTM Performance:")
evaluate_per_class(model, test_texts, test_labels, index_to_label)

print("\nLSTM with Attention Performance:")
evaluate_per_class(attention_model, test_texts, test_labels, index_to_label)

In [None]:
def predict_category(text, model, tokenizer, index_to_label):
    model.eval()
    with torch.no_grad():
        embedded_text = embed_sentence(text).to('cuda')
        output = model(embedded_text.unsqueeze(0))  # Add batch dimension
        _, prediction = torch.max(output, 1)
    return index_to_label[prediction.item()]

user_input = "Breaking news: AI is transforming industries worldwide!"
predicted_category = predict_category(user_input, attention_model, tokenizer, index_to_label)
print(f"Predicted Category: {predicted_category}")

In [None]:
def analyze_errors(model, test_texts, test_labels, text_cat_pairs, index_to_label):
    model.eval()
    with torch.no_grad():
        test_outputs = model(test_texts.to('cuda'))
        _, test_preds = torch.max(test_outputs, 1)

    test_labels = test_labels.cpu().numpy()
    test_preds = test_preds.cpu().numpy()

    errors = (test_preds != test_labels)
    for idx, error in enumerate(errors):
        if error:
            print(f"Text: {text_cat_pairs[idx][0]}")
            print(f"True Label: {index_to_label[test_labels[idx]]}")
            print(f"Predicted Label: {index_to_label[test_preds[idx]]}")
            print("-" * 50)

In [None]:
print("LSTM Errors:")
analyze_errors(model, test_texts, test_labels, text_cat_pairs, index_to_label)

print("\nLSTM with Attention Errors:")
analyze_errors(attention_model, test_texts, test_labels, text_cat_pairs, index_to_label)