In [None]:
import torch
from torch import nn
from torchtext.vocab import GloVe
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

# Assuming data and labels_one_hot are already defined
X_train, X_val, y_train, y_val = train_test_split(data, labels_one_hot, test_size=0.2, random_state=42)

glove = GloVe(name='6B', dim=100)
embedding_matrix = np.zeros((len(word_index) + 1, 100))
for word, i in word_index.items():
    if word in glove.stoi:
        embedding_vector = glove[word]
        embedding_matrix[i] = embedding_vector

class TextCNN(nn.Module):
    def __init__(self, vocab_size, embed_size, kernel_sizes, num_channels):
        super(TextCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.embedding.weight = nn.Parameter(torch.tensor(embedding_matrix, dtype=torch.float32))
        self.embedding.weight.requires_grad = True
        self.convs = nn.ModuleList(
            [nn.Conv1d(in_channels=embed_size, out_channels=c, kernel_size=k) for k, c in zip(kernel_sizes, num_channels)]
        )
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(sum(num_channels), 7)

    def forward(self, x):
        x = self.embedding(x).permute(0, 2, 1)
        convs = [torch.relu(conv(x)) for conv in self.convs]
        pools = [torch.max(conv, dim=2)[0] for conv in convs]
        out = torch.cat(pools, dim=1)
        out = self.dropout(out)
        return self.fc(out)

embed_size = 100
kernel_sizes = [4, 5, 6]
num_channels = [128, 128, 128]
vocab_size = len(word_index) + 1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TextCNN(vocab_size, embed_size, kernel_sizes, num_channels)
model = model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()

def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for batch in train_loader:
            texts, labels = batch
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_losses.append(train_loss / len(train_loader))

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                texts, labels = batch
                texts, labels = texts.to(device), labels.to(device)
                outputs = model(texts)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

        val_losses.append(val_loss / len(val_loader))

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}')

    return train_losses, val_losses

def evaluate_accuracy(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for texts, labels in data_loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

y_train_integers = np.argmax(y_train, axis=1)
y_val_integers = np.argmax(y_val, axis=1)

train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.long), torch.tensor(y_train_integers, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.long), torch.tensor(y_val_integers, dtype=torch.long))
train_loader = DataLoader(train_dataset, batch_size=494, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=494, shuffle=False)

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.01):
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

early_stopping = EarlyStopping(patience=5, min_delta=0.01)

num_epochs = 25
train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs)

plt.figure(figsize=(10, 5))
plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.show()

train_accuracy = evaluate_accuracy(model, train_loader)
val_accuracy = evaluate_accuracy(model, val_loader)
print(f'Training Accuracy: {train_accuracy:.4f}')
print(f'Validation Accuracy: {val_accuracy:.4f}')


def predict_sentiment(model, vocab, sentence):
    model.eval()
    tokens = sentence.lower().split()
    indices = [vocab.get(token, 0) for token in tokens]
    max_kernel_size = max(kernel_sizes)
    if len(indices) < max_kernel_size:
        indices.extend([0] * (max_kernel_size - len(indices)))
    tensor = torch.tensor(indices, dtype=torch.long).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(tensor)
        prediction = torch.argmax(output, axis=1).item()
    return prediction


