# AUA, DS 229 – MLOps
### Week 4 – Tests in Machine Learning
***

<center><img src="images/unit_integration_tests.jpeg" width="600" height="600"/></center>

<p><b>Unit tests</b></p> A unit test is the smallest and simplest form of software testing. These tests are employed to assess a separable unit of software, such as a class or function, for correctness independent of the larger software system that contains the unit. Unit tests are also employed as a form of specification to ensure that a function or module exactly performs the behavior required by the system. Unit tests are commonly used to introduce test-driven development concepts.

<p><b>Integration tests</b></p> Software components that pass individual unit tests are assembled into larger components. Engineers then run an integration test on an assembled component to verify that it functions correctly.

<p><b>System tests</b></p> A system test is the largest scale test that engineers run for an undeployed system. All modules belonging to a specific component, such as a server that passed integration tests, are assembled into the system. Then the engineer tests the end-to-end functionality of the system. System tests come in many different flavors:

- Smoke tests, in which engineers test very simple but critical behavior, are among the simplest type of system tests. Smoke tests are also known as sanity testing, and serve to short-circuit additional and more expensive testing.

- Performance tests, once basic correctness is established via a smoke test, a common next step is to write another variant of a system test to ensure that the performance of the system stays acceptable over the duration of its lifecycle. Because response times for dependencies or resource requirements may change dramatically during the course of development, a system needs to be tested to make sure that it doesn’t become incrementally slower without anyone noticing (before it gets released to users). For example, a given program may evolve to need 32 GB of memory when it formerly only needed 8 GB, or a 10 ms response time might turn into 50 ms, and then into 100 ms. A performance test ensures that over time, a system doesn’t degrade or become too expensive.

- Regression tests are conducted to prevent bugs from sneaking back into the codebase. Regression tests can be analogized to a gallery of rogue bugs that historically caused the system to fail or produce incorrect results. By documenting these bugs as tests at the system or integration level, engineers refactoring the codebase can be sure that they don’t accidentally introduce bugs that they’ve already invested time and effort to eliminate.

**Some benefits of writing unit tests:**
- You can catch bugs earlier in the development cycle which is important since if bugs appear in released code, it may inconvenience end-users and potentially cause major losses in a business sense.
- You can make changes to your code with more confidence since any upgrades or refactoring you do will break the system if they are done incorrectly (and will work accordingly if done correctly).

## Part 1: Introduction to [unittest](https://docs.python.org/3/library/unittest.html)

The basic building blocks of unit testing are **test cases** — single scenarios that must be set up and checked for correctness. In unittest, test cases are represented by **unittest.TestCase** instances. To make your own test cases you must write subclasses of TestCase. The testing code of a TestCase instance should be entirely self contained, such that it can be run either in isolation or in arbitrary combination with any number of other test cases.

In [None]:
import unittest

In [None]:
class Rectangle:
    
    def __init__(self, width, height):
        self.width = width
        self.height = height
    
    def get_area(self):
        return self.width * self.height
    
    def get_perimeter(self):
        return 2 * (self.width+self.height)
    
    def get_wrong_perimeter(self):
        return self.width + self.height
    
    def __str__(self):
        return f"[Rectangle with width={self.width} and height={self.height}]"
    

In [None]:
rec = Rectangle(4, 5)
print("The area of", rec, "is:", rec.get_area())
print("The perimeter of", rec, "is:", rec.get_perimeter())

Let's start writing some tests for our rectangle!

In [None]:
class RectangleTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.rectangles = [Rectangle(w, h) for (w, h) in [(1, 2), (4, 8), (10, 15)]]

    # def setUp(self):
    #     self.rectangles = [Rectangle(w, h) for (w, h) in [(1, 2), (4, 8), (10, 15)]]

    def test_area(self):
        for rec in self.rectangles:
            area = rec.get_area()
            ground_truth = rec.width * rec.height
            self.assertEqual(area, ground_truth, f"{rec}'s area calculation is wrong, expected {ground_truth} but got {area}.")
    
    def test_perimeter(self):
        for rec in self.rectangles:
            perimeter = rec.get_perimeter()
            ground_truth = 2 * (rec.width+rec.height)
            self.assertEqual(perimeter, ground_truth, f"{rec}'s perimeter calculation is wrong, expected {ground_truth} but got {perimeter}.")
            

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

**setUp**: Method called to prepare the test fixture. This is called immediately before calling the test method.  
**setUpClass**: A class method called before tests in an individual class are run. setUpClass is called with the class as the only argument and must be decorated as a `classmethod()`.

