In [1]:
import pytest
import logging
import numpy as np
import pandas as pd

from sklearn.linear_model import ElasticNet
from sklearn.datasets import make_regression
from sklearn.metrics import r2_score
from collections import Counter

# 0. Initial configuration

In [2]:
class MLLinearModel:

  def __init__(
      self,
      alpha:float,
      l1_ratio:float,
      random_state=0,
      model_name=""
      ):

    # Store params
    self.alpha = alpha
    self.l1_ratio = l1_ratio
    self.random_state = random_state
    self.model_name = model_name

    # Initializate parameters
    self.model_reg = None

  def train_model(self, X_train:np.ndarray, y_train:np.ndarray, verbose=True):
    self.model_reg = ElasticNet(random_state=self.random_state, alpha=self.alpha, l1_ratio=self.l1_ratio)
    self.model_reg.fit(X_train, y_train)
    if verbose:
      print(self.model_reg.coef_)
      print(self.model_reg.intercept_)

  def predict_model(self, X_test:np.ndarray):
    y_pred = self.model_reg.predict(X_test)
    return y_pred

  def evaluate_model(self, X_test:np.ndarray, y_test:np.ndarray):
    y_pred = self.predict_model(X_test)
    return {'r2_score': r2_score(y_test, y_pred)}

In [3]:
# Generate synthetic data
n_features = 4
X_train, y_train = make_regression(n_features=n_features, random_state=0)
X_test, y_test = make_regression(n_features=n_features, random_state=0)

In [4]:
# Try class
model_reg =  MLLinearModel(alpha=1.0, l1_ratio=0.5,model_name="prueba")
model_reg.train_model(X_train=X_train, y_train=y_train)
model_reg.evaluate_model(X_test=X_test, y_test=y_test)

[17.01756265 25.92033077 42.98727128 57.7739761 ]
-0.7022844637724632


{'r2_score': 0.8905284935984247}

### 0.1. Logging

Source:
* https://realpython.com/python-logging/

In [5]:
# logging_example.py

import logging

# Create a custom logger
logger = logging.getLogger(__name__)

# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler('file.log')
c_handler.setLevel(logging.WARNING)
f_handler.setLevel(logging.ERROR)


# Create formatters and add it to handlers
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
#f_format = logging.Formatter('[%(asctime)s] p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s','%m-%d %H:%M:%S')

c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)

# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)

logger.warning('This is a warning')
logger.error('This is an error')
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')

__main__ - ERROR - This is an error
ERROR:__main__:This is an error
__main__ - ERROR - This is an error message
ERROR:__main__:This is an error message
__main__ - CRITICAL - This is a critical message
CRITICAL:__main__:This is a critical message


In [6]:
logger.critical('Prueba')

__main__ - CRITICAL - Prueba
CRITICAL:__main__:Prueba


In [None]:
# logging_example.py

import logging

# Create a custom logger
logger = logging.getLogger(__name__)

# Create handlers
f_handler = logging.FileHandler('file_2.log')
f_handler.setLevel(logging.ERROR)
f_handler.setLevel(logging.DEBUG)

# Create formatters and add it to handlers
f_format = logging.Formatter('[%(asctime)s] p%(process)s {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s','%m-%d %H:%M:%S')

f_handler.setFormatter(f_format)

# Add handlers to the logger
logger.addHandler(f_handler)

logger.warning('This is a warning')
logger.error('This is an error')
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')
logger.critical('Prueba')

__main__ - ERROR - This is an error
__main__ - ERROR - This is an error
__main__ - ERROR - This is an error
__main__ - ERROR - This is an error
__main__ - ERROR - This is an error
__main__ - ERROR - This is an error
__main__ - ERROR - This is an error
ERROR:__main__:This is an error
__main__ - ERROR - This is an error message
__main__ - ERROR - This is an error message
__main__ - ERROR - This is an error message
__main__ - ERROR - This is an error message
__main__ - ERROR - This is an error message
__main__ - ERROR - This is an error message
__main__ - ERROR - This is an error message
ERROR:__main__:This is an error message
__main__ - CRITICAL - This is a critical message
__main__ - CRITICAL - This is a critical message
__main__ - CRITICAL - This is a critical message
__main__ - CRITICAL - This is a critical message
__main__ - CRITICAL - This is a critical message
__main__ - CRITICAL - This is a critical message
__main__ - CRITICAL - This is a critical message
CRITICAL:__main__:This is

