# Step 2: The text description generates the terrain distribution

In [1]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, classification_report
import gensim.downloader as api

## 1.0 Hyperparameters and Paths

In [2]:
SUM_DISTRIBUTION = 256
INTER_DIM =31
HIDDEN_DIM_1 = [64, 32, 16]
HIDDEN_DIM_2 = [128, 64, 32, 16, 32]
LABEL_NUM = 5
BATCH_SIZE_2 = 4
EPOCH_2 = 50
LEARNING_RATE_2 = 0.0005
TRAIN_PATH_2 = "../data/text/prompt_train.csv"
VALID_PATH_2 = "../data/text/prompt_valid.csv"
TEST_PATH_2 = "../data/text/prompt_test.csv"
SAVE_MODEL_PATH_1 = "../model/best_classifier.pth"
SAVE_MODEL_PATH_2 = "../model/best_generator.pth"
OUTPUT_PATH = "../test_distribution.csv"
PREDICTION_PATH = "../test_prediction.csv"
TRUE_LABEL_PATH = "../test_true_label.csv"

## 2.1 Load Data Model and Pre-process

### 2.1.1 Word2Vec (Only one of 2.1.1 and 2.1.2 can be chosen)

In [3]:
# Read CSV
train_df = pd.read_csv(TRAIN_PATH_2)
valid_df = pd.read_csv(VALID_PATH_2)
test_df = pd.read_csv(TEST_PATH_2)

# Load pre-trained Word2Vec model
word2vec_model = api.load("word2vec-google-news-300")

# Function to convert a sentence to a Word2Vec vector
def sentence_to_word2vec(sentence, model):
    words = sentence.split()
    word_vectors = [model[word] for word in words if word in model]
    if len(word_vectors) == 0:
        return np.zeros(300)  # Assuming Word2Vec vectors are 300-dimensional
    return np.mean(word_vectors, axis=0)

# Convert text data to Word2Vec vectors
X_train_word2vec = np.array([sentence_to_word2vec(sentence, word2vec_model) for sentence in train_df.iloc[:, 0]])
X_valid_word2vec = np.array([sentence_to_word2vec(sentence, word2vec_model) for sentence in valid_df.iloc[:, 0]])
X_test_word2vec = np.array([sentence_to_word2vec(sentence, word2vec_model) for sentence in test_df.iloc[:, 0]])

y_train = train_df.iloc[:, 1].values
y_valid = valid_df.iloc[:, 1].values
y_test = test_df.iloc[:, 1].values

# Convert Word2Vec vectors to tensors
X_train_word2vec = torch.FloatTensor(X_train_word2vec)
X_valid_word2vec = torch.FloatTensor(X_valid_word2vec)
X_test_word2vec = torch.FloatTensor(X_test_word2vec)
y_train = torch.LongTensor(y_train)
y_valid = torch.LongTensor(y_valid)
y_test = torch.LongTensor(y_test)

# Construct Dataloader
train_loader = DataLoader(TensorDataset(X_train_word2vec, y_train), batch_size=BATCH_SIZE_2, shuffle=True)
valid_loader = DataLoader(TensorDataset(X_valid_word2vec, y_valid), batch_size=BATCH_SIZE_2)
test_loader = DataLoader(TensorDataset(X_test_word2vec, y_test), batch_size=BATCH_SIZE_2)

INPUT_DIM = X_train_word2vec.shape[1]

print("Data loaders created successfully.")

Data loaders created successfully.


### 2.1.2 TF-IDF (Only one of 2.1.1 and 2.1.2 can be chosen)

In [4]:
# # # Read CSV
# train_df = pd.read_csv(TRAIN_PATH_2)
# valid_df = pd.read_csv(VALID_PATH_2)
# test_df = pd.read_csv(TEST_PATH_2)

# # Get TF-IDF vectors
# vectorizer = TfidfVectorizer()
# X_train_tfidf = vectorizer.fit_transform(train_df.iloc[:, 0])
# X_valid_tfidf = vectorizer.transform(valid_df.iloc[:, 0])
# X_test_tfidf = vectorizer.transform(test_df.iloc[:, 0])

# y_train = train_df.iloc[:, 1].values
# y_valid = valid_df.iloc[:, 1].values
# y_test = test_df.iloc[:, 1].values

# # Convert TF-IDF vectors to tensors
# X_train_tfidf = torch.FloatTensor(X_train_tfidf.toarray())
# X_valid_tfidf = torch.FloatTensor(X_valid_tfidf.toarray())
# X_test_tfidf = torch.FloatTensor(X_test_tfidf.toarray())
# y_train = torch.LongTensor(y_train)
# y_valid = torch.LongTensor(y_valid)
# y_test = torch.LongTensor(y_test)

# # Construct Dataloader
# train_loader = DataLoader(TensorDataset(X_train_tfidf, y_train), batch_size=BATCH_SIZE_2, shuffle=True)
# valid_loader = DataLoader(TensorDataset(X_valid_tfidf, y_valid), batch_size=BATCH_SIZE_2)
# test_loader = DataLoader(TensorDataset(X_test_tfidf, y_test), batch_size=BATCH_SIZE_2)

