# Test implementation on 2D to 1D problems
### Problem Setup
To start testing the algorithm, we are going to try to optimize finding the absolute minima of a function with
one input and one output. Lets start defining this function:

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import warnings
from sklearn.gaussian_process import GaussianProcessRegressor
from scipy.stats import norm

plt.style.use('dark_background')

In [21]:
# Implementation of the same algorithm using out own tool
from search_space import InputValueSpace, SearchSpace, choice, integer_random, transform_exp_base_10, static, float_random
from evaluation_metrics import *
from sklearn.linear_model import LogisticRegression
from typing import Union
import micromlgen

# objective function is to MAXIMIZE the value of the function
class ObjectiveFunction:
    def __init__(self, model: LogisticRegression, base_toi: int, metric: callable, metric_args: list = None,
                 constraints: dict = None):
        self.name = self.__class__.__name__
        self.model = model
        self.metric = metric
        self.metric_args = {"model_size": 3000, "model_latency": 200} if metric_args is None else metric_args
        self.model_latency = base_toi
        self.constaints = constraints

        if model.__module__.split(".")[0] == "sklearn":
            self.model_framework = "sklearn"
            self.model_family = model.__module__.split(".")[1]
            self.model_type = model.__module__.split(".")[2]
        elif model.__module__.split(".")[0] == "keras":
            self.model_framework = "keras"
            self.model_family = model.__module__.split(".")[1]
            self.model_type = None
        else:
            raise NotImplementedError("The given model type has not been implemented yet")

        self.trained = False
        self.history_params = []
        self.history_outputs = []
        self.history_metrics = []

    def train(self, X: np.ndarray, Y: np.ndarray):
        self.model.fit(X, Y)
        self.trained = True

    def predict(self, x: np.ndarray, append: bool = True) -> Union[list, float]:
        assert self.trained, ValueError("Model not trained yet")  # TODO: Add custom error
        if not isinstance(x, np.ndarray):
            x = np.ndarray(x)
        if x.ndim == self.model.n_features_in_:
            pred = self.model.predict(np.expand_dims(x, axis=0))
        else:
            raise ValueError("Dimensions not matching")  # TODO: Add custom error
        if append:
            self.history_outputs.append(pred)
        return pred

    def set_parameters(self, params: dict, append: bool = True):
        if self.model_framework == "sklearn":
            self.model.set_params(**params)
        elif self.model_framework == "keras":
            # TODO
            pass

        if append:
            self.history_params.append(params)
        self.model_serialized = self.get_model_serialized()
        self.model_size = self.get_model_size()
        self.model_latency += self.get_computing_toi()
        if self.constaints["model_size"] <= self.model_size or self.constaints["model_latency"] <= self.model_latency:
            return False
        else:
            return True

    def get_parameters(self) -> dict:
        # TODO: Test this method for Keras models
        return self.model.get_params()

    def predict_evaluate(self, x: np.ndarray, y: np.ndarray, append: bool = True) -> float:
        if not isinstance(x, np.ndarray):
            x = np.ndarray(x)
        goal = self.evaluate(self.model.score(x, y))
        if append:
            self.history_metrics.append(goal)
        return goal

    def get_model_serialized(self):
        # TODO
        return 0

    def get_model_size(self):
        # TODO
        return 0

    def get_computing_toi(self):
        # TODO
        return 0

    def evaluate(self, y: np.ndarray):
        return self.metric([y, self.model_size, self.model_latency], self.metric_args)

In [256]:
def random_search_optimization(X_train: np.ndarray,
                               X_test: np.ndarray,
                               y_train: np.ndarray,
                               y_test: np.ndarray,
                               search_space: SearchSpace,
                               f: ObjectiveFunction,
                               n_iters: int = 100,
                               ):
    for _ in range(n_iters):
        search_space.sample()
        constraints_flag = obj.set_parameters(se.current)
        if constraints_flag:
            obj.train(X_train, y_train)
            obj.predict_evaluate(X_test, y_test)
    return f

In [270]:
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
data = load_iris()

X = data["data"]
y = data["target"]

n_iters = 10

X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True, train_size=0.7)

In [271]:
se_logistic_regression_dict = {
    "penalty": InputValueSpace(
        se_type=choice, params_dict={"options": ["none", "elasticnet"]}
    ),
    "C": InputValueSpace(
        se_type=integer_random, params_dict={
            "lower_bound": -3, "upper_bound": 2, "lambda_function": transform_exp_base_10}
    ),
    "solver": InputValueSpace(
        se_type=static, params_dict={"value": "saga"}
    ),
    "max_iter": InputValueSpace(
        se_type=static, params_dict={"value": 500}
    ),
    "l1_ratio": InputValueSpace(
        se_type=float_random, params_dict={"lower_bound": 0, "upper_bound": 1}
    )
}
se = SearchSpace(input_dict=se_logistic_regression_dict, conditions_dict={"l1_ratio": ["penalty", "eq", "elasticnet"]})

obj = ObjectiveFunction(model=LogisticRegression(), metric=weighted_score, base_toi=0, metric_args=[0.8, 0.1, 0.1])

explored_space = random_search_optimization(X_train=X_train,
                                            X_test=X_test,
                                            y_train=y_train,
                                            y_test=y_test,
                                            search_space=se,
                                            f=obj,
                                            n_iters=n_iters
                                            )



In [303]:
clf.__module__

'sklearn.linear_model._logistic'

In [297]:
import tensorflow as tf

clf2 = tf.keras.Sequential()

In [298]:
clf2.__module__

'keras.engine.sequential'