In [22]:
from postprocessing import *
import random, copy
import numpy as np
import time, os, random, shutil, pickle, ray
# from cgp import *
import matplotlib.pyplot as plt
from configuration import config
import warnings
warnings.filterwarnings('ignore')

In [23]:
def rollout(dataset, policy):
    x, y, z = dataset
    count = 0
    for xi,yi,zi in zip(x,y,z):
        # input = [d[0], d[1]]
        # label = d[2]
        input = [xi, yi]
        output = policy.eval(*input)
        if type(output)!=float:
            continue
        # print(zi, output, zi == output)
        # if zi == output:
        #     count += 1
        count += np.abs(zi-output)
    return count / len(x)

def rollout2(dataset, policy):
    x, y = dataset
    z = [xx>=1.1*yy for xx,yy in zip(x,y)]
    count = 0
    for xi, yi, zi in zip(x,y,z):
        input = [xi, yi]
        output = policy.eval(*input)
        if output == zi:
            count += 1
    return count / 1000

def rollout3(dataset, policy):
    train, label = dataset
    count = 0
    for xyz, l in zip(train, label):
        output = policy.eval(*xyz)
        
        if not type(output)==np.ndarray:
            # print(output, l)
            if output == l:
                count += 1
    return count / 100

def rollout4(dataset, policy):
    train, label = dataset
    count = 0
    for data, l in zip(train, label):
        output = policy.eval(*data)
        
        if not type(output)==np.ndarray:
            # print(output, l)
            if output == l:
                count += 1
    return count / 100

In [42]:
def get_data():
    num = 100
    dim = 10
    x, y, z = [], [], []
    for i in range(num):
        xi = np.random.rand(dim,)
        x.append(xi)
        yi = xi * (np.random.uniform([-0.15]*dim, [0.15]*dim) + 1)
        y.append(yi)
        
        zi = np.sum(np.sign(np.maximum((yi-xi)/xi-0.1, 0))*xi)
        z.append(zi)
        
    return [x, y, z]

def get_data2():
    num = 1000
    train, label = [], []
    for i in range(num):
        x = np.random.uniform(-100,100)
        y = x * (1+np.random.uniform(-0.2, 0.2))
        train.append(x)
        label.append(y)
    return [train, label]

def get_data3():
    num = 100
    train, label = [], []
    for i in range(num):
        x = np.random.randint([-20]*10, [20]*10)
        y = np.random.randint(-20, 20)
        z = np.random.randint([0]*10, [100]*10)
        train.append((x,y,z))
        label.append(z[x>=y].sum())
    return [train, label]

def get_data4():
    '''
    将“涨停检测器”和“三元求和器”拼起来
    '''
    num = 100
    train, label = [], []
    for i in range(num):
        quote_price = np.random.randint(10, 100)
        # quote_quantity = np.random.randint
        open_price = np.random.uniform(0.8, 1.2) * quote_price
        bid_price = np.random.randint([1]*5, [150]*5)
        bid_quantity = np.random.randint([1]*5, [20]*5)
        
        if quote_price >= int(1.1 * open_price): # 涨停
            if quote_price <= bid_price.max(): # 小于买单里。， 的最高价，有可能成交
                index = bid_price >= quote_price # ,1.1*open_price
                quantity = bid_quantity[index].sum()
            else: quantity = 0
        else: quantity = 0
        
        train.append((quote_price, open_price, bid_price, bid_quantity))
        label.append(quantity)
    
    
    
    return [train, label]

In [36]:
# 涨跌判断器
# fs = [
#         # Function(op.add, 2), 
#         Function(op.sub, 2), 
#         # Function(op.mul, 2), 
#         # Function(protected_div, 2),
#         Function(op.abs, 1),
#         Function(sign, 1),
#         Function(tenth, 1),
#     ]

# 三元求和器
# fs = [
#     Function(op.ge, 2, '≥'),
#     # Function(Sub, 2, name='—'),
#     # Function(Sign, 1, name='sgn'),
#     Function(Mul, 2, name='x'),
#     Function(Sum, 1, name='∑')
# ]
def Sum3(x,y,z):
    '''
    x, 向量
    y, 标量
    z, 向量
    函数返回, z[x>=y].sum()
    '''
    if type(x) != np.ndarray: return 0
    if type(y) == np.ndarray: return 0
    if type(z) != np.ndarray: return 0
    return z[x>=y].sum()

