<a href="https://colab.research.google.com/github/AllisonOge/predictive_modeling/blob/main/predictive_modeling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [18]:
# import tflite_runtime.interpreter as tflite
import tensorflow.lite as tflite
import numpy as np
import os

# import tensorflow as tf


In [3]:
# utility function
def start_and_idle_time(bit_sequence):
    bit_sequence = np.asarray(bit_sequence, dtype=np.int32)
    start_time = 0
    idle_time = 0
    idle_times = []
    for i, bit in enumerate(bit_sequence):
        # print(i, bit)
        if i > 0:
            if bit == 0 and bit_sequence[i-1] == 1:
                # 1,0
                start_time = i
                idle_time += 1
            elif bit == 0 and bit_sequence[i-1] == 0:
                # 0,0
                idle_time += 1
            elif bit == 1 and bit_sequence[i-1] == 0:
                # 0,1
                idle_times.append((start_time, idle_time))
                idle_time = 0
            else:
                # 1,1
                continue
        else:
            if bit == 0:
                start_time = i
                idle_time += 1

        if i == len(bit_sequence)-1 and idle_time > 0:
            idle_times.append((start_time, idle_time))

    return np.array(idle_times)

In [4]:
class ML:
    """ML Parent Class to prepare the data and load model
       Attributes
       ----------
            dataset: (Array-like) saves the latest channel state within window size
            name: (string) name of the model
            window_size:(int) length of channel state to store in memory (default=10)
            model_interpreter: tflite interpreter to load model

        Methods
        -------
            update_dataset(channel_state)

    """

    def __init__(self, name: str, window_size: int = 10, model_path: str = os.path.abspath("./models/model.tflite")):
        self.dataset: list = []
        self.name = name
        self.window_size: int = window_size
        self.model_interpreter = tflite.Interpreter(model_path=model_path)
        # allocate memory
        self.model_interpreter.allocate_tensors()
        self.selected_channel = None

    def update_dataset(self, channel_state: list) -> None:
        """Update the dataset buffer

           Parameters
           ----------
                channel_state: (1-D Array-like) state of the channel

            Return 
            ------
                None
        """
        self.dataset.append(channel_state)

    def get_prediction(self, channel_state) -> None or dict:
        """Prepare data and invoke model inference

           Parameters
           ----------
                channel_state: (1-D Array-like) state of the channel

            Return
            ------
                None or predictions
        """
        # update dataset and reshape to window size
        self.update_dataset(channel_state)

        if not len(self.dataset) >= self.window_size:
            return

        self.dataset = self.dataset[:self.window_size]

        # set input tensor for the model
        X = np.array([r for r in self.dataset], dtype=np.float32).reshape(
            (-1, self.window_size, len(channel_state)))

        idx = self.model_interpreter.get_input_details()[0]["index"]
        # set input tensor
        self.model_interpreter.set_tensor(idx, X)
        # invoke predictions
        self.model_interpreter.invoke()

        idx = self.model_interpreter.get_output_details()[0]["index"]
        return self.model_interpreter.get_tensor(idx)

    def select_channel(self, channel_state):
        pass


# Model 1: predicts the next state

## Algorithm

- for every time slot get the predictions and select the channel with free state whose occupancy is the lowest from past data
- repeat for every time slot


In [57]:
class CS1(ML):
    """Channel Selection Algorithm 1

       Make inference on the next state of the channel and selects a channel 
       with the least occupancy

       Attributes
       ----------
        occ_sum: (list)
        occupancies: (list)
        counter: (int)
    """

    def __init__(self, name: str, nchannels: int, window_size: int = 10, model_path=...):
        super().__init__(name, window_size, model_path)
        self.occ_sums = []
        self.occupancies = [0, ] * nchannels
        self.counter = 0

    def update_dataset(self, channel_state):
        self.dataset.append(channel_state)
        self.occupancies = self.update_occupancies(channel_state)

    def update_occupancies(self, channel_state):
        """Compute the latest occupancies of the channel

           Steps:
                update the sums of states and increment counter
                new occupancy = sum of states / counter

            Parameters
            ----------

        """
        if len(self.occ_sums) > 0:
          self.occ_sums = [r+channel_state[i]
                          for i, r in enumerate(self.occ_sums)]
        else:
          self.occ_sums = channel_state

        self.counter += 1
        return [o / self.counter for o in self.occ_sums]

    def select_channel(self, channel_state):
        """Make predictions and select the channel with the least occupancy

           Parameters
           ----------
                channel_state: (1-D array-like) state of the channel

            Returns
            -------
                None or selected_channel
        """
        channel_state = list(channel_state)
        # get predictions
        preds = self.get_prediction(channel_state)
        if preds is None:
            return

        preds = (np.array(preds).flatten() > 0.5).astype(int)
        free_channels = [i for i, s in enumerate(preds) if s == 0]
        # no channel is free
        if len(free_channels) == 0:
            return
        if self.selected_channel in free_channels:
            return self.selected_channel
        # only one channel
        if len(free_channels) == 1:
            self.selected_channel = free_channels[0]
            return self.selected_channel
        # select the least occupancy
        latest_occ = [i for i, _ in enumerate(
            self.occupancies) if i in free_channels]
        self.selected_channel = [i for i, val in enumerate(
            latest_occ) if val == min(latest_occ)][0]
        return self.selected_channel


