In [5]:
import numpy as np
import pandas as pd
import tensorflow as tf
import os
from sklearn.model_selection import train_test_split, KFold
import shutil
import keras_tuner as kt

# Load datasets
actual_data = pd.read_csv('original_CMKL1.csv')
synthetic_data_ml = pd.read_csv('synthetic_data_RandomSeaerch_Ensemble.csv')
synthetic_data_gan = pd.read_csv('GANs_synthetic_data.csv')

# Combine synthetic datasets for the ML+GAN case
synthetic_data_combined = pd.concat([synthetic_data_ml, synthetic_data_gan])

# Create directory for saving models
if not os.path.exists('TFLite_NEW_senior_saved_model'):
    os.makedirs('TFLite_NEW_senior_saved_model')

# Define functions to build models in TensorFlow that approximate scikit-learn models

def build_logistic_regression(hp):
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

def build_decision_tree_classifier(hp):
    units = hp.Int('units', 32, 256, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

def build_decision_tree_regressor(hp):
    units = hp.Int('units', 32, 256, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units, activation='relu'),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='mean_squared_error')
    return model

def build_random_forest_classifier(hp):
    units1 = hp.Int('units1', 64, 256, step=32)
    units2 = hp.Int('units2', 32, 128, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units1, activation='relu'),
        tf.keras.layers.Dense(units2, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

def build_random_forest_regressor(hp):
    units1 = hp.Int('units1', 64, 256, step=32)
    units2 = hp.Int('units2', 32, 128, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units1, activation='relu'),
        tf.keras.layers.Dense(units2, activation='relu'),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='mean_squared_error')
    return model

def build_gradient_boosting_classifier(hp):
    units1 = hp.Int('units1', 64, 256, step=32)
    units2 = hp.Int('units2', 32, 128, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units1, activation='relu'),
        tf.keras.layers.Dense(units2, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

def build_gradient_boosting_regressor(hp):
    units1 = hp.Int('units1', 64, 256, step=32)
    units2 = hp.Int('units2', 32, 128, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units1, activation='relu'),
        tf.keras.layers.Dense(units2, activation='relu'),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='mean_squared_error')
    return model

def build_knn_classifier(hp):
    units1 = hp.Int('units1', 64, 256, step=32)
    units2 = hp.Int('units2', 32, 128, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units1, activation='relu'),
        tf.keras.layers.Dense(units2, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

def build_knn_regressor(hp):
    units1 = hp.Int('units1', 64, 256, step=32)
    units2 = hp.Int('units2', 32, 128, step=32)
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(18,)),
        tf.keras.layers.Dense(units1, activation='relu'),
        tf.keras.layers.Dense(units2, activation='relu'),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer=tf.keras.optimizers.Adam(hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
                  loss='mean_squared_error')
    return model

# Hyperparameter tuning function with cross-validation
def cross_val_hyperparameter_tuning(X, y, model_builder, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    fold_metrics = []
    
    # Clear the tuner directory before each run
    tuner_dir = 'tuner_dir'
    if os.path.exists(tuner_dir):
        shutil.rmtree(tuner_dir)

    for train_index, val_index in kf.split(X):
        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]
        
        tuner = kt.RandomSearch(
            model_builder,
            objective='val_loss',
            max_trials=10,
            executions_per_trial=1,
            directory='tuner_dir',
            project_name='tuning'
        )

        tuner.search(X_train, y_train, epochs=50, validation_data=(X_val, y_val))
        best_models = tuner.get_best_models(num_models=1)
        
        if best_models:
            best_model = best_models[0]
            val_loss = best_model.evaluate(X_val, y_val, verbose=0)
            fold_metrics.append(val_loss)
        else:
            print("No valid model found for this fold.")
    
    if fold_metrics:
        avg_metric = np.mean(fold_metrics)
        return best_model, avg_metric
    else:
        raise RuntimeError("All folds failed to produce a valid model.")

