# CH08: Unit testing and refactoring

#### 1. 디자인 원칙과 단위 테스트
: 단위 테스트 : 다른 코드의 일부분이 유효한지 검사하는 코드

단위 테스트 특징
- 격리 : 독릭접이고 비즈니스 로직에 집중
- 성능 : 신속하게 실행 , 반복적으로 실행  -> 설계
- 자체 검증: 단위테스트 실행만으로 결과 결정 O

##### 1-2. 단위 테스트와 애자일 소프트웨어 개발

##### 1-3. 단위 테스트와 소프트웨어 디자인
: 단위 테스틑 기본 코드 보완 x -> 실제 코드의 작성 방식에 직접적인 영향 o


In [1]:
import logging as logger
import random

class MetricsClient:
    """3rd party 지표 전송 클라이언트"""

    def send(self, metric_name, metric_value):
        if not isinstance(metric_name, str):
            raise TypeError("expected type str for metric_name")

        if not isinstance(metric_value, str):
            raise TypeError("expected type str for metric_value")

        logger.info("sending %s = %s", metric_name, metric_value)


class Process:
    def __init__(self):
        self.client = MetricsClient()  # 3rd party 지표 전송 클라이언트

    def process_iterations(self, n_iterations):
        for i in range(n_iterations):
            result = self.run_process()
            self.client.send("iteration.{}".format(i), result)

    def run_process(self):
        return random.randint(1, 100)

In [None]:
class WrappedClient:
    """3rd party 라이브러리를 통제 하에 둘 수 있도록 하는 wrapper 객체"""

    def __init__(self):
        self.client = MetricsClient()

    def send(self, metric_name, metric_value):
        return self.client.send(str(metric_name), str(metric_value))


class Process:
    def __init__(self):
        self.client = MetricsClient()  # 3rd party 지표 전송 클라이언트

    def process_iterations(self, n_iterations):
        for i in range(n_iterations):
            result = self.run_process()
            self.client.send("iteration.{}".format(i), result)

    def run_process(self):
        return random.randint(1, 100)

In [2]:
from unittest import TestCase, main
from unittest.mock import Mock


class TestWrappedClient(TestCase):
    def test_send_converts_types(self):
        wrapped_client = WrappedClient()
        wrapped_client.client = Mock()
        wrapped_client.send("value", 1)

        wrapped_client.client.send.assert_called_with("value", "1")

#### 1-4. 테스트 경계 정하기
: 외부 의존성 깊게 고려 x -> 올바른 파라미터 사용해 호출하면 정상 실행 확인만 해도 충분

좋은 단위 테스트 패치를 적용하여 넘어가고 핵심 기능에 초점

#### 2. 테스트를 위한 프레임워크와 도구


##### 2-1. 단위 테스트 프레임워크와 라이브러리

- unittest   표준 라이브러리
- pytest    pip install pytest

In [3]:
from enum import Enum


class MergeRequestStatus(Enum):
    APPROVED = "approved"
    REJECTED = "rejected"
    PENDING = "pending"


class MergeRequest:
    """An entity abstracting a merge request."""

    def __init__(self):
        self._context = {"upvotes": set(), "downvotes": set()}

    @property
    def status(self):
        if self._context["downvotes"]:
            return MergeRequestStatus.REJECTED
        elif len(self._context["upvotes"]) >= 2:
            return MergeRequestStatus.APPROVED
        return MergeRequestStatus.PENDING

    def upvote(self, by_user):
        self._context["downvotes"].discard(by_user)
        self._context["upvotes"].add(by_user)

    def downvote(self, by_user):
        self._context["upvotes"].discard(by_user)
        self._context["downvotes"].add(by_user)

##### # unittest
: 자바 JUnit 기반  객체 지향적

단위테스트를 만들려면 unit.TestCase 를 상속하여 테스트 클래스 생성 후 테스트할 조건 정의  
메서드 test_ 로 시작

