In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -q keras-tuner


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.preprocessing import OneHotEncoder
import os
import keras_tuner as kt

# === Load and preprocess data ===
data_dir = "/content/drive/MyDrive/Colab Notebooks"
df = pd.read_csv(os.path.join(data_dir, "trainingData_filtered.csv"))

# Prepare WAPs and Labels
wap_columns = [col for col in df.columns if "WAP" in col]
location_columns = ["LONGITUDE", "LATITUDE"]
building_floor_columns = ["BUILDINGID", "FLOOR"]

# Remove low-variance WAPs
wap_std = df[wap_columns].std()
useful_waps = wap_std[wap_std > 0.01].index.tolist()
df = df[useful_waps + location_columns + building_floor_columns]

# One-hot encode metadata
encoder = OneHotEncoder(sparse_output=False)
building_floor_onehot = encoder.fit_transform(df[building_floor_columns])
onehot_columns = encoder.get_feature_names_out(building_floor_columns)
df[onehot_columns] = building_floor_onehot

# Final features and labels
X_all = df[useful_waps + list(onehot_columns)].values
Y_all = df[location_columns].values
num_rssi = len(useful_waps)

# Normalize coordinates
Y_min = Y_all.min(axis=0)
Y_max = Y_all.max(axis=0)
Y_all_scaled = (Y_all - Y_min) / (Y_max - Y_min)

# === Sequence generation ===
def generate_sequences(T):
    max_dist = 1.5
    dist_matrix = euclidean_distances(Y_all, Y_all)
    X_seq, Y_seq = [], []

    for _ in range(5000):
        start_idx = np.random.randint(0, len(df))
        traj_x, traj_y = [X_all[start_idx]], [Y_all_scaled[start_idx]]
        current_idx = start_idx
        for _ in range(T - 1):
            neighbors = np.where(dist_matrix[current_idx] <= max_dist)[0]
            if len(neighbors) == 0:
                break
            next_idx = np.random.choice(neighbors)
            traj_x.append(X_all[next_idx])
            traj_y.append(Y_all_scaled[next_idx])
            current_idx = next_idx
        if len(traj_x) == T:
            X_seq.append(traj_x)
            Y_seq.append(traj_y)
    return np.array(X_seq), np.array(Y_seq)

# === Loss Function ===
def euclidean_distance_loss(y_true, y_pred):
    return tf.reduce_mean(tf.sqrt(tf.reduce_sum(tf.square(y_true - y_pred), axis=-1)))

# === Model Builder ===
def build_model(hp):
    T = hp.Choice("seq_length", [10, 15, 20])
    lstm_units = hp.Choice("lstm_units", [64, 128])
    hp.Choice("batch_size", [32, 64, 128])

    model_input = tf.keras.Input(shape=(T, X_all.shape[1]))
    x = tf.keras.layers.BatchNormalization()(model_input)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(lstm_units, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(lstm_units, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    output = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(2))(x)

    model = tf.keras.Model(inputs=model_input, outputs=output)
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
                  loss=euclidean_distance_loss,
                  metrics=[euclidean_distance_loss])
    return model

# === Custom Trial Runner ===
def custom_run_trial(tuner, trial):
    hp = trial.hyperparameters
    T = hp.get("seq_length")
    batch_size = hp.get("batch_size")

    X_seq, Y_seq = generate_sequences(T)
    X_seq[:, :, :num_rssi] = (X_seq[:, :, :num_rssi] + 105) / 105
    X_train, X_val, Y_train, Y_val = train_test_split(X_seq, Y_seq, test_size=0.2)

    model = tuner.hypermodel.build(hp)

    history = model.fit(X_train, Y_train,
                        validation_data=(X_val, Y_val),
                        epochs=10,
                        batch_size=batch_size,
                        verbose=0)

    val_loss = model.evaluate(X_val, Y_val, verbose=0)
    tuner.oracle.update_trial(trial.trial_id, {'val_euclidean_distance_loss': val_loss[0]})

# === Subclass the Tuner ===
class MyTuner(kt.RandomSearch):
    def run_trial(self, trial, *args, **kwargs):
        return custom_run_trial(self, trial)

