In [1]:
import numpy as np
import pandas as pd
from copy import deepcopy
from anytree import NodeMixin, RenderTree

### 노드 IO Type 정의

- Node의 IO Type을 정의해줘야 입출력 타입에 맞춰서 이어붙인다.

In [2]:
class NodeIOTypes:
    """
    Node의 IO 타입 정의
    """

    SCALAR = "scalar"
    SERIES = "series"

In [3]:
class Node(NodeMixin):
    """
    Node 베이스 클래스
    """
    def __init__(self,
                 max_n_childs: int,
                 input_type: NodeIOTypes,
                 output_type: NodeIOTypes,
                 ):

        self.data = None
        self.parent = None
        self.childs = []

        self.input_type = input_type
        self.output_type = output_type
        self.max_n_childs = max_n_childs

    def __repr__(self):
        return self.name

    @property
    def name(self):
        return type(self).__name__ + '()'

    @property
    def is_full(self):
        return len(self.childs) >= self.max_n_childs

    def add_child(self, child: NodeMixin):
        child.set_parent(self)
        self.childs.append(child)

    def activate(self, *inputs):
        raise NotImplementedError

    def set_parent(self, parent):
        self.parent = parent

    def set_data(self, data):
        self.data = data

    def get_data(self):
        return self.data

    def copy(self):
        return deepcopy(self)

    def propagate(self):
        if not self.is_full:
            raise ValueError("can't propagate through this node if not node.full")
        return self.activate(*[c.propagate() for c in self.childs])

### 연산자 노드 클래스

- 필요한 연산자들을 정의한다.
- 여기서 여러가지 기술적 지표 및 재무 팩터 계산을 위한 연산자 등이 자유롭게 정의되어야 한다.

In [4]:
class SERIES(Node):
    '''
    Series Leaf Node
    '''
    def __init__(self, series:pd.Series):
        super(SERIES, self).__init__(
            input_type = None,
            output_type = NodeIOTypes.SERIES,
            max_n_childs = 0,
        )

        self.series = series
        self.set_data(series)

    @property
    def name(self):
        return f"SERIES[{len(self.series)}]"

    def activate(self):
        return self.get_data()


class MEAN(Node):
    '''
    (a + b) / 2
    '''
    def __init__(self):
        super(MEAN, self).__init__(
            input_type = NodeIOTypes.SCALAR,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 2,
        )

    def activate(self, *inputs):
        return (inputs[0] + inputs[1]) / 2


class AVG(Node):
    '''
    Average
    '''
    def __init__(self):
        super(AVG, self).__init__(
            input_type = NodeIOTypes.SERIES,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 1,
        )

    def activate(self, series: pd.Series):
        return pd.Series([series.mean()])


class MAX(Node):
    '''
    Maximum
    '''
    def __init__(self):
        super(MAX, self).__init__(
            input_type = NodeIOTypes.SERIES,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 1,
        )

    def activate(self, series: pd.Series):
        return pd.Series([series.max()])


class MIN(Node):
    '''
    Minimum
    '''
    def __init__(self):
        super(MIN, self).__init__(
            input_type = NodeIOTypes.SERIES,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 1,
        )

    def activate(self, series: pd.Series):
        return pd.Series([series.min()])


class MEDIAN(Node):
    '''
    Median
    '''
    def __init__(self):
        super(MEDIAN, self).__init__(
            input_type = NodeIOTypes.SERIES,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 1,
        )

    def activate(self, series:pd.Series):
        return pd.Series([series.median()])


class QUANTILE(Node):
    '''
    Quantile
    '''
    def __init__(self, percent):
        super(QUANTILE, self).__init__(
            input_type = NodeIOTypes.SERIES,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 1,
        )

        self.percent = percent

    @property
    def name(self):
        return type(self).__name__ + f'[{self.percent}]'

    def activate(self, series: pd.Series):
        return pd.Series([series.quantile(self.percent)])


class LARGER(Node):
    '''
    a > b -> a
    '''
    def __init__(self):
        super(LARGER, self).__init__(
            input_type = NodeIOTypes.SCALAR,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 2,
        )

    def activate(self, *inputs):

        a = inputs[0].min()
        b = inputs[1].min()

        return pd.Series([max([a, b])])


class SMALLER(Node):
    '''
    a > b -> b
    '''
    def __init__(self):
        super(SMALLER, self).__init__(
            input_type = NodeIOTypes.SCALAR,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 2,
        )

    def activate(self, *inputs):

        a = inputs[0].min()
        b = inputs[1].min()

        return pd.Series([min([a, b])])


class PLUSMINUS(Node):
    '''
    (a - b) + c
    '''
    def __init__(self):
        super(PLUSMINUS, self).__init__(
            input_type = NodeIOTypes.SCALAR,
            output_type = NodeIOTypes.SCALAR,
            max_n_childs = 3,
        )

    def activate(self, *inputs) -> pd.Series:
        return inputs[0] - inputs[1] + inputs[2]

In [5]:
data = pd.Series([1, 2, 3, 4, 5])

seriesNode = SERIES(data)

Q3Node = QUANTILE(0.75)
Q1Node = QUANTILE(0.25)
plusminusNode = PLUSMINUS()
largerNode = LARGER()
smallerNode = SMALLER()
medianNode = MEDIAN()
avgNode = AVG()
maxNode = MAX()
minNode = MIN()
meanNode = MEAN()

nodes = [
    Q3Node,
    Q1Node,
    plusminusNode,
    largerNode,
    smallerNode,
    medianNode,
    avgNode,
    maxNode,
    minNode,
    meanNode,
    seriesNode,
]

Q3Node

QUANTILE[0.75]

## 연산자를 이어붙일 Binary Tree 객체

