In [1]:
from audioop import mul
import numpy as np
from abc import abstractmethod
from collections import defaultdict
from typing import Dict, Optional
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score, log_loss
from soupsieve import select
"""
Two Objectives:
    1. Maximize E(success)
    2. Minimize E(cost)
incentive 금액을 다양한 방법에 기반하여 조절하면서 Objective를 가장 최적으로 달성하는 모델을 찾는다.
"""

class BaseIncentive:
    def __init__(
        self, incentives: np.ndarray, 
        random_state: int = None,
        **kwargs
    ) -> None:
        self._incentives = np.asarray(incentives)
        self._random = np.random.default_rng(random_state)

    @abstractmethod
    def expect(self, incentive: float, context: any=None) -> float:
        pass

    @abstractmethod
    def choose(self, context: any=None) -> float:
        pass

    @abstractmethod
    def update(self, incentive: float, response: float, context: any=None) -> Optional[Dict[str, any]]:
        pass


class LogisticRegressionIncentive(BaseIncentive): # Update 시 Logistic Regression 모델 사용
    def __init__(
        self, 
        incentives: np.ndarray, 
        incentive_default: float, 
        expected_likelihood: float, 
        window_size: int,
        allow_exploration: bool = False,
        random_state: int = None,
        **kwargs
    ) :
        super().__init__(incentives, random_state, **kwargs)

        self._incentive_default = incentive_default
        self._expected_likelihood = expected_likelihood
        self._window_size = window_size
        self._allow_exploration = allow_exploration

        self._lr = None
        self._encoder = None
        self._histories = []
    
    
    @property
    def histories_(self): # 이전 기록
        return self._histories[-self._window_size:] if self._window_size > 0 else self._histories
    
    @property
    def coef_incentive_(self): # 회귀 계수
        if self._lr:
            return np.ravel(self._lr.coef_)[0]
        else:
            return np.nan

    def expect(self, incentive: float, context: any = None) -> float:
        assert self._lr is not None
        assert self._encoder is not None        
        
        categories = self._encoder.categories_[0]
        if context not in categories:
            context = np.zeros((1, len(categories)))
        else:
            context = self._encoder.transform(np.asarray([[context]], dtype=object))
        
        incentive = np.asarray([[incentive]])
        X = np.concatenate((incentive, context), axis=1)
        return np.ravel(self._lr.predict_proba(X=X)[:, 1]).item(0)

    def choose(self, context: any = None) -> float:
        if self._lr is None or self._encoder is None:
            responses = np.asarray([i for _, _, i in self.histories_]) # history(성공/실패 기록)의 reponse 데이터를 배열로 변형
            
            if len(responses) == 0: # reponse가 없는 경우 default incentive를 return
                return self._incentive_default
            elif np.all(responses == 1.0): # 모든 reponse가 성공인 경우 기록 중 minimum incentive를 return
                return np.min(self._incentives)
            elif np.all(responses == 0.0): # 모든 reponse가 실패인 경우 기록 중 maximum incentive를 return
                return np.max(self._incentives)

        if self.coef_incentive_ < 1e-3: # 기울기 소실?
            if self._allow_exploration:
                return self._random.choice(self._incentives)
            else:
                return np.min(self._incentives)
        probs = np.asarray([
            self.expect(incentive=incentive, context=context) for incentive in self._incentives
        ])
        diff = np.abs(probs - self._expected_likelihood)

        opt_incentives = self._incentives[np.where(diff == np.min(diff))] # select or incentives that have minimum diff

        return self._random.choice(opt_incentives) # among optimal incentives, return random incentive


    def update(self, incentive: float, response: float, context: any = None) -> Optional[Dict[str, any]]:
        self._histories.append((context, incentive, response)) # context, incentive, reponsce 기록 추가

        histories = self.histories_
        responses = np.asarray([i for _, _, i in histories])
        
        if len(np.unique(responses)) == 1:
            return None

        contexts = np.asarray([[i] for i, _, _ in histories])
        incentives = np.asarray([[i] for _, i, _ in histories])

        self._encoder = OneHotEncoder(sparse=False, dtype=np.int32, drop=None).fit(contexts) # one-hot 인코딩

        X = np.concatenate((incentives, self._encoder.transform(contexts)), axis=1)
        self._lr = LogisticRegression(C=0.1, tol=1e-4, solver='lbfgs', penalty='l2').fit(X, responses) # Logistic Regression 모델 생성
        
        y_prob = self._lr.predict_proba(X)[:, 1]  # 각 샘플에 대해 클래스 X에 속할 확률 리턴
        y_pred = self._lr.predict(X) # 속성이 X에 속하는지 아닌지 알려주는 벡터 리턴

        return {'accuracy': accuracy_score(responses, y_pred), 'logloss': log_loss(responses, y_prob)} # 평가 결과 리턴