# Function to create combined dataset with given ratio for each floor
def create_combined_dataset(actual_data, synthetic_data, ratio):
    combined_data = pd.DataFrame()
    floors = actual_data['z'].unique()
    for floor in floors:
        actual_floor_data = actual_data[actual_data['z'] == floor]
        synthetic_floor_data = synthetic_data[synthetic_data['z'] == floor]
        
        n_actual = int(len(actual_floor_data) * ratio)
        n_synthetic = len(actual_floor_data) - n_actual
        
        actual_sample = actual_floor_data.sample(n_actual, random_state=42)
        if len(synthetic_floor_data) < n_synthetic:
            synthetic_sample = synthetic_floor_data.sample(n_synthetic, replace=True, random_state=42)
        else:
            synthetic_sample = synthetic_floor_data.sample(n_synthetic, random_state=42)
        
        combined_floor_data = pd.concat([actual_sample, synthetic_sample])
        combined_data = pd.concat([combined_data, combined_floor_data], ignore_index=True)
        
    return combined_data

# Function to train and convert model to TensorFlow Lite
def train_and_convert_to_tflite(X, y, model_builder, model_name, dataset_name, ratio):
    best_model, avg_metric = cross_val_hyperparameter_tuning(X, y, model_builder)
    
    # Convert to TensorFlow Lite
    converter = tf.lite.TFLiteConverter.from_keras_model(best_model)
    tflite_model = converter.convert()

    # Save the model
    model_path = f'TFLite_NEW_senior_saved_model/{dataset_name}_{ratio}_{model_name}.tflite'
    with open(model_path, 'wb') as f:
        f.write(tflite_model)
    
    return model_path, avg_metric

# Function to calculate Mean Distance Error
def mean_distance_error(y_true, y_pred):
    return np.mean(np.sqrt(np.sum((y_true - y_pred) ** 2, axis=1)))

# Train and evaluate models with cross-validation and hyperparameter tuning
ratios = [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1]
datasets = {
    'actual_ml': synthetic_data_ml,
    'actual_gan': synthetic_data_gan,
    'actual_ml_gan': synthetic_data_combined
}

model_builders = {
    'LogisticRegression': build_logistic_regression,
    'DecisionTreeClassifier': build_decision_tree_classifier,
    'DecisionTreeRegressor': build_decision_tree_regressor,
    'RandomForestClassifier': build_random_forest_classifier,
    'RandomForestRegressor': build_random_forest_regressor,
    'GradientBoostingClassifier': build_gradient_boosting_classifier,
    'GradientBoostingRegressor': build_gradient_boosting_regressor,
    'KNeighborsClassifier': build_knn_classifier,
    'KNeighborsRegressor': build_knn_regressor
}

for dataset_name, synthetic_data in datasets.items():
    for ratio in ratios:
        combined_data = create_combined_dataset(actual_data, synthetic_data, ratio)
        
        # Convert DataFrame to numpy array
        X = combined_data[[f'RSSI{i+1}' for i in range(18)]].values
        y_floor = combined_data['z'].values
        y_coord = combined_data[['x', 'y']].values
        
        # Floor classification training and evaluation
        for model_name in ['LogisticRegression', 'DecisionTreeClassifier', 'RandomForestClassifier', 'GradientBoostingClassifier', 'KNeighborsClassifier']:
            model_path, avg_metric = train_and_convert_to_tflite(X, y_floor, model_builders[model_name], model_name, f'{dataset_name}_{ratio}_floor', ratio)
            print(f'Model saved to {model_path}, Average Validation Loss: {avg_metric}')
        
        # Coordinate regression training and evaluation
        for model_name in ['DecisionTreeRegressor', 'RandomForestRegressor', 'GradientBoostingRegressor', 'KNeighborsRegressor']:
            model_path, avg_metric = train_and_convert_to_tflite(X, y_coord, model_builders[model_name], model_name, f'{dataset_name}_{ratio}_coord', ratio)
            print(f'Model saved to {model_path}, Average Validation Loss: {avg_metric}')


Trial 10 Complete [00h 00m 19s]
val_loss: 85.00841522216797

Best val_loss So Far: 84.27782440185547
Total elapsed time: 00h 03m 09s


  saveable.load_own_variables(weights_store.get(inner_path))


Reloading Tuner from tuner_dir\tuning\tuner0.json
Reloading Tuner from tuner_dir\tuning\tuner0.json
Reloading Tuner from tuner_dir\tuning\tuner0.json
Reloading Tuner from tuner_dir\tuning\tuner0.json
INFO:tensorflow:Assets written to: C:\Users\prabw\AppData\Local\Temp\tmpa5e7btwn\assets