# === Initialize and Run Tuner ===
tuner = MyTuner(
    build_model,
    objective=kt.Objective('val_euclidean_distance_loss', direction='min'),
    max_trials=10,
    executions_per_trial=1,
    overwrite=True,
    directory='kt_tuner_logs',
    project_name='indoor_lstm_tuning'
)

tuner.search_space_summary()
tuner.search()

# === Best Results ===
best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]
print("\n✅ Best Hyperparameters:")
print(f"Sequence length: {best_hp['seq_length']}")
print(f"LSTM units: {best_hp['lstm_units']}")
print(f"Batch size: {best_hp['batch_size']}")


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from scipy.spatial.distance import euclidean
import matplotlib.pyplot as plt
import os

# === Setup ===
data_dir = "/content/drive/MyDrive/Colab Notebooks"
os.makedirs(data_dir, exist_ok=True)

# === Load Filtered Data ===
df = pd.read_csv(os.path.join(data_dir, "trainingData_filtered.csv"))

# Identify WAP, location, and metadata columns
wap_columns = [col for col in df.columns if "WAP" in col]
location_columns = ["LONGITUDE", "LATITUDE"]
building_floor_columns = ["BUILDINGID", "FLOOR"]

# Filter out WAPs with low variance
wap_std = df[wap_columns].std()
useful_waps = wap_std[wap_std > 0.01].index.tolist()
df = df[useful_waps + location_columns + building_floor_columns]

# One-hot encode BUILDINGID and FLOOR
encoder = OneHotEncoder(sparse_output=False)
building_floor_onehot = encoder.fit_transform(df[building_floor_columns])
onehot_columns = encoder.get_feature_names_out(building_floor_columns)
df[onehot_columns] = building_floor_onehot

# Combine RSSI + one-hot features
feature_columns = useful_waps + list(onehot_columns)
X_all = df[feature_columns].values
Y_all = df[location_columns].values

# === Generate Enhanced Trajectories ===
from sklearn.metrics.pairwise import euclidean_distances

T = 20  # best sequence length
v_max = 1.5  # m/s
delta_t = 1.0
max_dist = v_max * delta_t

dist_matrix = euclidean_distances(Y_all, Y_all)
X_sequences = []
Y_sequences = []

for _ in range(5000):
    start_idx = np.random.randint(0, len(df))
    traj_x = [X_all[start_idx]]
    traj_y = [Y_all[start_idx]]
    current_idx = start_idx

    for _ in range(T - 1):
        neighbors = np.where(dist_matrix[current_idx] <= max_dist)[0]
        if len(neighbors) == 0:
            break
        next_idx = np.random.choice(neighbors)
        traj_x.append(X_all[next_idx])
        traj_y.append(Y_all[next_idx])
        current_idx = next_idx

    if len(traj_x) == T:
        X_sequences.append(traj_x)
        Y_sequences.append(traj_y)

X_sequences = np.array(X_sequences)
Y_sequences = np.array(Y_sequences)

# np.save(os.path.join(data_dir, "X_sequences_enhanced.npy"), X_sequences)
# np.save(os.path.join(data_dir, "Y_sequences_enhanced.npy"), Y_sequences)

# print("✅ Enhanced sequences generated and saved.")

# === Normalize Inputs ===
X = X_sequences
Y = Y_sequences

num_rssi = len(useful_waps)
X[:, :, :num_rssi] = (X[:, :, :num_rssi] + 105) / 105  # Normalize RSSI

Y_min = Y.min(axis=(0, 1))
Y_max = Y.max(axis=(0, 1))
Y_scaled = (Y - Y_min) / (Y_max - Y_min)

# np.save(os.path.join(data_dir, "Y_min_1.npy"), Y_min)
# np.save(os.path.join(data_dir, "Y_max_1.npy"), Y_max)

# === Train/Test Split ===
X_train, X_val, Y_train, Y_val = train_test_split(X, Y_scaled, test_size=0.2, random_state=42)

