#LoRa Signal Classifier
With Experimentation Data.

Imports and Constants



In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Flatten, Dense, Dropout
import tensorflow as tf
import numpy as np
import os, re

PATH_DATASETS = ''
PATH_MODELS = ''
PATH_RESULTS = ''
NUM_CLASSES = 18
MODEL_INPUT_SHAPE = (512, 1)
MODEL_OUTPUT_SHAPE = (NUM_CLASSES,)

Data

In [None]:
def one_hot_encode(config, num_classes= NUM_CLASSES):
    # Example: one_hot_encode(config=1, num_classes=18) -> np.array([0, 1, 0, ..., 0, 0, 0])
    if config < 0 or config >= num_classes:
        raise ValueError('config must be between 0 and num_classes-1')
    return np.eye(num_classes)[config]

def parse_file_name(file_name):
  # Example: parse_file_name('IF_Sample_P04_C01_N0001.npy') -> {'power': '4', 'config': '1', 'count': '1', 'label': one_hot_encode(config=1, num_classes=18)}
  pattern = re.compile(r'IF_Sample_P(?P<power>\d+)_C(?P<config>\d+)_N(?P<count>\d+).npy')
  match = pattern.match(file_name)
  if match:
    return {
      'power': int(match.group('power')),
      'config': int(match.group('config')),
      'count': int(match.group('count')),
      'label': one_hot_encode(config=int(match.group('config')))
    }
  else:
      raise ValueError('file_name does not match the pattern')

def data_generator(path_dataset, file_list, batch_size):
  while True:
    batch_data, batch_labels = [], []
    for file_name in file_list:
      sample = np.load(os.path.join(path_dataset, file_name))
      label = parse_file_name(file_name)['label']
      if sample.shape == MODEL_INPUT_SHAPE and label.shape == MODEL_OUTPUT_SHAPE:
        batch_data.append(sample)
        batch_labels.append(label)
      if len(batch_data) == batch_size:
        yield np.array(batch_data), np.array(batch_labels)
        batch_data, batch_labels = [], []
    if batch_data:
      yield np.array(batch_data), np.array(batch_labels)

def group_files_to_dict(files, field):
  groups = {}
  for file_name in files:
    value = parse_file_name(file_name)[field]
    if value not in groups:
        groups[value] = []
    groups[value].append(file_name)
  return groups

In [None]:
import random

def split_data_uniform(data, val_split, max_n, shuffle=True):
    data_dict = {}
    for filename in data:
        key = "_".join(filename.split("_")[2:4])  # Get 'Pxx_Cxx' from the filename
        if key not in data_dict:
            data_dict[key] = []
        data_dict[key].append(filename)

    # Now we perform train-test split for each 'Pxx_Cxx' group
    train_files, test_files = [], []
    split_index = int((max_n + 1) * (1 - val_split))  # Split index based on the split ratio
    for key in data_dict:
        # Shuffle and split the files
        if shuffle:
            random.shuffle(data_dict[key])

        test_files.extend(data_dict[key][split_index:])
        if key[:3] == 'P00' or key[:3] == 'P02':
          continue
        train_files.extend(data_dict[key][:split_index])

    return train_files, test_files

Model

In [None]:
def create_model():
  model = Sequential([
      Flatten(input_shape=MODEL_INPUT_SHAPE),
      Dense(16, activation='tanh'),
      Dense(16, activation='tanh'),
      Dropout(0.5),
      Dense(NUM_CLASSES, activation='softmax')
  ])
  model.compile(loss=tf.keras.losses.categorical_crossentropy,
                optimizer=tf.keras.optimizers.Adam(),
                metrics=['accuracy'])
  return model

def load_model(model_name):
  path_model = os.path.join(PATH_MODELS, model_name)
  return tf.keras.models.load_model(path_model)

def save_model(model, model_name):
  path_model = os.path.join(PATH_MODELS, model_name)
  model.save(path_model)

In [None]:
def train_model(model, path_dataset, train_data, validation_data, epochs,
                batch_size):
  num_samples = len(train_data)
  steps_per_epoch = (num_samples // batch_size) - 1
  train_data_gen = data_generator(path_dataset, train_data, batch_size)

  num_val_samples = len(validation_data)
  validation_steps = (num_val_samples // batch_size)-1
  val_data_gen = data_generator(path_dataset, validation_data, batch_size)

  model.fit(
      train_data_gen,
      epochs=epochs,
      steps_per_epoch=steps_per_epoch,
      validation_data=val_data_gen,
      validation_steps=validation_steps,
      verbose=1)

  return model

In [None]:
def evaluate_model(model, path_dataset, test_files, validation_steps):
    test_data = group_files_to_dict(test_files, field= 'power')
    result = {}
    for power_level in test_data.keys():
        values = []
        data_gen = data_generator(path_dataset, test_data[power_level], batch_size=16)
        for _ in range(validation_steps):
            samples, labels = next(data_gen)
            predictions = model.predict(samples)
            values += [np.array(v) for v in zip(predictions, labels)]
        result[power_level] = np.array(values)
    return result

Experimentation

In [None]:
def Monte_Carlo_Cross_Validation(path_dataset, num_iterations= 15,
                                 power_levels= list(range(0, 21, 4)),
                                 val_split= 0.2, epochs= 10,
                                 batch_size= 16):
  data = [entry.name
          for entry in os.scandir(path_dataset)
          if entry.is_file() and entry.name.endswith('.npy')
          and entry.name.startswith('IF_Sample')]
  # Add code to filter data based on power_levels.
  results = []
  for i in range(num_iterations):
    print(f'Monte Carlo CV iteration {i+1}/{num_iterations}')
    train_files, validation_files = split_data_uniform(data, val_split, 50)
    model = create_model()
    model = train_model(model, path_dataset, train_files, validation_files,
                        epochs, batch_size)
    num_val_samples = len(validation_files)
    validation_steps = (num_val_samples // batch_size)-1

    result = evaluate_model(model, path_dataset, validation_files, validation_steps)
    results.append(result)
    np.save(os.path.join(PATH_RESULTS, f'result_{i+1}.npy'), np.array(results))
  return np.array(results)


In [None]:
def main():
  dataset_name = 'SDR_Dataset2'
  num_iterations= 30

  path_dataset = os.path.join(PATH_DATASETS, dataset_name)
  results = Monte_Carlo_Cross_Validation(path_dataset,
                                         num_iterations,
                                         epochs= 20,
                                         batch_size= 8)
  np.save(os.path.join(PATH_RESULTS, 'exp_results_5.npy'), results)

if __name__ == "__main__":
  main()