# INPUT_DIM = X_train_tfidf.shape[1]

# print("Data loaders created successfully.")

In [5]:
# Define the Class
class MLPClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim):
        super().__init__()
        layers = []
        for i in range(len(hidden_dims)):
            if i == 0:
                layers.append(nn.Linear(input_dim, hidden_dims[i]))
            else:
                layers.append(nn.Linear(hidden_dims[i-1], hidden_dims[i]))
            layers.append(nn.ReLU())
        
        # Add the final layer
        layers.append(nn.Linear(hidden_dims[-1], output_dim))
        
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

## 2.2 Define Distribution Generator

In [6]:
class GeneratorNetwork(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes=31, kernel_sizes=[3, 4, 5], num_filters=10, sum_distribution=256):
        super(GeneratorNetwork, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        self.convs = nn.ModuleList([
            nn.Conv2d(1, num_filters, (K, embed_dim)) for K in kernel_sizes
        ])
        
        self.fc = nn.Linear(len(kernel_sizes) * num_filters, num_classes)
        self.sum_distribution = sum_distribution

    def conv_and_pool(self, x, conv):
        x = F.relu(conv(x)).squeeze(3)  # (batch_size, num_filters, ~)
        x = F.max_pool1d(x, x.size(2)).squeeze(2)
        return x

    def forward(self, x):
        x = x.long()  # Ensure input is of type LongTensor
        x = self.embedding(x).unsqueeze(1)  # (batch_size, 1, seq_len, embed_dim)
        x = torch.cat([self.conv_and_pool(x, conv) for conv in self.convs], 1)
        x = self.fc(x)
        
        return x

# Example usage
vocab_size = 50  # Example vocabulary size
embed_dim = 256  # Example embedding dimension
# model = GeneratorNetwork(vocab_size, embed_dim)

## 2.3 Define the Integratede Classifier 

In [7]:
# Generator + Classifier
class IntegratedClassifier(nn.Module):
    def __init__(self, distribution_generator, distribution_classifier):
        super(IntegratedClassifier, self).__init__()
        self.distribution_generator = distribution_generator
        self.distribution_classifier = distribution_classifier
    
    def forward(self, x):
        distribution = self.distribution_generator(x)
        return self.distribution_classifier(distribution)

## 2.4 Define Train and Evaluation Function

In [8]:
def train_and_evaluate(model, criterion, optimizer, train_loader, valid_loader, num_epochs=EPOCH_2):
    best_valid_loss = float('inf')
    best_model = None
    train_losses, valid_losses = [], []
    train_accuracies, valid_accuracies = [], []
    
    for epoch in range(num_epochs):
        # Training
        model.train()
        epoch_train_loss = 0
        correct_train = 0
        total_train = 0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            epoch_train_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total_train += y_batch.size(0)
            correct_train += (predicted == y_batch).sum().item()
        
        train_losses.append(epoch_train_loss / len(train_loader))
        train_accuracy = correct_train / total_train
        train_accuracies.append(train_accuracy)
        
        # Validation
        model.eval()
        epoch_valid_loss = 0
        correct_valid = 0
        total_valid = 0
        with torch.no_grad():
            for X_batch, y_batch in valid_loader:
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                epoch_valid_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                total_valid += y_batch.size(0)
                correct_valid += (predicted == y_batch).sum().item()
            
            valid_losses.append(epoch_valid_loss / len(valid_loader))
            valid_accuracy = correct_valid / total_valid
            valid_accuracies.append(valid_accuracies)
            
            # Save Best Model
            if epoch_valid_loss < best_valid_loss:
                best_valid_loss = epoch_valid_loss
                best_model = model.state_dict()
        
        # The loss and accuracy for each epoch are output
        if (epoch + 1) % 10 == 0:
            print(f'Epoch {epoch + 1}/{num_epochs}:')
            print(f'Train Loss: {train_losses[-1]:.4f}, Train Accuracy: {train_accuracy:.4f}')
            print(f'Valid Loss: {valid_losses[-1]:.4f}, Valid Accuracy: {valid_accuracy:.4f}')
    
    return best_model, train_losses, valid_losses, train_accuracies, valid_accuracies

## 2.5 Train and Save Best Model

In [9]:
# Initialize model
distribution_generator = GeneratorNetwork(vocab_size, embed_dim)
# distribution_generator = GeneratorNetwork(INPUT_DIM, HIDDEN_DIM_2)
distribution_classifier = MLPClassifier(INTER_DIM, HIDDEN_DIM_1, LABEL_NUM)
distribution_classifier.load_state_dict(torch.load(SAVE_MODEL_PATH_1))
integrated_classifier = IntegratedClassifier(distribution_generator, distribution_classifier)
integrated_classifier.distribution_classifier.requires_grad_(False)

# Optimization
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(integrated_classifier.parameters(), lr=LEARNING_RATE_2)

# Train the model
best_model, train_losses, valid_losses, train_accuracies, valid_accuracies = train_and_evaluate(integrated_classifier, criterion, optimizer, train_loader, valid_loader)

# Save the best model
save_path = SAVE_MODEL_PATH_2
torch.save(best_model, save_path)
print(f"The model has been saved to {save_path}")

Epoch 10/50:
Train Loss: 1.6102, Train Accuracy: 0.2011
Valid Loss: 1.6100, Valid Accuracy: 0.2000
Epoch 20/50:
Train Loss: 1.6101, Train Accuracy: 0.2011
Valid Loss: 1.6111, Valid Accuracy: 0.2000
Epoch 30/50:
Train Loss: 1.6098, Train Accuracy: 0.2011
Valid Loss: 1.6110, Valid Accuracy: 0.2000
Epoch 40/50:
Train Loss: 1.6095, Train Accuracy: 0.2011
Valid Loss: 1.6108, Valid Accuracy: 0.2000
Epoch 50/50:
Train Loss: 1.6096, Train Accuracy: 0.2011
Valid Loss: 1.6111, Valid Accuracy: 0.2000
The model has been saved to ../model/best_generator.pth


In [10]:
# # Training Process Visualization
# plt.figure(figsize=(12, 5))

# plt.plot(train_losses, label='Train Loss')
# plt.plot(valid_losses, label='Valid Loss')
# plt.xlabel('Epoch')
# plt.ylabel('Loss')
# plt.title('Loss per Epoch')
# plt.legend()
# plt.show()

# plt.plot(train_accuracies, label='Train Accuracy')
# plt.plot(valid_accuracies, label='Valid Accuracy')
# plt.xlabel('Epoch')
# plt.ylabel('Accuracy')
# plt.title('Accuracy per Epoch')
# plt.legend()
# plt.show()

## 2.6 Test

In [11]:
integrated_classifier.load_state_dict(best_model)

integrated_classifier.eval()
all_preds = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = integrated_classifier(X_batch)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())

