Loading the model in 

In [1]:
import torch
import pandas as pd
from torch import nn 
import numpy as np

import joblib

# from common_utils import MLPBuilder
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
from sklearn.model_selection import train_test_split

modelname = 'CNN_Shuffling'

In [2]:
print("PyTorch version:", torch.__version__)

# Check if CUDA is available
print("CUDA available:", torch.cuda.is_available())

# Get current CUDA device index (if available)
if torch.cuda.is_available():
    print("Current CUDA device index:", torch.cuda.current_device())
    print("CUDA device name:", torch.cuda.get_device_name(torch.cuda.current_device()))
else:
    print("No CUDA devices found.")
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

PyTorch version: 2.5.0+cu118
CUDA available: True
Current CUDA device index: 0
CUDA device name: NVIDIA GeForce RTX 4070 SUPER


In [3]:
import torch.nn.functional as F

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        # Convolutional Layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)   # output size = 8x8x32
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)  # output size = 4x4x64
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)  # output size = 4x4x128
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)  # output size = 4x4x256
        # Batch Normalization Layers
        self.batchnorm1 = nn.BatchNorm2d(32)
        self.batchnorm2 = nn.BatchNorm2d(64)
        self.batchnorm3 = nn.BatchNorm2d(128)
        self.batchnorm4 = nn.BatchNorm2d(256)
        # Fully Connected Layers
        self.fc1 = nn.Linear(256 * 4 * 4, 512)  # 4x4x256 -> 512
        self.fc2 = nn.Linear(512, 256)  # 512-> 256
        self.fc3 = nn.Linear(256, 64)  # 256-> 64

        # Dropout for regularization
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = x.unsqueeze(1)  
        x = F.relu(self.batchnorm1(self.conv1(x)))  
        x = F.relu(self.batchnorm2(self.conv2(x)))  
        x = F.relu(self.batchnorm3(self.conv3(x)))  
        x = F.relu(self.batchnorm4(self.conv4(x))) 

        x = x.view(x.size(0), -1)  # Flatten

        x = self.dropout(F.relu(self.fc1(x)))
        x = self.dropout(F.relu(self.fc2(x)))
        x = self.fc3(x)  # Output 64 logits (for 64 locations)

        return x

In [4]:
# we trained the model with 8 features

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN()
model = torch.load('models/CNN/'+modelname+'.pth', map_location=device)
model.to(device)
model.eval()  

  model = torch.load('models/CNN/'+modelname+'.pth', map_location=device)


CNN(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (batchnorm1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (batchnorm4): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (fc1): Linear(in_features=4096, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=256, bias=True)
  (fc3): Linear(in_features=256, out_features=64, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)

In [5]:
for param in model.conv1.parameters():
    param.requires_grad = False
for param in model.conv2.parameters():
    param.requires_grad = False
for param in model.conv3.parameters():
    param.requires_grad = False
for param in model.conv4.parameters():
    param.requires_grad = False

for param in model.fc1.parameters():
    param.requires_grad = True
for param in model.fc2.parameters():
    param.requires_grad = True
for param in model.fc3.parameters():
    param.requires_grad = True


for name, param in model.named_parameters():
    print(f"Layer: {name} | Trainable: {param.requires_grad}")


Layer: conv1.weight | Trainable: False
Layer: conv1.bias | Trainable: False
Layer: conv2.weight | Trainable: False
Layer: conv2.bias | Trainable: False
Layer: conv3.weight | Trainable: False
Layer: conv3.bias | Trainable: False
Layer: conv4.weight | Trainable: False
Layer: conv4.bias | Trainable: False
Layer: batchnorm1.weight | Trainable: True
Layer: batchnorm1.bias | Trainable: True
Layer: batchnorm2.weight | Trainable: True
Layer: batchnorm2.bias | Trainable: True
Layer: batchnorm3.weight | Trainable: True
Layer: batchnorm3.bias | Trainable: True
Layer: batchnorm4.weight | Trainable: True
Layer: batchnorm4.bias | Trainable: True
Layer: fc1.weight | Trainable: True
Layer: fc1.bias | Trainable: True
Layer: fc2.weight | Trainable: True
Layer: fc2.bias | Trainable: True
Layer: fc3.weight | Trainable: True
Layer: fc3.bias | Trainable: True


## Loading test data in 

In [6]:
# train_file_path = './Data/25-02-10/cleaned_df.csv'
test_file_path = './Data/25-02-04/combined_data.csv'
model_path = './models/CNN/'
cleaned_df = pd.read_csv(test_file_path)

In [7]:
X = cleaned_df[['Tx_0 RSSI', 'Tx_1 RSSI', 'Tx_2 RSSI', 'Tx_3 RSSI', 'Tx_4 RSSI', 'Tx_5 RSSI', 'Tx_6 RSSI', 'Tx_7 RSSI']]
Y = cleaned_df[['X_Coord', 'Y_Coord']]


In [8]:
grid_data = {}

# Collect RSSI values per (X_Coord, Y_Coord)
for index, row in cleaned_df.iterrows():
    x, y = int(row['X_Coord']), int(row['Y_Coord'])
    
    # Initialize empty list for this location if not already present
    if (x, y) not in grid_data:
        grid_data[(x, y)] = []
    
    # Append RSSI values (Tx_0 to Tx_7)
    grid_data[(x, y)].append(row[X.columns].values)

# Define parameters
num_tx = 8      # Number of transmitters
seq_length = 8  # Sequence length (8 samples per training instance)
stride = 1      # Sliding window step (adjustable)
max_sequences = 50  # Maximum sequences per grid cell to limit dataset size

# Create a list to store all sequences
training_samples = []

# Generate sequences for CNN training
for (x, y), rssi_values in grid_data.items():
    rssi_array = np.array(rssi_values)  # Convert to NumPy array
    
    if rssi_array.shape[0] < seq_length:
        continue  # Skip locations with fewer than 8 samples

    # Extract multiple 8-sample sequences using a sliding window
    num_samples = rssi_array.shape[0]

    for start in range(0, num_samples - seq_length + 1, stride):
        # Get an 8-sample window
        window = rssi_array[start:start + seq_length].T  # Shape: (Tx, 8 samples)

        # Store sample with label (x, y)
        training_samples.append((window, (x, y)))

In [9]:
if training_samples:
    X_Sequence = np.array([sample[0] for sample in training_samples])  # Shape: (num_samples, Tx, 8)
    y_Sequence = np.array([sample[1] for sample in training_samples])  # Shape: (num_samples, 2) -> (X_Coord, Y_Coord)
else:
    X_Sequence = np.array([])
    y_Sequence = np.array([])

print("Generated training data shape:", X_Sequence.shape)
print("Generated labels shape:", y_Sequence.shape)

Generated training data shape: (6946, 8, 8)
Generated labels shape: (6946, 2)


In [10]:

# Perform train-test split (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(
    X_Sequence, y_Sequence, test_size=0.2, random_state=42
)



# Convert (X_Coord, Y_Coord) to a single class index
y_train = np.array([x * 8 + y for x, y in y_train])
y_test = np.array([x * 8 + y for x, y in y_test])

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)  # Must be long for classification
y_test = torch.tensor(y_test, dtype=torch.long)

