# AP location prediction 
**Following the methods explored in the `ap_coordinates_prediction_review.ipynb`**

In [None]:
import time 
import pickle

import pandas as pd
import numpy as np
import seaborn as sns

from sklearn.preprocessing import LabelEncoder

import matplotlib.pyplot as plt
from collections import defaultdict

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import RobustScaler

from sklearn.model_selection import GridSearchCV
from sklearn.multioutput import MultiOutputRegressor

import optuna
from xgboost import XGBClassifier, XGBRegressor
from optuna.integration import XGBoostPruningCallback
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error


from mpl_toolkits.mplot3d import Axes3D
from matplotlib.colors import Normalize


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from tqdm import tqdm

from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split

np.random.seed(0)
torch.manual_seed(0)

## Load data 

In [None]:
import os

# Set the path to the main root of the project
# This is the folder that contains the 'ap_data' directory
project_root = '/home/sionna/Documents/GitTest2/AP-Sionna-Testing' 

os.chdir(project_root)

# Print the current working directory to confirm it's correct
print(f"Current working directory: {os.getcwd()}")

In [None]:
# Use a path relative to your main project folder
path = 'ap_data/ap_data_all_floors.csv'

data = pd.read_csv(path)
print(data.shape)

In [None]:
# Set max valid RSSI to 200
data = data.dropna(subset=[data.columns[-1]])
data.replace({np.nan: 200}, inplace=True)
data.iloc[:, 4:-4] = data.iloc[:, 4:-4].clip(upper=200)

