# Indoor Positioning

In [23]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras import models
from keras import layers
import json
import random
import numpy as np
from matplotlib import pyplot as plt
from math import sqrt
from statistics import mean

## Load Data

Copy local_records.json file to the same directory as this script, and then run:

In [24]:
# load local_records.json data
def load_data(file_name):
    with open(file_name, 'r') as file:
        data = file.read()
    json_list = json.loads(data)
    return json_list

In [25]:
json_list = load_data('local_records.json')

## Split dataset

The data is split into 80% training data, 20% testing data

In [None]:
# shuffle the list
random.Random(7).shuffle(json_list)
# split into train data and test data
ratio_of_train_data = 0.8
cut_i = int(ratio_of_train_data * len(json_list))
train = json_list[:cut_i]
test = json_list[cut_i:]

In [None]:
print(len(train))
print(len(test))

## Extract features and lables

feature: [RSSI of AP_1_2GHZ, RSSI of AP_1_5GHZ, RSSI of AP_2_2GHZ, RSSI of AP_2_5GHZ, RSSI of AP_3_2GHZ, RSSI of AP_3_5GHZ, RSSI of AP_4_2GHZ, RSSI of AP_4_5GHZ]

label: [x , y]

In [26]:
# extract features and lables from data
index_map = {'SCSLAB_AP_1_2GHZ': 1, 'SCSLAB_AP_1_5GHZ': 2, 'SCSLAB_AP_2_2GHZ': 3,\
            'SCSLAB_AP_2_5GHZ': 4, 'SCSLAB_AP_3_2GHZ': 5, 'SCSLAB_AP_3_5GHZ': 6,\
            'SCSLAB_AP_4_2GHZ': 7, 'SCSLAB_AP_4_5GHZ': 8}

In [27]:
def extract_feature_and_label(l):
    feature_list = []
    lable_list = []
    for json_obj in l:
        # extract lable
        x = json_obj.get('ref_x')
        y = json_obj.get('ref_y')
        lable_list.append([x, y])
        # extract feature
        #feature = [0 for i in range(9)]
        feature = [0 for i in range(8)]
        #angle = json_obj.get('angle')
        #feature[0] = angle
        rssi_observations = json_obj.get('rssi_observations')
        for observation in rssi_observations:
            ssid = observation.get('SSID')
            index = index_map[ssid]
            feature[index - 1] = observation.get('RSSI')
        feature_list.append(feature)
    return np.array(feature_list, dtype='float64'), np.array(lable_list, dtype='float64')

In [None]:
train_data, train_labels = extract_feature_and_label(train)
test_data, test_labels = extract_feature_and_label(test)

## Normalize data

In [None]:
max_rss = train_data.max(axis=0)
min_rss = train_data.min(axis=0)

train_data = (train_data - min_rss)/(max_rss - min_rss)
test_data = (test_data - min_rss)/(max_rss - min_rss)

In [None]:
max_rss

In [None]:
min_rss

In [None]:
train_data

## Build model

Define Loss Function - Euclidean distance

In [28]:
def euclidean_distance(y_true, y_pred):
    return tf.reduce_mean(tf.norm(y_true - y_pred, axis=1, ord='euclidean'))

Define Model

In [29]:
def build_model(shape):
    model = models.Sequential()
    model.add(layers.Dense(256, activation='relu',input_shape=(shape,)))
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.Dense(2))
    model.compile(optimizer='adam', loss=euclidean_distance, metrics=['mse'])
    return model

In [None]:
model = build_model(train_data.shape[1])
history = model.fit(train_data, train_labels, epochs=50, batch_size=20, verbose=0, validation_split=0.2)
loss, mse = model.evaluate(test_data, test_labels)

In [None]:
model = build_model(train_data.shape[1])
history = model.fit(train_data, train_labels, epochs=50, batch_size=20, verbose=0, validation_split=0.2)

In [None]:
data = test_data[1:10]
tmp = model.predict(data)
print(tmp)
print(tmp[0])
print(tmp[0][1])

In [None]:
loss

In [None]:
mse

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')

plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
import time
for i in range(10):
    data = test_data[i]
    start = time.time()
    model.predict(np.array([data]))
    print(time.time() - start)

## Download model

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

In [None]:
with open('north_model.tflite', 'wb') as f:
    f.write(tflite_model)

## 10-fold Cross Validation

In [None]:
# number of samples in one fold
fold_samples_num = len(json_list) // 10
losses_list = []

