In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np
import pandas as pd
import random
import os

os.chdir('C:\\Users\\david\\Desktop\\대학원\\Individual_project\\mbti_project\\MBTI&BigFive_data\\전처리데이터\\240322시험')

## train_test_split (8:2)

In [8]:
df = pd.read_csv("MBTI_reprepro.csv")
features = np.load("MBTICLS.npy")

# 데이터를 8:2로 분할, stratify를 적용하여 'label' 열 기준으로 분할
df_train, df_test, features_train, features_test = train_test_split(
    df, features, test_size=0.2, stratify=df['type'], random_state=42)

# 분할된 데이터셋과 특성 저장
df_train.to_csv("MBTI_train.csv", index=False, encoding = 'utf-8-sig')
df_test.to_csv("MBTI_test.csv", index=False, encoding = 'utf-8-sig')
np.save("MBTI_train.npy", features_train)
np.save("MBTI_test.npy", features_test)

print("Test dataset and features have been created and saved.")

Test dataset and features have been created and saved.


## Train/Validate (8:2)

In [5]:
class CNNClassifier(nn.Module):
    def __init__(self, num_classes, input_size, kernel_size=3, dropout_rate=0.5):
        super(CNNClassifier, self).__init__()
        self.conv = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=kernel_size)
        self.bn = nn.BatchNorm1d(64)  # Batch Normalization
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.pool = nn.MaxPool1d(kernel_size=kernel_size)
        # Dynamic calculation of FC layer input size
        self.fc_input_size = self._calculate_fc_input_size(input_size, kernel_size)
        self.fc = nn.Linear(self.fc_input_size, num_classes)
    
    def _calculate_fc_input_size(self, input_size, kernel_size):
        size = input_size
        size = (size - (kernel_size - 1) - 1) + 1  # Conv1d output size
        size = size // kernel_size  # MaxPool1d output size
        size = size * 64  # Considering the number of output channels from Conv1d
        return size

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

def load_data(file_path, feature_path):
    df = pd.read_csv(file_path)
    labels = df['type'].apply(lambda x: 1 if 'P' in x else 0).tolist()
    features = np.load(feature_path)
    return labels, features

def build_dataloader(X, y, batch_size):
    tensor_x = torch.tensor(X).float()
    tensor_y = torch.tensor(y).long()
    dataset = TensorDataset(tensor_x, tensor_y)
    dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size)
    return dataloader

def set_seed(seed_value=42):
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)
    os.environ['PYTHONHASHSEED'] = str(seed_value)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

def train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, device, epochs, early_stopping_patience=10):
    best_val_loss = float('inf')
    early_stopping_counter = 0

    for epoch in range(epochs):
        model.train()
        train_loss, train_correct, total_train = 0, 0, 0
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, pred = torch.max(output, dim=1)
            train_correct += (pred == target).sum().item()
            total_train += target.size(0)
        
        train_accuracy = train_correct / total_train
        
        val_loss, val_correct, total_val = 0, 0, 0
        model.eval()
        with torch.inference_mode():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                loss = criterion(output, target)
                val_loss += loss.item()
                _, pred = torch.max(output, dim=1)
                val_correct += (pred == target).sum().item()
                total_val += target.size(0)
                
        val_accuracy = val_correct / total_val
        
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss/len(train_loader):.4f}, Train Acc: {train_accuracy:.4f}, Val Loss: {val_loss/len(val_loader):.4f}, Val Acc: {val_accuracy:.4f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stopping_counter = 0
            # Save the best model
            torch.save(model.state_dict(), "best_model.pth")
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= early_stopping_patience:
                print("Early stopping triggered.")
                break

def process_data_and_train_model(file_path, feature_path, batch_size, epochs, lr, device, seed=42):
    set_seed(seed)  # Set the seed for reproducibility

    # Load data
    labels, features = load_data(file_path, feature_path)

    # Split the data into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(features, labels, test_size=0.2, random_state=seed, stratify=labels)

    # Create dataloaders for training and validation sets
    train_loader = build_dataloader(X_train, y_train, batch_size)
    val_loader = build_dataloader(X_val, y_val, batch_size)

    # Model configuration
    num_classes = 2
    input_size = features.shape[1]  # Dynamic input size based on features
    model = CNNClassifier(num_classes=num_classes, input_size=input_size).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-2)  # Added weight decay for regularization

    # Train and evaluate the model
    train_and_evaluate(model, train_loader, val_loader, criterion, optimizer, device, epochs)

# Assuming the paths to the dataset and features are provided correctly
file_path = 'E&I_train.csv'
feature_path = 'E&I_train.npy'
batch_size = 16
epochs = 30
lr = 2e-5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#action
process_data_and_train_model(file_path, feature_path, batch_size, epochs, lr, device)