def LimUp(quote_q, open_p):
    '''limit-up'''
    if type(open_p)==np.ndarray or type(quote_q)==np.ndarray:
        return 0
    if quote_q >= int(1.1 * open_p):
        return 1
    else: return 0
# 涨停处理
fs = [
    Function(Sum3, 3, name='Sum3'),
    Function(LimUp, 2, name='LimUp'),
    Function(MAX, 1, name='MAX'),
    Function(Sub, 2, name='—'),
    Function(Max1, 1, name='Max1'),
    Function(Mul, 2 , name='x'),
]
def Abs(x): return np.abs(x)
new_functions = [
    # Function(Add, 2, name='Add'),
    Function(Sub, 2, name='Sub'),
    # Function(Mul, 2, name='Mul'),
    # Function(Div, 2, name='Div'),
    # Function(Neg, 1, name='Neg'),
    Function(Abs, 1, name='Abs'),
    # Function(Max1, 1, name='Max2'),
    # Function(Sum, 1, name='Sum'),
    # Function(Min1, 1, name='Min'),
    Function(Sign, 1, name='Sgn'),
    Function(tenth, 1,)
    # Function(const_01, 0, name='0.1')
]

In [37]:
class Node:
    """A node in CGP graph"""
    def __init__(self, arity):
        """Initialize this node randomly"""
        self.arity = arity
        self.i_func = None # 该节点的函数在函数集的index
        self.i_inputs = [None] * arity
        self.weights = [None] * arity
        self.i_output = None
        self.output = None
        self.active = False