The main difference between **setUpClass** and **setUp** is that the former is called only once and that is before all the tests, while **setUp** is called immediately before each and every test.

Now let's test **get_wrong_perimeter** method.

In [None]:
class RectangleTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.rectangles = [Rectangle(w, h) for (w, h) in [(1, 2), (4, 8), (10, 15)]]
        
    def test_wrong_perimeter(self):
        for rec in self.rectangles:
            perimeter = rec.get_wrong_perimeter()
            ground_truth = 2 * (rec.width+rec.height)
            self.assertEqual(perimeter, ground_truth, f"{rec}'s perimeter calculation is wrong, expected {ground_truth} but got {perimeter}.")
            

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

When there are very small differences among your tests, for instance some parameters, unittest allows you to distinguish them inside the body of a test method using the **subTest()** context manager. Without using a subtest, *execution would stop after the first failure*, and the error would be less easy to diagnose.

In [None]:
class RectangleTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.rectangles = [Rectangle(w, h) for (w, h) in [(1, 2), (4, 8), (10, 15)]]
        
    def test_wrong_perimeter(self):
        for rec in self.rectangles:
            with self.subTest():  # I can make you distinguish your tests!
                perimeter = rec.get_wrong_perimeter()
                ground_truth = 2 * (rec.width+rec.height)
                self.assertEqual(perimeter, ground_truth, f"{rec}'s perimeter calculation is wrong, expected {ground_truth} but got {perimeter}.")


if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

The TestCase class provides several assert methods to check for and report failures. The following table lists the most commonly used methods:

<center><img src="images/asserts.png" width="500" height="500"/></center>

<div class="alert alert-block alert-success">
    With <b>pytest</b> we can run parametrized tests and get rid of the <b>for</b> loop in testing methods. Let's talk about that in the next class.
</div>

#### Example

<div class="alert alert-block alert-danger">
<b>Action</b>:
    Restart the notebook to escape from running tests from the previous problem.
</div> 

In [None]:
import unittest

In [None]:
# Returns a list of even numbers in the given range. Note that both bounds are inclusive.
def get_even_numbers(left=1, right=10, nums=[]):
    start = left if left % 2 == 0 else left + 1
    for n in range(start, right+1, 2):
        nums.append(n)
    return nums


# result = get_even_numbers(1, 6)
# result

In [None]:
class EvenNumbersTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.ranges = [(1, 2), (4, 8), (11, 15)]
        cls.ground_truths = [[2], [4, 6, 8], [12, 14]]
        
    def test_even_numbers(self):
        for idx, (left, right) in enumerate(self.ranges):
            with self.subTest():
                even_numbers = get_even_numbers(left, right)
                ground_truth = self.ground_truths[idx]
                self.assertListEqual(even_numbers, ground_truth)
            

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

#### Let's understand the problem.

In [None]:
def get_even_numbers(left=1, right=10, nums=[]):
    start = left if left % 2 == 0 else left + 1
    for n in range(start, right+1, 2):
        nums.append(n)
    return nums

In [None]:
result = get_even_numbers(1, 6)
result

In [None]:
result = get_even_numbers(20, 24)
result

<div class="alert alert-block alert-success">
    <b>Default arguments are evaluated once at module load time. This may cause problems if the argument is a mutable object such as a list or a dictionary. If the function modifies the object (e.g., by appending an item to a list), the default value is modified.</b>
    
Recall that a mutable object is an object whose state can be modified after it is defined.
</div>

In [None]:
def get_even_numbers(left=1, right=10, nums=[]):
    start = left if left % 2 == 0 else left + 1
    for n in range(start, right+1, 2):
        nums += [n]  # Will this help?
    return nums


result = get_even_numbers(1, 6)
result = get_even_numbers(20, 24)
result

In [None]:
def get_even_numbers(left=1, right=10, nums=[]):
    start = left if left % 2 == 0 else left + 1
    for n in range(start, right+1, 2):
        nums = nums + [n]  # But what is the differece?
    return nums


result = get_even_numbers(1, 6)
result = get_even_numbers(20, 24)
result

In [None]:
# Recommended solution!
def get_even_numbers(left=1, right=10, nums: list = None):
    if nums is None:
        nums = []
    
    start = left if left % 2 == 0 else left + 1
    for n in range(start, right+1, 2):
        nums.append(n)
    return nums