for i in range(10):
    print('Processing fold {}'.format(i + 1))
    
    val_start = i * fold_samples_num
    val_end = (i + 1) * fold_samples_num
    # prepare validation data
    validation_dataset = json_list[val_start : val_end]
    validation_data, validation_label = extract_feature_and_label(validation_dataset)
    
    # prepare training data
    partial_training_dataset = np.concatenate([json_list[:val_start], json_list[val_end:]], axis=0)
    training_data, training_label = extract_feature_and_label(partial_training_dataset)
    
    # build and train model
    model = build_model(training_data.shape[1])
    model.fit(training_data, training_label, epochs=50, batch_size=20, verbose=0)
    loss, mse = model.evaluate(validation_data, validation_label)
    losses_list.append(loss)

In [None]:
print(losses_list)
print(np.mean(losses_list))

# Accuracy Test 1

In [10]:
def dispatch_direction(json_obj, north_list, east_list, south_list, west_list):
    angle = json_obj.get('angle')
    if angle > 315 or angle <= 45:
        north_list.append(json_obj)
    elif angle > 45 and angle <= 135:
        east_list.append(json_obj)
    elif angle > 135 and angle < 225:
        south_list.append(json_obj)
    elif angle <= 315 and angle > 225:
        west_list.append(json_obj)

# extract all the data at point (x,y)
def extract_data_at_one_point(x, y):
    other = []
    point_north = []
    point_east = []
    point_south = []
    point_west = []
    for json_obj in json_list:
        ref_x = json_obj.get('ref_x')
        ref_y = json_obj.get('ref_y')
        if ref_x == x and ref_y == y:
            dispatch_direction(json_obj, point_north, point_east, point_south, point_west)
        else:
            other.append(json_obj)
    return other, point_north, point_east, point_south, point_west

In [None]:
other, north_list, east_list, south_list, west_list = extract_data_at_one_point(1,1)

In [None]:
print(len(north_list))
print(len(east_list))
print(len(south_list))
print(len(west_list))
print(len(other))

In [11]:
 test_points = [(1,1), (1,5), (2,10), (5,4), (9,4), (9,1), (11,2), (6,1), (7,6), (4,9), (4,3), (6,3), (6,5), (4,5)]

In [12]:
def get_raw_train_and_test_data(data_list, direction_index):
    # data_list[0] is the list 'other' 
    train_raw_data = data_list[0]
    test_raw_data = []
    for k in range(1, 5):
        if k != direction_index:
            train_raw_data += data_list[k]
    # the list gonna to pick 50 samples randomly for test, and the rest should be added to train data
    direction_list_for_test = data_list[direction_index]
    random.shuffle(direction_list_for_test)
    test_raw_data += direction_list_for_test[:50]
    train_raw_data += direction_list_for_test[50:]
    return train_raw_data, test_raw_data

def normalize_data(train_data, test_data):
    max_rss = train_data.max(axis=0)
    min_rss = train_data.min(axis=0)
    train_data = (train_data - min_rss)/(max_rss - min_rss)
    test_data = (test_data - min_rss)/(max_rss - min_rss)
    
def calculate_error(model, test_data, test_labels):
    prediction_list = model.predict(test_data)
    errors = []
    for i in range(len(prediction_list)):
        pred = prediction_list[i]
        real = test_labels[i]
        e = sqrt(sum((pred - real)**2))
        errors.append(e)
    return mean(errors)

In [None]:
point_error_map = dict()
# for all the test points
for point in test_points:
    x = point[0]
    y = point[1]
    data_list = extract_data_at_one_point(x,y)
    error_list = []
    # repeat 1000 times
    for i in range(1000):
        direction_error_list = []
        # repeat for N, E, S, W
        for j in range(1, 5):
            train_raw_data, test_raw_data = get_raw_train_and_test_data(data_list, j)
            
            # extract features from raw json data
            train_data, train_labels = extract_feature_and_label(train_raw_data)
            test_data, test_labels = extract_feature_and_label(test_raw_data)
            
            # normalize train data and test data
            normalize_data(train_data, test_data)
            
            # build and train the model
            model = build_model(train_data.shape[1])
            model.fit(train_data, train_labels, epochs=50, batch_size=20, verbose=0, validation_split=0.2)
            
            # calculate the mean error
            e = calculate_error(model, test_data, test_labels)
            direction_error_list.append(e)
        error_list.append(mean(direction_error_list))
    point_error_map[point] = mean(error_list)



# Accuracy Test 2

