# Predict The price of Books
---

Copyright [2021] [Data Scientist & ML Engineer: [Ahmed](https://machinehack.com/user/profile/ui/61c4874bf292faa49acf07a8)]

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

---

## Table of Contents

<table>
<thead>
  <tr>
      <th><a href='#Table-of-Contents'>Table of Contents</a></th>
    <th></th>
  </tr>
</thead>
<tbody>
  <tr>
      <td><a href='#An-Overview'>An Overview</a></td>
    <td></td>
  </tr>
  <tr>
    <td><a href='#Dependencies'>Dependencies</td>
    <td></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(A)-Install-Dependencies'>(A) Install Dependencies</a></td>
  </tr>
      <tr>
    <td></td>
          <td><a href='#(B)-Importing-Libraries'>(B) Importing Libraries</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(C)-Hardware-Dependencies'>(C) Hardware Dependencies</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(D)-Data-Dependences'>(D) Data Dependences</a></td>
  </tr>
  <tr>
      <td><a href='#Workflow-pipeline'>Workflow pipeline</a></td>
    <td></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(A)-Data-Ingestion'>(A) Data Ingestion</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(B)-Data-Exploration'>(B) Data Exploration</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(C)-Data-Analysis'>(C) Data Analysis</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(D)-Data-Preparation'>(D) Data Preparation</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(E)-Train-Model-&-Validate-Model'>(E) Train Model & Validate Model</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(F)-Evaluate-Model'>(F) Evaluate Model</a></td>
  </tr>
  <tr>
    <td></td>
      <td><a href='#(G)-Serving-Model'>(G) Serving Model</a></td>
  </tr>
</tbody>
</table>

## An Overview

<p style='font-size: 18px;font-weight:bold'>Transform</p>


- One of the best methodologies for building a fully managed DAG using [TFX](https://www.tensorflow.org/tfx).


- This [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph)  is built based on Feature Engineering:

    1. Input Layers:

        A. **Ratings**: it takes the rating of the customers about a book – the `scrape_Ratings` responsible for scraping the float value out of the binary tensors then scales the values using z-score in a Data Distributed manner and memory contributed way using [TF.Transform](https://www.tensorflow.org/tfx/tutorials/transform/census).

        B. **Reviews**: it similars to `Ratings` but the difference here is, using min-max normalization since the reviews have discrete values also by using TF.Transform. (TF. Transform is way better than Sklearn for preprocessing. Sklearn eats the memory, crucially.

        C. **BookCategory**: is a vocabulary list Built based on the [TensorFlow vocabulary file](https://www.tensorflow.org/api_docs/python/tf/feature_column/categorical_column_with_vocabulary_file).

        D. **Genre**: Similar to BookCategory.

    2. **DenseFeatures**: Responsible for creating `feature_column` layer out of all these layers (i.e embedding feature columns which are used after crossing all these features together.


- **How did I choose these layers out of all the other features?**

    * I can't forget to mention my favorite API of all times: [**TensorFlow Data Validation (TFDV)**.](https://www.tensorflow.org/tfx/data_validation/get_started) This API is one of the open-source APIs based on [**Facets**](https://pair-code.github.io/facets/). Using it helps me to discover the anomalies, Skew, Distribution Skew, Drifts, Data Interpolation & Extrapolation, etc. You will definitely want to use it when you want to visualize and analyze your data.

---

## Dependencies

### (A) Install Dependencies

**Run the below cells. Restart the kernel (Kernel > Restart kernel > Restart). Re-run the below cell and proceed further.**

<p style='font-size: 18px;font-weight:bold'> For Developing and ML Models</p>

In [None]:
!pip install -q tensorflow==2.7

<p style='font-size: 18px;font-weight:bold'>For Data Analysis & Visualization</p>

In [8]:
!pip install -q openpyxl



In [None]:
!pip install -q tensorflow-data-validation==1.5

In [2]:
import sys
import warnings
warnings.filterwarnings('ignore')

print('Installing TensorFlow Data Validation')
!pip install -q tensorflow_data_validation[visualization]

import tensorflow_data_validation as tfdv
print('TFDV version: {}'.format(tfdv.version.__version__))
# Confirm that we're using Python 3
assert sys.version_info.major is 3, 'Oops, not running Python 3. Use Runtime > Change runtime type'

Installing TensorFlow Data Validation
TFDV version: 1.5.0


<p style='font-size: 18px;font-weight:bold'>For Preprocessing & Transformation</p>

In [None]:
!pip install -q tensorflow-transform==1.5

<p style='font-size: 18px;font-weight:bold'>For Reusable Embedding & Transfor Learning</p>

In [3]:
!pip install -q tensorflow-hub==0.12



<p style='font-size: 18px;font-weight:bold'>For Shell  Arguments' execution & Commanding</p>

In [None]:
!pip install -q Fire==0.4

### (B) Importing Libraries

In [4]:
import os
import pandas as pd
import numpy as np
from operator import itemgetter
# import fire # If you want to execute the train & evalute in a form of Shell command.

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import feature_column as fc
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
from tensorflow.keras import layers
print(tf.__version__)

2.7.0


In [5]:
from typing import Text, Dict, List, Union, Tuple, Optional, NamedTuple

### (C) Hardware Dependencies

In [6]:
pyshical_devices = tf.config.experimental.list_physical_devices('GPU')
print(f"List of GPUs Available: {len(pyshical_devices)}")
try:
    tf.config.experimental.set_memory_growth(pyshical_devices[0],True)
except IndexError:
    print('GPU Not Found!')

List of GPUs Available: 0
GPU Not Found!


2022-01-07 12:30:46.741788: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcusolver.so.11'; dlerror: libcusolver.so.11: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/cuda/lib:/usr/local/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
2022-01-07 12:30:46.759695: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1850] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


### (D) Data Dependences

<span style='color:green;'>You can download the data using this link: [Predict The Price Of Books](https://machinehack.com/hackathon/predict_the_price_of_books/data)</span>

---

## Workflow pipeline

### (A) Data Ingestion

We're going to read the files so we can do some data exploration & Data Analysis

In [21]:
train_data = pd.read_excel("../input/predict-book-prices/train.xlsx")
test_data = pd.read_excel("../input/predict-book-prices/test.xlsx")

In [22]:
train_data.head()

Unnamed: 0,Title,Author,Edition,Reviews,Ratings,Synopsis,Genre,BookCategory,Price
0,The Prisoner's Gold (The Hunters 3),Chris Kuzneski,"Paperback,– 10 Mar 2016",4.0 out of 5 stars,8 customer reviews,THE HUNTERS return in their third brilliant no...,Action & Adventure (Books),Action & Adventure,220.0
1,Guru Dutt: A Tragedy in Three Acts,Arun Khopkar,"Paperback,– 7 Nov 2012",3.9 out of 5 stars,14 customer reviews,A layered portrait of a troubled genius for wh...,Cinema & Broadcast (Books),"Biographies, Diaries & True Accounts",202.93
2,Leviathan (Penguin Classics),Thomas Hobbes,"Paperback,– 25 Feb 1982",4.8 out of 5 stars,6 customer reviews,"""During the time men live without a common Pow...",International Relations,Humour,299.0
3,A Pocket Full of Rye (Miss Marple),Agatha Christie,"Paperback,– 5 Oct 2017",4.1 out of 5 stars,13 customer reviews,A handful of grain is found in the pocket of a...,Contemporary Fiction (Books),"Crime, Thriller & Mystery",180.0
4,LIFE 70 Years of Extraordinary Photography,Editors of Life,"Hardcover,– 10 Oct 2006",5.0 out of 5 stars,1 customer review,"For seven decades, ""Life"" has been thrilling t...",Photography Textbooks,"Arts, Film & Photography",965.62


### (B) Data Exploration

Now, let's discover and dive into the dataset to see what features we can use for this regression problem to help up find the best accuracy for predicting the prices of these books

In [23]:
train_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6237 entries, 0 to 6236
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Title         6237 non-null   object 
 1   Author        6237 non-null   object 
 2   Edition       6237 non-null   object 
 3   Reviews       6237 non-null   object 
 4   Ratings       6237 non-null   object 
 5   Synopsis      6237 non-null   object 
 6   Genre         6237 non-null   object 
 7   BookCategory  6237 non-null   object 
 8   Price         6237 non-null   float64
dtypes: float64(1), object(8)
memory usage: 438.7+ KB


In [24]:
train_data.describe()

Unnamed: 0,Price
count,6237.0
mean,560.707516
std,690.110657
min,25.0
25%,249.18
50%,373.0
75%,599.0
max,14100.0


### (C) Data Analysis

We're going to do some analysis to explore the patterns, which features we're going to use, and also the differences between the Train & Test datasets so we can check if there's any kinds of anomalies, drifts, or skews using Tensorflow Data Validation

In [25]:
# Generating protobuf data for visualizing the statistics
train_stats = tfdv.generate_statistics_from_dataframe(train_data)
test_stats = tfdv.generate_statistics_from_dataframe(test_data)

Let's visualize the datasets and see the differences

In [26]:
# Visualize the input statistics using Facets.
tfdv.visualize_statistics(lhs_statistics=train_stats, lhs_name='Train Dataset',
                          rhs_statistics=test_stats, rhs_name="Test Dataset")

<p style='font-size: 18px;font-weight:bold'>From this statistics we saw that;</p>

- The distribution skew of the `training dataset` is totally difference from the `test dataset` in these features:

    + `Edition`
    + `Author`
    + `Synopsis`
    + `Title`

    which means – we can't rely on them to predict the price.

- Also, we notice the data distribution in some other training dataset features are intrapolating with some other testing dataset features:
    
    + `Reviews`
    + `Ratings`
    + `Genre`
    + `BookCategory`

    which means – we can use them for predicting the price.

I still see that the 4 features we selected not enough to help with the price predictions. 

Test it! You will notice that it gives average predictions. There're multiple ways to use the benefits from the other features we neglected, but how!?

For example, we can use Transfer learning or reusable embedding, reusable embedding!?? Yes! We can use reusable embedding to embed one of our features and convert it from a useless feature to something we can rely on with high accuracy.

Welcome to **TensorFlow Hub**!
> The TensorFlow Hub lets you search and discover hundreds of trained, ready-to-deploy machine learning models in one place.

We're going to use [`nnlm-en-dim50`](https://tfhub.dev/google/nnlm-en-dim50/2):
> Token based text embedding trained on English Google News 7B corpus.

<span style='color:green;'> You can read more about the [Neural Probalistic Language Model](https://www.linkedin.com/posts/drxavier997_neural-probabilistic-language-model-activity-6861266851578679296-LofX)</span> 

In [27]:
train_data[['Genre', 'BookCategory']].groupby('BookCategory').count()

Unnamed: 0_level_0,Genre
BookCategory,Unnamed: 1_level_1
Action & Adventure,818
"Arts, Film & Photography",517
"Biographies, Diaries & True Accounts",596
Comics & Mangas,583
"Computing, Internet & Digital Media",510
"Crime, Thriller & Mystery",723
Humour,540
"Language, Linguistics & Writing",594
Politics,325
Romance,560


In [28]:
train_data.pivot_table(index='Genre', columns='BookCategory').count()

       BookCategory                        
Price  Action & Adventure                       40
       Arts, Film & Photography                 80
       Biographies, Diaries & True Accounts    124
       Comics & Mangas                          69
       Computing, Internet & Digital Media      90
       Crime, Thriller & Mystery                49
       Humour                                  101
       Language, Linguistics & Writing         113
       Politics                                 90
       Romance                                  28
       Sports                                   99
dtype: int64

<p style='font-size: 18px;font-weight:bold'>Train & Test Split</p>

In [34]:
msk = np.random.rand(len(train_data)) < 0.8
train_dataframe = train_data[msk]
dev_dataframe = train_data[~msk]

In [35]:
len(train_dataframe)

5011

In [36]:
print(f"length of Train Dataset: {len(train_dataframe)}")
print(f"length of Dev Dataset: {len(dev_dataframe)}")

length of Train Dataset: 5011
length of Dev Dataset: 1226


Make sure, you've lowercase the names of the columns for the features name of the execution graph (You will understand in the upcoming phases 

In [38]:
train_dataframe.columns = [feature.lower() for feature in train_dataframe.columns.to_list()]
dev_dataframe.columns = [feature.lower() for feature in dev_dataframe.columns.to_list()]

In [39]:
train_dataframe.columns

Index(['title', 'author', 'edition', 'reviews', 'ratings', 'synopsis', 'genre',
       'bookcategory', 'price'],
      dtype='object')

Now, let's save our 3 dataframes in CSV format, so we can prepare them 

In [40]:
os.makedirs(os.path.join('.','Dataset'))
train_dataframe.to_csv('./Dataset/train_data.csv', index=False)
dev_dataframe.to_csv('./Dataset/dev_data.csv', index=False)
test_data.to_csv("./Dataset/test_data.csv", index=False)

One for Batch Testing, later.

In [41]:
dev_dataframe.iloc[:50, :].to_csv('Dataset/batch_data.csv', index=False)

### (D) Data Preparation 

We're preparing our data for our mode: **Data Preparation** is one of the hardest, brainstorming phases you may struggle with within the ML Pipeline.

It requires the talent of understanding the data from domain knowledge. It shows how good you understand the data you held.

In [42]:
# By running this line, make sure that you're not doing any kind of operations.
# or you will face a problem due to graph execution
tf.config.run_functions_eagerly(False)

In [43]:
class Features(object):
    """
    This class contains all the main features I'm using in this project.
    """
    DEFAULTS_COLUMNS = ['title', 'author', 'edition', 'reviews', 'ratings', 'synopsis', 'genre',
                        'bookcategory', 'price']
    INFER_COLUMNS = ['title', 'author', 'edition', 'reviews', 'ratings', 'synopsis', 'genre',
                     'bookcategory']
    UNWANTED_FEATURES = ['synopsis', 'edition']
    FEATURES = ['title', 'author', 'genre', 'bookcategory','reviews', 'ratings']
    LABEL = 'price'
    DEFAULTS = [['null'], ['null'], ['null'], ['null'],['null'], ['null'], ['null'], ['null'], [0.0]]

In [44]:
def create_dataset(pattern: Text,
                   mode: Optional[Union[Text, None]],
                   batch_size: int,
                   num_epochs: int) -> Tuple[tf.data.Dataset,
                                             tf.data.Dataset]:
    """
    Create dataset using tf.data API from CSV file.
        Args:
            Pattern[Text]: Path of the CSV file.
            batch_size [int]: numbers of example per batch.
            mode [Optional[Text, None]]: decide whether we're going to shuffle the dataset 
                                         or not.
            num_epochs [int]: numbers of times proving the data for training.

    """
    def features_label(row_data: Dict[Text, tf.Tensor]) -> Tuple[
                                                            Dict[Text,
                                                                 tf.Tensor],
                                                            tf.Tensor]:
        """
        This function responsible for splitting the dataset into features & label
            Args:
                raw_data[Dict[Text, tf.Tensor]]: dictionary of CSV column names and tensor values.
            Returns:
                Tuple[Dict[Text, tf.Tensor], tf.Tensor]: Tuple of Dictionary of features' Tensors & label Tensor
        """
        label = row_data.pop(Features.LABEL)
        features = row_data
        for unwanted in Features.UNWANTED_FEATURES:
            features.pop(unwanted)

        return features, label

    dataset = tf.data.experimental.make_csv_dataset(file_pattern=pattern,
                                                    batch_size=batch_size,
                                                    column_names=Features.DEFAULTS_COLUMNS,
                                                    column_defaults=Features.DEFAULTS)
    dataset = dataset.map(features_label)

    if mode == 'train':
        num_epochs = None
        dataset = dataset.shuffle(buffer_size=batch_size * 10)
    else:
        num_epochs = num_epochs

    dataset = dataset.repeat(num_epochs)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

    return dataset

In [45]:
dataset = create_dataset("Dataset/batch_data.csv", "train", 10, 9)
dataset

<PrefetchDataset shapes: (OrderedDict([(title, (10,)), (author, (10,)), (reviews, (10,)), (ratings, (10,)), (genre, (10,)), (bookcategory, (10,))]), (10,)), types: (OrderedDict([(title, tf.string), (author, tf.string), (reviews, tf.string), (ratings, tf.string), (genre, tf.string), (bookcategory, tf.string)]), tf.float32)>

<p style='font-size: 18px;font-weight:bold'>Let's do some Feature Engineering</p>


First, we need to save all the titles of `Genre` & `BookCategory`

In [46]:
os.makedirs(os.path.join('.', 'Vocabulary'))
with open('Vocabulary/list_of_genre.txt', 'w') as log:
    for line in train_data['Genre'].unique().tolist():
        log.write(f"{line}\n")
with open('Vocabulary/list_of_bookCat.txt', 'w') as lobc:
    for line in train_data['BookCategory'].unique().tolist():
        lobc.write(f"{line}\n")
with open('Vocabulary/list_of_authors.txt', 'w') as auth:
    for line in train_data['Author'].tolist():
        auth.write(f'{line}\n')

tf.distribute.ReplicaContext(
    strategy, replica_id_in_sync_group
)
**For the next couple of cells, we'r going to build two transfomed functions that would help us in our pre-processing with `Reviews` & `Ratings` Features.**

In [None]:
train_data['Reviews'].unique()

First, we need to scrape the average of stars that clients have submitted to this book. Also, we know that the max stars are 5 and min number of stars is 1. (Realistically, a person may not submit a review. Therefore, the review value is zero. At the same time, it won't be logical to pass a zero start to books that not invoked in our scope)

In [47]:
# decorate all the tensors of reviews instead of one tensor
@tf.function
def format_reviews(review:
                   tf.TensorSpec(shape=[None],
                                 dtype=tf.string)) -> tf.Tensor:
    def formating(tensor: str) -> float:
        return float(tf.strings.split(tensor, maxsplit=1).numpy()[0])
    return tf.ensure_shape(tf.map_fn(lambda tensor:
                                        tf.py_function(formating, [tensor], tf.float32),
                                     review,
                                     fn_output_signature=tf.float32,
                                     parallel_iterations=1,
                                     swap_memory=True),
                           (None, ))


@tf.function
def scale_reviews(review:
                  tf.TensorSpec(shape=[None],
                                dtype=tf.float32)) -> tf.Tensor:
    return tft.scale_by_min_max(review,
                                output_min=1.0,
                                output_max=5.0)

Similarly, we need to scrape the number of ratings' people who submitted a review – then we want to scale them down.

What interest here is, it will be less efficient to use min-max scaling; why!?

The number of customers is submitting ratings may be infinite numbers not controlled by any barriers. Here, if we tried to invoke each tensor of ratings to the scale layer, we would end up with `NaN` values since it requires the mean and standard deviation of all the tensors to calculate the standard scaling (i.e. data normal distributed within each feature)[$^1$](https://towardsdatascience.com/all-about-feature-scaling-bcc0ad75cb35#:~:text=data%20is%20normally%20distributed%20within%20each%20feature).

It might be crazy, but I had to come up with an idea of how to scale all the tensors inside the dataset. But, I still have to add this into the model so, I can apply the **Transform** methodology.

<span style='background-color:yellow'>
Account this as the first limitation of using Transform out of using it inside the TFX framework.
</span>

In [48]:
train_data['Ratings'].unique()

array(['8 customer reviews', '14 customer reviews', '6 customer reviews',
       '13 customer reviews', '1 customer review', '72 customer reviews',
       '16 customer reviews', '111 customer reviews',
       '132 customer reviews', '17 customer reviews',
       '4 customer reviews', '3 customer reviews', '5 customer reviews',
       '2 customer reviews', '23 customer reviews', '76 customer reviews',
       '10 customer reviews', '9 customer reviews', '15 customer reviews',
       '34 customer reviews', '32 customer reviews',
       '49 customer reviews', '62 customer reviews',
       '61 customer reviews', '7 customer reviews', '18 customer reviews',
       '98 customer reviews', '12 customer reviews',
       '97 customer reviews', '285 customer reviews',
       '29 customer reviews', '27 customer reviews',
       '267 customer reviews', '24 customer reviews',
       '146 customer reviews', '95 customer reviews',
       '234 customer reviews', '35 customer reviews',
       '66 custome

In [49]:
# decorate all the tensors of ratings instead of one tensor
@tf.function
def format_ratings(rate:
                   tf.TensorSpec(shape=[None],
                                 dtype=tf.string)) -> tf.Tensor:
    def formating(tensor: tf.Tensor) -> tf.Tensor:
        return tf.strings.to_number(
                    tf.strings.regex_replace(
                        tf.strings.split(tensor, maxsplit=1).numpy()[0], ',', ''),
                    out_type=tf.float32)
    return tf.ensure_shape(
               tf.map_fn(lambda tensor:
                             tf.py_function(formating, [tensor], tf.float32),
                         rate,
                         fn_output_signature=tf.float32,
                         parallel_iterations=1,
                         swap_memory=True),
               (None,))


def format_ratings_ds(dataset: tf.data.Dataset,
                      batch_size: int,
                      steps: int) -> tf.data.Dataset:
    """Prepare the `ratings` feature for normalization"""
    dataset = dataset.map(lambda features, label: features['ratings'])
    dataset = dataset.map(format_ratings)
    scale = tf.keras.layers.experimental.preprocessing.Normalization(axis=None)
    scale.adapt(data=dataset, batch_size=batch_size, steps=steps)
    return scale


# Create our custom z-score
@tf.keras.utils.register_keras_serializable()
class ZScoreCustomPreprocessor(tf.keras.layers.Layer):
    """Custom Class for calculating the z-score"""

    def __init__(self,
                 scale_layer,
                 **kwargs):

        super(ZScoreCustomPreprocessor, self).__init__(**kwargs)
        self.lambda_layer = tf.keras.layers.Lambda
        self._scale = scale_layer

    @tf.function
    def transform(self,
                  value:
                      tf.TensorSpec(shape=[None], dtype=tf.float32)):
        return self._scale(value)

    def call(self, inputs):
        return self.lambda_layer(self.transform)(inputs)

    def get_config(self):  # For Keras custom Serializing
        config = super(ZScoreCustomPreprocessor, self).get_config()
        config.update({'scale_layer': self._scale})
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

If you notice – in the previous two cells, we started by executing the functions, eagerly. We calculate the actual values; for testing the return values, Then – we had to wrap both functions using `tf.function` to convert from the  [eager execution](https://towardsdatascience.com/eager-execution-vs-graph-execution-which-is-better-38162ea4dbf6#:~:text=TVBEATS%20on%20Unsplash-,eager%20execution,-Eager%20execution%20is) environment to the [graph execution](https://towardsdatascience.com/eager-execution-vs-graph-execution-which-is-better-38162ea4dbf6#:~:text=the%20Graph%20Execution.-,graph%20execution,-We%20covered%20how) environment to have the speed and the scalability of graph execution besides the parallelism.

In [50]:
authors_list = open('Vocabulary/list_of_authors.txt').read().splitlines()
len(authors_list)

6237

In [51]:
def transformer(inputs: Dict[Text, tf.Tensor],
                adapt_data: tf.data.Dataset,
                batch_size: int,
                steps: int,
                authors_list: List[Text],
                categorical_columns: List[Text]) -> Tuple[Dict[Text,
                                                               tf.Tensor],
                                                          Dict[Text,
                                                               Union[fc.numeric_column,
                                                                     fc.bucketized_column,
                                                                     fc.categorical_column_with_vocabulary_file,
                                                                     fc.embedding_column]]]:
    """
    Transformer function responsibles for passes our numerical and sting column features as an input to
    the model after applying feature engineering to these features.

        Args:
            inputs[Dict[Text, tf.Tensor]]: A Dictionary of feature columns and tensor values.
            categorical_columns[List[Text]]: represents our list of features that 
                                             we're going to apply feature engineering on them.
        Returns:
            Tuple[Dict[Text, tf.Tensor],
                  Dict[Text, Union[fc.numeric_column,
                                  fc.bucketized_column,
                                  fc.categorical_column_with_vocabulary_file,
                                  fc.embedding_column]]]
    """
    # Have a copy from the input features to pass-through columns.
    transformed = inputs.copy()

    # Now, we need to do our preprocessing and add it to the graph
    transformed['reviews'] = layers.Lambda(format_reviews,
                                           name='scrape_reviews')(inputs["reviews"])
    transformed['ratings'] = layers.Lambda(format_ratings,
                                           name='scrape_ratings',)(inputs["ratings"])

    feature_columns = {
        feature: fc.numeric_column(feature)
        for feature in ['reviews', 'ratings']
    }

    # We need to discretize our Ratings & Reviews
    range_of_reviews = np.arange(1.0,5.0,0.1).tolist()
    range_of_ratings = list(range(1, int(1e+4), 5))
    bucketize_reviews = fc.bucketized_column(source_column=
                                                 feature_columns['reviews'],
                                             boundaries=range_of_reviews)
    bucketize_ratings = fc.bucketized_column(source_column=
                                                 feature_columns['ratings'],
                                             boundaries=range_of_ratings)

#     hash_size_ratings_reviews = int(.5 * np.sqrt(len(range_of_reviews) * len(range_of_ratings))) # appply collisions
    hash_size_ratings_reviews = int(2e+3)  # appply collisions

    crossed_ratings_reviews = fc.crossed_column(
                                    keys=[bucketize_reviews, bucketize_ratings],
                                    hash_bucket_size=hash_size_ratings_reviews)

    # On-hot encodding the our crossed Ratings & Reviews
    feature_columns['crossed_ratings_reviews'] = fc.indicator_column(crossed_ratings_reviews)

    # Scaling each of Reviews (scaling to Data Distribution using z-score
    # & ratings (scaling it using min-max as it has a discrete range)
    transformed["reviews"] = layers.Lambda(scale_reviews,
                                           name='scaled_reviews')(transformed["reviews"])

    scale = format_ratings_ds(dataset=adapt_data, batch_size=batch_size, steps=steps)
    transformed["ratings"] = ZScoreCustomPreprocessor(scale,
                                                      name='scale_ratings')(transformed["ratings"])

    # Embed each of `author` and `title`
    feature_columns['title'] = hub.text_embedding_column_v2(
                                    key='title',
                                    module_path='https://tfhub.dev/google/nnlm-en-dim50/2',
                                    trainable=False
                               )
    author_categorical_feature = fc.categorical_column_with_hash_bucket(
                                    key='author',
                                    hash_bucket_size=len(authors_list) + 10)

    feature_columns['author'] = fc.embedding_column(
                                    categorical_column=author_categorical_feature,
                                    dimension=10)
    # We need to embed our `Genre` and `BookCategory`
    genre_vocab = fc.categorical_column_with_vocabulary_file(key="genre",
                                                             vocabulary_file=
                                                                 'Vocabulary/list_of_genre.txt',
                                                             num_oov_buckets=5)
    bookCat_vocab = fc.categorical_column_with_vocabulary_file(key='bookcategory',
                                                             vocabulary_file=
                                                                 'Vocabulary/list_of_bookCat.txt',
                                                             num_oov_buckets=2)
    genre_vocab_size = 345
    bookCat_vocab_size = 11
#     bookcategory_genre_coll = int(.5 * np.sqrt(genre_vocab_size * bookCat_vocab_size))
    feature_columns["genre_column"] = fc.indicator_column(genre_vocab)
    feature_columns["bookcategory_column"] = fc.indicator_column(bookCat_vocab)

    cross_genre_and_bookCat = fc.crossed_column([genre_vocab, bookCat_vocab],
                                                hash_bucket_size=genre_vocab_size * bookCat_vocab_size)
    feature_columns['bookcategory_genre'] = fc.embedding_column(cross_genre_and_bookCat,
                                                                dimension=2)

    return transformed, feature_columns

### (E) Train Model & Validate Model

In training and validating the model – we're responsible for building the model function – which is responsible for structuring the model's skeleton, compiling the loss function, and the learning rate.

- `model_build`: 
    + It is building the skeleton of the model.


- `train_and_evaluate`:
    + Train & Evaluate function is responsible for training and evaluating the model in a distributed manner. After finishing training, it will save the serving model.


Keras keeps a master list of all built-in layer, model, optimizer, and metric classes, which is used to find the correct class to call from_config. If the class can't be found, then an error is raised (Value Error: Unknown layer).

In [52]:
@tf.keras.utils.register_keras_serializable()
def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))

@tf.keras.utils.register_keras_serializable()
def rlmse(y_true, y_pred):
    return tf.sqrt(
            tf.reduce_mean(
                tf.square(
                    tf.experimental.numpy.log10(y_true + 1)\
                    - tf.experimental.numpy.log10(y_pred + 1))))

In [53]:
def model_build(adapt_data: tf.data.Dataset,
                batch_size: int,
                steps: int,
                authors_list: List[Text],
                linear_dnn_units: List[int],
                categorical_dnn_units: List[int],
                hidden_layer_unit: int) -> tf.keras.models.Model:
    """
    Building the skeleton of the model
    Args:
        linear_dnn_units [List[int]]: List contains number of nodes inside
                                      each of linear hidden layer.
        categorical_dnn_units [List[int]]: List contains number of nodes inside
                                          each of categorical hidden layer.
        hidden_layer_units [int]: Numbers of unit of our pre-last layer.
    Returns:
        tf.keras.models.Model
    """
    inputs = {
        feature: layers.Input(name=feature, shape=(), dtype='string')
        for feature in Features.FEATURES
    }

    transformed, feature_columns = transformer(inputs=inputs,
                                               adapt_data=adapt_data,
                                               batch_size=batch_size,
                                               steps=steps,
                                               authors_list=authors_list,
                                               categorical_columns=Features.FEATURES)

    numerical_dense_features = layers.DenseFeatures(
                                  feature_columns=itemgetter(*["reviews",
                                                               "ratings",
                                                               "crossed_ratings_reviews"])(feature_columns),
                                  name='numerical_dense_features')(dict(
                                                                    list(
                                                                        transformed.items())[4:6]))

    embedded_dense_features = layers.DenseFeatures(
                                     feature_columns=itemgetter(*["title",
                                                                  "author",
                                                                  "genre_column",
                                                                  "bookcategory_column",
                                                                  "bookcategory_genre"])(feature_columns),
                                      name='embedded_dense_features')(dict(
                                                                            list(
                                                                                transformed.items())[:4]))
    # Building dnn for Reviews & ratings features
    linear_hidden_layers = numerical_dense_features
    for layerNo, numNodes in enumerate(linear_dnn_units):
        linear_hidden_layers = layers.Dense(units=numNodes,
                                            activation="relu",
                                            kernel_initializer='normal',
                                            name=f'linear_dnn_{layerNo+1}')(linear_hidden_layers)

    # Building our depth layers using Genre & BookCategory for our categorical layers
    categorical_hidden_layers = embedded_dense_features
    for layerNo, numNodes in enumerate(categorical_dnn_units):
        categorical_hidden_layers = layers.Dense(units=numNodes,
                                            activation="relu",
                                            kernel_initializer='normal',
                                            name=f'embedded_dnn_{layerNo+1}')(categorical_hidden_layers)

    concatenation = layers.concatenate([linear_hidden_layers, categorical_hidden_layers],
                                       name='features_concatenation')
    hidden_layer_3 = layers.Dense(units=hidden_layer_unit,
                                  activation="relu", 
                                  kernel_initializer='normal',
                                  name='hidden_layer',
                                  kernel_regularizer=
                                      tf.keras.regularizers.l1(l1=1e-2))(concatenation)

    outputs = layers.Dense(1, activation='linear', name='price')(hidden_layer_3)

    model = tf.keras.models.Model(outputs=outputs, inputs=inputs)
    model.compile(optimizer="adam",
                  loss=rlmse,
                  metrics=[rmse])

    return model

<span style='color:red'>A wondering question, why we don't use **MSE** as our loss function?</span>

-  <span style='color:green'>Answer; **MSE** is not known as a loss function used in Regression models. It is efficient if you use **RLMSE** as we created in our `build_model` function.
.</span> 

Experiment the Model

In [54]:
model = model_build(adapt_data=dataset,
                    batch_size=10,
                    steps=873,
                    authors_list=authors_list,
                    linear_dnn_units=[64, 32, 16],
                    categorical_dnn_units= [32, 16],
                    hidden_layer_unit=8)

model.summary()
# tf.keras.utils.plot_model(model, 'model_graph.png', show_shapes=False)

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 ratings (InputLayer)           [(None,)]            0           []                               
                                                                                                  
 reviews (InputLayer)           [(None,)]            0           []                               
                                                                                                  
 scrape_ratings (Lambda)        (None,)              0           ['ratings[0][0]']                
                                                                                                  
 scrape_reviews (Lambda)        (None,)              0           ['reviews[0][0]']                
                                                                                              

So, what exactly is;
```python
@ tf.keras.utils.register_keras_serializable
```
>This decorator injects the decorated class or function into the Keras custom object dictionary, so that it can be serialized and deserialized without needing an entry in the user-provided custom object dict. It also injects a function that Keras will call to get the object's serializable string key.


<span style='color:green;'>

Read more about [**Keras Serialization & Deserialization**](https://www.tensorflow.org/api_docs/python/tf/keras/utils/register_keras_serializable)
</p>

Serializes a Layer object into a JSON-compatible representation.

<span style='color:blue;'>

<a href="https://www.tensorflow.org/api_docs/python/tf/keras/layers/serialize" >

**`tf.keras.layers.serialize`**

</a>
</span>

In [55]:
from pprint import pprint
pprint(tf.keras.layers.serialize(model))

{'class_name': 'Functional',
 'config': {'input_layers': {'author': ['author', 0, 0],
                             'bookcategory': ['bookcategory', 0, 0],
                             'genre': ['genre', 0, 0],
                             'ratings': ['ratings', 0, 0],
                             'reviews': ['reviews', 0, 0],
                             'title': ['title', 0, 0]},
            'layers': [{'class_name': 'InputLayer',
                        'config': {'batch_input_shape': (None,),
                                   'dtype': 'string',
                                   'name': 'ratings',
                                   'ragged': False,
                                   'sparse': False},
                        'inbound_nodes': [],
                        'name': 'ratings'},
                       {'class_name': 'InputLayer',
                        'config': {'batch_input_shape': (None,),
                                   'dtype': 'string',
                          

In [56]:
del dataset

In [57]:
del model

[<p style='font-size: 18px;font-weight:bold'>Custom functions & Custom subclasses</p>](https://www.tensorflow.org/guide/keras/save_and_serialize#:~:text=aware%20of%20it.-,custom%20functions,-Custom-defined%20functions)


Since graph execution doesn't save any python code, we can't insert any eager function with the serving model. It leads to incompatibility issues, which means that we will not be able to serve our model without adding our transformed features' functions and the input functions.

Since graph execution doesn't save any python code, we can't insert any eager function with the serving model. It leads to incompatibility issues, which means that we will not be able to serve our model without adding our transformed features' functions and the input functions.


<span style='color:CornflowerBlue;'>

>Custom-defined functions (e.g. activation loss or initialization) do not need a get_config method. The function name is sufficient for loading as long as it is registered as a custom object.
  
</span>
<span style='background-color:yellow'>
    Here, you can see the lack of not building an End-to-End without using <b>TFX</b>.
</span>
<span style='color:MediumTurquoise;'>

> **[Important](https://www.tensorflow.org/guide/saved_model#specifying_signatures_during_export
)**: *Unless you need to export your model to an environment other than TensorFlow 2.x with Python, you probably don't need to export signatures explicitly. If you're looking for a way of enforcing an input signature for a specific function, see the <u>[input_signature](https://www.tensorflow.org/api_docs/python/tf/function#input_signatures)</u> argument to [tf.function](https://www.tensorflow.org/api_docs/python/tf/function)*.

</span>

<span style='color: green;'>
    Read More about: <a href="https://www.tensorflow.org/tfx/guide/keras#keras_module_file_with_transform">
        Keras Module file with Transform</a>
</span>


In [58]:
def train_and_evaluate(train_path: Text,
                       dev_path: Text,
                       linear_dnn_units: List[int],
                       categorical_dnn_units: List[int],
                       hidden_layer_unit: int,
                       train_examples: int,
                       dev_examples: int,
                       batch_size: int,
                       epochs: int,
                       steps: int,
                       authors_list: List[Text],
                       model_dir: Text,
                       checkpoint_path: Text,
                       tensorboard_logs_path: Text,
                       start_from_latest_checkpoint: Optional[bool]) -> None:
    """
    Train & Evaluate function is responsible for training and evaluating the model
    in a distributed manner. After finishing training, it will save the model for serving.
        Args:
            train_path [Text]: Path where we're going to retrieve our training data.
            dev_path [Text]: Path where we're going to retrieve our development data.
            linear_dnn_units [List[int]]: List contains number of nodes inside
                                          each of linear hidden layer.
            categorical_dnn_units [List[int]]: List contains number of nodes inside
                                               each of categorical hidden layer.
            hidden_layer_units [int]: numbers of unit of our pre-last layer.
            train_examples [int]: number of examples in our training dataset.
            dev_examples [int]: number of examples in your development dataset.
            batch_size [int]: numbers of example per batch.
            epochs [int]: numbers of times proving the data for training.
            model_dir [Text]: location for saving our model.
            checkpoint_path [Text]: location for saving our model's checkpoints.
            tensorboard_logs_path [Text]: location where we're going to save our model's logs.
            start_from_latest_checkpoint [Optional[bool]]: it gives you the option that start
                                                           training from the last point you
                                                           finished last time.
    """
    import logging
    import datetime

    # Wrapping the Callback to trace the nodes values.
    # It is used in monitoring the weights of the network while examinate training.
#     class wPrint(tf.keras.callbacks.Callback):
#         def on_train_begin(self, logs={}):
#             tf.print('\nTrain Begin!')
#             tf.print(logs.keys())
#             tf.print(model.trainable_variables)
#             tf.print('=' * 40)

#         def on_epoch_end(self, epoch, logs={}):
#             tf.print('\nVariables After Epochs End!')
#             tf.print(logs.keys())
#             tf.print(model.trainable_variables)

    # Let's Build our distributed training model
    strategy = tf.distribute.MirroredStrategy()

    GLOBAL_BATCH_SIZE = batch_size * strategy.num_replicas_in_sync
    checkpoint_file_prefix = "model-checkpoints-epochs-{epoch:02d}-rlmse-{val_loss:0.4f}-time-" +\
                             datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
    # Load train & Eval dataset and train the model.
    train_dataset = create_dataset(train_path,
                                   mode='train',
                                   batch_size=GLOBAL_BATCH_SIZE,
                                   num_epochs=None)

    dev_dataset = create_dataset(dev_path,
                                 mode=None,
                                 batch_size=GLOBAL_BATCH_SIZE,
                                 num_epochs=epochs).take(dev_examples // 1000)
    with strategy.scope():
        try:
            model = model_build(adapt_data=train_dataset,
                                batch_size=GLOBAL_BATCH_SIZE,
                                steps=steps,
                                authors_list=authors_list,
                                linear_dnn_units=linear_dnn_units,
                                categorical_dnn_units=categorical_dnn_units,
                                hidden_layer_unit=hidden_layer_unit)

            if start_from_latest_checkpoint:
                latest = tf.train.latest_checkpoint('checkpoints')
                model.load_weights(latest)
        except ValueError:
            logging.error("You've changed the layers' structure of the model.\nThis model can't execute the previous checkpoint on this recent model's skeleton.")

        # Initializing the callbacks
        checkpoints = tf.keras.callbacks.ModelCheckpoint(
            filepath=os.path.join(checkpoint_path,
                                  checkpoint_file_prefix),
                                  save_best_only=True,
                                  save_weights_only=True,
                                  mode= 'min',
                                  verbose=1)
        tensorboard = tf.keras.callbacks.TensorBoard(log_dir=tensorboard_logs_path)
        ES = tf.keras.callbacks.EarlyStopping(patience=2,
                                              mode='min',
                                              verbose=1)
        reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
                                                         factor=1e-2,
                                                         mode='min',
                                                         patience=0,
                                                         verbose=1)
        model.fit(train_dataset,
                  validation_data=dev_dataset,
                  steps_per_epoch=train_examples // GLOBAL_BATCH_SIZE,
                  epochs=epochs,
#                   validation_steps=1,
                  callbacks=[checkpoints, tensorboard, ES, reduce_lr])

        # Save the model
        model_dir = os.path.join(model_dir, f"model-{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}")

        tf.keras.models.save_model(model,
                                   model_dir)
# if __name__ == '__main__':
#     fire.Fire(train_and_evaluate)

#### Run Model

In [61]:
# Create Model's Directories
os.makedirs('Models')
os.makedirs('checkpoints')
os.makedirs('Tensorboard')

In [62]:
train_and_evaluate(
                "Dataset/train_data.csv",
                "Dataset/dev_data.csv",
                [64, 32, 16],
                [64, 32],
                8,
                train_data.shape[0],
                dev_dataframe.shape[0],
                64,
                9,
                873,
                authors_list,
                "Models",
                "checkpoints",
                "Tensorboard",
                False)

2022-01-07 12:42:55.105813: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "LegacyParallelInterleaveDatasetV2/_11"
op: "LegacyParallelInterleaveDatasetV2"
input: "ShuffleDatasetV3/_6"
input: "Const/_2"
input: "Const/_2"
input: "Const/_9"
input: "Const/_9"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "deterministic"
  value {
    s: "true"
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_tf_data_experimental_parallel_interleave_filename_to_dataset_985747"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n%LegacyParallelInterleaveDatasetV2:254"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
      shape {
      }
      shape {


Epoch 1/9

2022-01-07 12:43:32.548925: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "LegacyParallelInterleaveDatasetV2/_11"
op: "LegacyParallelInterleaveDatasetV2"
input: "ShuffleDatasetV3/_6"
input: "Const/_7"
input: "Const/_7"
input: "Const/_9"
input: "Const/_9"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "deterministic"
  value {
    s: "true"
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_tf_data_experimental_parallel_interleave_filename_to_dataset_985842"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n%LegacyParallelInterleaveDatasetV2:266"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
      shape {
      }
      shape {



Epoch 00001: val_loss improved from inf to 1.37903, saving model to checkpoints/model-checkpoints-epochs-01-rlmse-1.3790-time-2022-01-07_12-40-24
Epoch 2/9

2022-01-07 12:44:07.105950: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.



Epoch 00002: val_loss improved from 1.37903 to 0.58096, saving model to checkpoints/model-checkpoints-epochs-02-rlmse-0.5810-time-2022-01-07_12-40-24
Epoch 3/9

2022-01-07 12:44:41.032440: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.



Epoch 00003: val_loss improved from 0.58096 to 0.40492, saving model to checkpoints/model-checkpoints-epochs-03-rlmse-0.4049-time-2022-01-07_12-40-24
Epoch 4/9

2022-01-07 12:45:14.578757: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.



Epoch 00004: val_loss improved from 0.40492 to 0.35873, saving model to checkpoints/model-checkpoints-epochs-04-rlmse-0.3587-time-2022-01-07_12-40-24
Epoch 5/9

2022-01-07 12:45:48.613038: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.



Epoch 00005: val_loss did not improve from 0.35873

Epoch 00005: ReduceLROnPlateau reducing learning rate to 1.0000000474974514e-05.
Epoch 6/9

2022-01-07 12:46:22.764939: W tensorflow/core/framework/dataset.cc:744] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.



Epoch 00006: val_loss did not improve from 0.35873

Epoch 00006: ReduceLROnPlateau reducing learning rate to 1.0000000656873453e-07.
Epoch 00006: early stopping


2022-01-07 12:47:21.914108: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.


### (F) Evaluate Model

I used to use **Tensorboard** for monitoring the model performance while training & validating the model. Therefore, I will be able to trace and track the model's linkage, function's losses, and the graphical environment of my Tensorflow.

<p style='font-size: 18px;font-weight:bold'>From Structure Prospective</p>

- We're going to compare two Skeleton which has built for this project:

<img src="https://i.imgur.com/KlZ0vkR.png" style='width:100%'>This image shows the problem of connecting float datapoints without scaling them, besides the lack of layers shown in the image</img>

<img src="https://i.imgur.com/pd8izTP.png" style='width:100%'>On the other hand, we can see that only the scaled values are connected with the `feature_column` layer which is `DenseFeature`. Also, we can see – we managed to split the categorical embedded vocabularies in a different `DenseFeature` layer; this helps to have deep & managed data distribution across the whole graph.</img>


<p style='font-size: 18px;font-weight:bold'>Training Performance</p>

- We're going to compare two different training performance:

    1. **First Failed Model**

        <img src="https://i.imgur.com/DzbU9D0.png" style='width:100%'>This image shows the loss function curve across the number of epochs using `rlmse` as my loss function evaluator and how hard the model is barely training.</img>

        <img src="https://i.imgur.com/csMHX7F.png" style='width:100%'>This image shows the model performance in the training phase along with the number of iterations for the validation data. </img>

    2. **Second Model**

        <img src="https://i.imgur.com/9V4YFhF.png" style='width:100%'>This image shows the loss function curve across the number of epochs using `rlmse` as my loss function evaluator</img>

        <img src="https://i.imgur.com/m5r8BYg.png" style='width:100%'>This image shows the model performance in the training phase along with the number of iterations for the validation data.</img>

<span style="color:green;">Examinate the monitored training performance in this follow link:</span>[Tensorboard.dev](https://tensorboard.dev/experiment/QAIWXZmOQvWtxRnzDA3eGA/#scalars)

### (G) Serving Model

The serving model is the function responsible for serving the data to our model in the right manner without applying preprocessing for the real values (that's why we relied on creating our transform preprocessing pipeline inside the model's graph).

<img src='https://cloud.google.com/architecture/images/data-preprocessing-for-ml-with-tf-transform-tf-transform-behavior.svg' style='width:100%'> This diagram shows – how `tf.Transform`  applies the behavior of preprocessing and transforming in **Training (Fitting)** and also **prediction (Serving, or Inferencing)**.

<span style='color:green;'>To Read More about `tf.Transform` preprocessing and transforming, Read This article by [**Google Cloud**](https://cloud.google.com/architecture/data-preprocessing-for-ml-with-tf-transform-pt2)</span></img>

<span style='color:DodgerBlue;'>

> You can use `tf.function` to make graphs out of your programs. It is a transformation tool that creates Python-independent dataflow graphs out of your Python code. This will help you create performant and portable models, and it is required to use SavedModel.
</span>

<span style='color:green;'>You can read more about how the `tf.function` is efficient when there's a high-level of operations overheating, so you can use it as an alternative</span>: [tf.function](https://www.tensorflow.org/guide/function)

We're going to create a function that apply a unified predictions (Batch & Streaming):-

**Make sure you serialized the Custom functions & Subclass before loading the model**

In [63]:
def streaming(model: tf.keras.models.Model,
              row_data: Dict[Text, Text]) -> Dict[Text, float]:

    """
    The streaming function is responsible for predicting streaming values –
    you're sending the features of the book, and the streaming function predicts the price.

    Args:
        model [tf.keras.models.Model]: model we're using for predictions.
        row_data [Dict[Text, Text]]: we treats the variable here as a Dict or JSON, so you can
                                     insert both for prediction.
     Returns:
         [Dict[Text, float]]: this will be the predictions in form of JSON (since this is "" not '')
                              but also can treated like a Dict (if you checked the type).
    """
    import warnings
    warnings.filterwarnings('ignore')

    pred_dict = {
        feature: tf.convert_to_tensor([row_data[feature]])
        for feature in Features.FEATURES
    }
    return {"price":
            np.round(float(model.predict(pred_dict)), decimals=4)}


In [100]:
def batching(model: tf.keras.models.Model,
             dataset_dir: Text,
             save_dir: Text,
             batch_size: int) -> Dict[Text, float]:
    """
    The batching function is responsible for predicting batching data –
    you're sending the features of the books, and the batching function predicts the prices.

    Args:
        model [tf.keras.models.Model]: model we're using for predictions.
        dataset_dir [Text]: directory of the features' file.
        save_dir [Text]: Place where we have to save our prediction in.
        batch_size [int]: numbers of example per batch.
     Returns:
         [Dict[Text, float]]: this will be the predictions in form of JSON (since this is "" not '')
                              but also can treated like a Dict (if you checked the type).
    """
    import itertools
    output = []
    TEST_EXAMPLES_SIZE = pd.read_csv(dataset_dir, index_col=False).shape[0]
    STEPS_PER_EPOCH = TEST_EXAMPLES_SIZE // batch_size
    dataset = tf.data.experimental.make_csv_dataset(dataset_dir,
                                                    batch_size=batch_size,
                                                    column_names=Features.INFER_COLUMNS,
                                                    select_columns=Features.FEATURES,
                                                    shuffle=False)
    
    for index, (batch, _) in enumerate(zip(dataset, range(STEPS_PER_EPOCH))):
        output.append(model.predict(batch).tolist())
#         tf.print(f'Batch No. {index} is Done!')
    # Directory to save the data in
    try:
        pd.DataFrame({"price": [value[0] for value in itertools.chain(*output)]}).\
        to_csv(os.path.join(save_dir, 'test_batch_predictions.csv'), index=False)
    except Exception as e:
        return tf.print('Something wrong happened!: ', e)
    else:
        return tf.print('Your Predictions has been saved, Sucessfully!')

In [101]:
def serving_fn(prediction_type: Optional[Union['stream',
                                               'batch']],
               model_dir: Optional[Union[Text, bool]],
               data: Optional[Union[Dict[Text, Text], Text]],
               save_dir: Text,
               batch_size: Optional[Union[int, None]] = 0,
               ) -> Dict[Text,
                                                                  Optional[
                                                                           Union[float,
                                                                                 List[float]]]]:
    """
    Responsible for providing a unified serving predictions.
    Args:
        model_dir [Optional[Union[Text, book]]]: whether you want to insert the model path,
                                                 or you want to retrieve the latest updated model
                                                 for prediction.
        data [Optional[Union[Dict[Text, Text], Text]]]: whether it is a row of data | batched data.
        save_dir [Text]: Place where we have to save our prediction in.
        batch_size [int]: numbers of example per batch.
     Returns:
         [Dict[Text,
               Optional[
                   Union[float,
                         List[float]]]]]: whether it returns dictionary of price for single value –
                                          or returns a dictionary of list of prices for books
    """
    output = None
    strategy = tf.distribute.MirroredStrategy()
    GLOBAL_BATCH_SIZE = batch_size * strategy.num_replicas_in_sync
    # try:
    if model_dir == True: # It means that you want to predict using latest trained model.
        path = os.path.join('Models', os.listdir('Models')[-1])
    else:
        path = model_dir
    # We want to use Synchronous Distributed training for batch predictions since it might
    # take too long if your dataset is large.
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    with strategy.scope():
        model = tf.keras.models.load_model(path,
                                           custom_objects={
                                                "format_reviews": format_reviews,
                                                "scale_reviews": scale_reviews,
                                                "format_ratings": format_ratings
                                            }, compile=False)  # We don't need to retrain the model,
                                                   # we can do that by invoke the checkpoints
                                                   # using train & evaluate.
        if prediction_type == "stream":
            if type(data) is dict:
                output = streaming(model,
                                   row_data=data)
            else:
                tf.print('You can sent single row of data to stream ONLY!')
        elif prediction_type == 'batch':
            if batch_size != 0:
                output = batching(model=model,
                                  dataset_dir=data,
                                  save_dir=save_dir,
                                  batch_size=GLOBAL_BATCH_SIZE)
            else:
                tf.print("If you want batch prediction, you MUST insert `batch_size`.")
        else:
            tf.print('Incorrect inference type!')
    return output

In [None]:
# Cleared the outupt of this cell due to the AutoShard problem in Kaggle Envoironment!
serving_fn('batch', True, 'Dataset/test_data.csv',"Dataset", 60)

**The Batch prediction**

In [105]:
pd.read_csv('Dataset/test_batch_predictions.csv', index_col=False).head()

Unnamed: 0,price
0,144.747696
1,394.749573
2,404.099426
3,389.46283
4,416.120056


Lastly, you may find the result quite an average due to the lack of data. Obviously, you're not going to rely on the `reviews` and `ratings` alone in this project, or the author's name. These are multiple more features that may help if it was there like; readers' opinions, bins of the `reviews` and `ratings` and so on.

The idea of this project is to show to you the power of using **Transform** and at the same time how it can be really powerful if we applied that using the production framework – **TFX**.

I hope this project was interesting for you! Please, if you find any comment regards to the project, contact me!

<center>
________________________________
</center>

<p style='text-align:center;'>Thanks for reaching this level of expermenting
the idea of <b>Transform</b></p>
<p style='text-align:center;'>Data Scientist & ML Engineer: <a href='https://www.linkedin.com/in/drxavier997/'>Ahmed</a></p>
<p style='text-align:center;'>Created at: 2021-01-06