In [None]:
class BaseCase:
    """Base test suite."""

    def setUp(self):
        self.merge_request = self.mr_cls()

    def test_simple_rejected(self):
        self.merge_request.downvote("maintainer")
        self.assertEqual(
            self.merge_request.status.value, MergeRequestStatus.REJECTED.value
        )

    def test_just_created_is_pending(self):
        self.assertEqual(
            self.mr_cls().status.value, MergeRequestStatus.PENDING.value
        )

    def test_pending_awaiting_review(self):
        self.merge_request.upvote("core-dev")
        self.assertEqual(
            self.merge_request.status.value, MergeRequestStatus.PENDING.value
        )

    def test_approved(self):
        self.merge_request.upvote("dev1")
        self.merge_request.upvote("dev2")

        self.assertEqual(
            self.merge_request.status.value, MergeRequestStatus.APPROVED.value
        )

    def test_no_double_approve(self):
        self.merge_request.upvote("dev1")
        self.merge_request.upvote("dev1")

        self.assertEqual(
            self.merge_request.status.value, MergeRequestStatus.PENDING.value
        )

    def test_upvote_changes_to_downvote(self):
        self.merge_request.upvote("dev1")
        self.merge_request.upvote("dev2")
        self.merge_request.downvote("dev1")

        self.assertEqual(
            self.merge_request.status.value, MergeRequestStatus.REJECTED.value
        )

    def test_downvote_to_upvote(self):
        self.merge_request.upvote("dev1")
        self.merge_request.downvote("dev2")
        self.merge_request.upvote("dev2")

        self.assertEqual(
            self.merge_request.status.value, MergeRequestStatus.APPROVED.value
        )

    def test_invalid_types(self):
        self.assertRaises(
            TypeError, self.merge_request.upvote, {"invalid-object"}
        )


class TestsUTFrameworks1(BaseCase, TestCase):
    mr_cls = MergeRequest

단위 테스트 API 다양한 메서드 제공
ex] 실제 실행 값과 예상 값을 비교하는 assertEquals(<actual>, <expected>[, message]).  


두 개의 새로운 상태(OPEN, CLOSED) 와 한 개의 새로운 메서드 close() 를 추가, 투표 

In [4]:
class MergeRequestExtendedStatus(Enum):
    APPROVED = "approved"
    REJECTED = "rejected"
    PENDING = "pending"
    OPEN = "open"
    CLOSED = "closed"


class MergeRequest:
    def __init__(self):
        self._context = {"upvotes": set(), "downvotes": set()}
        self._status = MergeRequestStatus.OPEN

    def close(self):
        self._status = MergeRequestStatus.CLOSED

    @property
    def status(self):
        if self._status == MergeRequestStatus.CLOSED:
            return self._status

        if self._context["downvotes"]:
            return MergeRequestStatus.REJECTED
        elif len(self._context["upvotes"]) >= 2:
            return MergeRequestStatus.APPROVED
        return MergeRequestStatus.PENDING

    def _cannot_vote_if_closed(self):
        if self._status == MergeRequestStatus.CLOSED:
            raise MergeRequestException("can't vote on a closed merge request")

    def upvote(self, by_user):
        self._cannot_vote_if_closed()

        self._context["downvotes"].discard(by_user)
        self._context["upvotes"].add(by_user)

    def downvote(self, by_user):
        self._cannot_vote_if_closed()

        self._context["upvotes"].discard(by_user)
        self._context["downvotes"].add(by_user)

유효성 검사 작동확인 assertRaises, assertRaisesRegex 메서드 사용

In [None]:
 class ExtendedCases:
    def test_cannot_upvote_on_closed_merge_request(self):
        self.merge_request.close()
        self.assertRaises(
            MergeRequestException, self.merge_request.upvote, "dev1"
        )

    def test_cannot_downvote_on_closed_merge_request(self):
        self.merge_request.close()
        self.assertRaisesRegex(
            MergeRequestException,
            "can't vote on a closed merge request",
            self.merge_request.downvote,
            "dev1",
        )
        

class TestsUTFrameworks2(BaseCase, ExtendedCases, TestCase):
    mr_cls = MergeRequest