## 1. Unit Tests

"In computer programming, unit testing is a method by which individual units of source code are tested to determine if they are fit for use. A unit is the smallest testable part of an application. In procedural programming a unit may be an individual function or procedure" [1] Thus, for instance, they test if a function or class method behaves as expected.


References:
[1]: https://en.wikibooks.org/wiki/Introduction_to_Software_Engineering/Testing/Unit_Tests

### 1.1. Initial example

In [5]:
class MLModel:

  def __init__(
      self,
      param1:float,
      param2:float,
      model_name:str
      ):

    # Store params
    self.param1 = param1

  def predict_model(self):
    pass

  def evaluate_model(self):
    pass


In [None]:
# Define several tests within a function, and evaluate them over different
# configurations
def test_answer(result_test):
    assert result_test.param1 >= 0
    assert result_test.param2 >= 0
    assert type(result_test.param1) == float
    assert type(result_test.param2) == float

def test1():
    return MLModel(param1=1.1, param2=1.2, model_name="prueba")

def test2():
    return MLModel(param1=1.1, param2=-1, model_name="prueba")

# Test 1
test_answer(result_test = test1())

# Test 2
test_answer(result_test = test2())

AssertionError: 

In [None]:
#@pytest.fixture()
def test1():
    return MLModel(param1=1.1, param2=1.2, model_name="prueba")

def test_unit_create_model(test1):
    #assert test1.model_name == "prueba"
    assert test1.param1 >= 0
    assert test1.param2 >= 0
    assert test1.param1 == float
    assert test1.param2 == float

In [None]:
test_unit_create_model(test1)

In [None]:
MLModel(param1=1.1, param2=1.2, model_name="prueba").model_name

'prueba'

### 1.2. Another example
Source:
* https://github.com/miguelgfierro/pybase/blob/main/test/pytest_fixtures.py
* https://docs.pytest.org/en/6.2.x/fixture.html

In [None]:
# Define data structures
@pytest.fixture()
def basic_structures():
    data = {
        "int": 5,
        "yes": True,
        "no": False,
        "float": 0.5,
        "pi": 3.141592653589793238462643383279,
        "string": "Name",
        "none": None,
    }
    return data

@pytest.fixture()
def complex_structures():
    my_list = [1, 2, 3]
    my_dict = {"a": 1, "b": 2}
    return my_list, my_dict

@pytest.fixture()
def numeric_libs(complex_structures):
    l, d = complex_structures
    np_array = np.array(l)
    df = pd.DataFrame(d, index=[0])
    series = pd.Series(l)
    return np_array, df, series

In [None]:
# Define tests
def test_basic_structures(basic_structures):
    assert basic_structures["int"] == 5
    assert basic_structures["yes"] is True
    assert basic_structures["no"] is False
    assert basic_structures["float"] == 0.5
    assert basic_structures["string"] == "Name"
    assert basic_structures["none"] is None


def test_comparing_numbers(basic_structures):
    assert basic_structures["pi"] == pytest.approx(3.1415926, 0.0000001)
    assert basic_structures["pi"] != pytest.approx(3.1415926, 0.00000001)
    assert basic_structures["int"] > 3
    assert basic_structures["int"] >= 5
    assert basic_structures["int"] < 10
    assert basic_structures["int"] <= 5


def test_lists(complex_structures):
    l = complex_structures[0]
    assert l == [1, 2, 3]
    assert Counter(l) == Counter([2, 1, 3])  # list have same elements
    assert 1 in l
    assert 5 not in l
    assert all(x in l for x in [2, 3])  # sublist in list


def test_dictionaries(complex_structures):
    d = complex_structures[1]
    assert d == {"a": 1, "b": 2}
    assert "a" in d
    assert d.items() <= {"a": 1, "b": 2, "c": 3}.items()  # subdict in dict
    with pytest.raises(KeyError):
        value = d["c"]


def test_pandas(numeric_libs):
    _, df, series = numeric_libs
    df_target = pd.DataFrame({"a": 1, "b": 2}, index=[0])
    series_target = pd.Series([1, 2, 3])
    pd.testing.assert_frame_equal(df, df_target)
    pd.testing.assert_series_equal(series, series_target)


def test_numpy(numeric_libs):
    np_array = numeric_libs[0]
    np_target = np.array([1, 2, 3])
    np_target2 = np.array([0.9999, 2, 3])
    assert np.all(np_array == np_target)
    np.testing.assert_array_equal(np_array, np_target)  # same as before
    np.testing.assert_array_almost_equal(np_array, np_target2, decimal=4)