INFO:tensorflow:Assets written to: C:\Users\prabw\AppData\Local\Temp\tmpa5e7btwn\assets


Saved artifact at 'C:\Users\prabw\AppData\Local\Temp\tmpa5e7btwn'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 18), dtype=tf.float32, name='keras_tensor')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  2867363840080: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2867406595344: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2867406589776: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2867406598416: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2867406598608: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2867406595728: TensorSpec(shape=(), dtype=tf.resource, name=None)
Model saved to TFLite_NEW_senior_saved_model/actual_ml_gan_0.1_coord_0.1_KNeighborsRegressor.tflite, Average Validation Loss: 81.79132843017578


In [6]:
import numpy as np
import pandas as pd
import tensorflow as tf
import os

# Function to calculate Mean Distance Error
def mean_distance_error(y_true, y_pred):
    return np.mean(np.sqrt(np.sum((y_true - y_pred) ** 2, axis=1)))

# Function to load a TensorFlow Lite model and make predictions
def load_and_predict_with_tflite_model(model_path, X_test):
    # Load the TensorFlow Lite model
    interpreter = tf.lite.Interpreter(model_path=model_path)
    interpreter.allocate_tensors()

    # Get input and output details
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Prepare the input data in the format expected by the model
    X_test = np.array(X_test, dtype=np.float32)

    # Predict using the TensorFlow Lite model
    predictions = []
    for i in range(len(X_test)):
        interpreter.set_tensor(input_details[0]['index'], [X_test[i]])
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        predictions.append(output_data[0])

    return np.array(predictions)

# Load the actual dataset for testing
actual_test_data = pd.read_csv('original_CMKL1.csv')

# Prepare the data for testing
X_test = actual_test_data[[f'RSSI{i+1}' for i in range(18)]]
y_test_coord = actual_test_data[['x', 'y']]

# Define the directory where models are saved
model_directory = 'TFLite_NEW_senior_saved_model'

# Iterate over the models and calculate Mean Distance Error
for ratio in [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1]:
    for dataset_name in ['actual_ml', 'actual_gan', 'actual_ml_gan']:
        for model_name in ['DecisionTreeRegressor', 'RandomForestRegressor', 'GradientBoostingRegressor', 'KNeighborsRegressor']:
            model_path = f'{model_directory}/{dataset_name}_{ratio}_coord_{ratio}_{model_name}.tflite'
            print(f'Testing model: {model_path}')
            
            # Predict using the TensorFlow Lite model
            predictions = load_and_predict_with_tflite_model(model_path, X_test)
            
            # Calculate Mean Distance Error
            mde = mean_distance_error(y_test_coord.values, predictions)
            print(f'Mean Distance Error for {dataset_name} at ratio {ratio} with model {model_name}: {mde} meters')


Testing model: TFLite_NEW_senior_saved_model/actual_ml_0.9_coord_0.9_DecisionTreeRegressor.tflite
Mean Distance Error for actual_ml at ratio 0.9 with model DecisionTreeRegressor: 11.800111200029365 meters
Testing model: TFLite_NEW_senior_saved_model/actual_ml_0.9_coord_0.9_RandomForestRegressor.tflite
Mean Distance Error for actual_ml at ratio 0.9 with model RandomForestRegressor: 11.565227162020669 meters
Testing model: TFLite_NEW_senior_saved_model/actual_ml_0.9_coord_0.9_GradientBoostingRegressor.tflite
Mean Distance Error for actual_ml at ratio 0.9 with model GradientBoostingRegressor: 11.56880737640622 meters
Testing model: TFLite_NEW_senior_saved_model/actual_ml_0.9_coord_0.9_KNeighborsRegressor.tflite
Mean Distance Error for actual_ml at ratio 0.9 with model KNeighborsRegressor: 11.577313107221768 meters
Testing model: TFLite_NEW_senior_saved_model/actual_gan_0.9_coord_0.9_DecisionTreeRegressor.tflite
Mean Distance Error for actual_gan at ratio 0.9 with model DecisionTreeRegress