testing keras custom layer stuff

In [233]:
# import packages:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from itertools import product
import os 
import tensorflow as tf
from tensorflow.keras import Input, Model, regularizers, constraints, layers, optimizers
from keras.layers import Layer
from keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score

define classes:

In [234]:
# randomizer seed:
np.random.seed(0)

# need to define a constraint for training the parameters:
class OrderedConstraint(constraints.Constraint):
    # constructor:
    def __init__(self):
        pass

    # call function for constraint:
    def __call__(self, W):
        return tf.sort(W, axis = 2)

# first layer -> membership layer:
class MF_Layer(Layer): 
    # constructor:
    def __init__(self, num_inputs, num_mfs, **kwargs):
        super(MF_Layer, self).__init__(**kwargs)
        self.num_inputs = num_inputs
        self.num_mfs = num_mfs

        # need to initialize antecedent parameters, but a <= b <= c. therefore 
        # generate a set of "raw parameters" to be sorted. these are not trained

        raw_params = np.random.uniform(low = 0.0, high = 50.0, size = (self.num_inputs, self.num_mfs, 3))
        sorted_params = tf.sort(raw_params, axis = -1)

        self.mf_params = self.add_weight(
            shape = (self.num_inputs, self.num_mfs, 3),             # num_inputs, num_mfs per input, 3 params per mf (triangular)
            initializer = tf.constant_initializer(sorted_params.numpy()),
            trainable = True,
            constraint = OrderedConstraint(),
            name = 'Antecedent_Params'
        )

    # custom setting of weights:
    def set_weights(self, params):
        # this function is used to set weights based on what a user provides
        # user must provide weights in the form of a np.array of shape (num_mfs, num_params)

        if params.shape != (self.num_inputs, self.num_mfs, 3):
            raise ValueError(f'Parameters provided are not of correct shape, expected ({self.num_inputs}, {self.num_mfs}, 3)')

        self.mf_params = params
        
    # function call:
    def call(self, inputs):
        # need to initialize the membership values:
        membership_values = []

        # for every input:
        for i in range(self.num_inputs):
            # get the memberships for that input:
            input_mf_params = self.mf_params[i]

            # need to now compute the fuzzified value for each membership function:
            fuzzified_values = []

            # for every membership function:
            for j in range(self.num_mfs):
                a = input_mf_params[j, 0]
                b = input_mf_params[j, 1]
                c = input_mf_params[j, 2]

                # check to see if we are on the edges:
                is_left_edge = (j == 0) & (a == b)                  # this would be the left-most ramp
                is_right_edge = (j == self.num_mfs - 1) & (b == c)  # this would be the right-most ramp
        
                # ramp calculations:
                left = tf.where((inputs[:, i] == a) & is_left_edge, 1.0, (inputs[:, i] - a) / (b - a))
                right = tf.where((inputs[:, i] == c) & is_right_edge, 1.0, (c - inputs[:, i]) / (c - b))

                output = tf.maximum(0.0, tf.minimum(left, right))

                fuzzified_values.append(output)
        
            # need to now stack the mf values for that given input:
            membership_values.append(tf.stack(fuzzified_values, axis = -1))

        # stack everything and return:
        return tf.stack(membership_values, axis = 1)

