In [None]:
 try:
  import colab
  !pip install --upgrade pip
except:
  pass

### Restart RUNTIME

In [None]:
# !pip install transformers
# !pip install -q -U tfx
!pip install -q -U --use-feature=2020-resolver tfx

### Import Libraries

In [None]:
import os
import pprint
import tempfile
import urllib

import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
# from tfx.utils.dsl_utils import csv_input

%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [None]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

In [None]:
# # This is the root directory for your TFX pip package installation.
_tfx_root = tfx.__path__[0]

# This is the directory containing the YouTube Videos statistics Pipeline example.
_videos_root = os.path.join(_tfx_root, 'examples/Youtube_videos')

# This is the path where your model will be pushed for serving.
_serving_model_dir = os.path.join(
    tempfile.mkdtemp(), 'serving_model/videos_simple')

# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)

In [None]:
_data_root = tempfile.mkdtemp(prefix='tfx-data')
# DATA_PATH = 'https://raw.githubusercontent.com/varun-bhaseen/Dataset/master/Kaggle%20YouTube%20Statistics%20Dataset/videos.csv'
DATA_PATH = 'https://raw.githubusercontent.com/varun-bhaseen/Dataset/master/Kaggle%20YouTube%20Statistics%20Dataset/USvideos.csv'
_data_filepath = os.path.join(_data_root, "videos.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

In [None]:
!head {_data_filepath}

In [None]:
context = InteractiveContext()

### ExampleGen TFX

In [None]:
_data_root

In [None]:
# example_gen = CsvExampleGen(input=external_input(_data_root))
example_gen = CsvExampleGen(input_base=_data_root)
context.run(example_gen)

In [None]:
artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

We can also take a look at the first three training examples:

In [None]:
# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

### StatisticsGen
The `StatisticsGen` component computes statistics over your dataset for data analysis, as well as for use in downstream components. It uses the [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) library.

`StatisticsGen` takes as input the dataset we just ingested using `ExampleGen`.

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

After StatisticsGen finishes running, we can visualize the outputted statistics. Try playing with the different plots!

In [None]:
context.show(statistics_gen.outputs['statistics'])

### SchemaGen

The `SchemaGen` component generates a schema based on your data statistics. (A schema defines the expected bounds, types, and properties of the features in your dataset.) It also uses the [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) library.

Note: The generated schema is best-effort and only tries to infer basic properties of the data. It is expected that you review and modify it as needed.

`SchemaGen` will take as input the statistics that we generated with `StatisticsGen`, looking at the training split by default.

In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

After SchemaGen finishes running, we can visualize the generated schema as a table.

In [None]:
context.show(schema_gen.outputs['schema'])

### ExampleValidator
The `ExampleValidator` component detects anomalies in your data, based on the expectations defined by the schema. It also uses the [TensorFlow Data Validation](https://www.tensorflow.org/tfx/data_validation/get_started) library.

`ExampleValidator` will take as input the statistics from `StatisticsGen`, and the schema from `SchemaGen`.

In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

After ExampleValidator finishes running, we can visualize the anomalies as a table.

In [None]:
context.show(example_validator.outputs['anomalies'])

In the anomalies table, we can see that there are no anomalies. This is what we'd expect, since this the first dataset that we've analyzed and the schema is tailored to it. You should review this schema -- anything unexpected means an anomaly in the data. Once reviewed, the schema can be used to guard future data, and anomalies produced here can be used to debug model performance, understand how your data evolves over time, and identify data errors.

### Transform
The `Transform` component performs feature engineering for both training and serving. It uses the [TensorFlow Transform](https://www.tensorflow.org/tfx/transform/get_started) library.

`Transform` will take as input the data from `ExampleGen`, the schema from `SchemaGen`, as well as a module that contains user-defined Transform code.

Let's see an example of user-defined Transform code below (for an introduction to the TensorFlow Transform APIs, [see the tutorial](https://www.tensorflow.org/tfx/tutorials/transform/simple)). First, we define a few constants for feature engineering:

Note: The `%%writefile` cell magic will save the contents of the cell as a `.py` file on disk. This allows the `Transform` component to load your code as a module.



In [None]:
_videos_constants_module_file = 'videos_constants.py'

In [None]:
%%writefile {_videos_constants_module_file}




Prepare Data

In [None]:
def prepare_data(filename):
    
    df = pd.read_csv(filename)
    
    # get text embeddings
    print("generating bert embedding videos ....")
    
    # combine all video info into one embeddings
    df['video_emb'] = df['title']

    tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
    model = TFBertModel.from_pretrained('bert-base-cased')
    max_length = 10 # change it to your computer capacity
    batch_encoding = tokenizer.batch_encode_plus(df['video_emb'].tolist(), max_length=max_length, pad_to_max_length=True)

    outputs = model(tf.convert_to_tensor(batch_encoding['input_ids'])) # shape: (batch,sequence length, hidden state)
    
    embeddings_video = tf.reduce_mean(outputs[0],1)
    
    df['video_emb'] = embeddings_video.numpy().tolist()
  
    print("generating bert embedding for user...")
    # assuming tags as user interested tags
    batch_encoding_user = tokenizer.batch_encode_plus(df['tags'].tolist(), max_length=max_length, pad_to_max_length=True)
    outputs_user = model(tf.convert_to_tensor(batch_encoding_user['input_ids'])) # shape: (batch,sequence length, hidden state)
    embeddings_user = tf.reduce_mean(outputs_user[0],1)
    df['user_emb'] = embeddings_user.numpy().tolist()

    # min max scaling
    scaler = MinMaxScaler()
    df[['views', 'likes', 'dislikes','comment_total']] = scaler.fit_transform(df[['views', 'likes', 'dislikes','comment_total']])

    # to speedup:
    df = df.reset_index(drop=True) #shuffle df
    return df

MMOE clas

In [None]:
class MMoE(Layer):
    """
    Multi-gate Mixture-of-Experts model.
    """

    def __init__(self,
                 units,
                 num_experts,
                 num_tasks,
                 use_expert_bias=True,
                 use_gate_bias=True,
                 expert_activation='relu',
                 gate_activation='softmax',
                 expert_bias_initializer='zeros',
                 gate_bias_initializer='zeros',
                 expert_bias_regularizer=None,
                 gate_bias_regularizer=None,
                 expert_bias_constraint=None,
                 gate_bias_constraint=None,
                 expert_kernel_initializer='VarianceScaling',
                 gate_kernel_initializer='VarianceScaling',
                 expert_kernel_regularizer=None,
                 gate_kernel_regularizer=None,
                 expert_kernel_constraint=None,
                 gate_kernel_constraint=None,
                 activity_regularizer=None,
                 **kwargs):
        """
         Method for instantiating MMoE layer.
        :param units: Number of hidden units
        :param num_experts: Number of experts
        :param num_tasks: Number of tasks
        :param use_expert_bias: Boolean to indicate the usage of bias in the expert weights
        :param use_gate_bias: Boolean to indicate the usage of bias in the gate weights
        :param expert_activation: Activation function of the expert weights
        :param gate_activation: Activation function of the gate weights
        :param expert_bias_initializer: Initializer for the expert bias
        :param gate_bias_initializer: Initializer for the gate bias
        :param expert_bias_regularizer: Regularizer for the expert bias
        :param gate_bias_regularizer: Regularizer for the gate bias
        :param expert_bias_constraint: Constraint for the expert bias
        :param gate_bias_constraint: Constraint for the gate bias
        :param expert_kernel_initializer: Initializer for the expert weights
        :param gate_kernel_initializer: Initializer for the gate weights
        :param expert_kernel_regularizer: Regularizer for the expert weights
        :param gate_kernel_regularizer: Regularizer for the gate weights
        :param expert_kernel_constraint: Constraint for the expert weights
        :param gate_kernel_constraint: Constraint for the gate weights
        :param activity_regularizer: Regularizer for the activity
        :param kwargs: Additional keyword arguments for the Layer class
        """
        # Hidden nodes parameter
        self.units = units
        self.num_experts = num_experts
        self.num_tasks = num_tasks

        # Weight parameter
        self.expert_kernels = None
        self.gate_kernels = None
        self.expert_kernel_initializer = initializers.get(expert_kernel_initializer)
        self.gate_kernel_initializer = initializers.get(gate_kernel_initializer)
        self.expert_kernel_regularizer = regularizers.get(expert_kernel_regularizer)
        self.gate_kernel_regularizer = regularizers.get(gate_kernel_regularizer)
        self.expert_kernel_constraint = constraints.get(expert_kernel_constraint)
        self.gate_kernel_constraint = constraints.get(gate_kernel_constraint)

        # Activation parameter
        self.expert_activation = activations.get(expert_activation)
        self.gate_activation = activations.get(gate_activation)

        # Bias parameter
        self.expert_bias = None
        self.gate_bias = None
        self.use_expert_bias = use_expert_bias
        self.use_gate_bias = use_gate_bias
        self.expert_bias_initializer = initializers.get(expert_bias_initializer)
        self.gate_bias_initializer = initializers.get(gate_bias_initializer)
        self.expert_bias_regularizer = regularizers.get(expert_bias_regularizer)
        self.gate_bias_regularizer = regularizers.get(gate_bias_regularizer)
        self.expert_bias_constraint = constraints.get(expert_bias_constraint)
        self.gate_bias_constraint = constraints.get(gate_bias_constraint)

        # Activity parameter
        self.activity_regularizer = regularizers.get(activity_regularizer)

        # Keras parameter
        self.input_spec = InputSpec(min_ndim=2)
        self.supports_masking = True

        super(MMoE, self).__init__(**kwargs)

    def build(self, input_shape):
        """
        Method for creating the layer weights.
        :param input_shape: Keras tensor (future input to layer)
                            or list/tuple of Keras tensors to reference
                            for weight shape computations
        """
        assert input_shape is not None and len(input_shape) >= 2

        input_dimension = input_shape[-1]

        # Initialize expert weights (number of input features * number of units per expert * number of experts)
        self.expert_kernels = self.add_weight(
            name='expert_kernel',
            shape=(input_dimension, self.units, self.num_experts),
            initializer=self.expert_kernel_initializer,
            regularizer=self.expert_kernel_regularizer,
            constraint=self.expert_kernel_constraint,
        )

        # Initialize expert bias (number of units per expert * number of experts)
        if self.use_expert_bias:
            self.expert_bias = self.add_weight(
                name='expert_bias',
                shape=(self.units, self.num_experts),
                initializer=self.expert_bias_initializer,
                regularizer=self.expert_bias_regularizer,
                constraint=self.expert_bias_constraint,
            )

        # Initialize gate weights (number of input features * number of experts * number of tasks)
        self.gate_kernels = [self.add_weight(
            name='gate_kernel_task_{}'.format(i),
            shape=(input_dimension, self.num_experts),
            initializer=self.gate_kernel_initializer,
            regularizer=self.gate_kernel_regularizer,
            constraint=self.gate_kernel_constraint
        ) for i in range(self.num_tasks)]

        # Initialize gate bias (number of experts * number of tasks)
        if self.use_gate_bias:
            self.gate_bias = [self.add_weight(
                name='gate_bias_task_{}'.format(i),
                shape=(self.num_experts,),
                initializer=self.gate_bias_initializer,
                regularizer=self.gate_bias_regularizer,
                constraint=self.gate_bias_constraint
            ) for i in range(self.num_tasks)]

        self.input_spec = InputSpec(min_ndim=2, axes={-1: input_dimension})

        super(MMoE, self).build(input_shape)

    def call(self, inputs, **kwargs):
        """
        Method for the forward function of the layer.
        :param inputs: Input tensor
        :param kwargs: Additional keyword arguments for the base method
        :return: A tensor
        """
        gate_outputs = []
        final_outputs = []

        # add a shared bottom layer (relu layer)

        # f_{i}(x) = activation(W_{i} * x + b), where activation is ReLU according to the paper
        # expert_outputs = K.tf.tensordot(a=inputs, b=self.expert_kernels, axes=1)
        expert_outputs = tf.tensordot(a=inputs, b=self.expert_kernels, axes=1)
        # Add the bias term to the expert weights if necessary
        if self.use_expert_bias:
            expert_outputs = K.bias_add(x=expert_outputs, bias=self.expert_bias)
        expert_outputs = self.expert_activation(expert_outputs)

        # g^{k}(x) = activation(W_{gk} * x + b), where activation is softmax according to the paper
        for index, gate_kernel in enumerate(self.gate_kernels):
            gate_output = K.dot(x=inputs, y=gate_kernel)
            # Add the bias term to the gate weights if necessary
            if self.use_gate_bias:
                gate_output = K.bias_add(x=gate_output, bias=self.gate_bias[index])
            gate_output = self.gate_activation(gate_output)
            gate_outputs.append(gate_output)

        # f^{k}(x) = sum_{i=1}^{n}(g^{k}(x)_{i} * f_{i}(x))
        for gate_output in gate_outputs:
            expanded_gate_output = K.expand_dims(gate_output, axis=1)
            weighted_expert_output = expert_outputs * K.repeat_elements(expanded_gate_output, self.units, axis=1)
            final_outputs.append(K.sum(weighted_expert_output, axis=2))

        return final_outputs

    def compute_output_shape(self, input_shape):
        """
        Method for computing the output shape of the MMoE layer.
        :param input_shape: Shape tuple (tuple of integers)
        :return: List of input shape tuple where the size of the list is equal to the number of tasks
        """
        assert input_shape is not None and len(input_shape) >= 2

        output_shape = list(input_shape)
        output_shape[-1] = self.units
        output_shape = tuple(output_shape)

        return [output_shape for _ in range(self.num_tasks)]

    def get_config(self):
        """
        Method for returning the configuration of the MMoE layer.
        :return: Config dictionary
        """
        config = {
            'units': self.units,
            'num_experts': self.num_experts,
            'num_tasks': self.num_tasks,
            'use_expert_bias': self.use_expert_bias,
            'use_gate_bias': self.use_gate_bias,
            'expert_activation': activations.serialize(self.expert_activation),
            'gate_activation': activations.serialize(self.gate_activation),
            'expert_bias_initializer': initializers.serialize(self.expert_bias_initializer),
            'gate_bias_initializer': initializers.serialize(self.gate_bias_initializer),
            'expert_bias_regularizer': regularizers.serialize(self.expert_bias_regularizer),
            'gate_bias_regularizer': regularizers.serialize(self.gate_bias_regularizer),
            'expert_bias_constraint': constraints.serialize(self.expert_bias_constraint),
            'gate_bias_constraint': constraints.serialize(self.gate_bias_constraint),
            'expert_kernel_initializer': initializers.serialize(self.expert_kernel_initializer),
            'gate_kernel_initializer': initializers.serialize(self.gate_kernel_initializer),
            'expert_kernel_regularizer': regularizers.serialize(self.expert_kernel_regularizer),
            'gate_kernel_regularizer': regularizers.serialize(self.gate_kernel_regularizer),
            'expert_kernel_constraint': constraints.serialize(self.expert_kernel_constraint),
            'gate_kernel_constraint': constraints.serialize(self.gate_kernel_constraint),
            'activity_regularizer': regularizers.serialize(self.activity_regularizer)
        }
        base_config = super(MMoE, self).get_config()

        return dict(list(base_config.items()) + list(config.items()))

In [None]:
# Simple callback to print out ROC-AUC, only for classification
class ROCCallback(Callback):
    def __init__(self, training_data, validation_data, test_data):
        self.train_X = training_data[0]
        self.train_Y = training_data[1]
        self.validation_X = validation_data[0]
        self.validation_Y = validation_data[1]
        self.test_X = test_data[0]
        self.test_Y = test_data[1]

    def on_train_begin(self, logs={}):
        return

    def on_train_end(self, logs={}):
        return

    def on_epoch_begin(self, epoch, logs={}):
        return

    def on_epoch_end(self, epoch, logs={}):
        train_prediction = self.model.predict(self.train_X)
        validation_prediction = self.model.predict(self.validation_X)
        test_prediction = self.model.predict(self.test_X)

        # Iterate through each task and output their ROC-AUC across different datasets
        for index, output_name in enumerate(self.model.output_names):
            if (output_name == 'user_click') or (output_name == 'user_like'): # cassification is roc-auc score
                train_roc_auc = roc_auc_score(self.train_Y[index], np.squeeze(train_prediction[index]))
                validation_roc_auc = roc_auc_score(self.validation_Y[index], np.squeeze(validation_prediction[index]))
                test_roc_auc = roc_auc_score(self.test_Y[index], np.squeeze(test_prediction[index]))
                print(
                    'ROC-AUC-{}-Train: {} ROC-AUC-{}-Validation: {} ROC-AUC-{}-Test: {}'.format(
                        output_name, round(train_roc_auc, 4),
                        output_name, round(validation_roc_auc, 4),
                        output_name, round(test_roc_auc, 4)
                    )
                )
            elif (output_name == 'user_rating') or (output_name == 'time_spend'): # regression is explained variance
                train_roc_auc = explained_variance_score(self.train_Y[index], np.squeeze(train_prediction[index]))
                validation_roc_auc = explained_variance_score(self.validation_Y[index], np.squeeze(validation_prediction[index]))
                test_roc_auc = explained_variance_score(self.test_Y[index], np.squeeze(test_prediction[index]))
                print(
                    'explained-variance-score-{}-Train: {} explained-variance-score-{}-Validation: {} explained-variance-score-{}-Test: {}'.format(
                        output_name, round(train_roc_auc, 4),
                        output_name, round(validation_roc_auc, 4),
                        output_name, round(test_roc_auc, 4)
                    )
                )

        return

    def on_batch_begin(self, batch, logs={}):
        return

    def on_batch_end(self, batch, logs={}):
        return

In [None]:
def build_model():
    # Set up the input layer
    input_video_emb = Input(shape=(768,))
    input_likes = Input(shape=(1,))
    input_dislikes = Input(shape=(1,))
    input_comments = Input(shape=(1,))
    input_user_emb = Input(shape=(768,))
    input_views = Input(shape=(1,))
    #input = Concatenate()([input_video_emb,input_user_emb,input_other_features])
    input = Concatenate()([input_video_emb,input_user_emb,input_likes,input_dislikes,input_comments,input_views])
    
    input_layer = ReLU()(input)

    # add the shared ReLu layer
    # Set up MMoE layer
    mmoe_layers = MMoE(
        units=4,
        num_experts=8,
        num_tasks=4
    )(input_layer)

    output_layers = []
    print("Output is user_click, user_rating, user_like and time_spend...")
    output_info = [(1, 'user_click'),(1,'user_rating'),(1,'user_like'),(1,'time_spend')]  # the rating is categorical or regression?
    output_activation = ['softmax','linear','softmax','linear'] # None (linear) activation for regression task; softmax for classification

    # Build tower layer from MMoE layer
    for index, task_layer in enumerate(mmoe_layers):
        tower_layer = Dense(
            units=8,
            activation='relu',
            kernel_initializer=VarianceScaling())(task_layer)
        output_layer = Dense(
            units=output_info[index][0],
            name=output_info[index][1],
            activation=output_activation[index],
            kernel_initializer=VarianceScaling())(tower_layer)
        output_layers.append(output_layer)

    # Compile model
    # model = Model(inputs=[input_video_tags,input_video_title,input_video_desp,input_video_view], outputs=output_layers)
    model = Model(inputs=[input_video_emb,input_user_emb,input_likes,input_dislikes,input_comments,input_views], outputs=output_layers)
    return model

### Trainer
The `Trainer` component will train a model that you define in TensorFlow. Default Trainer support Estimator API, to use Keras API, you need to specify [Generic Trainer](https://github.com/tensorflow/community/blob/master/rfcs/20200117-tfx-generic-trainer.md) by setup `custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor)` in Trainer's contructor.

`Trainer` takes as input the schema from `SchemaGen`, the transformed data and graph from `Transform`, training parameters, as well as a module that contains user-defined model code.

Let's see an example of user-defined model code below (for an introduction to the TensorFlow Keras APIs, [see the tutorial](https://www.tensorflow.org/guide/keras)):

In [None]:
#### ==================
## train the model
#### ==================
def train_ranking_model(df):

    train, val_test = train_test_split(df, test_size=0.3)
    val, test = train_test_split(val_test, test_size=0.5)


    train_label = [train[col_name].values for col_name in ['user_click', 'user_rating', 'user_like', 'time_spend']]
    # Tensorflow - ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type float)
    train_data = [np.asarray(np.squeeze(train[['video_emb']].values.tolist())).astype(np.float32),
                  np.asarray(np.squeeze(train[['user_emb']].values.tolist())).astype(np.float32),
                  train[['likes']].values,
                  train[['dislikes']].values,
                  train[['comment_total']].values,
                  train[['views']].values] # todo: user demographics, device, time, and location
                  # np.asarray(train[['view_count']].values.tolist()).astype(np.float32)]

    validation_label = [val[col_name].values for col_name in ['user_click', 'user_rating', 'user_like', 'time_spend']]
    validation_data = [np.asarray(np.squeeze(val[['video_emb']].values.tolist())).astype(np.float32),
                       np.asarray(np.squeeze(val[['user_emb']].values.tolist())).astype(np.float32),
                        val[['likes']].values,
                        val[['dislikes']].values,
                        val[['comment_total']].values,
                        val[['views']].values]
                       
                  # np.asarray(val[['view_count']].values.tolist()).astype(np.float32)]

    test_label = [test[col_name].values for col_name in ['user_click', 'user_rating', 'user_like', 'time_spend']]
    test_data = [np.asarray(np.squeeze(test[['video_emb']].values.tolist())).astype(np.float32),
                 np.asarray(np.squeeze(test[['user_emb']].values.tolist())).astype(np.float32),
                 test[['likes']].values,
                 test[['dislikes']].values,
                 test[['comment_total']].values,
                 test[['views']].values]
                  # np.asarray(test[['view_count']].values.tolist()).astype(np.float32)]

    print('Training data shape = {}'.format(train.shape))
    print('Validation data shape = {}'.format(val.shape))
    print('Test data shape = {}'.format(test.shape))

    model = build_model()

    adam_optimizer = Adam()
    model.compile(
        loss={'user_click': 'binary_crossentropy', 'user_rating':'MSE','user_like': 'binary_crossentropy','time_spend':'MSE'},
        optimizer=adam_optimizer,
        metrics=['accuracy']
    )

    # Print out model architecture summary
    model.summary()

    # CALLBACKS for main model
    checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("mmoe.hdf5", save_best_only=True)
    early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
    reduce_lr_cb = tf.keras.callbacks.LearningRateScheduler(lambda x: 1e-3 * 0.9 ** x)
    tensorboard_cb = tf.keras.callbacks.TensorBoard(run_logdir)
   
    # Train the model,
    model.fit(
        x=train_data,
        y=train_label,
        validation_data=(validation_data, validation_label),
        callbacks=[
            ROCCallback(
                training_data=(train_data, train_label),
                validation_data=(validation_data, validation_label),
                test_data=(test_data, test_label)
                ),
                checkpoint_cb, reduce_lr_cb, early_stopping_cb, tensorboard_cb
        ],
        epochs=100
    )

    return model

In [None]:
def train_position_bias_model(df):
    ###  ============================
    ## adding a shallow side tower to learn selection biase
    # measure https://en.wikipedia.org/wiki/Propensity_score_matching
    #### =============================
    ## train the selection bias
    ## "shallow tower": input: item position; output: relevance (clicked or not);

    pos_shallow_tower = tf.keras.Sequential(
        [
            tf.keras.layers.Dense(64, activation='relu', input_shape=(4,)),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(1, activation='softmax')
        ]
    )

    print("the network structure for position bias prediction:",pos_shallow_tower.summary())
    # sgd_opt = tf.keras.optimizers.SGD(lr=0.001)
    pos_shallow_tower.compile(
        loss='binary_crossentropy',
        optimizer='adam',
        metrics=['accuracy']
    )

    position_df = pd.get_dummies(df, columns=['device_info'], prefix='', prefix_sep='')[['android', 'ios', 'web','pos_bias','position']]
    assert position_df.isnull().any().any() == False

    train_pos, val_pos = train_test_split(position_df, test_size=0.2)
    train_pos_data = train_pos.drop(columns={'pos_bias'})
    train_pos_label = train_pos[['pos_bias']]

    validation_pos_data =  val_pos.drop(columns={'pos_bias'})
    validation_pos_label = val_pos[['pos_bias']]

    # CALLBACKS for shallow tower
    checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("biase.hdf5", save_best_only=True)
    early_stopping_cb = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
    reduce_lr_cb = tf.keras.callbacks.LearningRateScheduler(lambda x: 1e-3 * 0.9 ** x)
    tensorboard_cb = tf.keras.callbacks.TensorBoard(run_logdir)

    pos_shallow_tower.fit(
        x=train_pos_data,
        y=train_pos_label,
        validation_data=(validation_pos_data, validation_pos_label),
        epochs=20,
        callbacks=[checkpoint_cb, reduce_lr_cb, early_stopping_cb, tensorboard_cb],
    )
    return pos_shallow_tower

In [None]:
def final_score(weights_for_engagement, weights_for_satification, main_model, position_biase_model,test_main,test_position):

    print("the manually set weights for user engagement is", str(weights_for_engagement))
    print("the manually set weights for user satisfaction is", str(weights_for_satification))

    preds = main_model.predict(test_main)
    user_click = preds[0]
    user_rating = preds[1]
    user_like = preds[2]
    time_spend = preds[3]

    preds_position = position_biase_model.predict(test_position)
    return weights_for_engagement*expit(user_click+time_spend+preds_position) + weights_for_satification*expit(user_rating+user_like)

### Evaluator
The `Evaluator` component computes model performance metrics over the evaluation set. It uses the [TensorFlow Model Analysis](https://www.tensorflow.org/tfx/model_analysis/get_started) library. The `Evaluator` can also optionally validate that a newly trained model is better than the previous model. This is useful in a production pipeline setting where you may automatically train and validate a model every day. In this notebook, we only train one model, so the `Evaluator` automatically will label the model as "good". 

`Evaluator` will take as input the data from `ExampleGen`, the trained model from `Trainer`, and slicing configuration. The slicing configuration allows you to slice your metrics on feature values (e.g. how does your model perform on taxi trips that start at 8am versus 8pm?). See an example of this configuration below:

Train model

In [None]:
df = prepare_data('videos.csv')

In [None]:
# creating models
print("train the main model ...")
main_model = train_ranking_model(df)
print("train the shallow tower for position bias...")
position_biase_model = train_position_bias_model(df)

Create Candidate List for the query

In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
def get_index_from_title(title):
    return df[df.title == title]["index"].values[0]

In [None]:
cv = CountVectorizer()
count_matrix = cv.fit_transform(df["tags"])

In [None]:
cosine_sim = cosine_similarity(count_matrix)
df['index'] = range(0, len(df) )

Our query

In [None]:
query = "iPhone X + iPhone 8 Hands on!"
movie_index = get_index_from_title(query)
print('Query = ', movie_index, query)

In [None]:
# get similar movies
similar_movies = list(enumerate(cosine_sim[movie_index]))
sorted_similar_movies = sorted(similar_movies, key=lambda x:x[1], reverse=True)

In [None]:
# return top 50 as our candidate list
sorted_similar = sorted_similar_movies[1:51]
movie_indices = [i[0] for i in sorted_similar]
topMovies = df.iloc[movie_indices]
topMovies.head()

Save Candidate list 

In [None]:
#topMovies.to_csv('CandidateList.csv') 

### Pusher
The `Pusher` component is usually at the end of a TFX pipeline. It checks whether a model has passed validation, and if so, exports the model to `_serving_model_dir`.

Rank our Candidate List using MMOe

In [None]:
from keras.models import load_model
print("ranking...")
test_data =  topMovies       
test_main = [np.asarray(np.squeeze(test_data[['video_emb']].values.tolist())).astype(np.float32),
          np.asarray(np.squeeze(test_data[['user_emb']].values.tolist())).astype(np.float32),
          test_data[['likes']].values,
          test_data[['dislikes']].values,
          test_data[['comment_total']].values,
          test_data[['views']].values]

test_position = pd.get_dummies(test_data, columns=['device_info'], prefix='', prefix_sep='')[['android', 'ios', 'web','position']]
print("The final score for test data is...")

# try to load saved models
position_biase = load_model('biase.hdf5')
mmoe = build_model()
mmoe.load_weights('mmoe.hdf5')
adam_optimizer = Adam()
mmoe.compile(
        loss={'user_click': 'binary_crossentropy', 'user_rating':'MSE','user_like': 'binary_crossentropy','time_spend':'MSE'},
        optimizer=adam_optimizer,
        metrics=['accuracy'])
final_score_list = final_score(0.2, 0.8, mmoe, position_biase,test_main,test_position)
#final_score_list = final_score(0.2, 0.8, main_model, position_biase_model,test_main,test_position)
print("Final score = ", final_score_list)

In [None]:
# merging results from model.predict() with test dataset
test_data['final_score'] = final_score_list

In [None]:
#sort by final_score desc
final_set = test_data.sort_values(by='final_score', ascending=False)
#select top 5 based on final_score
final_set[['title','final_score', 'tags']].head(5) 