In [19]:
import torch
import torch.nn as nn
import torch.optim as optim
import re
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score

# Read data, first line of text, second line of labels
def load_data(filename):
    texts = []
    labels = []
    with open(filename, 'r', encoding='utf-8') as f:
        lines = f.readlines()
        for i in range(0, len(lines), 2):  # Process every two lines together
            text = lines[i].strip()
            label = lines[i+1].strip()
            try:
                labels.append(int(label))  # Attempts to convert the label to an integer
                texts.append(text)
            except ValueError:
                print(f"Skipping invalid label: {label}")
    return texts, labels

# Word segmentation of text
def tokenize(text):
    return re.findall(r'\w+', text.lower())

# Building a vocabulary
def build_vocab(texts):
    vocab = {}
    idx = 0
    for text in texts:
        tokens = tokenize(text)
        for token in tokens:
            if token not in vocab:
                vocab[token] = idx
                idx += 1
    return vocab

# Convert text to a feature vector
def text_to_features(texts, vocab):
    features = []
    for text in texts:
        tokens = tokenize(text)
        token_ids = [vocab.get(token, -1) for token in tokens if token in vocab]
        features.append(token_ids)
    return features

# Fill sequence
def pad_sequences(sequences, maxlen):
    return torch.tensor([seq + [0] * (maxlen - len(seq)) for seq in sequences])

# Loading training and test data
train_texts, train_labels = load_data('train.txt')
test_texts, test_labels = load_data('test.txt')

# Building a vocabulary
vocab = build_vocab(train_texts)

# Convert text to features
train_features = text_to_features(train_texts, vocab)
test_features = text_to_features(test_texts, vocab)

# Label number
le = LabelEncoder()
train_labels = torch.tensor(le.fit_transform(train_labels))
test_labels = torch.tensor(le.transform(test_labels))

# Determines the maximum sequence length and fills the
maxlen = max(max(len(seq) for seq in train_features), max(len(seq) for seq in test_features))
train_features = pad_sequences(train_features, maxlen)
test_features = pad_sequences(test_features, maxlen)

In [20]:
class GGNN(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_classes):
        super(GGNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.gru = nn.GRU(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        embedded = self.embedding(x)  # Word embedding
        output, hidden = self.gru(embedded)  # GRU updates the node representation
        hidden = hidden.squeeze(0)
        return self.fc(hidden)


In [21]:
# Training models
def train_model(model, train_features, train_labels, epochs=20, batch_size=32):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for i in range(0, len(train_features), batch_size):
            batch_features = train_features[i:i+batch_size]
            batch_labels = train_labels[i:i+batch_size]
            
            optimizer.zero_grad()
            output = model(batch_features)
            loss = loss_fn(output, batch_labels)
            loss.backward()
            optimizer.step()
            
            # Calculate the loss
            total_loss += loss.item()
            
            # The accuracy and F 1 were calculated for each batch
            preds = torch.argmax(output, dim=1).cpu().numpy()
            labels = batch_labels.cpu().numpy()
            batch_acc = accuracy_score(labels, preds)
            batch_f1 = f1_score(labels, preds, average='macro')
            print(f'Epoch {epoch+1}, Batch {i//batch_size+1}: ACC = {batch_acc:.4f}, F1 = {batch_f1:.4f}')
        
        # Output the average loss per epoch
        print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_features):.4f}')


# Testing the model
def test_model(model, test_features, test_labels, batch_size=32):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for i in range(0, len(test_features), batch_size):
            batch_features = test_features[i:i+batch_size]
            batch_labels = test_labels[i:i+batch_size]
            
            output = model(batch_features)
            preds = torch.argmax(output, dim=1).cpu().numpy()
            labels = batch_labels.cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)
            
            # Print the accuracy and F 1 for each batch
            batch_acc = accuracy_score(labels, preds)
            batch_f1 = f1_score(labels, preds, average='macro')
            print(f'Batch {i//batch_size+1}: ACC = {batch_acc:.4f}, F1 = {batch_f1:.4f}')
    
    # Calculate the accuracy of the overall test set and F1
    test_acc = accuracy_score(all_labels, all_preds)
    test_f1 = f1_score(all_labels, all_preds, average='macro')
    print(f'Test Accuracy: {test_acc:.4f}, F1-score: {test_f1:.4f}')


In [22]:
# Model parameters
vocab_size = len(vocab)
embed_size = 64
hidden_size = 128
num_classes = len(set(train_labels.tolist()))

# Initialize the model
model = GGNN(vocab_size, embed_size, hidden_size, num_classes)

# Loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training models
train_model(model, train_features, train_labels, epochs=20, batch_size=32)

# Testing the model
test_model(model, test_features, test_labels, batch_size=32)


