In [3]:
!pip install sympy==1.5
import sympy
sympy.__version__

Collecting sympy==1.5
[?25l  Downloading https://files.pythonhosted.org/packages/4d/a7/25d5d6b3295537ab90bdbcd21e464633fb4a0684dd9a065da404487625bb/sympy-1.5-py2.py3-none-any.whl (5.6MB)
[K     |████████████████████████████████| 5.6MB 5.9MB/s 
Installing collected packages: sympy
  Found existing installation: sympy 1.1.1
    Uninstalling sympy-1.1.1:
      Successfully uninstalled sympy-1.1.1
Successfully installed sympy-1.5


'1.1.1'

In [1]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
import sys
sys.path.append('/content/gdrive/My Drive/Library') #!ls /content/gdrive/My\ Drive

Mounted at /content/gdrive


In [2]:
import tensorflow as tf
import numpy as np
from utils import functions

# Constants for L0 Regularization
BETA = 2 / 3
GAMMA = -0.1
ZETA = 1.1
EPSILON = 1e-6

class SymbolicLayerL0(tf.keras.layers.Layer):
    def __init__(self, funcs=None, initial_weight=None, variable=False, init_stddev=0.1):
        super().__init__()
        
        if funcs is None:
            funcs = functions.default_func
        self.initial_weight = initial_weight
        self.W = None       # Weight matrix
        self.built = False  # Boolean whether weights have been initialized
        if self.initial_weight is not None:     # use the given initial weight
            with tf.name_scope("symbolic_layer"):
                if not variable:
                    self.W = tf.Variable(self.initial_weight)
                else:
                    self.W = self.initial_weight
            self.built = True
        
        self.init_stddev = init_stddev
        self.n_funcs = len(funcs)           # Number of activation functions (and number of layer outputs)
        self.funcs = [func.tf for func in funcs]        # Convert functions to list of Tensorflow functions
        self.n_double = functions.count_double(funcs)   # Number of activation functions that take 2 inputs
        self.n_single = self.n_funcs - self.n_double    # Number of activation functions that take 1 input
        
        self.out_dim = self.n_funcs + self.n_double
        self.droprate_init = 0.5
        self.bias = None
        self.qz_log_alpha = None
        self.in_dim = None
        self.eps = None
    
    def build(self, in_dim):
        with tf.name_scope("symbolic_layer"):
            self.in_dim = in_dim
            #self.qz_log_alpha = tf.Variable(tf.zeros(shape=(self.in_dim, self.out_dim)))
            self.qz_log_alpha = tf.Variable(tf.random.normal((in_dim, self.out_dim),
                                                             mean=tf.math.log(1-self.droprate_init) - tf.math.log(self.droprate_init),
                                                             stddev=1e-2))
    
    def loss(self):
        """Regularization loss term"""
        return tf.reduce_sum(tf.sigmoid(self.qz_log_alpha - BETA * tf.math.log(-GAMMA / ZETA)))
    
    def call(self, x, sample=True, reuse_u=False):
        """Multiply by weight matrix and apply activation units"""
        with tf.name_scope("symbolic_layer"):
            if self.W is None or self.qz_log_alpha is None:
                self.build(x.shape.as_list()[1])    # First dimension is batch size
            
            if sample:
                """Uniform random numbers for concrete distribution"""
                if self.eps is None or not reuse_u:
                    #self.eps = tf.zeros(shape=(self.in_dim, self.out_dim))
                    self.eps = tf.random.uniform(shape=(self.in_dim, self.out_dim), minval=EPSILON, maxval=1.0 - EPSILON)
                """Quantile, aka inverse CDF, of the 'stretched' concrete distribution"""
                y = tf.sigmoid((tf.math.log(self.eps) - tf.math.log(1.0-self.eps) + self.qz_log_alpha) / BETA)
                z = y * (ZETA - GAMMA) + GAMMA
                mask = tf.clip_by_value(z, clip_value_min=0.0, clip_value_max=1.0)
                w = self.W * mask
                h = tf.matmul(x, w)
            else:
                """Deterministic value of weight based on mean of z"""
                pi = tf.sigmoid(self.qz_log_alpha)
                z_mean = tf.clip_by_value(pi * (ZETA - GAMMA) + GAMMA, clip_value_min=0.0, clip_value_max=1.0)
                w = self.W * z_mean
                h = tf.matmul(x, w)
            
            output = []
            in_i = 0    # input index
            out_i = 0   # output index
            # Apply functions with only a single input
            while out_i < self.n_single:
                output.append(self.funcs[out_i](h[:, in_i]))
                in_i += 1
                out_i += 1
            # Apply functions that take 2 inputs and produce 1 output
            while out_i < self.n_funcs:
                output.append(self.funcs[out_i](h[:, in_i], h[:, in_i+1]))
                in_i += 2
                out_i += 1
            output = tf.stack(output, axis=1)
            return output

class SymbolicNetL0(tf.keras.Model):
    def __init__(self, symbolic_depth, funcs=None, initial_weights=None, initial_bias=None,
                 variable=False, init_stddev=0.1):
        super().__init__()     # Python 2 下使用 super(MyModel, self).__init__()
        # 此處添加初始化程式碼（包含 call 方法中會用到的層），例如
        self.depth = symbolic_depth     # Number of hidden layers
        self.funcs = funcs
        self.shape = (None, 1)
        if initial_weights is not None:
            self.symbolic_layers = [SymbolicLayerL0(funcs=funcs, initial_weight=initial_weights[i], variable=variable)
                                    for i in range(self.depth)]
            if not variable:
                self.output_weight = tf.Variable(initial_weights[-1])
            else:
                self.output_weight = initial_weights[-1]
    
    def call(self, input, sample=True, reuse_u=False):
        # 此處添加模型呼叫的程式碼（處理輸入並返回輸出），例如
        self.shape = (int(input.shape[1]), 1)     # Dimensionality of the input
        # connect output from previous layer to input of next layer
        h = input
        # Building hidden layers
        for i in range(self.depth):
            h = self.symbolic_layers[i](h, sample=sample, reuse_u=reuse_u)
        # Final output (no activation units) of network
        h = tf.matmul(h, self.output_weight)
        return h
    
    def get_loss(self):
        return tf.reduce_sum([self.symbolic_layers[i].loss() for i in range(self.depth)])
    
    def get_weights(self):
        """Return list of weight matrices, TF.Keras method w/ slightly tweak output Type"""
        # First part is iterating over hidden weights. Then append the output weight.        
        return [self.symbolic_layers[i].W * tf.clip_by_value(
                tf.sigmoid(self.symbolic_layers[i].qz_log_alpha) * (ZETA - GAMMA) + GAMMA,clip_value_min=0.0,clip_value_max=1.0)
                for i in range(self.depth)] + [self.output_weight]

In [5]:
from inspect import signature
func=lambda x, y: x**2 + np.sin(2*np.pi*y)
x_dim = len(signature(func).parameters)     # Number of inputs to the function, or, dimensionality of x
N_TRAIN=256
NOISE_SD = 0
range_min=-1
range_max=1
x = np.array((range_max - range_min) * np.random.random([N_TRAIN, x_dim]) + range_min, dtype=np.float32)
y = np.array(np.random.normal([[func(*x_i)] for x_i in x], NOISE_SD), dtype=np.float32)

activation_funcs = [
            *[functions.Constant()] * 2,
            *[functions.Identity()] * 4,
            *[functions.Square()] * 4,
            *[functions.Sin()] * 2,
            *[functions.Exp()] * 2,
            *[functions.Sigmoid()] * 2,
            *[functions.Product()] * 2
        ]
width = len(activation_funcs)
n_double = functions.count_double(activation_funcs) #count how many 2-in func

summary_step = 1000 # Number of iterations at which to print to screen
n_epochs1=10001
n_epochs2=10001

init_sd_first = 0.5
init_sd_last = 0.5
init_sd_middle = 0.5
initial_weights=[tf.random.truncated_normal([x_dim, width + n_double], stddev=init_sd_first),
                 tf.random.truncated_normal([width, width + n_double], stddev=init_sd_middle),
                 tf.random.truncated_normal([width, 1], stddev=init_sd_last)]

sym = SymbolicNetL0(len(initial_weights)-1,
                    funcs=activation_funcs,
                    initial_weights=initial_weights)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=1e-2)

@tf.function
def train_one_step(x,y):
    with tf.GradientTape() as tape:
        y_hat = sym(x)
        error = tf.math.reduce_mean(tf.keras.losses.mean_squared_error(y, y_hat))
        reg_loss = sym.get_loss()
        loss = error + 0.005 * reg_loss
    grads = tape.gradient(loss, sym.variables)    # 使用 model.variables 這一屬性直接獲得模型中的所有變數
    optimizer.apply_gradients(grads_and_vars=zip(grads, sym.variables))
    return loss

for i in range(n_epochs1):
    loss_val=train_one_step(x,y)
    if i % 1000 == 0:   # Number of iterations at which to print to screen
        print("Epoch: %d - loss: %f" % (i,loss_val))

from utils import pretty_print
print(pretty_print.network([tf.convert_to_tensor(w) for w in sym.get_weights()],
                           activation_funcs,
                           ["x","y"]))

Epoch: 0 - loss: 3.841239
Epoch: 1000 - loss: 0.363843
Epoch: 2000 - loss: 0.697265
Epoch: 3000 - loss: 0.062612
Epoch: 4000 - loss: 0.041749
Epoch: 5000 - loss: 0.041766
Epoch: 6000 - loss: 0.033600
Epoch: 7000 - loss: 0.027199
Epoch: 8000 - loss: 0.020205
Epoch: 9000 - loss: 0.028498
Epoch: 10000 - loss: 0.021169
1.04959*x**2 + 0.997291*sin(6.28749213800495*y) + 0.0311579