class FixedIncentive(BaseIncentive): # 고정된 incentive 리턴
    def __init__(
        self, 
        incentives: np.ndarray, 
        incentive_default: float,
        random_state: int = None,
        **kwargs
    ) -> None:
        super().__init__(incentives, random_state, **kwargs)
        self._incentive_default = incentive_default

    def expect(self, incentive: float, context: any = None) -> float:
        return np.nan

    def choose(self, context: any = None) -> float:
        return self._incentive_default

    def update(self, incentive: float, response: float, context: any = None) -> Optional[Dict[str, any]]:
        return


class RandomIncentive(BaseIncentive): # 랜덤한 incentive 리턴
    def __init__( 
        self, 
        incentives: np.ndarray, 
        random_state: int = None,
        **kwargs
    ) -> None:
        super().__init__(incentives, random_state, **kwargs)

    def expect(self, incentive: float, context: any = None) -> float:
        return 1.0 / len(self._incentives)

    def choose(self, context: any = None) -> float:
        return self._random.choice(self._incentives)

    def update(self, incentive: float, response: float, context: any = None) -> Optional[Dict[str, any]]:
        return


class ThompsonSamplingIncentive(BaseIncentive):
    def __init__(
        self, 
        incentives: np.ndarray, 
        random_state: int=None,
        is_optimistic: bool=False,
        multi_objective: str=None,
        **kwargs
    ) -> None:
        super().__init__(incentives, random_state, **kwargs)

        if multi_objective is not None and multi_objective not in ('random', 'success', 'cost'):
            raise ValueError('the argument, "select", should be one of "random", "success", or "cost".')

        self._is_optimistic = is_optimistic
        self._multi_objective = multi_objective
        
        self._init_arms()
        
    def expect(self, incentive: float, context: any = None) -> float:
        i = np.flatnonzero(self._incentives == incentive).item(0)
        a, b = self._arms[context][i]
        theta = self._random.beta(a + 1, b + 1)
        if self._is_optimistic:
            theta = np.max((theta, a / (a+b)))
        return theta

    def choose(self, context: any = None) -> float: # multi objective를 만족하는 context(의 incentive)를 고르거나 혹은 expect가 최대가 되게 하는 하나만 고른다
        return self._choose_multi(context=context) if self._multi_objective else self._choose_single(context)

    def _init_arms(self):
        self._arms = defaultdict(lambda: np.zeros((len(self._incentives), 2)))

    def _choose_single(self, context: any = None) -> float:
        expects = np.asarray([
            self.expect(incentive=incentive, context=context) for incentive in self._incentives
        ])
        opts = self._incentives[expects == np.max(expects)]
        return self._random.choice(opts)

    def _choose_multi(self, context: any = None) -> float:
        obj_success_ratio = np.asarray([
            self.expect(incentive=incentive, context=context) for incentive in self._incentives
        ])
        obj_cost = obj_success_ratio * self._incentives
        orders = np.asarray([1, -1])
        objectives = np.column_stack((obj_success_ratio, obj_cost))
        I_opt = self._find_fareto_frontiers(objectives, orders)
        if len(I_opt):
            if self._multi_objective == 'random':
                i_opt = self._random.choice(I_opt)
            elif self._multi_objective == 'success':
                objectives_opt = obj_success_ratio[I_opt]
                i_opt = np.argmax(objectives_opt)
            elif self._multi_objective == 'cost':
                objectives_opt = obj_cost[I_opt]
                i_opt = np.argmin(objectives_opt)
            return self._incentives[i_opt]
        else:
            return self._random.choice(self._incentives)
    
    def _is_dominated(self, x: int, y: int, objectives: np.ndarray, orders: np.ndarray) -> bool: # 우선순위 결정
        """
        At least one objective is better than another, 
        and other objectives are equal to or better than others.
        """
        n_objectives = objectives.shape[1]

        for i in np.arange(n_objectives):
            order = orders[i]
            if order > 0:
                is_dominate = objectives[x, i] > objectives[y, i]
            else:
                is_dominate = objectives[x, i] < objectives[y, i]

            for j in np.arange(n_objectives):
                if i == j:
                    continue
                else:
                    if order > 0:
                        is_dominate = is_dominate and objectives[x, j] >= objectives[y, j]
                    else:
                        is_dominate = is_dominate and objectives[x, j] <= objectives[y, j]

            if is_dominate:
                return True

        return False

    def _is_incomparable(self, x: int, y: int, objectives: np.ndarray, orders: np.ndarray) -> bool: # 비교 불가 여부 판단
        """
        At least one objective is better than another, 
        but at least another one objective is worse than others.
        """
        n_objectives = objectives.shape[1]

        for i in np.arange(n_objectives):
            order = orders[i]
            if order > 0:
                is_dominate = objectives[x, i] > objectives[y, i]
            else:
                is_dominate = objectives[x, i] < objectives[y, i]

            for j in np.arange(n_objectives):
                if order > 0:
                    if i != j and objectives[x, j] < objectives[y, j] and is_dominate:
                        return True
                else:
                     if i != j and objectives[x, j] > objectives[y, j] and is_dominate:
                        return True
        return False
    
    def _find_fareto_frontiers(self, objectives: np.ndarray, orders: np.ndarray) -> np.ndarray: # 동일한 우선순위에 있는 frontier들을 모두 리턴
        """
        One objective is dominated or incomparable toward all other objectives,
        such objective is the pareto frontier.
        """
        frontiers = []
        n = len(objectives)

        for i in np.arange(n):
            is_pareto_frontier = True
            
            for j in np.arange(n):
                if i == j:
                    continue
                else:
                    is_dominated = self._is_dominated(i, j, objectives, orders)
                    is_incomparable = self._is_incomparable(i, j, objectives, orders)
                    is_pareto_frontier = is_pareto_frontier and (is_dominated or is_incomparable)
                    
            if is_pareto_frontier:
                frontiers.append(i)
                
        return np.asarray(frontiers)


