<a href="https://colab.research.google.com/github/Andicleomj/Machine-Learning/blob/main/Week14/Markov%20model%20dan%20Hidden%20Markov%20Model/Bank%20Marketing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Rizki Aprilia Rahman
#### 1103213007
#### Week 14 Markov model dan Hidden Markov Model

In [1]:
# Install PyTorch (skip if already installed)
!pip install torch torchvision torchaudio --quiet

In [2]:
# Import libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

In [3]:
# Load dataset
def load_data():
    url = "/content/sample_data/bank-full.csv"
    df = pd.read_csv(url, sep=';')

    # Preprocess data
    X = df.drop(columns=['y'])
    y = df['y']

    # Convert categorical to numerical
    X = pd.get_dummies(X)
    le = LabelEncoder()
    y = le.fit_transform(y)

    # Standardize features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    return train_test_split(X, y, test_size=0.2, random_state=42)


In [4]:
# Create custom dataset class
class BankDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32).unsqueeze(1)  # Add sequence length dimension
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [5]:
# Define RNN model
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1, pooling='max'):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.pooling = pooling

        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        if self.pooling == 'max':
            out, _ = torch.max(out, 1)
        elif self.pooling == 'avg':
            out = torch.mean(out, 1)
        out = self.fc(out)
        return out

In [6]:
# Define Hidden Markov Model (HMM) Class
class HMMModel(nn.Module):
    def __init__(self, n_states, n_features):
        super(HMMModel, self).__init__()
        self.n_states = n_states
        self.n_features = n_features

        self.transition = nn.Parameter(torch.randn(n_states, n_states))  # Transition probabilities
        self.emission = nn.Parameter(torch.randn(n_states, n_features))  # Emission probabilities
        self.initial = nn.Parameter(torch.randn(n_states))  # Initial state probabilities

    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        log_alpha = self.initial.unsqueeze(0).expand(batch_size, -1)

        for t in range(seq_len):
            obs = x[:, t, :]
            log_alpha = torch.logsumexp(
                log_alpha.unsqueeze(2) + self.transition + obs.unsqueeze(1) @ self.emission.T, dim=1
            )

        return log_alpha

In [7]:
# Train function
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
    train_loader, val_loader = dataloaders
    model = model.to(device)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        scheduler.step()

        val_loss = 0.0
        val_preds = []
        val_labels = []

        model.eval()
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)

                preds = torch.argmax(outputs, dim=1)
                val_preds.extend(preds.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())

        train_loss = running_loss / len(train_loader.dataset)
        val_loss = val_loss / len(val_loader.dataset)
        val_acc = accuracy_score(val_labels, val_preds)

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    return model