## 2. Smoke Tests

"Preliminary testing or sanity testing to reveal simple failures severe enough to, for example, reject a prospective software release. Smoke tests are a subset of test cases that cover the most important functionality of a component or system, used to aid assessment of whether main functions of the software appear to work correctly" [1]

Thus, they are used to ensure that critical components/parts of the system work, so that they can be used in production systems to quickly check that there are no obvious failures [2]


References:
* [1]: https://en.wikipedia.org/wiki/Smoke_testing_(software)
* [2]: https://miguelgfierro.com/blog/2018/a-beginners-guide-to-python-testing/

In [89]:
import unittest
from functools import wraps

In [98]:
def test_logger(logger):
    """
    Decorator for unitest that output assertion Error In a logger file
    Source: https://stackoverflow.com/questions/61346627/how-to-make-assert-output-be-logged-to-file

    :param logger: a logger
    :return:
    """
    def deco(f):
        @wraps(f)
        def wrapper(*args):
            try:
                f(*args)
            except AssertionError as assErr:
                # f_name = (f.__name__) f_name is the function name of the function being decorated could be usefull
                # assErr is the message of the assertion Error
                # you can do more formatting here
                logger.error(assErr)  # send to the log

                raise  # if you don't raise the Error the test will be a success


        return wrapper
    return deco

In [52]:
class SmokeTests(unittest.TestCase):

    def _has_method(self, o, name):
      return callable(getattr(o, name, None))

    def check_methods_exist(self, ModelClass):
      logger.info("Ensuring model has all methods")
      self.assertTrue(hasattr(ModelClass, "predict_model") and callable(getattr(MLLinearModel, "predict_model")))
      self.assertTrue(hasattr(ModelClass, "train_model") and callable(getattr(MLLinearModel, "train_model")))
      self.assertTrue(hasattr(ModelClass, "evaluate_model") and callable(getattr(MLLinearModel, "evaluate_model")))

    @test_logger(logger)
    def check_methods_exist_v1(self, ModelClass):
      logger.info("Ensuring model has all methods")
      list_methods = ['predict_model', 'train_model', 'evaluate_model']
      for method_iter in list_methods:
        test_iter = self._has_method(ModelClass, method_iter)
        #self.assertEqual(test_iter, True, f"Method {method_iter} does not exist")
        self.assertTrue(test_iter, f"Method {method_iter} does not exist")

In [44]:
# Test OK
SmokeTests().check_methods_exist(MLLinearModel)

In [49]:
# Test OK
SmokeTests().check_methods_exist_v1(MLLinearModel)

In [53]:
# Test Not OK
SmokeTests().check_methods_exist(MLModel)

AssertionError: False is not true

In [54]:
# Test Not OK
SmokeTests().check_methods_exist_v1(MLModel)

__main__ - ERROR - False is not true : Method train_model does not exist
ERROR:__main__:False is not true : Method train_model does not exist


AssertionError: False is not true : Method train_model does not exist

## 3. Integration/Functional Tests

"Integration and functional tests are used to check the correct behavior of a system. E.g., In the case of a neural network algorithm, we would like to make sure that for a known dataset we always get a certain value of accuracy" [1]

References:
[1]: https://miguelgfierro.com/blog/2018/a-beginners-guide-to-python-testing/

In [None]:
import unittest

In [54]:
# Generate synthetic data
n_features = 4
X_train, y_train = make_regression(n_features=n_features, random_state=0)
X_test, y_test = make_regression(n_features=n_features, random_state=0)

In [None]:
# Try class
model_reg =  MLLinearModel(alpha=1.0, l1_ratio=0.5,model_name="prueba")
model_reg.train_model(X_train=X_train, y_train=y_train)
model_reg.evaluate_model(X_test=X_test, y_test=y_test)

In [76]:
@pytest.mark.parametrize('alpha, l1_ratio, n_features, random_state, r2_score', [
    (1.0, 0.5, 4, 0, 0.89),
    #(1.0, 0.5, 4, 0, 0.89),
])
def test_integration_r2_score(alpha, l1_ratio, n_features, random_state, r2_score):
  # Generate synthetic data
  X_train, y_train = make_regression(n_features=n_features, random_state=random_state)
  X_test, y_test = make_regression(n_features=n_features, random_state=random_state)

  # Train model
  model_reg =  MLLinearModel(alpha=alpha, l1_ratio=l1_ratio,model_name="prueba")
  model_reg.train_model(X_train=X_train, y_train=y_train, verbose=False)

  # Evaluation
  r2_score_test = np.round(model_reg.evaluate_model(X_test=X_test, y_test=y_test)['r2_score'], 2)
  assert r2_score_test >= r2_score