Epoch 1/30, Train Loss: 0.7896, Train Acc: 0.4882, Val Loss: 0.6891, Val Acc: 0.5296
Epoch 2/30, Train Loss: 0.7494, Train Acc: 0.5126, Val Loss: 0.6840, Val Acc: 0.5592
Epoch 3/30, Train Loss: 0.7454, Train Acc: 0.5292, Val Loss: 0.6978, Val Acc: 0.4970
Epoch 4/30, Train Loss: 0.7219, Train Acc: 0.5473, Val Loss: 0.6726, Val Acc: 0.6036
Epoch 5/30, Train Loss: 0.7055, Train Acc: 0.5558, Val Loss: 0.6646, Val Acc: 0.5947
Epoch 6/30, Train Loss: 0.7163, Train Acc: 0.5514, Val Loss: 0.6674, Val Acc: 0.6139
Epoch 7/30, Train Loss: 0.6941, Train Acc: 0.5828, Val Loss: 0.6801, Val Acc: 0.5740
Epoch 8/30, Train Loss: 0.6882, Train Acc: 0.5891, Val Loss: 0.6928, Val Acc: 0.5340
Epoch 9/30, Train Loss: 0.6906, Train Acc: 0.5825, Val Loss: 0.6612, Val Acc: 0.6154
Epoch 10/30, Train Loss: 0.6925, Train Acc: 0.5821, Val Loss: 0.6652, Val Acc: 0.6243
Epoch 11/30, Train Loss: 0.6872, Train Acc: 0.5869, Val Loss: 0.6621, Val Acc: 0.6228
Epoch 12/30, Train Loss: 0.6838, Train Acc: 0.5828, Val Loss: 0

## Test

In [6]:
class CNNClassifier(nn.Module):
    def __init__(self, num_classes, input_size, kernel_size=3, dropout_rate=0.5):
        super(CNNClassifier, self).__init__()
        self.conv = nn.Conv1d(in_channels=1, out_channels=64, kernel_size=kernel_size)
        self.bn = nn.BatchNorm1d(64)  # Batch Normalization
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.pool = nn.MaxPool1d(kernel_size=kernel_size)
        # Dynamic calculation of FC layer input size
        self.fc_input_size = self._calculate_fc_input_size(input_size, kernel_size)
        self.fc = nn.Linear(self.fc_input_size, num_classes)
    
    def _calculate_fc_input_size(self, input_size, kernel_size):
        size = input_size
        size = (size - (kernel_size - 1) - 1) + 1  # Conv1d output size
        size = size // kernel_size  # MaxPool1d output size
        size = size * 64  # Considering the number of output channels from Conv1d
        return size

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

def load_data(file_path, feature_path):
    df = pd.read_csv(file_path)
    labels = df['type'].apply(lambda x: 1 if 'E' in x else 0).tolist()
    features = np.load(feature_path)
    return labels, features

def build_dataloader(X, y, batch_size):
    tensor_x = torch.tensor(X).float()
    tensor_y = torch.tensor(y).long()
    dataset = TensorDataset(tensor_x, tensor_y)
    dataloader = DataLoader(dataset, shuffle=True, batch_size=batch_size)
    return dataloader

def load_best_model(model_path, num_classes, input_size, device):
    model = CNNClassifier(num_classes=num_classes, input_size=input_size).to(device)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    return model

def evaluate(model, data_loader, device):
    model.eval()
    total_correct, total = 0, 0
    all_predictions = []
    all_targets = []

    with torch.inference_mode():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, pred = torch.max(output, dim=1)
            total_correct += (pred == target).sum().item()
            total += target.size(0)
            all_predictions.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    accuracy = total_correct / total
    precision = precision_score(all_targets, all_predictions)
    recall = recall_score(all_targets, all_predictions)
    f1 = f1_score(all_targets, all_predictions)

    return accuracy, precision, recall, f1

def test_model(test_file_path, test_feature_path, batch_size, model_path, device):
    test_labels, test_features = load_data(test_file_path, test_feature_path)
    test_loader = build_dataloader(test_features, test_labels, batch_size)
    num_classes = 2
    input_size = test_features.shape[1]

    # Load the best model from training
    best_model = load_best_model(model_path, num_classes, input_size, device)

    # Evaluate the model on the test set with additional metrics
    accuracy, precision, recall, f1 = evaluate(best_model, test_loader, device)
    print(f"Test Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1:.4f}")

# Assuming the paths to the test dataset and features, and the path to the best model are provided correctly
test_file_path = 'E&I_test.csv'
test_feature_path = "E&I_test.npy"
model_path = "best_model.pth"
batch_size = 16
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Execute the testing of the model with the best validation weights
test_model(test_file_path, test_feature_path, batch_size, model_path, device)

Test Accuracy: 0.5964, Precision: 0.5890, Recall: 0.4812, F1-Score: 0.5297