result = get_even_numbers(1, 6)
result = get_even_numbers(20, 24)
result

In [None]:
class EvenNumbersTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.ranges = [(1, 2), (4, 8), (11, 15)]
        cls.ground_truths = [[2], [4, 6, 8], [12, 14]]
        
    def test_even_numbers(self):
        for idx, (left, right) in enumerate(self.ranges):
            with self.subTest(left=left, right=right):
                even_numbers = get_even_numbers(left, right)
                ground_truth = self.ground_truths[idx]
                self.assertListEqual(even_numbers, ground_truth)
            

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

## Part 2: Running speed tests with [cProfile](https://docs.python.org/3/library/profile.html) and [time](https://docs.python.org/3/library/time.html)

<div class="alert alert-block alert-danger">
<b>Action</b>:
    Restart the notebook to escape from running tests from the previous problem.
</div> 

**Problem**: Implement Euclidean distance
$$
  d(a, b) = \sqrt{\sum_{i=1}^{n} (a_i - b_i)^2}
$$

In [None]:
import unittest
import math 


# Version 1:
def simple_euclidean_distance(a, b):
    distance = 0.0
    for e1, e2 in zip(a, b):
        distance += (e1 - e2) ** 2
    return math.sqrt(distance)


In [None]:
import cProfile
import random
import numpy as np


n_trials = 200
length = 200000


with cProfile.Profile() as pr:
    for _ in range(n_trials):
        a, b = np.random.random_sample(size=length), np.random.random_sample(size=length)
        _ = simple_euclidean_distance(a, b)

pr.print_stats()

<ol>
<li><b>ncalls</b> : Shows the number of calls made.</li>
<li><b>tottime</b>: Total time taken by the given function. Note that the time made in calls to sub-functions are excluded.</li>
<li><b>percall</b>: Total time / No of calls. (remainder is left out)</li>
<li><b>cumtime</b>: Unlike tottime, this includes time spent in this and all subfunctions that the higher-level function calls. It is most useful and is accurate for recursive functions.</li>
<li>The <b>percall</b> following cumtime is calculated as the quotient of cumtime divided by primitive calls. The primitive calls include all the calls that were not included through recursion.</li>
</ol>

In [None]:
# Version 2:
def numpy_euclidean_distance(a, b):
    return np.sqrt(((a-b)**2).sum())


In [None]:
with cProfile.Profile() as pr:
    
    for _ in range(n_trials):
        a, b = np.random.random_sample(size=length), np.random.random_sample(size=length)
        _ = numpy_euclidean_distance(a, b)

pr.print_stats()

**NumPy uses vectorized implementations which are much faster and more efficient as compared to for-loops.**   
Recall that NumPy’s ND-arrays are homogeneous: an array can only contain data of a single type. For instance, an array can contain 8-bit integers or 32-bit floating point numbers, but not a mix of the two. This is in stark contrast to Python’s lists and tuples, which are entirely unrestricted in the variety of contents they can possess; a given list could simultaneously contain strings, integers, and other objects. This restriction on an array’s contents comes at a great benefit; in “knowing” that an array’s contents are homogeneous in data type, NumPy is able to delegate the task of performing mathematical operations on the array’s contents to optimized, compiled C code. This is a process that is referred to as vectorization. The outcome of this can be a tremendous speedup relative to the analogous computation performed in Python, which must painstakingly check the data type of every one of the items as it iterates over the arrays, since Python typically works with lists with unrestricted contents.

