In [None]:
import tensorflow as tf
import tensorflow_probability as tfp

from tensorflow.keras import layers
from tensorflow.keras.models import Model

import pandas as pd
import numpy as np

### Create custom layer

Задача: необходимо выполнить кластеризацию данных простыми правилами, имея только ряд из 8 значений для анализа. Традиционно это можно решить написав простой scrip, который реализует определенную логику. Ниже представлен метод реализации через Tensorflow Functional API. Главное преимущество такого подхода - сохранение его как модель tensorflow

In [None]:
tf.config.run_functions_eagerly(True)

In [None]:
class ClusterLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(ClusterLayer, self).__init__(**kwargs)
        self.status = tf.Variable([0], dtype=tf.float32)

    def build(self, input_shape):
        self.output_dim = 1
    
    def call(self, inputs):
        @tf.function(input_signature=[tf.TensorSpec(shape=[8], dtype=tf.float32)])
        # Определяем функцию расчета кластера
        # На вход передается массив из 8 значений (1-lifetime, 7 - динамические показатели)
        # На выходе получаем номер кластера
        def calc_cluster_func(x):
            # Определяем функцию определения кластера для НЕ новичков
            # На входе имеем тот же массив из 8 значений
            # На выходе номер кластера
            def not_newbie():
                not_nan_tensor = x[1:]
                not_negative = not_nan_tensor[not_nan_tensor >= 0]
                tf.cond(tf.size(not_negative) <= 1,
                        lambda: self.status[0].assign(1.0),
                        lambda: tf.cond(tf.size(not_negative) == 2,
                                        lambda: f_shape_m2(not_nan_tensor),
                                        lambda: tf.cond(tf.size(not_negative) == 3,
                                                        lambda: f_shape_m3(not_nan_tensor),
                                                        lambda: check_stationarity(not_nan_tensor))))
                return self.status
            
            # Определяем функцию расчета кластера, для ряда в котором более 3-х значений
            # На входе имеем массив из 7 значений (без lifetime)
            # На выходе - номер кластера
            def check_stationarity(a):
                uniq, idx, cnt = uniq, idx, cnt = tf.unique_with_counts(a[a >= 0])
                INDEX_TENSOR = tf.where(tf.math.less_equal(0.0, a))
                tf.cond(
                    tf.gather(a, INDEX_TENSOR[-1]) == 0.0,
                    lambda: self.status[0].assign(999.0),
                    lambda: tf.cond(
                        tf.size(uniq) != 1,
                        lambda: stationarity(a),
                        lambda: self.status[0].assign(4.0)
                    )
                )
                return self.status
            
            # Определяем функцию проверки временного ряда на стационарность
            # На входе имеем массив из 7 значений (без lifetime)
            # На выходе - номер кластера
            def stationarity(a):
                kpss_stat, p_value, nlags, crit_dict = kpss(a)
                INDEX_TENSOR = tf.where(tf.math.less_equal(0.0, a))
                tf.cond(kpss_stat[0] > 0.463,
                        lambda: tf.cond(tf.gather(a, INDEX_TENSOR[-1]) == 0.0,
                                        lambda: self.status[0].assign(999.0),
                                        lambda: self.status[0].assign(4.0)),
                        lambda: kt(a))

                return self.status
            
            # Функция расчета статистики KPSS
            # На входе имеем массив из 7 значений (без lifetime)
            # На выходе - KPSS статистики, p-value, и критические значения
            def kpss(a):
                 nobs = tf.cast(tf.size(a[a>=0]), tf.float32)
                 resids = tf.math.subtract(a[a>=0], tf.math.reduce_mean(a[a>=0]))
                 crit = tf.constant([0.347, 0.463, 0.574, 0.739])
                 
                 nlags = tf.cast(tf.math.ceil(12.0 * tf.math.pow(tf.math.divide(nobs, 100.0), tf.math.divide(1.0, 4.0))), dtype=tf.int32)
                 nlags = tf.math.minimum(nlags, [nobs-1])
                
                 pvals = [0.10, 0.05, 0.025, 0.01]
                 eta = tf.math.divide(tf.math.reduce_sum(tf.math.cumsum(resids, 0) ** 2), (nobs ** 2))  # eq. 11, p. 165
                 s_hat = _sigma_est_kpss(resids, nobs, nlags)

                 kpss_stat = tf.math.divide(eta, s_hat)
                 p_value = tfp.math.interp_regular_1d_grid(kpss_stat, 
                                                          tf.math.reduce_min(crit),
                                                          tf.math.reduce_max(crit), 
                                                          pvals)

                 crit_dict = {"10%": crit[0], "5%": crit[1], "2.5%": crit[2], "1%": crit[3]}
                 
                 uniq2, idx2, cnt2 = tf.unique_with_counts(tf.reshape(kpss_stat, (-1,)))

                 return uniq2, p_value, nlags, crit_dict

            def _sigma_est_kpss(resids, nobs, lags):
                s_hat = tf.cast(tf.math.reduce_sum(resids ** 2), dtype=tf.float32)

                n = lags.get_shape().num_elements()+1
                for i in range(1, lags.get_shape().num_elements() + 1):
                    resids_prod = tf.tensordot(tf.gather(tf.reshape(resids, (-1,)), tf.reshape(tf.range(i, n), (-1,))), 
                                               tf.gather(tf.reshape(resids, (-1,)), tf.range(0, tf.cast(nobs, dtype=tf.int32) - i)), 0)
                    s_hat += 2 * resids_prod * (1.0 - (i / (tf.cast(lags, dtype=tf.float32) + 1.0)))
                return s_hat / nobs  


            # Определяем функцию расчета кластера на временном ряду из 3-х значений
            # На входе имеем массив из 7 значений (без lifetime)
            # На выходе - номер кластера
            def f_shape_m3(a):

                INDEX_TENSOR = tf.where(tf.math.less_equal(0.0, a))
                uniq, idx, cnt = uniq, idx, cnt = tf.unique_with_counts(a[a >= 0])

                tf.cond(tf.gather(a, INDEX_TENSOR[-1]) == 0.0,
                        lambda: self.status[0].assign(999.0),
                        lambda: tf.cond(tf.math.logical_and(tf.gather(a, INDEX_TENSOR[0]) == tf.gather(a, INDEX_TENSOR[1]), 
                                                            tf.gather(a, INDEX_TENSOR[0]) == tf.gather(a, INDEX_TENSOR[2])),
                                        lambda: self.status[0].assign(4.0),
                                        lambda: kt(a)))

                return self.status
            
            # Определяем функцию расчета статистики KendallTau (Определяем возрастающий и убывающий тренд)
            # На входе имеем массив из 7 значений (без lifetime)
            # На выходе - номер кластера
            def kt(a):
                corr = tfp.stats.kendalls_tau(a, tf.constant([1,2,3,4,5,6,7], dtype=tf.float32))
                tf.cond(corr > 0.0,
                        lambda: self.status[0].assign(2.0),
                        lambda: self.status[0].assign(3.0))
                return self.status
            
            # Определяем функцию расчета кластера для рядя из двух значений
            # На входе имеем массив из 7 значений (без lifetime)
            # На выходе - номер кластера
            def f_shape_m2(a):
                INDEX_TENSOR = tf.where(tf.math.less_equal(0.0, a))
                INDEX_DIFF = tf.math.subtract(INDEX_TENSOR[1], INDEX_TENSOR[0])
                
                tf.cond(tf.math.less(INDEX_DIFF, 3),
                        lambda: self.status[0].assign(1.0),
                        lambda: tf.cond(tf.gather(a, INDEX_TENSOR[-1]) == 0.0,
                                        lambda: self.status[0].assign(999.0),
                                        lambda: tf.cond(tf.gather(a, INDEX_TENSOR[1])> tf.gather(a, INDEX_TENSOR[0]),
                                                        lambda: self.status[0].assign(2.0),
                                                        lambda: tf.cond(tf.gather(a, INDEX_TENSOR[1])< tf.gather(a, INDEX_TENSOR[0]),
                                                                        lambda: self.status[0].assign(3.0),
                                                                        lambda: self.status[0].assign(4.0)))))
                return self.status

            # Делаем проверку, является ли профиль новичком
            # Первое значение в массиве является lifetime профиля
            tf.cond(x[0] < 5.0, 
                    lambda: self.status[0].assign(0.0), 
                    not_newbie)

            return self.status
        
        # Применяем функцию расчета кластера к входным данных
        out = tf.map_fn(calc_cluster_func, inputs)
        return tf.reshape(out, (-1, 1))