In [8]:
# Main Experiment
if __name__ == "__main__":
    X_train, X_test, y_train, y_test = load_data()
    train_dataset = BankDataset(X_train, y_train)
    val_dataset = BankDataset(X_test, y_test)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

    dataloaders = (train_loader, val_loader)

    input_size = X_train.shape[1]
    output_size = len(np.unique(y_train))

    hidden_sizes = [16, 32, 64]
    poolings = ['max', 'avg']
    optimizers = {'SGD': optim.SGD, 'RMSProp': optim.RMSprop, 'Adam': optim.Adam}
    epochs_list = [5, 50, 100, 250, 350]

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    results = []

    for hidden_size in hidden_sizes:
        for pooling in poolings:
            for optimizer_name, optimizer_fn in optimizers.items():
                for num_epochs in epochs_list:
                    print(f"\nHidden Size: {hidden_size}, Pooling: {pooling}, Optimizer: {optimizer_name}, Epochs: {num_epochs}")

                    model = RNNModel(input_size, hidden_size, output_size, pooling=pooling)
                    criterion = nn.CrossEntropyLoss()
                    optimizer = optimizer_fn(model.parameters(), lr=0.01)
                    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

                    trained_model = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs)

                    # Evaluate on test set
                    test_preds = []
                    test_labels = []

                    trained_model.eval()
                    with torch.no_grad():
                        for inputs, labels in val_loader:
                            inputs, labels = inputs.to(device), labels.to(device)
                            outputs = trained_model(inputs)
                            preds = torch.argmax(outputs, dim=1)
                            test_preds.extend(preds.cpu().numpy())
                            test_labels.extend(labels.cpu().numpy())

                    test_acc = accuracy_score(test_labels, test_preds)
                    print(f"Test Accuracy: {test_acc:.4f}")

                    results.append((hidden_size, pooling, optimizer_name, num_epochs, test_acc))


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch 225/250, Train Loss: 0.1406, Val Loss: 0.2708, Val Acc: 0.8944
Epoch 226/250, Train Loss: 0.1407, Val Loss: 0.2713, Val Acc: 0.8952
Epoch 227/250, Train Loss: 0.1406, Val Loss: 0.2708, Val Acc: 0.8945
Epoch 228/250, Train Loss: 0.1406, Val Loss: 0.2709, Val Acc: 0.8952
Epoch 229/250, Train Loss: 0.1407, Val Loss: 0.2705, Val Acc: 0.8957
Epoch 230/250, Train Loss: 0.1406, Val Loss: 0.2709, Val Acc: 0.8941
Epoch 231/250, Train Loss: 0.1406, Val Loss: 0.2713, Val Acc: 0.8940
Epoch 232/250, Train Loss: 0.1406, Val Loss: 0.2709, Val Acc: 0.8945
Epoch 233/250, Train Loss: 0.1406, Val Loss: 0.2709, Val Acc: 0.8958
Epoch 234/250, Train Loss: 0.1405, Val Loss: 0.2714, Val Acc: 0.8938
Epoch 235/250, Train Loss: 0.1406, Val Loss: 0.2708, Val Acc: 0.8946
Epoch 236/250, Train Loss: 0.1406, Val Loss: 0.2712, Val Acc: 0.8947
Epoch 237/250, Train Loss: 0.1406, Val Loss: 0.2712, Val Acc: 0.8937
Epoch 238/250, Train Loss: 0.1406, Val

In [10]:
# HMM Experiment
print("\nRunning Hidden Markov Model (HMM) Experiment")
hmm_model = HMMModel(n_states=4, n_features=input_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(hmm_model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

trained_hmm_model = train_model(hmm_model, dataloaders, criterion, optimizer, scheduler, num_epochs=100)



Running Hidden Markov Model (HMM) Experiment
Epoch 1/100, Train Loss: 1.1313, Val Loss: 0.2644, Val Acc: 0.8985
Epoch 2/100, Train Loss: 0.2523, Val Loss: 0.2597, Val Acc: 0.8959
Epoch 3/100, Train Loss: 0.2491, Val Loss: 0.2547, Val Acc: 0.8994
Epoch 4/100, Train Loss: 0.2494, Val Loss: 0.2511, Val Acc: 0.9008
Epoch 5/100, Train Loss: 0.2485, Val Loss: 0.2542, Val Acc: 0.8969
Epoch 6/100, Train Loss: 0.2484, Val Loss: 0.2564, Val Acc: 0.8952
Epoch 7/100, Train Loss: 0.2470, Val Loss: 0.2653, Val Acc: 0.8959
Epoch 8/100, Train Loss: 0.2487, Val Loss: 0.2547, Val Acc: 0.8974
Epoch 9/100, Train Loss: 0.2497, Val Loss: 0.2522, Val Acc: 0.8977
Epoch 10/100, Train Loss: 0.2481, Val Loss: 0.2529, Val Acc: 0.8994
Epoch 11/100, Train Loss: 0.2479, Val Loss: 0.2518, Val Acc: 0.8994
Epoch 12/100, Train Loss: 0.2484, Val Loss: 0.2587, Val Acc: 0.8972
Epoch 13/100, Train Loss: 0.2480, Val Loss: 0.2552, Val Acc: 0.8965
Epoch 14/100, Train Loss: 0.2482, Val Loss: 0.2532, Val Acc: 0.8966
Epoch 15/10