In [1]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
from tqdm import tqdm
import matplotlib.pyplot as plt



In [2]:
def get_CSI(CSI_RAW): # Function to extract the CSI information from CSV file rows
    string = CSI_RAW
    res = string.split(' ')
    numbers = []
    for token in res:
        if '[' in token or ']' in token:
            if '[' in token:
                temp = token.split('[')
                if len(temp[1]) > 0:
                    number = int(temp[1])
                    numbers.append(number)
            if ']' in token:
                temp = token.split(']')
                if len(temp[1]) > 0:
                    number = int(temp[1])
                    numbers.append(number)
        else:
            number = int(token)
            numbers.append(number)
    return numbers

In [3]:
# A seperate CSV file for each location
CSV_FILE_1 = '../data/lab_entrance.csv'
CSV_FILE_2 = '../data/lab_infront.csv'
CSV_FILE_3 = '../data/lab_2ndrow.csv'

n_classes = 3
CSI_dim = 128

target_mac = "3A:FB:99:B4:E0:FC" # replace with your target device mac


df_1 = pd.read_csv(CSV_FILE_1)
df_2 = pd.read_csv(CSV_FILE_2)
df_3 = pd.read_csv(CSV_FILE_3)

df_1 = df_1.drop(columns='CSI_DATA')
df_1 = df_1.rename(columns={'seq_ctrl': 'CSI'})

df_2 = df_2.drop(columns='CSI_DATA')
df_2 = df_2.rename(columns={'seq_ctrl': 'CSI'})

df_3 = df_3.drop(columns='CSI_DATA')
df_3 = df_3.rename(columns={'seq_ctrl': 'CSI'})


# Take relevant data
df_1 = df_1[['mac', 'rssi', 'CSI']] # Relevant columns
df_1 = df_1[df_1['mac'] == target_mac] # Take only target mac address into account

df_2 = df_2[['mac', 'rssi', 'CSI']] # Relevant columns
df_2 = df_2[df_2['mac'] == target_mac] # Take only target mac address into account

df_3 = df_3[['mac', 'rssi', 'CSI']] # Relevant columns
df_3 = df_3[df_3['mac'] == target_mac] # Take only target mac address into account

data_1 = df_1[['CSI']].to_numpy()
data_2 = df_2[['CSI']].to_numpy()
data_3 = df_3[['CSI']].to_numpy()

n_data_1 = len(data_1)
n_data_2 = len(data_2)
n_data_3 = len(data_3)
n_data = n_data_1 + n_data_2 + n_data_3

X = np.zeros((n_data, CSI_dim)) # Features
Y = np.zeros((n_data, n_classes)) # Labels

for i, value in enumerate(data_1):
    string = value[0]
    X[i,:] = np.array(get_CSI(string))
    Y[i,0] = 1

for i, value in enumerate(data_2):
    string = value[0]
    X[i+n_data_1,:] = np.array(get_CSI(string))
    Y[i+n_data_1,1] = 1

for i, value in enumerate(data_3):
    string = value[0]
    X[i+n_data_1+n_data_2,:] = np.array(get_CSI(string))
    Y[i+n_data_1+n_data_2,2] = 1
    

In [4]:
# shuffle inputs
indsh = (np.arange(n_data))
np.random.shuffle(indsh)

X = X[indsh, :]
Y = Y[indsh]

X_train = X[:int(0.7 * n_data),:]
y_train = Y[:int(0.7 * n_data),:]

X_val = X[int(0.7 * n_data):int(0.8 * n_data),:]
y_val = Y[int(0.7 * n_data):int(0.8 * n_data),:]

X_test = X[int(0.8 * n_data):,:]
y_test = Y[int(0.8 * n_data):,:]

# Define a custom dataset
class CustomDataset(Dataset):
    def __init__(self, feature, label):
        self.feature = feature
        self.label = label

    def __len__(self):
        return len(self.feature)

    def __getitem__(self, idx):
        x = self.feature[idx, :]
        y = self.label[idx, :]
        return x, y

# Create Dataset objects
train_data = CustomDataset(torch.Tensor(X_train), torch.Tensor(y_train))
val_data = CustomDataset(torch.Tensor(X_val), torch.Tensor(y_val))
test_data = CustomDataset(torch.Tensor(X_test), torch.Tensor(y_test))

# Create DataLoader for training, validation, and test sets
batch_size = 4
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)

In [6]:
shared_layers = nn.Sequential(
    nn.Linear(CSI_dim, 256, bias=True),
    nn.ReLU(),
    nn.BatchNorm1d(256),
    nn.Linear(256, 64, bias=True),
    nn.ReLU(),
    nn.BatchNorm1d(64),
    nn.Linear(64, 32, bias=True),
    nn.ReLU(),
    nn.BatchNorm1d(32),
    nn.Linear(32, 16, bias=True),
    nn.ReLU(),
    nn.BatchNorm1d(16),
    nn.Linear(16, 4, bias=True),
    nn.ReLU(),
    nn.BatchNorm1d(4),
    nn.Linear(4, n_classes, bias=True) # Last dim is the amount classes / locations
)

class My_NN(nn.Module):
        def __init__(self):
            super(My_NN, self).__init__()
            self.block = shared_layers

        def _initialize_weights(self):
            for m in self.modules():
                if isinstance(m, nn.Linear):
                    nn.init.kaiming_uniform_(m.weight)
                    nn.init.zeros_(m.bias)
                elif isinstance(m, nn.BatchNorm1d):
                    nn.init.ones_(m.weight)
                    nn.init.zeros_(m.bias)

        def forward(self, feature):  # input feature matrix size: CSI_dim = 128
            output1 = self.block(feature)

            return output1

In [None]:
# Initialize the model and other hyperparameters
my_nn = My_NN()
import torch.optim as optim

criterion = nn.MSELoss()
optimizer = optim.SGD(my_nn.parameters(), lr=0.001, momentum=0.9)

my_nn.train()
num_epochs = 100
print_parameters_every = 20  # Print parameters every n epochs
train_losses = []
validation_losses = []
for epoch in tqdm(range(num_epochs)):
    for feature_batch, labels_batch in train_loader:  
        output = my_nn(feature_batch)  
        loss = criterion(output, labels_batch)  
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_losses.append(loss.item())  # Store training loss
    # validation loop
    with torch.no_grad():
        for feature_batch, labels_batch in val_loader:
            output = my_nn(feature_batch)
            val_loss = criterion(output, labels_batch)
            # scheduler.step(val_loss)  # you may add the scheduler to avoid over-fitting by reducing learning rate

        validation_losses.append(val_loss.item())
# Plot the training and validation loss
plt.figure(1)
plt.plot(train_losses, label='Training Loss')
plt.plot(validation_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()


 90%|█████████ | 90/100 [00:32<00:03,  2.67it/s]

In [None]:
# test loop
my_nn.eval()
correct = 0
total = 0
with torch.no_grad():
    for feature_batch, labels_batch in test_loader:
        out = my_nn(feature_batch)
        _,pred = torch.max(out,1)
        _, real = torch.max(labels_batch,1)
        total += len(real)
        correct += (pred == real).sum().item()

print("Accuracy:", correct/total)

Ratio: 0.8984375