class Individual:
    """An individual (chromosome, genotype, etc.) in evolution"""
    
    # 以下是class变量，init内是实例化object的变量
    # max_arity = 3
    # fitness = None

    def __init__(self, input_dim, out_dim, function_set, n_cols, level_back, out_random_active=False):
        
        self.n_cols = n_cols # number of cols (nodes) in a single-row CGP
        self.level_back = level_back # 后面的节点可以最远连接的前面节点的相对位置
    
        # 【创新点：给不同位置的node设置不同的function set,人的先验知识可以起作用，可能需要给node class设置一个fun_set】
        self.function_set = function_set
        self.max_arity = max(f.arity for f in fs)
        self.n_inputs = input_dim
        self.n_outputs = out_dim # 输出维度
        self.weight_range = [-1, 1]
        
        # 创建nodes
        self.nodes = []
        for pos in range(self.n_cols):
            self.nodes.append(self._create_random_node(pos))
        
        # 将最后n_outputs个node设为输出节点
        for i in range(1, self.n_outputs + 1):
            # 输出节点有一定概率被关闭
            if out_random_active:
                if random.random() < 0.8:
                    self.nodes[-i].active = True
                else:
                    self.nodes[-i].active = False
            else:
                self.nodes[-i].active = True
        
        self.fitness = None
        self._active_determined = False

    def _create_random_node(self, pos):
        '''
        pos:该节点的index
        设: n_inputs=3, level_back=4

        in  in  in  0   1   2   3   4   5   6
        *   *   *   *   *   *   *   *   *   *

        pos  pos-level_back  -n_inputs  max(p-l,-n)  pos-1  i_inputs取值
        0        -4              -3         -3         -1    -3,-2,-1
        1        -3              -3         -3          0    -3,-2,-1,0
        2        -2              -3         -2          1    -2,-1,0,1
        3        -1              -3         -1          2    -1,0,1,2
        4         0              -3          0          3     0,1,2,3
        5         1              -3          1          4     1,2,3,4
        6         2              -3          2          5     2,3,4,5
        
        输入维度=3,则-3,-2,-1三个点是程序的输入节点
        '''
        node = Node(self.max_arity)
        node.i_func = random.randint(0, len(self.function_set) - 1)
        for i in range(self.function_set[node.i_func].arity):
            # 随机确定node的每个输入端口连接的是前面节点(column)的输出
            # node.i_inputs[i]记录前端父节点的idx
            # print('aaa', i, self.function_set[node.i_func].name)
            node.i_inputs[i] = random.randint(max(pos - self.level_back, -self.n_inputs), pos - 1)
            node.weights[i] = 1.0 # random.uniform(self.weight_range[0], self.weight_range[1])
        node.i_output = pos
        return node

    def _determine_active_nodes(self):
        """
        Determine which nodes in the CGP graph are active
        """
        # check each node in reverse order
        n_active = 0

        # 逆序遍历所有节点
        for node in reversed(self.nodes):
            if node.active:
                n_active += 1
                # 依次检查该节点所有输入端口
                for i in range(self.function_set[node.i_func].arity):
                    
                    # i_input是该node第i的输入端口所连的父节点的index
                    i_input = node.i_inputs[i]
                    
                    if i_input >= 0:  # >=0表示node的父节点是一个hidden节点，而非input节点
                        # 该节点的父节点也设置为“激活”
                        self.nodes[i_input].active = True
        self.n_active = n_active
        if config.Verbose:
            print("# active genes: ", n_active)

    def eval(self, *args):
        """
        Given inputs, evaluate the output of this CGP individual.
        :return the final output value
        """
        if not self._active_determined:
            self._determine_active_nodes()
            self._active_determined = True
        
        # forward pass: evaluate
        for node in self.nodes:
            if node.active:
                inputs = []
                for i in range(self.function_set[node.i_func].arity): # 依次获得该node各维输入
                    i_input = node.i_inputs[i] # node的父节点idx
                    w = node.weights[i] # node与父节点连边的权重
                    
                    # 父节点是程序的输入节点
                    if i_input < 0:
                        inputs.append(args[-i_input - 1] * w)
                    # 父节点是普通节点
                    else:
                        inputs.append(self.nodes[i_input].output * w)
                node.output = self.function_set[node.i_func](*inputs) # 执行计算
        if self.n_outputs == 1:
            return self.nodes[-1].output
        
        out = []
        for i in reversed(range(1, self.n_outputs + 1)):
            out.append(self.nodes[-i].output)
        return out

    def mutate(self, mut_rate=0.01):
        """
        Mutate this individual. Each gene is varied with probability *mut_rate*.
        :param mut_rate: mutation probability
        :return a child after mutation
        """
        child = copy.deepcopy(self)
        for pos, node in enumerate(child.nodes):
            # mutate the function gene
            if random.random() < mut_rate:
                node.i_func = random.choice(range(len(self.function_set)))
            
            # 函数突变之后需要重新选择其父节点
            # mutate the input genes (connection genes)
            arity = self.function_set[node.i_func].arity
            for i in range(arity):
                if node.i_inputs[i] is None or random.random() < mut_rate:  # if the mutated function requires more arguments, then the last ones are None 
                    node.i_inputs[i] = random.randint(max(pos - self.level_back, -self.n_inputs), pos - 1)
                if node.weights[i] is None or random.random() < mut_rate:
                    # 节点之间不再设置随机权重，统一固定1.0
                    node.weights[i] = 1.0 # random.uniform(self.weight_range[0], self.weight_range[1])
            # initially an individual is not active except the last output node
            node.active = False
        for i in range(1, self.n_outputs + 1):
            child.nodes[-i].active = True
        child.fitness = None
        child._active_determined = False
        return child

# Individual.function_set = fs
# Individual.max_arity = max(f.arity for f in fs)

def evolve(pop, mut_rate, mu, lambda_):
    """
    Evolve the population *pop* using the mu + lambda evolutionary strategy

    :param pop: a list of individuals, whose size is mu + lambda. The first mu ones are previous parents.
    :param mut_rate: mutation rate
    :return: a new generation of individuals of the same size
    """
    pop = sorted(pop, key=lambda ind: ind.fitness)  # stable sorting
    parents = pop[-mu:]
    # generate lambda new children via mutation
    offspring = []
    for _ in range(lambda_):
        parent = random.choice(parents)
        offspring.append(parent.mutate(mut_rate))
    return parents + offspring