# === Loss Function ===
def euclidean_distance_loss(y_true, y_pred):
    return tf.reduce_mean(tf.sqrt(tf.reduce_sum(tf.square(y_true - y_pred), axis=-1)))

# === Build Optimized Model ===
def build_model(input_shape, output_dim):
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.BatchNormalization()(inputs)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True))(x)
    x = tf.keras.layers.Dropout(0.3)(x)
    outputs = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(output_dim))(x)
    model = tf.keras.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
                  loss=euclidean_distance_loss,
                  metrics=[euclidean_distance_loss])
    return model

model = build_model(X_train.shape[1:], output_dim=2)

# === Train Model ===
early_stop = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)

history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=150,
    batch_size=64,  # best batch size
    # callbacks=[early_stop]
)

# === Save Model ===
model.save(os.path.join(data_dir, "hyper_model_with_building_floor.keras"))
print("✅ Model with building+floor saved.")

# === Evaluate ===
Y_pred_scaled = model.predict(X_val)
Y_pred = Y_pred_scaled * (Y_max - Y_min) + Y_min
Y_true = Y_val * (Y_max - Y_min) + Y_min

def average_localization_error(y_true, y_pred):
    batch, time_steps, _ = y_true.shape
    total_error = sum(
        euclidean(y_true[i][t], y_pred[i][t])
        for i in range(batch) for t in range(time_steps)
    )
    return total_error / (batch * time_steps)

error_meters = average_localization_error(Y_true, Y_pred)
print(f"📍 Final Average Localization Error: {error_meters:.2f} meters")

# === Plot Loss ===
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title("Training Loss (Euclidean Distance)")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=150,
    batch_size=64,  # best batch size
    # callbacks=[early_stop]
)

In [None]:
model.save(os.path.join(data_dir, "hyper_model_with_building_floor.keras"))

In [None]:
Y_pred_scaled = model.predict(X_val)
Y_pred = Y_pred_scaled * (Y_max - Y_min) + Y_min
Y_true = Y_val * (Y_max - Y_min) + Y_min

def average_localization_error(y_true, y_pred):
    batch, time_steps, _ = y_true.shape
    total_error = sum(
        euclidean(y_true[i][t], y_pred[i][t])
        for i in range(batch) for t in range(time_steps)
    )
    return total_error / (batch * time_steps)

error_meters = average_localization_error(Y_true, Y_pred)
print(f"📍 Final Average Localization Error: {error_meters:.2f} meters")

In [None]:
history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=50,
    batch_size=64,  # best batch size
    # callbacks=[early_stop]
)

In [None]:
model.save(os.path.join(data_dir, "hyper_model_with_building_floor.keras"))

In [None]:
Y_pred_scaled = model.predict(X_val)
Y_pred = Y_pred_scaled * (Y_max - Y_min) + Y_min
Y_true = Y_val * (Y_max - Y_min) + Y_min

def average_localization_error(y_true, y_pred):
    batch, time_steps, _ = y_true.shape
    total_error = sum(
        euclidean(y_true[i][t], y_pred[i][t])
        for i in range(batch) for t in range(time_steps)
    )
    return total_error / (batch * time_steps)

error_meters = average_localization_error(Y_true, Y_pred)
print(f"📍 Final Average Localization Error: {error_meters:.2f} meters")

In [None]:
history = model.fit(
    X_train, Y_train,
    validation_data=(X_val, Y_val),
    epochs=50,
    batch_size=64,  # best batch size
    # callbacks=[early_stop]
)

In [None]:
model.save(os.path.join(data_dir, "hyper_model_with_building_floor.keras"))

In [None]:
Y_pred_scaled = model.predict(X_val)
Y_pred = Y_pred_scaled * (Y_max - Y_min) + Y_min
Y_true = Y_val * (Y_max - Y_min) + Y_min

def average_localization_error(y_true, y_pred):
    batch, time_steps, _ = y_true.shape
    total_error = sum(
        euclidean(y_true[i][t], y_pred[i][t])
        for i in range(batch) for t in range(time_steps)
    )
    return total_error / (batch * time_steps)