##### 테스트 파라미터화
: 임계값을 변경하며 테스트

status 프로퍼티에서 종료여부 확인한후 코드 테스트

In [None]:
class AcceptanceThreshold:
    def __init__(self, merge_request_context: dict) -> None:
        self._context = merge_request_context

    def status(self):
        if self._context["downvotes"]:
            return MergeRequestStatus.REJECTED
        elif len(self._context["upvotes"]) >= 2:
            return MergeRequestStatus.APPROVED
        return MergeRequestStatus.PENDING


class MergeRequest:
    ...
    @property
    def status(self):
        if self._status == MergeRequestStatus.CLOSED:
            return self._status

        return AcceptanceThreshold(self._context).status()

In [None]:
class TestsUTFrameworks3(BaseCase, ExtendedCases, TestCase):
    mr_cls = MergeRequest

    def setUp(self):
        super().setUp()
        self.fixture_data = (
            (
                {"downvotes": set(), "upvotes": set()},
                MergeRequestStatus.PENDING,
            ),
            (
                {"downvotes": set(), "upvotes": {"dev1"}},
                MergeRequestStatus.PENDING,
            ),
            (
                {"downvotes": "dev1", "upvotes": set()},
                MergeRequestStatus.REJECTED,
            ),
            (
                {"downvotes": set(), "upvotes": {"dev1", "dev2"}},
                MergeRequestStatus.APPROVED,
            ),
        )

    def test_status_resolution(self):
        for context, expected in self.fixture_data:
            with self.subTest(context=context):
                status = AcceptanceThreshold(context).status()
                self.assertEqual(status.value, expected.value)

#### # pytest

In [8]:
def test_simple_rejected():
    merge_request = MergeRequest()
    merge_request.downvote("maintainer")
    assert merge_request.status == MergeRequestStatus.REJECTED


def test_just_created_is_pending():
    assert MergeRequest().status == MergeRequestStatus.PENDING


def test_pending_awaiting_review():
    merge_request = MergeRequest()
    merge_request.upvote("core-dev")
    assert merge_request.status == MergeRequestStatus.PENDING


def test_approved():
    merge_request = MergeRequest()
    merge_request.upvote("dev1")
    merge_request.upvote("dev2")

    assert merge_request.status == MergeRequestStatus.APPROVED


def test_no_double_approve():
    merge_request = MergeRequest()
    merge_request.upvote("dev1")
    merge_request.upvote("dev1")

    assert merge_request.status == MergeRequestStatus.PENDING


def test_upvote_changes_to_downvote():
    merge_request = MergeRequest()
    merge_request.upvote("dev1")
    merge_request.upvote("dev2")
    merge_request.downvote("dev1")

    assert merge_request.status == MergeRequestStatus.REJECTED


def test_downvote_to_upvote():
    merge_request = MergeRequest()
    merge_request.upvote("dev1")
    merge_request.downvote("dev2")
    merge_request.upvote("dev2")

    assert merge_request.status == MergeRequestStatus.APPROVED

결과가 참인지를 비교하는 것은 assert 구문만 사용하면 되지만, 예외의 발생 유무 검사와 같은 검사는 일부 함수를 사용해야 함

In [None]:
import pytest
def test_invalid_types():
    merge_request = MergeRequest()
    pytest.raises(TypeError, merge_request.upvote, {"invalid-object"})


def test_cannot_vote_on_closed_merge_request():
    merge_request = MergeRequest()
    merge_request.close()
    pytest.raises(MergeRequestException, merge_request.upvote, "dev1")
    with pytest.raises(
        MergeRequestException, match="can't vote on a closed merge request"
    ):
        merge_request.downvote("dev1")

##### 테스트 파라미터화
: unittest보다 좋음  cuz 테스트 조합마다 새로운 테스트 케이스를 생성

 pytest.mark.parameterize 데코레이터를 사용