# second layer -> firing strength layer:
class FS_Layer(Layer):
    # constructor:
    def __init__(self, num_inputs, num_mfs, **kwargs):
        super(FS_Layer, self).__init__(**kwargs)
        self.num_inputs = num_inputs
        self.num_mfs = num_mfs
        self.num_rules = num_mfs ** num_inputs

    # call function:
    def call(self, membership_values):
        # this layer accepts the membership values, which have shape (batch_size, num_inputs, num_mfs):
        batch_size = tf.shape(membership_values)[0]

        # initialize the firing strengths:
        firing_strengths = tf.ones((batch_size, self.num_rules), dtype = tf.float32)

        # generate all the rule combinations:
        rules = list(product(range(self.num_mfs), repeat = self.num_inputs))    # example [(0, 0, 0), (0, 0, 1), ...]

        # need to check each input, each mf combination, and multiply their values together:
        for rule_index, combination in enumerate(rules):
            # print(f'combination: {combination}')
            rule_strength = tf.ones((batch_size, ), dtype = tf.float32)

            # for every input and membership function:
            for input_index, mf_index in enumerate(combination):
                # print(f'input: {input_index + 1} | mf: {mf_index + 1}')

                # correctly extract the fuzzified values based on the combination index:
                rule_strength *= membership_values[:, input_index, mf_index]
            
            # update the firing strengths:
            rule_strength = tf.expand_dims(rule_strength, axis = -1)  # shape: (batch_size, 1)
            firing_strengths = tf.concat(
                [firing_strengths[:, :rule_index], rule_strength, firing_strengths[:, rule_index + 1:]],
                axis = 1,
            )
            # print(f'firing strength: {firing_strengths}')

        return firing_strengths
    
# third layer -> normalization layer:
class NM_Layer(Layer):
    # constructor:
    def __init__(self, num_inputs, num_mfs, **kwargs):
        super(NM_Layer, self).__init__(**kwargs)
        self.num_inputs = num_inputs
        self.num_mfs = num_mfs

    # call function:
    def call(self, firing_strengths):
        # this function accepts inputs of size (batch_size, num_rules).
        # need to first get the total firing strength:
        total_firing_strength = tf.reduce_sum(firing_strengths, axis = 1, keepdims = True)
        
        # can now normalize the firing strengths:
        normalized_strengths = firing_strengths / (total_firing_strength + 1e-10)   # add a buffer in case the total firing strength is zero

        return normalized_strengths
    
# fourth layer -> consequent layer:
class CN_Layer(Layer):
    # constructor: 
    def __init__(self, num_inputs, num_mfs, **kwargs):
        super(CN_Layer, self).__init__(**kwargs)
        self.num_inputs = num_inputs
        self.num_mfs = num_mfs
        self.num_rules = num_mfs ** num_inputs

        # need to initialize the consequent parameters:
        self.consequent_params = self.add_weight(
            shape = (self.num_rules, self.num_inputs + 1),
            initializer = tf.keras.initializers.RandomUniform(-1.0, 1.0, seed = 1234),
            trainable = True,
            name = 'Consequent_Params'
        )

    # this function is used for manually setting the consequent parameters:
    def set_cons(self, params):
        # this function accepts parameters as an array of size (num_rules, num_inputs + 1):
        if params.shape != (self.num_rules, self.num_inputs + 1):
            raise ValueError(f'Parameters provided are not of correct shape, expected ({self.num_rules}, {self.num_inputs + 1})')
        
        # assign parameters:
        self.consequent_params = params

    # call function:
    def call(self, input_list):
        # unpack inputs from list:
        normalized_strengths, inputs = input_list

        # get the batch size:
        batch_size = tf.shape(normalized_strengths)[0]

        # the output is given by the multiplication of the inputs with the consequent weights,
        # such as: o_k = w_bar_k * (x_1 * p_k + x_2 * q_k + x_3 * r_k + ... + s_k)
        # can therefore extend the inputs to be (batch_size, num_inputs + bias) for ease of multiplication:
        inputs_with_bias = tf.concat([inputs, tf.ones((batch_size, 1), dtype = tf.float32)], axis = -1)

        # need to now reshape the normalized strengths to be of size (batch_size, num_rules, 1)
        # this effectively flips it into a 'column vector' of sorts, where each individual value is now vertically aligned
        normalized_strengths = tf.reshape(normalized_strengths, (batch_size, self.num_rules, 1))

        # get the consequent parameters, which have shape (num_rules, num_inputs + 1):
        consequent_params = self.consequent_params

        # expand inputs with bias to match the rule axis: (batch_size, num_rules, num_inputs + 1)
        inputs_with_bias_expanded = tf.expand_dims(inputs_with_bias, axis = 1)

        # calculate the consequent for each rule
        consequents = tf.reduce_sum(normalized_strengths * inputs_with_bias_expanded * consequent_params, axis = 2)

        return consequents