In [30]:
# key: point 
# value: raw json data at the point
point_data_map = dict()
for json_obj in json_list:
    ref_x = json_obj.get('ref_x')
    ref_y = json_obj.get('ref_y')
    point = (ref_x, ref_y)
    if point_data_map.get(point) is None:
        point_data_map[point] = [json_obj]
    else:
        point_data_map[point].append(json_obj)

In [31]:
print(len(json_list) // 10 // 54)

75


In [34]:
def get_train_and_test_data():
    train_data = []
    test_data = []
    for point in point_data_map:
        data = point_data_map[point]
        random.shuffle(data)
        test_data += data[:75]
        train_data += data[75:]
    return train_data, test_data

def normalize_data(train_data, test_data):
    max_rss = train_data.max(axis=0)
    min_rss = train_data.min(axis=0)
    train_data = (train_data - min_rss)/(max_rss - min_rss)
    test_data = (test_data - min_rss)/(max_rss - min_rss)

In [39]:
point_error_map_list = []

for _ in range(10):
    train_raw_data, test_raw_data = get_train_and_test_data()
    
    # extract features from raw json data
    train_data, train_labels = extract_feature_and_label(train_raw_data)
    test_data, test_labels = extract_feature_and_label(test_raw_data)
    
    # normalize train data and test data
    normalize_data(train_data, test_data)
    
    # build and train the model
    model = build_model(train_data.shape[1])
    model.fit(train_data, train_labels, epochs=50, batch_size=20, verbose=0, validation_split=0.2)
    
    # calculate error
    prediction_list = model.predict(test_data)
    point_error_list_map = dict()
    for i in range(len(prediction_list)):
        pred = prediction_list[i]
        real = test_labels[i]
        e = sqrt(sum((pred - real)**2))
        label = (real[0], real[1])
        if point_error_list_map.get(label) is None:
            point_error_list_map[label] = [e]
        else:
            point_error_list_map[label].append(e)
    
    # calculate mean error on every points
    for point in point_error_list_map:
        error_list = point_error_list_map[point]
        point_error_list_map[point] = mean(error_list)
    
    point_error_map_list.append(point_error_list_map)



In [40]:
point_mean_error = dict()
for point_error_map in point_error_map_list:
    for point in point_error_map:
        e = point_error_map[point]
        if point_mean_error.get(point) is None:
            point_mean_error[point] = [e]
        else:
            point_mean_error[point].append(e)
for point in point_mean_error:
    point_mean_error[point] = mean(point_mean_error[point])

In [41]:
point_mean_error

{(4.0, 6.0): 0.5992055081591194,
 (3.0, 6.0): 0.6398665260675199,
 (2.0, 6.0): 0.6686195892300383,
 (1.0, 6.0): 0.5870194576153228,
 (4.0, 5.0): 0.45214113302781683,
 (3.0, 5.0): 0.5344676827260528,
 (1.0, 5.0): 0.8038753085604944,
 (1.0, 4.0): 0.5994764553762603,
 (1.0, 3.0): 1.0532681991922337,
 (4.0, 4.0): 0.4403697163023755,
 (4.0, 3.0): 0.5039032405400933,
 (5.0, 3.0): 0.3705735190473584,
 (6.0, 5.0): 0.7439063145745783,
 (5.0, 5.0): 0.352239484227578,
 (5.0, 4.0): 0.43353482723869713,
 (6.0, 3.0): 0.8728468710726418,
 (6.0, 4.0): 0.834757951463432,
 (9.0, 5.0): 0.922069487246144,
 (6.0, 6.0): 0.31201846366311004,
 (8.0, 5.0): 0.6109045487946602,
 (7.0, 5.0): 0.6760216872612765,
 (8.0, 4.0): 0.6587782274874333,
 (7.0, 6.0): 0.4368412688161915,
 (9.0, 4.0): 0.7551004310226614,
 (1.0, 1.0): 0.9111117611174286,
 (2.0, 1.0): 0.5046187864726434,
 (3.0, 1.0): 0.7306741185937016,
 (1.0, 2.0): 0.7735588838484987,
 (10.0, 4.0): 0.6763472681378719,
 (10.0, 3.0): 0.5111982858491493,
 (11.0, 

In [43]:
all_p_me = []
for e in point_mean_error.values():
    all_p_me.append(e)
print("Mean error for all points: " + str(mean(all_p_me)))

Mean error for all points: 1.4120769080438957


In [45]:
p_me = []
for e in point_mean_error.values():
    if e < 2:
        p_me.append(e)
print("Mean error after remove coner points: " + str(mean(p_me)))

Mean error after remove coner points: 0.6389431162646771