# Model 2: predicts the idle time

## Algorithm

- for every time slot get the predictions and select channel with higher idle time prediction
- repeat for every time slot


In [6]:
class CS2(ML):
    """Channel Selection Algorithm 2

       Make inference on the idle time of the channel 
       and select channel with highest idle time

       Attributes
       ----------
            dataset: (Array-like) saves the latest channel state within window size
            window_size:(int) length of channel state to store in memory (default=10)
            model_interpreter: tflite interpreter to load model
            selected_channel: (int) channel selected by algorithm

        Methods
        -------
            update_dataset(channel_state)

    """

    def prepare_dataset(self):
        """ Compute the idletimes of the channel states sequence
            stored in dataset and updates it

            This method transforms the bits of ones and zeros to
            the idle time at each time slot e.g., a sequence of 
            [1, 0, 0, 0, 1, 0, 1] has the idle time representation
            of [0, 3, 2, 1, 0, 1, 0]

            Given the channel states tensor it transforms it to the
            corresponding idle times

            Parameters
            ----------
              None

            Returns
            -------
              None
        """
        idle_times = []
        for i in range(len(self.dataset)):
            idle_times.append([j[0][1] if len(j) > 0 and i+j[0][0] <= i else 0
                               for j in list(map(start_and_idle_time, np.transpose(self.dataset[i:])))])
        self.dataset = [r for r in idle_times]

    def select_channel(self, channel_state) -> None or int:
        """Select a channel with highest idle time prediction

           Parameters
           ----------
                channel_state: (1-D array-like) state of the channel

            Returns
            -------
                None or selected_channel

        """
        channel_state = list(channel_state)
        self.update_dataset(channel_state)
        if not len(self.dataset) >= self.window_size:
            return
        # prepare the data
        self.prepare_dataset()
        # get predictions
        preds = self.get_prediction(channel_state)
        preds = np.array(preds).flatten()

        if preds is None:
            return
        # if no channel is free
        if max(preds) <= 0:
            return
        # select channel of highest idle time
        self.selected_channel = [
            i for i, val in enumerate(preds) if val == max(preds)][0]
        return self.selected_channel

# Test the models 🧪

In [7]:
import pandas as pd


data = pd.read_csv("https://raw.githubusercontent.com/AllisonOge/predictive_modeling/main/sensor_ml.csv", index_col="id")
data = data.drop_duplicates(subset="created_at")
sensor = data[["chan_1", "chan_2", "chan_3", "chan_4"]].to_numpy()

In [8]:
sensor

array([[1, 1, 0, 1],
       [1, 1, 0, 1],
       [1, 1, 0, 1],
       ...,
       [1, 1, 0, 1],
       [0, 0, 0, 0],
       [0, 0, 0, 0]])

In [9]:
# generate yeild
channel_state = (s for s in sensor)
channel_state

<generator object <genexpr> at 0x7fab9570e750>

In [58]:
import unittest

class TestCS1(unittest.TestCase):
  def setUp(self):
    nchannels = sensor.shape[1]
    self.cs1 = CS1("Next State", nchannels, model_path="model.tflite")

  def test_nochannel(self):
    selected_channel = [self.cs1.select_channel(next(channel_state)) for _ in range(9)]
    self.assertEqual(all(selected_channel), False, "expected the selected channels to be all none")

  def test_predictions(self):
    [self.cs1.select_channel(next(channel_state)) for _ in range(9)]
    selected_channel = self.cs1.select_channel(next(channel_state))
    self.assertTrue(type(selected_channel) is int or selected_channel is None, "expected selected channel to be an integer or None")


tests = TestCS1()
tests_loaded = unittest.TestLoader().loadTestsFromModule(tests)
unittest.TextTestRunner().run(tests_loaded)


..
----------------------------------------------------------------------
Ran 2 tests in 0.016s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>

In [59]:
class TestCS2(unittest.TestCase):
  def setUp(self):
    self.cs2 = CS2("Idletime", model_path="model2.tflite")

  def test_nochannel(self):
    selected_channel = [self.cs2.select_channel(next(channel_state)) for _ in range(8)]
    self.assertEqual(all(selected_channel), False, "expected the selected channels to be all none")

  def test_predictions(self):
    [self.cs2.select_channel(next(channel_state)) for _ in range(8)]
    selected_channel = self.cs2.select_channel(next(channel_state))
    self.assertTrue(type(selected_channel) is int or selected_channel is None, "expected selected channel to be an integer or None")


tests = TestCS2()
tests_loaded = unittest.TestLoader().loadTestsFromModule(tests)
unittest.TextTestRunner().run(tests_loaded)


..
----------------------------------------------------------------------
Ran 2 tests in 0.021s

OK


<unittest.runner.TextTestResult run=2 errors=0 failures=0>