In [None]:
@pytest.mark.parametrize(
    "context,expected_status",
    (
        ({"downvotes": set(), "upvotes": set()}, MergeRequestStatus.PENDING),
        (
            {"downvotes": set(), "upvotes": {"dev1"}},
            MergeRequestStatus.PENDING,
        ),
        ({"downvotes": "dev1", "upvotes": set()}, MergeRequestStatus.REJECTED),
        (
            {"downvotes": set(), "upvotes": {"dev1", "dev2"}},
            MergeRequestStatus.APPROVED,
        ),
    ),
)
def test_acceptance_threshold_status_resolution(context, expected_status):
    assert AcceptanceThreshold(context).status() == expected_status

| @pytest.mark.parameterize 반복 없애고 응집력있게 유지

##### 픽스쳐(Fixture)

: pytest  재사용 가능한 기능을 쉽게 만들 수 있음

-> 생성한 데이터나 객체 재사용해서 효율적으로 테스트

In [10]:
import pytest
@pytest.fixture
def rejected_mr():
    merge_request = MergeRequest()

    merge_request.downvote("dev1")
    merge_request.upvote("dev2")
    merge_request.upvote("dev3")
    merge_request.downvote("dev4")

    return merge_request


def test_simple_rejected(rejected_mr):
    assert rejected_mr.status == MergeRequestStatus.REJECTED


def test_rejected_with_approvals(rejected_mr):
    rejected_mr.upvote("dev2")
    rejected_mr.upvote("dev3")
    assert rejected_mr.status == MergeRequestStatus.REJECTED


def test_rejected_to_pending(rejected_mr):
    rejected_mr.upvote("dev1")
    assert rejected_mr.status == MergeRequestStatus.PENDING


def test_rejected_to_approved(rejected_mr):
    rejected_mr.upvote("dev1")
    rejected_mr.upvote("dev2")
    assert rejected_mr.status == MergeRequestStatus.APPROVED

#### 2-2.코드 커버리지

##### # 코드 커버리지 도구 설정
: pytest 의 경우 ptest-cov 패키지를 설치

###### # 코드 커버리지 사용 시 주의사항
: 라인이 인터프리트 되었다고 해서 적절히 테스트 되었다는 것을 의미 x

| 코드 사각지대를 찾기 위해 커버리 사용 but 궁극적인 목표 x

#### 2-3. 모의(mock) 객체


##### # 패치와 모의에 대한 주의사항
: 단한 테스트 케이스를 작성하기 위해 다양한 몽키 패치  
 또는 모의 객체를 생성해야 한다면 좋은 코드가 아니라는 신호





*몽키 패치: 런타임 주에 코드를 수정 (a.k.a: 게릴라(guerilla) 패치)

##### # Mock 객체 사용하기

##### Mock 객체의 종류
파이썬 표준 라이브러리 unittest.mock 에서 Mock 과 MagicMock 객체 제공

In [None]:
class GitBranch:
    def __init__(self, commits: List[Dict]):
        self._commits = {c["id"]: c for c in commits}

    def __getitem__(self, commit_id):
        return self._commits[commit_id]

    def __len__(self):
        return len(self._commits)


def author_by_id(commit_id, branch):
    return branch[commit_id]["author"]




 author_by_id 를 테스트

In [None]:
def test_find_commit():
    branch = GitBranch([{"id": "123", "author": "dev1"}])
    assert author_by_id("123", branch) == "dev1"


def test_find_any():
    author = author_by_id("123", Mock()) is not None
    ...


In [None]:
commit_id = '123', branch = <Mock id='4499646224'>

    def author_by_id(commit_id, branch):
>       return branch[commit_id]["author"]
E       TypeError: 'Mock' object is not subscriptable

#### 테스트 더블의 사용 예

In [13]:
"""Clean Code in Python - Chapter 8: Unit Testing

> Mock Objects

"""
from datetime import datetime

import requests

from og_src import constants as STATUS_ENDPOINT
# from constants import STATUS_ENDPOINT