In [99]:
class TestIntegration(unittest.TestCase):

  def test_integration_r2_score(self, alpha, l1_ratio, n_features, random_state, r2_score):
    # Generate synthetic data
    X_train, y_train = make_regression(n_features=n_features, random_state=random_state)
    X_test, y_test = make_regression(n_features=n_features, random_state=random_state)

    # Train model
    model_reg =  MLLinearModel(alpha=alpha, l1_ratio=l1_ratio,model_name="prueba")
    model_reg.train_model(X_train=X_train, y_train=y_train, verbose=False)

    # Evaluation
    r2_score_test = np.round(model_reg.evaluate_model(X_test=X_test, y_test=y_test)['r2_score'], 2)
    self.assertAlmostEqual(
        r2_score_test,
        r2_score,
        places=3,
        msg=f"Value obtained is {r2_score_test} different from {r2_score}"
        )

In [77]:
# Test OK
test_integration_r2_score(alpha=1.0, l1_ratio=0.5, n_features=4, random_state=0, r2_score=0.89)

In [100]:
# Test OK
TestIntegration().test_integration_r2_score(alpha=1.0, l1_ratio=0.5, n_features=4, random_state=0, r2_score=0.89)

In [93]:
# Test NOT OK
test_integration_r2_score(alpha=1.0, l1_ratio=0.5, n_features=4, random_state=0, r2_score=0.99)

AssertionError: 

In [101]:
# Test NOT OK
TestIntegration().test_integration_r2_score(alpha=1.0, l1_ratio=0.5, n_features=4, random_state=0, r2_score=0.99)

AssertionError: 0.89 != 0.99 within 3 places (0.09999999999999998 difference) : Value obtained is 0.89 different from 0.99

## 4. Utility Test

Examples of tests that show how to implement the code.

In [20]:
class MLLinearModel:
  """
    Class for training, prediciton and evaluation an ML regressor model (ElasticNet)

    >> # Generate synthetic data
    >> n_features = 4
    >> X_train, y_train = make_regression(n_features=n_features, random_state=0)
    >> X_test, y_test = make_regression(n_features=n_features, random_state=0)

    >> # Try class
    >> model_reg =  MLLinearModel(alpha=1.0, l1_ratio=0.5,model_name="prueba")
    >> model_reg.train_model(X_train=X_train, y_train=y_train)
    >> model_reg.evaluate_model(X_test=X_test, y_test=y_test)

  """

  def __init__(
      self,
      alpha:float,
      l1_ratio:float,
      random_state=0,
      model_name=""
      ):

    # Store params
    self.alpha = alpha
    self.l1_ratio = l1_ratio
    self.random_state = random_state
    self.model_name = model_name

    # Initializate parameters
    self.model_reg = None

  def train_model(self, X_train:np.ndarray, y_train:np.ndarray, verbose=True):
    self.model_reg = ElasticNet(random_state=self.random_state, alpha=self.alpha, l1_ratio=self.l1_ratio)
    self.model_reg.fit(X_train, y_train)
    if verbose:
      print(self.model_reg.coef_)
      print(self.model_reg.intercept_)

  def predict_model(self, X_test:np.ndarray):
    y_pred = self.model_reg.predict(X_test)
    return y_pred

  def evaluate_model(self, X_test:np.ndarray, y_test:np.ndarray):
    y_pred = self.predict_model(X_test)
    return {'r2_score': r2_score(y_test, y_pred)}

In [21]:
print(MLLinearModel.__doc__)


    Class for training, prediciton and evaluation an ML regressor model (ElasticNet)

    >> # Generate synthetic data
    >> n_features = 4
    >> X_train, y_train = make_regression(n_features=n_features, random_state=0)
    >> X_test, y_test = make_regression(n_features=n_features, random_state=0)

    >> # Try class
    >> model_reg =  MLLinearModel(alpha=1.0, l1_ratio=0.5,model_name="prueba")
    >> model_reg.train_model(X_train=X_train, y_train=y_train)
    >> model_reg.evaluate_model(X_test=X_test, y_test=y_test)

  