def create_population(n, input_dim, out_dim, fs, n_cols, level_back, out_random_active=False):
    """Create a random population composed of n individuals."""
    return [Individual(input_dim, 
                       out_dim, 
                       fs, 
                       n_cols,
                       level_back,
                       out_random_active=out_random_active) for _ in range(n)]


In [43]:
np.random.seed(123)
random.seed(123)
pop = create_population(config.MU+config.LAMBDA, 
                            input_dim=4, 
                            out_dim=1,
                            fs=fs,
                            # fs=new_functions,
                            n_cols=10,
                            level_back=10,
                            out_random_active=False)
# data = get_data()
# data = get_data2()
# data = get_data3()
data = get_data4()

In [44]:
for g in range(10000):
    tick = time.time()
    fit_list = [rollout4(data, p) for p in pop]
    # fitness = ray.get(fit_list)
    # 优化方向是最大化
    for f, p in zip(fit_list, pop):
        p.fitness = f
    pop = evolve(pop, 0.09, config.MU, config.LAMBDA)
    print(g, 'time:', round(time.time()-tick, 3),'best fitness:', pop[0].fitness)
    if pop[0].fitness == 1.0:
        break

0 time: 0.105 best fitness: 0.61
1 time: 0.096 best fitness: 0.61
2 time: 0.079 best fitness: 0.61
3 time: 0.079 best fitness: 0.61
4 time: 0.092 best fitness: 0.61
5 time: 0.062 best fitness: 0.61
6 time: 0.046 best fitness: 0.61
7 time: 0.04 best fitness: 0.61
8 time: 0.041 best fitness: 0.61
9 time: 0.045 best fitness: 0.61
10 time: 0.055 best fitness: 0.61
11 time: 0.069 best fitness: 0.61
12 time: 0.079 best fitness: 0.61
13 time: 0.045 best fitness: 0.61
14 time: 0.053 best fitness: 0.61
15 time: 0.055 best fitness: 0.61
16 time: 0.068 best fitness: 0.61
17 time: 0.067 best fitness: 0.61
18 time: 0.079 best fitness: 0.61
19 time: 0.085 best fitness: 0.61
20 time: 0.078 best fitness: 0.61
21 time: 0.062 best fitness: 0.61
22 time: 0.067 best fitness: 0.61
23 time: 0.073 best fitness: 0.61
24 time: 0.071 best fitness: 0.61
25 time: 0.102 best fitness: 0.61
26 time: 0.096 best fitness: 0.61
27 time: 0.093 best fitness: 0.61
28 time: 0.086 best fitness: 0.61
29 time: 0.067 best fitne

In [45]:
g = extract_computational_subgraph(pop[0])
visualize(g, to_file='test.png',input_names=['quote_p', 'open_p', 'bid_p', 'bid_q'])

Operator notation of '—'' is not available. The node id is shown instead.
Operator notation of 'x'' is not available. The node id is shown instead.
Operator notation of 'Max1'' is not available. The node id is shown instead.
Operator notation of 'Sum3'' is not available. The node id is shown instead.
Operator notation of 'Sum3'' is not available. The node id is shown instead.


