In [None]:
import numpy as np
import utilities as ut
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import LabelEncoder

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# Change dataset name if needed
dataset = 'UJI'
data = np.load(f'data/dataset_{dataset}.npz', allow_pickle=True)
pred_w = 'floor' ## 'block' or 'floor','building'

# Extract contents
X_train_seq = data['X_train_seq']
Y_train_seq = data['Y_train_seq']
X_test_seq  = data['X_test_seq']
Y_test_seq  = data['Y_test_seq']
X_test_row = data['X_test']
Y_test_row = data['Y_test']
block_info  = data['block_info'][0]  # it's stored as an object array

In [None]:
if pred_w == 'block':
    pred_col = -1
elif pred_w == 'floor':
    pred_col = 0
elif pred_w == 'building':
    pred_col = 1

# Extract block ID from last column
y_train_ids = Y_train_seq[:, pred_col].astype(int)
y_test_ids  = Y_test_seq[:, pred_col].astype(int)


# 4. Encode block IDs
encoder = LabelEncoder()
encoder.fit(Y_train_seq[:, pred_col])  # use full label list (not just train)

# Now transform using safe encoder
y_train = torch.tensor(encoder.transform(y_train_ids), dtype=torch.long)
y_test   = torch.tensor(encoder.transform(y_test_ids), dtype=torch.long)

# # Encode labels to class indices
# encoder = LabelEncoder()
# y_train = torch.tensor(encoder.fit_transform(y_train_ids), dtype=torch.long)
# y_test  = torch.tensor(encoder.transform(y_test_ids), dtype=torch.long)

In [None]:
# Convert features to tensor
X_train = torch.tensor(X_train_seq, dtype=torch.float32)
X_test  = torch.tensor(X_test_seq, dtype=torch.float32)

In [None]:
# Create DataLoaders
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=64, shuffle=True)
test_loader  = DataLoader(TensorDataset(X_test, y_test), batch_size=64)

In [None]:
# ====================
# Model Definition
# ====================
class LSTMBlockClassifier(nn.Module):
    def __init__(self, input_size, hidden_size=256, fc_size=256, num_classes=100):
        super(LSTMBlockClassifier, self).__init__()
        self.lstm = nn.LSTM(input_size=input_size,
                            hidden_size=hidden_size,
                            batch_first=True)
        self.fc1 = nn.Linear(hidden_size, fc_size)
        self.fc2 = nn.Linear(fc_size, num_classes)

    def forward(self, x):
        mean = x.mean(dim=2, keepdim=True)        # mean over features per time step
        std = x.std(dim=2, keepdim=True) + 1e-8   # std over features per time step
        x = (x - mean) / std
        out, (h_n, _) = self.lstm(x)
        h_last = h_n.squeeze(0)
        x = F.relu(self.fc1(h_last))
        x = self.fc2(x)
        return x

In [None]:
# ====================
# Training & Evaluation
# ====================

input_size = X_train.shape[2]
num_classes = len(encoder.classes_)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LSTMBlockClassifier(input_size=input_size, num_classes=num_classes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

In [None]:
# Training loop
epochs = 30
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        preds = model(xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} | Loss: {total_loss:.4f}")

In [None]:
# === Evaluation: Block Accuracy + Mean Localization Error (MLE) ===
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        preds = model(xb).argmax(dim=1)
        all_preds.append(preds.cpu().numpy())
        all_labels.append(yb.cpu().numpy())

# Combine predictions and labels
all_preds = np.concatenate(all_preds)
all_labels = np.concatenate(all_labels)

In [None]:
# === Classification Accuracy
num_correct = np.sum(all_preds == all_labels)
accuracy = 100 * num_correct / len(all_labels)
print(f"\n Block Prediction Accuracy: {accuracy:.2f}%")

In [None]:
# === Convert class indices back to original block IDs
pred_block_ids = encoder.inverse_transform(all_preds)
true_block_ids = Y_test_seq[:, -1].astype(int)  # ground truth from sequence data

# === Compute (x, y) for predicted and true blocks
pred_coords = np.array([ [block_info[b]['x'], block_info[b]['y']] for b in pred_block_ids ])
# true_coords = np.array([ [block_info[b]['x'], block_info[b]['y']] for b in true_block_ids ])
true_coords = Y_test_seq[:, -4:-2]  # assuming these are [x, y] columns

# === Euclidean Distance Error
errors = np.linalg.norm(pred_coords - true_coords, axis=1)
mean_error = np.mean(errors)

print(f" Mean Localization Error (MLE): {mean_error:.2f} meters")