class SlidingWindowTSIncentive(ThompsonSamplingIncentive):
    def __init__(
        self, 
        incentives: np.ndarray, 
        window_size: int,
        random_state: int = None, 
        is_optimistic: bool = False, 
        multi_objective: str = None, 
        **kwargs
    ) -> None:
        super().__init__(incentives, random_state, is_optimistic, multi_objective, **kwargs)
        self._window_size = window_size
        self._histories = []

    @property
    def histories_(self):
        return self._histories[-self._window_size:] if self._window_size > 0 else self._histories

    def update(self, incentive: float, response: float, context: any = None) -> Optional[Dict[str, any]]:
        self._histories.append((context, incentive, response))
        self._init_arms()

        histories = self.histories_
        contexts = np.asarray([i for i, _, _ in histories])
        incentives = np.asarray([i for _, i, _ in histories])
        responses = np.asarray([i for _, _, i in histories])
        
        for ctx in np.unique(contexts):
            M_ctx = contexts == ctx
            inc_ctx = incentives[M_ctx]
            res_ctx = responses[M_ctx]
            
            for idx, inc in enumerate(self._incentives):
                res = res_ctx[inc_ctx == inc]
                a, b = np.sum(res == 1.0), np.sum(res == 0.0)
                self._arms[ctx][idx] = (a, b)


class DecayTSIncentive(ThompsonSamplingIncentive):
    def __init__(
        self, 
        incentives: np.ndarray, 
        decay_factor: float,
        random_state: int = None, 
        is_optimistic: bool = False, 
        multi_objective: str = None, 
        **kwargs
    ) -> None:
        super().__init__(incentives, random_state, is_optimistic, multi_objective, **kwargs)
        self._decay_factor = decay_factor

    def update(self, incentive: float, response: float, context: any = None) -> Optional[Dict[str, any]]:
        for ctx in self._arms.keys():
            self._arms[ctx] = self._arms[ctx] * self._decay_factor
        
        i = np.flatnonzero(self._incentives == incentive).item(0)
        a, b = self._arms[context][i]
        a = a + response
        b = b + (1.0 - response)
        self._arms[context][i] = (a, b)