class BuildStatus:
    """The CI status of a pull request."""

    @staticmethod
    def build_date() -> str:
        return datetime.utcnow().isoformat()

    @classmethod
    def notify(cls, merge_request_id, status):
        build_status = {
            "id": merge_request_id,
            "status": status,
            "built_at": cls.build_date(),
        }
        response = requests.post(STATUS_ENDPOINT, json=build_status)
        response.raise_for_status()
        return response


 외부 모듈의 의존성이 높다. 테스트할때 실제로 API 호출할 필요는 없고 호출이 잘되는지만 확인  
 시간값도 예측 힘듬 datetime C로 작성된 모듈 직접 패치 x -> build_date 정적 메서드 사용

In [None]:
#### 단위 테스트
from unittest import mock

from constants import STATUS_ENDPOINT
from mock_2 import BuildStatus


@mock.patch("mock_2.requests")
def test_build_notification_sent(mock_requests):
    build_date = "2018-01-01T00:00:01"
    with mock.patch("mock_2.BuildStatus.build_date", return_value=build_date):
        BuildStatus.notify(123, "OK")

    expected_payload = {"id": 123, "status": "OK", "built_at": build_date}
    mock_requests.post.assert_called_with(
        STATUS_ENDPOINT, json=expected_payload
    )

# 3. 리펠토링
: 보다 나은 코드를 만들려는 경우, 가독성 높일련느 경우 -> 수정작업 이전 이후가 완전히 동일한 기능 유지

##### 3-1. 코드의 진화
메서드를 더 작게 분리, 의존성 주입

In [None]:
from datetime import datetime
from constants import STATUS_ENDPOINT

class BuildStatus:

    endpoint = STATUS_ENDPOINT

    def __init__(self, transport):
        self.transport = transport

    @staticmethod
    def build_date() -> str:
        return datetime.utcnow().isoformat()

    def compose_payload(self, merge_request_id, status) -> dict:
        return {
            "id": merge_request_id,
            "status": status,
            "built_at": self.build_date(),
        }

    def deliver(self, payload):
        response = self.transport.post(self.endpoint, json=payload)
        response.raise_for_status()
        return response

    def notify(self, merge_request_id, status):
        return self.deliver(self.compose_payload(merge_request_id, status))

필요하면 교체된 텍스트 더블을 사용한 객체의 픽스처를 노출하는 것도 가능

In [None]:
@pytest.fixture
def build_status():
    bstatus = BuildStatus(Mock())
    bstatus.build_date = Mock(return_value="2018-01-01T00:00:01")
    return bstatus


def test_build_notification_sent(build_status):

    build_status.notify(1234, "OK")

    expected_payload = {
        "id": 1234,
        "status": "OK",
        "built_at": build_status.build_date(),
    }

    build_status.transport.post.assert_called_with(
        build_status.endpoint, json=expected_payload
    )

#### 3-2. 상용 코드만 진화하는 것이 아니다
: 모든 코드 가독성 높아야함


#### 4.단위 테스트에 대한 추가 논의

##### 4-1.속성 기반(Property-based) 테스트
: 테스트를 실패가 만드는 데이터를 찾는 것

hypothesis 라이브러리 사용

##### 4-2.변형 테스트

 자동화된 단위 테스트를 사용하는 중에 누군가 버그를 추가했다면  
  적어도 하나 이상의 테스트에서 이를 포착하여 테스트에 실패해야 한다

In [None]:
"""Clean Code in Python - Chapter 8: Unit Testing & Refactoring

> Mutation Testing
"""
from mrstatus import MergeRequestStatus as Status


def evaluate_merge_request(upvote_count, downvotes_count):
    if downvotes_count > 0:
        return Status.REJECTED
    if upvote_count >= 2:
        return Status.APPROVED
    return Status.PENDING

In [None]:
import unittest

from mrstatus import MergeRequestStatus as Status
from mutation_testing_1 import evaluate_merge_request


class TestMergeRequestEvaluation(unittest.TestCase):
    def test_approved(self):
        result = evaluate_merge_request(3, 0)
        self.assertEqual(result, Status.APPROVED)


if __name__ == "__main__":
    unittest.main()