accuracy = accuracy_score(y_test, all_preds)
print(f'Accuracy: {accuracy}')
print(classification_report(y_test, all_preds))

Accuracy: 0.2
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        10
           1       0.00      0.00      0.00        10
           2       0.20      1.00      0.33        10
           3       0.00      0.00      0.00        10
           4       0.00      0.00      0.00        10

    accuracy                           0.20        50
   macro avg       0.04      0.20      0.07        50
weighted avg       0.04      0.20      0.07        50



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [12]:
# class_correct = [0] * 5
# class_total = [0] * 5
# for i in range(len(y_test)):
#     label = y_test[i]
#     class_correct[label] += (all_preds[i] == label)
#     class_total[label] += 1

# class_accuracy = [c / t for c, t in zip(class_correct, class_total)]

# plt.bar(range(5), class_accuracy, color='blue', alpha=0.6, label='Correct')
# plt.xlabel('Class')
# plt.ylabel('Accuracy')
# plt.title('Per-Class Accuracy')
# plt.show()

## 2.7 Generate Distribution

In [13]:
# Assuming `test_loader` is already defined and loaded with test data
def get_generator_output(test_loader, generator_model, sum_distribution=256):
    generator_model.eval()
    all_outputs = []
    
    with torch.no_grad():
        for x_batch, _ in test_loader:
            x = generator_model(x_batch)
            x = torch.softmax(x, dim=1) * sum_distribution
            x = torch.round(x)
            
            sum_x = x.sum(dim=1, keepdim=True)
            diff = sum_distribution - sum_x

            for i in range(x.size(0)):
                while diff[i] != 0:
                    if diff[i] > 0:
                        idx = torch.argmin(x[i])
                        x[i][idx] += 1
                        diff[i] -= 1
                    else:
                        idx = torch.argmax(x[i])
                        x[i][idx] -= 1
                        diff[i] += 1
            
            all_outputs.append(x)
    
    return torch.cat(all_outputs, dim=0)

def save_output_to_csv_tensor(output_tensor, filename):
    output_df = pd.DataFrame(output_tensor.cpu().numpy())
    output_df.to_csv(filename, index=False)
    print(f"Output saved to {filename}")
    
def save_output_to_csv_else(output, filename):
    output_df = pd.DataFrame(output)
    output_df.to_csv(filename, index=False)
    print(f"Output saved to {filename}")

In [14]:
# Assuming `tfidf_network` and `distribution_classifier` are already defined and loaded with the trained models
test_output = get_generator_output(test_loader, integrated_classifier.distribution_generator)

# Save the output to CSV
save_output_to_csv_tensor(test_output, OUTPUT_PATH)
save_output_to_csv_else(all_preds, PREDICTION_PATH)
save_output_to_csv_else(y_test, TRUE_LABEL_PATH)

Output saved to ../test_distribution.csv
Output saved to ../test_prediction.csv
Output saved to ../test_true_label.csv
