CNN-LSTM

In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

def sliding_window_dataset(rssi_data, distances, window_size=10):

    X, y = [], []
    for i in range(len(rssi_data) - window_size + 1):
        X.append(rssi_data[i:i+window_size])
        y.append(distances[i+window_size-1])
    return np.array(X), np.array(y)

class RSSIDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32).unsqueeze(1)  # Shape: (N, 1, 10)
        self.y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)  # Shape: (N, 1)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class CNN_LSTM_Model(nn.Module):
    def __init__(self):
        super(CNN_LSTM_Model, self).__init__()
        # 1D CNN: in_channels=1, out_channels=32, kernel_size=3, padding=1
        self.cnn = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        # LSTM: input_size=32, hidden_size=64, num_layers=5
        self.lstm = nn.LSTM(input_size=32, hidden_size=64, num_layers=5,
                            batch_first=True, dropout=0.2)
        # Fully Connected Layers
        self.fc1 = nn.Linear(64, 128)
        self.fc2 = nn.Linear(128, 1)
    def forward(self, x):
        # x: (batch, 1, 10)
        x = self.cnn(x)           # (batch, 32, 10)
        x = self.relu(x)
        x = x.permute(0, 2, 1)    # (batch, 10, 32) for LSTM
        out, (hn, cn) = self.lstm(x)  # out: (batch, 10, 64)
        x = out[:, -1, :]         # Take last time step: (batch, 64)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)           # (batch, 1)
        return x

def train_model(model, train_loader, val_loader, epochs=6000, lr=1e-4, device='cpu'):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()
    for epoch in range(epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            optimizer.step()
        # Optional: Validation and print progress every 500 epochs
        if (epoch+1) % 500 == 0:
            model.eval()
            val_losses = []
            with torch.no_grad():
                for X_val, y_val in val_loader:
                    X_val, y_val = X_val.to(device), y_val.to(device)
                    y_pred = model(X_val)
                    val_losses.append(criterion(y_pred, y_val).item())
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {loss.item():.4f}, Val Loss: {np.mean(val_losses):.4f}")