[Source: [“Vectorized” Operations: Optimized Computations on NumPy Arrays](https://www.pythonlikeyoumeanit.com/Module3_IntroducingNumpy/VectorizedOperations.html)]

In [None]:
import time


class FunctionExecutionSpeedTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.threshold = 1.0  # Execution should take no longer than a second.
        cls.data = ((np.random.random_sample(size=length), np.random.random_sample(size=length)) for n_trial in range(10))
        
    def test_execution_time(self):
        start = time.time()
        
        for (a, b) in self.data:
            _ = numpy_euclidean_distance(a, b)
            
        end = time.time()
        duration_in_sec = end - start
        self.assertLessEqual(duration_in_sec, self.threshold)
        

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

## Part 3: An example of running a data engineering test

<div class="alert alert-block alert-danger">
<b>Action</b>:
    Restart the notebook to escape from running tests from the previous problem.
</div> 

In [None]:
import pandas as pd
import numpy as np
import unittest


data = [
    {"product_id": "p1", "count": 3, "price": 20},
    {"product_id": "p1", "count": 1, "price": 23},
    {"product_id": "p2", "count": 1, "price": 73},
    {"product_id": "p2", "count": 1, "price": 34},
    {"product_id": "p2", "count": 2, "price": 55},
    {"product_id": "p3", "count": 1, "price": 20},
    {"product_id": "p4", "count": 1, "price": 71},
    {"product_id": "p4", "count": 1, "price": 73},
    {"product_id": "p4", "count": 5, "price": 34},
    {"product_id": "p4", "count": 3, "price": 55}
]


dataset = pd.DataFrame(data)
dataset

**Task**  
Repeat the `price` as many times as the corresponding `count` value is, form a list of prices (as shown below) and compute median price for each product.

| product_id | prices |
| --- | --- |
| p1 | [20, 20, 20, 23] | 
| p2 | [34, 55, 55, 73] | 
| p3 | [20] | 
| p4 | [34, 34, 34, 34, 34, 55, 55, 55, 73] | 

#### Solution 1

In [None]:
# 1) Loop over unique product ids.
# 2) Select count and price corresponding to a particular product.
# 3) Repeat prices by their corresponding counts.
# 4) Construct a numpy array and calculate median for a specific product, store the result.
# 5) Go to step 2.

def solve_with_loop(df):
    product2median = {}
    for p_id in df["product_id"].unique():
        info = df[df["product_id"] == p_id][["count", "price"]]
        expanded_info = pd.DataFrame(info.values.repeat(info["count"].values, axis=0), columns=["count", "price"])
        expanded_values = expanded_info["price"].values
        product2median[p_id] = np.median(expanded_values)
    
    result= pd.DataFrame({"product_id": product2median.keys(), "median_price": product2median.values()})
    return result 


result = solve_with_loop(dataset)
result

#### Solution 2

In [None]:
# 1) Repeat indices by 'count' number of times.
# 2) Select te rows specified by indices (note that if an index is repeated twice, then that row will also be selected twice).
# 3) Group by product ids, select price and apply median to it (reset indices as otherwise product_id will be index as a result of groupby). 

def solve_without_loop(df):
    repeated_data = df.loc[df.index.repeat(df["count"])].reset_index(drop=True)
    result = repeated_data.groupby(by="product_id")[["price"]].agg(median_price=("price", "median")).reset_index()
    return result

# expanded_data = repeated_data.groupby(by="product_id")[["price"]].agg(list).reset_index()
result = solve_without_loop(dataset)
result

#### Write tests for comparing pandas dataframes.

In [None]:
from pandas.testing import assert_frame_equal


class PriceMedianTests(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        
        # Mock data.
        data = [
            {"product_id": "p1", "timestamp": 1665393218, "count": 3, "price": 20},
            {"product_id": "p1", "timestamp": 11665306219, "count": 1, "price": 23},
            {"product_id": "p2", "timestamp": 1665395718, "count": 1, "price": 73},
            {"product_id": "p2", "timestamp": 1665385218, "count": 1, "price": 34},
            {"product_id": "p2", "timestamp": 1665397658, "count": 2, "price": 55},
            {"product_id": "p3", "timestamp": 1665393218, "count": 1, "price": 20},
            {"product_id": "p4", "timestamp": 1665393218, "count": 1, "price": 71},
            {"product_id": "p4", "timestamp": 1665386548, "count": 1, "price": 73},
            {"product_id": "p4", "timestamp": 1665757648, "count": 5, "price": 34},
            {"product_id": "p4", "timestamp": 11665246539, "count": 3, "price": 55}
        ]
        
        gr = [
            {"product_id": "p1", "median_price": 20.0},
            {"product_id": "p2", "median_price": 55.0},
            {"product_id": "p3", "median_price": 20.0},
            {"product_id": "p4", "median_price": 44.5}
        ]

        cls.dataset = pd.DataFrame(data)
        cls.ground_truth = pd.DataFrame(gr)
        
    def test_median_price(self):
        result = solve_without_loop(self.dataset)
        assert_frame_equal(result, self.ground_truth)
        

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)
    

## References
- [The ML Test Score: A Rubric for ML Production Readiness and Technical Debt Reduction
](https://research.google/pubs/pub46555/)
- [Testing for Reliability](https://sre.google/sre-book/testing-reliability/)
- [unittest — Unit testing framework](https://docs.python.org/3/library/unittest.html)