In [None]:
unique_floors = data['ap_name'].str.extract('(\d+F)')[0].unique()
num_floors = len(unique_floors)
fig, axes = plt.subplots(1, num_floors, figsize=(20 * num_floors // 2, 8))

for i, floor in enumerate(sorted(unique_floors)):
    floor_data = data[data['ap_name'].str.contains(floor)]
    axes[i].scatter(floor_data.iloc[:, -3], floor_data.iloc[:, -2])
    axes[i].set_xlabel('X Position')
    axes[i].set_ylabel('Y Position')
    axes[i].set_title(f'Scatter Plot of Positions ({floor})')

plt.tight_layout()
plt.show()

There isn't enough data to cover all the access points (APs), so some floors appear empty due to the lack of data for all APs on those floors

In [None]:
data['rounded_position'] = data.apply(lambda row: f"{int(round(row.iloc[-3]))}_{int(round(row.iloc[-2]))}_{int(round(row.iloc[-1]))}", axis=1)

unique_floors = data['ap_name'].str.extract('(\d+F)')[0].unique()
num_floors = len(unique_floors)
fig, axes = plt.subplots(1, num_floors, figsize=(20 * num_floors // 2, 8))

for i, floor in enumerate(sorted(unique_floors)):
    floor_data = data[data['ap_name'].str.contains(floor)]
    position_counts = floor_data['rounded_position'].value_counts().sort_index()
    
    position_counts.plot(kind='bar', ax=axes[i])
    axes[i].set_title(f'Frequency of Rounded X-Y Positions ({floor})')
    axes[i].set_xlabel('Rounded X-Y Position')
    axes[i].set_ylabel('Frequency')
    axes[i].tick_params(axis='x', rotation=90)
    
    for j, v in enumerate(position_counts):
        axes[i].text(j, v, str(v), ha='center', va='bottom')

plt.tight_layout()
plt.show()

## Data Distribution

In [None]:
# Select the columns we want to plot
selected_data = data.iloc[:, 4:-4]

# Create a figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(20, 24))

# Box plot
sns.boxplot(data=selected_data, ax=ax1, whis=1.5)
ax1.set_title('Box Plot of Column Distributions', fontsize=16)
ax1.set_xlabel('Columns', fontsize=12)
ax1.set_ylabel('Values', fontsize=12)
ax1.tick_params(axis='x', rotation=90)

# Add strip plot to show individual points
sns.stripplot(data=selected_data, ax=ax1, size=2, color=".3", linewidth=0)

# Violin plot
sns.violinplot(data=selected_data, ax=ax2, cut=0)
ax2.set_title('Violin Plot of Column Distributions', fontsize=16)
ax2.set_xlabel('Columns', fontsize=12)
ax2.set_ylabel('Values', fontsize=12)
ax2.tick_params(axis='x', rotation=90)

plt.tight_layout()

plt.show()

print(selected_data.describe())

## Label Encoding

In [None]:
def encode_and_save_labels(data, column_name, encoder_file_name='label_encoder.pkl'):
    # Initialize the LabelEncoder
    le = LabelEncoder()
    
    # Fit and transform the data
    encoded_values = le.fit_transform(data[column_name])
    
    # Replace the original column with encoded values
    data[column_name] = encoded_values
    
    # Save the encoder to a file
    with open(encoder_file_name, 'wb') as file:
        pickle.dump(le, file)
    
    print(f"Encoded {column_name} and saved encoder to {encoder_file_name}")
    
    return data

def decode_labels(data, column_name, encoder_file_name='label_encoder.pkl'):
    # Load the encoder from the file
    with open(encoder_file_name, 'rb') as file:
        le = pickle.load(file)
    
    # Transform the encoded values back to original labels
    decoded_values = le.inverse_transform(data[column_name])
    
    # Replace the encoded column with decoded values
    data[column_name] = decoded_values
    
    print(f"Decoded {column_name} using encoder from {encoder_file_name}")
    
    return data

def decode_predictions(y_pred, encoder_file_name='label_encoder.pkl'):
    # Load the encoder from the file
    with open(encoder_file_name, 'rb') as file:
        le = pickle.load(file)
    
    # Transform the encoded predictions back to original labels
    decoded_predictions = le.inverse_transform(y_pred)
    
    print(f"Decoded predictions using encoder from {encoder_file_name}")
    
    return decoded_predictions

## To see the mapping
#with open('label_encoder.pkl', 'rb') as file:
#    le = pickle.load(file)
#    print("Label Mapping:")
#    for i, label in enumerate(le.classes_):
#        print(f"{label} -> {i}")

In [None]:
df = data.iloc[:, 4:-4]
df = encode_and_save_labels(df, 'ap_name')
df = df.rename(columns={'ap_name': 'label'})

In [None]:
df

## Scaling using Robust Scaler

In [None]:
X = df.iloc[:, :-1] 
y = df.iloc[:, -1]  

In [None]:
robust_scaled_data = RobustScaler().fit_transform(X)

In [None]:
plt.figure(figsize=(10, 6))
plt.hist(robust_scaled_data, edgecolor='k', alpha=0.7)
plt.title('Distribution of All Values (Excluding Last Column)')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.show()

#### Are there overlapping points in the data?

In [None]:
binned_data = df.copy()
binned_data.iloc[:, :-1] = np.round(binned_data.iloc[:, :-1]) 
overlapping_points = binned_data.groupby(list(binned_data.columns[:-1]))['label'].nunique()
overlapping_points = overlapping_points[overlapping_points > 1]
overlapping_data = binned_data.set_index(list(binned_data.columns[:-1])).loc[overlapping_points.index]

overlapping_data

In [None]:
# delete overlapping data 
non_overlapping_mask = ~binned_data.set_index(list(binned_data.columns[:-1])).index.isin(overlapping_points.index)
df = df[non_overlapping_mask]
df.shape

## Model XGBClassifier With Standard Scaler

In [None]:
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        if isinstance(features, np.ndarray): self.features = torch.tensor(features, dtype=torch.float32)
        else: self.features = torch.tensor(features.values, dtype=torch.float32)
        self.labels = torch.tensor(labels.values, dtype=torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

dataset = CustomDataset(robust_scaled_data, y)

train_size = int(0.7 * len(dataset))
val_size = int(0.2 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(robust_scaled_data, y, test_size=0.2, random_state=1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.12, random_state=1)

In [None]:
y_train.size, y_val.size, y_test.size

In [None]:
# Initialize the XGBoost Classifier
xgb_clf = XGBClassifier(use_label_encoder=False)

# Train the model
xgb_clf.fit(X_train, y_train)

In [None]:
# Make predictions on the test set
y_pred = xgb_clf.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)

print(f'Accuracy: {accuracy:.2f}')
print('Classification Report:')
print(report)

In [None]:
decoded_y_pred = decode_predictions(y_pred)
decoded_y_test = decode_predictions(y_test)

In [None]:
def plot_ap_positions(data, y_test, y_pred):
    unique_floors = data['ap_name'].str.extract('(\d+F)')[0].unique()
    num_floors = len(unique_floors)
    fig, axes = plt.subplots(1, num_floors, figsize=(12 * num_floors // 2, 8))

    ap_positions = dict(zip(data['ap_name'], zip(data['x'], data['y'])))

    for i, floor in enumerate(sorted(unique_floors)):
        floor_mask = [floor in ap for ap in y_test]
        floor_y_test = y_test[floor_mask]
        floor_y_pred = y_pred[floor_mask]

        test_positions = [ap_positions[ap] for ap in floor_y_test]
        pred_positions = [ap_positions[ap] for ap in floor_y_pred]

        position_accuracy = defaultdict(lambda: {'correct': 0, 'total': 0})
        for true_pos, true_ap, pred_ap in zip(test_positions, floor_y_test, floor_y_pred):
            position_accuracy[true_pos]['total'] += 1
            if true_ap == pred_ap:
                position_accuracy[true_pos]['correct'] += 1

        position_percentage = {pos: (data['correct'] / data['total']) * 100 
                               for pos, data in position_accuracy.items()}

        x_test, y_test_coords = zip(*test_positions)
        x_pred, y_pred_coords = zip(*pred_positions)

        axes[i].scatter(x_test, y_test_coords, c='green', marker='o', s=200, alpha=0.5, label='Ground Truth')
        scatter = axes[i].scatter(x_pred, y_pred_coords, 
                                  c=[position_percentage.get(pos, 0) for pos in test_positions],
                                  cmap='RdYlGn', vmin=0, vmax=100, s=50, alpha=0.7)

        correct_predictions = sum(true == pred for true, pred in zip(floor_y_test, floor_y_pred))
        total_predictions = len(floor_y_test)
        overall_accuracy = (correct_predictions / total_predictions) * 100

        axes[i].set_title(f'AP Positions on {floor}\nAccuracy: {overall_accuracy:.2f}%')
        axes[i].set_xlabel('X Position')
        axes[i].set_ylabel('Y Position')
        axes[i].grid(True)

    plt.tight_layout()
    cbar = fig.colorbar(scatter, ax=axes.ravel().tolist())
    cbar.set_label('Percentage of Correct Predictions')
    plt.show()

In [None]:
plot_ap_positions(data, decoded_y_test, decoded_y_pred)

## Regression Model

In [None]:
d_loc = data.iloc[:, 4:-1]
d_loc = d_loc.drop(columns=['ap_name'])

In [None]:
X = d_loc.iloc[:, :-3]
y = d_loc.iloc[:, -3:]

In [None]:
regression_data_scaled = RobustScaler().fit_transform(X)
dataset = CustomDataset(regression_data_scaled, y)
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(robust_scaled_data, y, test_size=0.2, random_state=1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.12, random_state=1)

In [None]:
xgb_model = XGBRegressor(objective='reg:squarederror', random_state=42)
# Use MultiOutputRegressor to handle multiple targets
multi_xgb = MultiOutputRegressor(xgb_model)

In [None]:
# Define hyperparameters for tuning
param_grid = {
    'estimator__n_estimators': [100, 200, 300],
    'estimator__max_depth': [3, 4, 5],
    'estimator__learning_rate': [0.01, 0.1, 0.3]
}

In [None]:
# Perform grid search with cross-validation
grid_search = GridSearchCV(multi_xgb, param_grid, cv=3, scoring='neg_mean_squared_error', n_jobs=-1)
grid_search.fit(X_train, y_train)

In [None]:
# Print the best parameters
print("Best parameters found by grid search:")
for param, value in grid_search.best_params_.items():
    print(f"{param}: {value}")

# Get the best model
best_model = grid_search.best_estimator_

# Print all hyperparameters of the best model
print("\nAll hyperparameters of the best model:")
for i, estimator in enumerate(best_model.estimators_):
    print(f"\nEstimator for dimension {i+1} ({'x' if i==0 else 'y' if i==1 else 'z'}):")
    for param, value in estimator.get_params().items():
        print(f"  {param}: {value}")

In [None]:
# Make predictions
y_pred = best_model.predict(X_test)

In [None]:
# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f"\nOverall Mean Squared Error: {mse}")

# Calculate and print MSE for each coordinate
for i, coord in enumerate(['x', 'y', 'z']):
    mse = mean_squared_error(y_test.iloc[:, i], y_pred[:, i])
    print(f"MSE for {coord}: {mse}")

In [None]:
# Feature importance
feature_importance = np.mean([estimator.feature_importances_ for estimator in best_model.estimators_], axis=0)
feature_importance_df = pd.DataFrame({'feature': X.columns, 'importance': feature_importance})
feature_importance_df = feature_importance_df.sort_values('importance', ascending=False)
print("\nTop 10 most important features:")
print(feature_importance_df.head(10))

In [None]:
# Save the model
for i, estimator in enumerate(best_model.estimators_):
    estimator.save_model(f'xgboost_ap_position_model_all_{i}.json')

In [None]:
def plot_3d_ap_positions(y_true, y_pred):
    # Calculate MSE for each point
    mse = np.mean((y_true - y_pred)**2, axis=1)
    
    # Create the 3D plot
    fig = plt.figure(figsize=(12, 10))
    ax = fig.add_subplot(111, projection='3d')
    
    # Plot ground truth positions in blue
    ax.scatter(y_true[:, 0], y_true[:, 1], y_true[:, 2], 
               c='green', marker='o', s=100, alpha=1, label='Ground Truth')
    
    norm = Normalize(vmin=0, vmax=10)
    # Plot predicted positions with color based on MSE
    scatter = ax.scatter(y_pred[:, 0], y_pred[:, 1], y_pred[:, 2],
                         c=mse, norm=norm, cmap='Reds_r', s=30, alpha=0.7, label='Predicted')
    
    # Add colorbar
    cbar = fig.colorbar(scatter)
    cbar.set_label('Mean Squared Error')
    
    # Calculate overall MSE
    overall_mse = mean_squared_error(y_true, y_pred)
    
    ax.set_xlabel('X Position')
    ax.set_ylabel('Y Position')
    ax.set_zlabel('Z Position')
    ax.legend()
    plt.title(f'3D AP Positions - Overall MSE: {overall_mse:.4f}')
    
    plt.tight_layout()
    plt.show()


In [None]:
plot_3d_ap_positions(y_test.to_numpy(), y_pred)