# Move to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train, X_test, y_train, y_test = X_train.to(device), X_test.to(device), y_train.to(device), y_test.to(device)


# Implementing transfer learning in test environment

In [11]:
num_epochs = 100  # Adjust based on performance
batch_size = 32

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001, weight_decay = 1e-4)


In [12]:
model.eval()
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

correct = 0
total = 0
total_displacement = 0.0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU/CPU
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)  # Get predicted class
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        # Convert predicted and actual labels back to (X, Y) coordinates
        predicted_coords = torch.stack((predicted // 8, predicted % 8), dim=1).float()
        actual_coords = torch.stack((labels // 8, labels % 8), dim=1).float()
        
        # Compute Euclidean distance (displacement)
        displacement = torch.norm(predicted_coords - actual_coords, dim=1).sum().item()
        total_displacement += displacement

accuracy = 100 * correct / total
average_displacement = total_displacement / total

print(f"Test Accuracy: {accuracy:.2f}%")
print(f"Average Displacement Error: {average_displacement:.4f}")


Test Accuracy: 1.65%
Average Displacement Error: 3.7685


In [13]:

patience = 10 
best_loss = float("inf")


train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Training loop
model.train()
no_improve_epochs = 0  # Initialize early stopping counter

for epoch in range(num_epochs):
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU/CPU

        optimizer.zero_grad()
        outputs = model(inputs)  # Forward pass
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights
        running_loss += loss.item()

    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

    # Early stopping
    if avg_loss < best_loss:
        best_loss = avg_loss
        no_improve_epochs = 0
    else:
        no_improve_epochs += 1

    if no_improve_epochs >= patience:
        print(f"Early stopping at epoch {epoch + 1}")
        break


Epoch [1/100], Loss: 3.2106
Epoch [2/100], Loss: 1.8120
Epoch [3/100], Loss: 1.5078
Epoch [4/100], Loss: 1.2470
Epoch [5/100], Loss: 1.1547
Epoch [6/100], Loss: 1.0735
Epoch [7/100], Loss: 0.9604
Epoch [8/100], Loss: 0.9027
Epoch [9/100], Loss: 0.8288
Epoch [10/100], Loss: 0.7833
Epoch [11/100], Loss: 0.7572
Epoch [12/100], Loss: 0.7194
Epoch [13/100], Loss: 0.6642
Epoch [14/100], Loss: 0.6460
Epoch [15/100], Loss: 0.6123
Epoch [16/100], Loss: 0.5664
Epoch [17/100], Loss: 0.5768
Epoch [18/100], Loss: 0.5482
Epoch [19/100], Loss: 0.5564
Epoch [20/100], Loss: 0.4902
Epoch [21/100], Loss: 0.5051
Epoch [22/100], Loss: 0.5133
Epoch [23/100], Loss: 0.4750
Epoch [24/100], Loss: 0.4376
Epoch [25/100], Loss: 0.4286
Epoch [26/100], Loss: 0.4497
Epoch [27/100], Loss: 0.4245
Epoch [28/100], Loss: 0.4110
Epoch [29/100], Loss: 0.4023
Epoch [30/100], Loss: 0.3627
Epoch [31/100], Loss: 0.3553
Epoch [32/100], Loss: 0.3752
Epoch [33/100], Loss: 0.3768
Epoch [34/100], Loss: 0.3420
Epoch [35/100], Loss: 0

In [14]:
model.eval()
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

correct = 0
total = 0
total_displacement = 0.0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU/CPU
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)  # Get predicted class
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        # Convert predicted and actual labels back to (X, Y) coordinates
        predicted_coords = torch.stack((predicted // 8, predicted % 8), dim=1).float()
        actual_coords = torch.stack((labels // 8, labels % 8), dim=1).float()
        
        # Compute Euclidean distance (displacement)
        displacement = torch.norm(predicted_coords - actual_coords, dim=1).sum().item()
        total_displacement += displacement

accuracy = 100 * correct / total
average_displacement = total_displacement / total

print(f"Test Accuracy: {accuracy:.2f}%")
print(f"Average Displacement Error: {average_displacement:.4f}")


Test Accuracy: 87.91%
Average Displacement Error: 0.3769
