In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader, TensorDataset


In [2]:
# Load Dataset
data = pd.read_csv('winequality-white.csv', delimiter=';')


In [3]:
# Preprocess Data
X = data.drop('quality', axis=1).values
y = data['quality'].values


In [4]:
# Encode labels
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

In [5]:
# Normalize features
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [6]:
# Split Data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [7]:
# Convert to PyTorch Tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [8]:
# Define Bidirectional RNN Model
class BiRNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, pooling_type):
        super(BiRNNModel, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True, bidirectional=True)
        self.pooling_type = pooling_type
        self.fc = nn.Linear(hidden_size * 2, output_size)

    def forward(self, x):
        rnn_out, _ = self.rnn(x)
        if self.pooling_type == 'max':
            pooled, _ = torch.max(rnn_out, dim=1)
        elif self.pooling_type == 'avg':
            pooled = torch.mean(rnn_out, dim=1)
        else:
            raise ValueError("Pooling type must be 'max' or 'avg'")
        out = self.fc(pooled)
        return out

In [9]:
# Training and Evaluation Function
def train_and_evaluate(model, optimizer, criterion, train_loader, test_loader, num_epochs, scheduler=None, early_stopping_patience=10):
    best_acc = 0
    patience_counter = 0
    for epoch in range(num_epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch.unsqueeze(1))
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
        if scheduler:
            scheduler.step()

        # Evaluation
        model.eval()
        y_pred = []
        y_true = []
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                outputs = model(X_batch.unsqueeze(1))
                _, preds = torch.max(outputs, dim=1)
                y_pred.extend(preds.numpy())
                y_true.extend(y_batch.numpy())
        acc = accuracy_score(y_true, y_pred)

        if acc > best_acc:
            best_acc = acc
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= early_stopping_patience:
            print(f"Early stopping at epoch {epoch + 1}")
            break

        print(f"Epoch {epoch + 1}/{num_epochs}, Accuracy: {acc:.4f}")

    return best_acc


In [10]:
# Experiment Parameters
hidden_sizes = [16, 32, 64]
pooling_types = ['max', 'avg']
num_epochs_list = [5, 50, 100, 250, 350]
optimizers = [optim.SGD, optim.RMSprop, optim.Adam]

input_size = X_train.shape[1]
output_size = len(np.unique(y_train))

results = []

In [11]:
# Run Experiments for Bidirectional RNN
for hidden_size in hidden_sizes:
    for pooling_type in pooling_types:
        for num_epochs in num_epochs_list:
            for optimizer_fn in optimizers:
                print(f"Running: hidden_size={hidden_size}, pooling={pooling_type}, epochs={num_epochs}, optimizer={optimizer_fn.__name__}")

                model = BiRNNModel(input_size, hidden_size, output_size, pooling_type)

                optimizer = optimizer_fn(model.parameters(), lr=0.01)
                criterion = nn.CrossEntropyLoss()

                scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

                best_acc = train_and_evaluate(
                    model, optimizer, criterion, train_loader, test_loader, num_epochs, scheduler
                )

                results.append({
                    'model': 'BiRNN',
                    'hidden_size': hidden_size,
                    'pooling': pooling_type,
                    'epochs': num_epochs,
                    'optimizer': optimizer_fn.__name__,
                    'accuracy': best_acc
                })

# Convert Results to DataFrame
results_df = pd.DataFrame(results)
print(results_df)

# Save Results to CSV
results_df.to_csv('experiment_results_birnn.csv', index=False)

Running: hidden_size=16, pooling=max, epochs=5, optimizer=SGD
Epoch 1/5, Accuracy: 0.4500
Epoch 2/5, Accuracy: 0.4582
Epoch 3/5, Accuracy: 0.4704
Epoch 4/5, Accuracy: 0.4755
Epoch 5/5, Accuracy: 0.4878
Running: hidden_size=16, pooling=max, epochs=5, optimizer=RMSprop
Epoch 1/5, Accuracy: 0.4949
Epoch 2/5, Accuracy: 0.5306
Epoch 3/5, Accuracy: 0.5133
Epoch 4/5, Accuracy: 0.5418
Epoch 5/5, Accuracy: 0.5459
Running: hidden_size=16, pooling=max, epochs=5, optimizer=Adam
Epoch 1/5, Accuracy: 0.5102
Epoch 2/5, Accuracy: 0.5306
Epoch 3/5, Accuracy: 0.5163
Epoch 4/5, Accuracy: 0.5286
Epoch 5/5, Accuracy: 0.5480
Running: hidden_size=16, pooling=max, epochs=50, optimizer=SGD
Epoch 1/50, Accuracy: 0.4398
Epoch 2/50, Accuracy: 0.4622
Epoch 3/50, Accuracy: 0.4745
Epoch 4/50, Accuracy: 0.4837
Epoch 5/50, Accuracy: 0.4959
Epoch 6/50, Accuracy: 0.5020
Epoch 7/50, Accuracy: 0.5071
Epoch 8/50, Accuracy: 0.5082
Epoch 9/50, Accuracy: 0.5102
Epoch 10/50, Accuracy: 0.5163
Epoch 11/50, Accuracy: 0.5184
Epoch