In [None]:
class FinalLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(FinalLayer, self).__init__(**kwargs)
    def build(self, input_shape):
        self.output_dim = 1
    def call(self, inputs):
        out = tf.map_fn(lambda x: tf.stack([x[0], x[1], x[2]]), inputs)
      
        return out 

In [None]:
# В качестве входных данным имеем массив из 10 значений (pid, sid, lifetime, dynamic_...)
model_input = layers.Input(shape=(10,), name='Inputlayer')

# Слой выделяем из входных данных pid и sid
model_lamda1 = layers.Lambda(lambda x: x[:, :2 ])(model_input)
# Использует данные без pid и sid
model_lamda2 = layers.Lambda(lambda x: x[:, 2:10])(model_input)
# Слой расчета кластера
model_analytic = ClusterLayer()(model_lamda2)
# Слой объединения (на выходе модель будет возвращать pid, sid и номер кластера)
model_concat = layers.concatenate([model_lamda1, model_analytic])

model_out = FinalLayer(name='FinalLayer')(model_concat)
# Собираем все слои в модель
model = Model(inputs=model_input, outputs=model_out)

In [None]:
# сохраняем модель
model.save('./model_0.8')





INFO:tensorflow:Assets written to: ./model_0.8/assets


INFO:tensorflow:Assets written to: ./model_0.8/assets