In [5]:
class Individual:
    def __init__(self, input_dim, out_dim, function_set, n_cols, level_back, out_random_active=False):
        self.n_cols = n_cols # number of cols (nodes) in a single-row CGP
        self.level_back = level_back # 后面的节点可以最远连接的前面节点的相对位置
        self.function_set = function_set
        self.max_arity = max(f.arity for f in fs)
        self.n_inputs = input_dim
        self.n_outputs = out_dim # 输出维度
        self.weight_range = [-1, 1]
        
        # 创建nodes
        self.nodes = []
        for pos in range(self.n_cols):
            self.nodes.append(self._create_random_node(pos))
        
        # 将最后n_outputs个node设为输出节点
        for i in range(1, self.n_outputs + 1):
            if out_random_active: # 输出节点有一定概率被关闭
                if random.random() < 0.8: self.nodes[-i].active = True
                else: self.nodes[-i].active = False
            else: self.nodes[-i].active = True
        
        self.fitness = None
        self._active_determined = False

    def _create_random_node(self, pos):
        node = Node(self.max_arity)
        node.i_func = random.randint(0, len(self.function_set) - 1)
        for i in range(self.function_set[node.i_func].arity):
            # 随机确定node的每个输入端口连接的是前面节点(column)的输出
            # node.i_inputs[i]记录前端父节点的idx
            node.i_inputs[i] = random.randint(max(pos - self.level_back, -self.n_inputs), pos - 1)
            node.weights[i] = 1.0 # random.uniform(self.weight_range[0], self.weight_range[1])
        node.i_output = pos
        return node

    def _determine_active_nodes(self):
        """
        Determine which nodes in the CGP graph are active
        """
        n_active = 0
        for node in reversed(self.nodes):
            if node.active:
                n_active += 1
                for i in range(self.function_set[node.i_func].arity):
                    i_input = node.i_inputs[i]
                    if i_input >= 0: self.nodes[i_input].active = True
        self.n_active = n_active
        if config.Verbose: print("# active genes: ", n_active)

    def eval(self, *args):
        """
        Given inputs, evaluate the output of this CGP individual.
        :return the final output value
        """
        if not self._active_determined:
            self._determine_active_nodes()
            self._active_determined = True
        print('aaa',args)
        # forward pass: evaluate
        for node in self.nodes:
            if node.active:
                inputs = []
                for i in range(self.function_set[node.i_func].arity): # 依次获得该node各维输入
                    i_input = node.i_inputs[i] # node的父节点idx
                    w = node.weights[i]
                    if i_input < 0:
                        inputs.append(args[-i_input - 1] * w)
                    else:
                        inputs.append(self.nodes[i_input].output * w)
                print('bbb', '\n',
                        self.function_set[node.i_func].name, '\n',
                        node.i_inputs,  '\n',
                        inputs)
                node.output = self.function_set[node.i_func](*inputs) # 执行计算
        if self.n_outputs == 1:
            return self.nodes[-1].output
        out = []
        for i in reversed(range(1, self.n_outputs + 1)):
            out.append(self.nodes[-i].output)
        return out

    def mutate(self, mut_rate=0.01):
        child = copy.deepcopy(self)
        for pos, node in enumerate(child.nodes):
            if random.random() < mut_rate:
                node.i_func = random.choice(range(len(self.function_set)))
            arity = self.function_set[node.i_func].arity
            for i in range(arity):
                if node.i_inputs[i] is None or random.random() < mut_rate:  # if the mutated function requires more arguments, then the last ones are None 
                    node.i_inputs[i] = random.randint(max(pos - self.level_back, -self.n_inputs), pos - 1)
                if node.weights[i] is None or random.random() < mut_rate:
                    node.weights[i] = 1.0
            node.active = False
        for i in range(1, self.n_outputs + 1):
            child.nodes[-i].active = True
        child.fitness = None
        child._active_determined = False
        return child

In [8]:
ind = Individual(2,1,fs,3,3,)
ind.nodes[0].i_func = 0
ind.nodes[0].i_inputs = [-1, -2]
ind.nodes[1].i_func = 1
ind.nodes[1].i_inputs = [-1, 0]
ind.nodes[2].i_func = 2
ind.nodes[2].i_inputs = [1, None]
for node in ind.nodes:
    print(fs[node.i_func].name, node.i_inputs)
data = get_data3()
train, label = data
count = 0
for xy, l in zip(train, label):
    output = ind.eval(*xy)
    print('ccc', output, l)
    if not type(output)==np.ndarray:
        # print(output, l)
        if output == l:
            count += 1
print(count / 100)

≥ [-1, -2]
x [-1, 0]
∑ [1, None]
aaa (array([-12,   5,   4,  -2,  16,  17, -14,  -5, -15,  19]), -3)
bbb 
 ≥ 
 [-1, -2] 
 [array([-12.,   5.,   4.,  -2.,  16.,  17., -14.,  -5., -15.,  19.]), -3.0]


TypeError: unsupported operand type(s) for *: 'bool' and 'NoneType'