In [6]:
class Tree:
    """
    트리 객체
    """
    def __init__(self, name="tree"):
        self._root = None
        self.nodes = []
        self.node_names = []
        self.name = name

    def __call__(self, *args, **kwargs):
        return self.evaluate()

    def __repr__(self):
        return f'[{self.name}]'

    @property
    def root(self):
        return self._root

    @property
    def current(self):
        return self._currnode

    @property
    def iscompleted(self):
        if (self._currnode.max_n_childs > 0) & (not self._currnode.is_full):
            return False
        temp = self._currnode

        while True:
            if (self._currnode.max_n_childs > 0) & (not temp.is_full):
                return False
            if temp.parent is None:
                return True
            temp = temp.parent


    def insert(self, node: Node):
        node = node.copy()

        # 최초 삽입 시 root 노드로 지정
        if not self._root:
            self._root = node
            self.nodes.append(self._root)
            self.node_names.append(self._root.name)
            self._currnode = self._root
            return

        if not self._currnode.is_full:
            self._currnode.add_child(node)
            self.nodes.append(node)
            self.node_names.append(node.name)

            if node.max_n_childs > 0:
                self._currnode = node

            else:
                while self._currnode.is_full:
                    if self._currnode.parent is None:
                        break
                    self._currnode = self._currnode.parent

        else:
            self._currnode = self._currnode.parent
            self.insert(node)

    def evaluate(self):
        return self.root.propagate()

    def render(self):
        string = "\n".join(
            [f"{pre}{node.name}" for pre, _, node in RenderTree(self.root)]
        )
        print(string)

## 트리 생성기

- 위에 정의된 노드 및 트리 객체를 이어붙여서 트리를 뿜어내는 객체

In [7]:
from typing import List

class TreeGenerator:
    def __init__(self, nodes: List[Node]):

        self.nodes = nodes
        self.max_depth = None

        # 모든 node에 대하여 root node에 올 수 있는지 마스크로 생성
        self.initial_mask = self.__is_nonleaf(self.nodes)


    def generate_initial_mask(self) -> List[int]:
        """
        **Description**
            - root node에 각 노드들이 올 수 있는지 여부를 마스크로 리턴
            - root node에 올 수 있는 조건은 binary, twochildnoe
        """
        return self.__is_nonleaf(self.nodes)


    def reset(self, max_depth:int) -> dict:
        """
        **Description**
            - state와 action mask를 초기화
            - initial state는 np.zero(len(nodes))로 초기화
        """

        # 생성할 Tree의 max_depth
        self.max_depth = max_depth

        # 생성할 Tree 객체 선언
        self.tree = Tree('test')
        # root node 가능 여부 마스크
        mask = np.array(self.initial_mask)
        # 첫 스텝의 이전 액션 마스크는 np.zero로 시작
        prev_action_onehot = np.zeros(len(self.nodes))
        return {"state": prev_action_onehot, "action_mask": mask}


    def step(self, node_index:int) -> dict:
        """
        **Description**
            - Tree의 자식 노드에 node를 한번 추가하는 메서드
        """

        done = False

        prev_action_onehot = np.zeros(len(self.nodes))
        prev_action_onehot[node_index] = 1
        self.tree.insert(self.nodes[node_index])

        # 현재 node의 input type이 output_type과 같은 node를 마스크로 생성
        mask = [1 if n.output_type in self.tree.current.input_type else 0 for n in self.nodes]

        # Tree maximum depth에 도달하면 leaf node (DATA node)만 고려
        if self.tree.current.depth >= (self.max_depth-1):
            mask_a = mask
            mask_b = [1 if n.max_n_childs == 0 else 0 for n in self.nodes]
            mask = [(a and b) for a, b in zip(mask_a, mask_b)]

        # 만약 자식 node에 올 수 있는 node가 없으면 max depth += 1
        if (sum(mask) == 0) & ~self.tree.iscompleted:
            self.max_depth += 1
            mask = mask_a

        # tree 다 채워지면 done
        if self.tree.iscompleted:
            done = True

        return {"state": prev_action_onehot, "action_mask": mask}, done


    def sample(self, max_depth):
        """
        **Description**
            - 하나의 alpha tree를 random하게 생성
        """

        info = self.reset(max_depth)
        mask = info['action_mask']

        while True:
            prob = np.array(mask)/np.array(mask).sum()
            node_index = np.random.choice(np.arange(len(self.nodes)), p=prob)
            next_info, done = self.step(node_index)
            mask = next_info['action_mask']

            if done:
                break

        return self.tree


    def __is_nonleaf(self, nodes: List[Node]) -> List[int]:
        initial_mask = [1 if n.max_n_childs > 0 else 0 for n in nodes ]
        return initial_mask

## 실제 트리 생성

- 노드 스페이스 다 넣어주고 랜덤 생성해보자

In [8]:
generator = TreeGenerator(nodes)

In [9]:
sampled_alpha = generator.sample(2)
sampled_alpha.render()

MEAN()
├── MEAN()
│   ├── AVG()
│   │   └── SERIES[5]
│   └── LARGER()
│       ├── MIN()
│       │   └── SERIES[5]
│       └── QUANTILE[0.25]
│           └── SERIES[5]
└── SMALLER()
    ├── AVG()
    │   └── SERIES[5]
    └── MEAN()
        ├── PLUSMINUS()
        │   ├── MAX()
        │   │   └── SERIES[5]
        │   ├── MAX()
        │   │   └── SERIES[5]
        │   └── MAX()
        │       └── SERIES[5]
        └── MEDIAN()
            └── SERIES[5]


In [10]:
factor_value = sampled_alpha()
factor_value

0    2.75
dtype: float64