# fifth layer -> output layer:
class O_Layer(Layer):
    # constructor:
    def __init__(self, num_inputs, num_mfs, **kwargs):
        super(O_Layer, self).__init__(**kwargs)
        self.num_inputs = num_inputs
        self.num_output = num_mfs

    # call function:
    def call(self, consequents):
        output = tf.reduce_sum(consequents, axis = 1, keepdims = True)
        return output
    
# define a custom class for building models:
class ANFISModel(Model):
    # constructor for class:
    def __init__(self, input_shape, num_inputs, num_mfs):
        super(ANFISModel, self).__init__()
        # add layers to model:
        self.mf_layer = MF_Layer(num_inputs, num_mfs)
        self.fs_layer = FS_Layer(num_inputs, num_mfs)
        self.nm_layer = NM_Layer(num_inputs, num_mfs)
        self.cn_layer = CN_Layer(num_inputs, num_mfs)
        self.out_layer = O_Layer(num_inputs, num_mfs)

        # build the model:
        self.build((None, *input_shape))

    # call function:
    def call(self, inputs):
        # compute a forward pass:
        fuzzified = self.mf_layer(inputs)
        firing = self.fs_layer(fuzzified)
        norm = self.nm_layer(firing)
        consequents = self.cn_layer([norm, inputs])
        output = self.out_layer(consequents)

        return output

values for debugging the model:


In [235]:
# these are the testing parameters that I am using for the membership functions:
params = tf.constant(np.array([
    [  # Parameters for input 1
        [0.0, 0.0, 6.0],
        [5/6, 5.0, 55/6],
        [4.0, 10.0, 10.0]
    ],
    [  # Parameters for input 2
        [0.0, 0.0 , 15.0],
        [25/12, 12.5, 275/12],
        [10.0, 25.0, 25.0]
    ],
    [  # Parameters for input 3
        [0.0, 0.0, 30.0],
        [25/6, 25.0, 275/6],
        [15.0, 50.0, 50.0]
    ]
]), dtype = tf.float32)

# these are testing parameters used for debugging:
cons_params = tf.ones(shape = (27, 4), dtype = tf.float32)

# these are testing inputs used for debugging:
input = tf.constant([[8, 5, 32]], dtype = tf.float32)
# input = tf.constant([[8, 5, 32], [2, 17, 22]], dtype = tf.float32)

individual layer testing

In [236]:
# test first layer:
mf = MF_Layer(3, 3)     # instantiate
mf.set_weights(params)  # set to testing parameters

# pass values through first layer:
mf_output = mf(input)       # pass values
# print(f'layer 1 output: \n{mf_output}\n')

# pass values through second layer:
fs = FS_Layer(3, 3)         # instantiate
fs_output = fs(mf_output)   # pass values
# print(f'layer 2 output: \n{fs_output}\n')

# pass values through third layer:
nm = NM_Layer(3, 3)         # instantiate
nm_output = nm(fs_output)   # pass values
# print(f'layer 3 output: \n{nm_output}\n')

# pass values through the fourth layer:
cn = CN_Layer(3, 3)                 # instantiate
cn.set_cons(cons_params)            # assign testing params:
cn_output = cn([nm_output, input])  # pass values
# print(f'layer 4 output: \n{cn_output}\n')

# pass values through the fifth layer:
out = O_Layer(3,3)          # instantiate
output = out(cn_output)     # pass values
# print(f'layer 5 output: \n{output}\n')

add all layers into a keras model:

In [237]:
tf.keras.backend.clear_session()
model = ANFISModel(input_shape = (3,), num_inputs = 3, num_mfs = 3)