error_meters = average_localization_error(Y_true, Y_pred)
print(f"📍 Final Average Localization Error: {error_meters:.2f} meters")

In [None]:
import tensorflow as tf
from tensorflow.keras import backend as K

# Define the custom loss function
def euclidean_distance_loss(y_true, y_pred):
    return K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1))

# Now load the model with custom_objects
from tensorflow.keras.models import load_model

model_path = '/content/drive/MyDrive/Colab Notebooks/hyper_model_with_building_floor.keras'
model = load_model(model_path, custom_objects={'euclidean_distance_loss': euclidean_distance_loss})


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from scipy.spatial.distance import euclidean
import matplotlib.pyplot as plt
import os

# === Setup ===
data_dir = "/content/drive/MyDrive/Colab Notebooks"
os.makedirs(data_dir, exist_ok=True)

# === Load Filtered Data ===
df = pd.read_csv(os.path.join(data_dir, "trainingData_filtered.csv"))

# Identify WAP, location, and metadata columns
wap_columns = [col for col in df.columns if "WAP" in col]
location_columns = ["LONGITUDE", "LATITUDE"]
building_floor_columns = ["BUILDINGID", "FLOOR"]

# Filter out WAPs with low variance
wap_std = df[wap_columns].std()
useful_waps = wap_std[wap_std > 0.01].index.tolist()
df = df[useful_waps + location_columns + building_floor_columns]

# One-hot encode BUILDINGID and FLOOR
encoder = OneHotEncoder(sparse_output=False)
building_floor_onehot = encoder.fit_transform(df[building_floor_columns])
onehot_columns = encoder.get_feature_names_out(building_floor_columns)
df[onehot_columns] = building_floor_onehot

# Combine RSSI + one-hot features
feature_columns = useful_waps + list(onehot_columns)
X_all = df[feature_columns].values
Y_all = df[location_columns].values

# === Generate Enhanced Trajectories ===
from sklearn.metrics.pairwise import euclidean_distances

T = 20  # best sequence length
v_max = 1.5  # m/s
delta_t = 1.0
max_dist = v_max * delta_t

dist_matrix = euclidean_distances(Y_all, Y_all)
X_sequences = []
Y_sequences = []

for _ in range(5000):
    start_idx = np.random.randint(0, len(df))
    traj_x = [X_all[start_idx]]
    traj_y = [Y_all[start_idx]]
    current_idx = start_idx

    for _ in range(T - 1):
        neighbors = np.where(dist_matrix[current_idx] <= max_dist)[0]
        if len(neighbors) == 0:
            break
        next_idx = np.random.choice(neighbors)
        traj_x.append(X_all[next_idx])
        traj_y.append(Y_all[next_idx])
        current_idx = next_idx

    if len(traj_x) == T:
        X_sequences.append(traj_x)
        Y_sequences.append(traj_y)

X_sequences = np.array(X_sequences)
Y_sequences = np.array(Y_sequences)

# === Normalize Inputs ===
X = X_sequences
Y = Y_sequences

num_rssi = len(useful_waps)
X[:, :, :num_rssi] = (X[:, :, :num_rssi] + 105) / 105  # Normalize RSSI

Y_min = Y.min(axis=(0, 1))
Y_max = Y.max(axis=(0, 1))
Y_scaled = (Y - Y_min) / (Y_max - Y_min)

# === Train/Test Split ===
X_train, X_val, Y_train, Y_val = train_test_split(X, Y_scaled, test_size=0.2, random_state=42)

In [None]:
import random

idx = random.randint(0, X_val.shape[0] - 1)
sample_input = X_val[idx:idx+1]  # shape: (1, T, features)

# Predict scaled coordinates
pred_scaled = model.predict(sample_input)

# Rescale to real-world coordinates
pred_coords = pred_scaled * (Y_max - Y_min) + Y_min
true_coords = Y_val[idx:idx+1] * (Y_max - Y_min) + Y_min

# Print results
for t in range(sample_input.shape[1]):
    print(f"Step {t+1}: True = {true_coords[0][t]}, Predicted = {pred_coords[0][t]}")
