# Keras Model Building

In the last flow, we need to deploy the model to Sagemaker. However, SageMaker is easier to use with one of the pre-defined model types - in this case Tensorflow. 

In fact, our deployment strategy for the KNN-based model we trained is to first "export" it to a TF-Recs model with keras (the function keras_model), and then deploy it to SageMaker with their TensorFlowModel abstraction. 

First we import the packages we need and define some config variables:

In [None]:
import os

for _ in range(3):
    if os.path.exists(f'{os.getcwd()}/setup.py'):
        break
    os.chdir('..')
print('Current working directory:', os.getcwd())

In [None]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '1'

In [None]:
import random
import tarfile
import time
from pathlib import Path
from typing import Tuple

import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs

from src.utils.logging import bprint
from src.utils.meta import get_latest_successful_run
from src.utils.styling import apply_styling

In [None]:
colors = apply_styling()
palette = colors['palette']

Let's retrieved the artifacts from the latest successful run. 
The `get_latest_successful_run` uses the `metaflow.Flow` object to get results of runs using the (class) name of the flows. 

In [None]:
FLOW_NAME = 'ModelingFlow'
latest_run = get_latest_successful_run(FLOW_NAME)
final_vectors = latest_run.data.final_vectors
final_dataset = latest_run.data.final_dataset

In [None]:
type(latest_run)

### Retrieval Model

First, we gather the ids and the embeddings of the songs

In [None]:
songs_ids = np.array(final_vectors.index_to_key)
songs_embeddings = np.array([final_vectors[idx] for idx in songs_ids])

We need to include an "unknown" item in the embedding matrix.

In [None]:
num_embeddings = len(songs_embeddings)
embedding_size = songs_embeddings[0].shape[0]
bprint(f'Num of embeddings: {len(songs_embeddings):,}', level=3)
bprint(f'Embeddings dimensions: {embedding_size}', level=3)

bprint('Adding a vector for unknown items', level=3)
unknown_vector = np.zeros((1, embedding_size))
embedding_matrix = np.vstack([unknown_vector, songs_embeddings])
bprint('First item:', embedding_matrix[0][0:5], level=4)
bprint('Shape of the matrix:', embedding_matrix.shape, level=4)
assert embedding_matrix[0][0] == 0.0

bprint('Initializing layers and network', level=3)
lookup_layer = tf.keras.layers.StringLookup(vocabulary=songs_ids, mask_token=None)
embedding_layer = tf.keras.layers.Embedding(
    input_dim=embedding_matrix.shape[0],
    output_dim=embedding_matrix.shape[1],
    weights=[embedding_matrix],
    trainable=False,
)
embedding_layer.build((None,))

model = tf.keras.Sequential([lookup_layer, embedding_layer])

bprint('Creating retrieval model', level=3)
brute_force = tfrs.layers.factorized_top_k.BruteForce(model)
song_index = brute_force.index(candidates=songs_embeddings, identifiers=songs_ids)

Finally, we test the model with one of the songs.

In [None]:
def get_recommendations(song_id, k=10):
    """
    Get recommendations for a given song.

    Parameters
    ----------
    song_id : int
        The ID of the song for which recommendations are to be generated.
    k : int, optional
        The number of recommendations to be returned. Default is 10.

    Returns
    -------
    tuple
        A tuple containing the song vector, recommendation scores, and recommendation IDs.
        - song_vector : numpy.ndarray
            The vector representation of the input song.
        - rec_scores : numpy.ndarray
            The scores of the recommended songs.
        - rec_ids : numpy.ndarray
            The IDs of the recommended songs.
    """
    song_vector = model(np.array([song_id]))
    rec_scores, rec_ids = song_index(tf.constant([song_id]), k=k)
    return song_vector, rec_scores, rec_ids

def pprint_recommendations(song_id, k=10):
    """
    Print recommendations for a given song.

    Parameters
    ----------
    song_id : int
        The ID of the song for which recommendations are to be generated.
    k : int, optional
        The number of recommendations to be returned. Default is 10.
    """
    song_vector, rec_scores, rec_ids = get_recommendations(song_id, k=k)
    bprint('Song ID:', song_id, level=4, prefix='*')
    bprint('Song vector:', song_vector.numpy()[0][:5], level=4)
    bprint('Recommendations after track:', level=4)
    for score, song_id in zip(rec_scores[0].numpy(), rec_ids[0].numpy()):
        song_id = str(song_id, 'utf-8')
        if not song_id == song_id:
            continue
        bprint(f'{score:.2f} - {song_id}', level=5)

In [None]:
type(get_recommendations('Alabimbombao~', k=5)[1])

In [None]:
bprint('Testing retrieval model', level=3)
test_id = 'Alabimbombao~'  # Unknown!
pprint_recommendations(test_id, k=5)

test_index = 3
test_id = songs_ids[test_index]
pprint_recommendations(test_id, k=5)

Let's put it all together.

In [None]:
def build_retrieval_model(songs_ids, songs_embeddings):
    embedding_size = songs_embeddings[0].shape[0]
    bprint(f'Num of embeddings: {len(songs_embeddings):,}', level=3)
    bprint(f'Embeddings dimensions: {embedding_size}', level=3)

    bprint('Adding a vector for unknown items', level=3)
    unknown_vector = np.zeros((1, embedding_size))
    embedding_matrix = np.vstack([unknown_vector, songs_embeddings])
    bprint('First item:', embedding_matrix[0][0:5], level=4)
    bprint('Shape of the matrix:', embedding_matrix.shape, level=4)
    assert embedding_matrix[0][0] == 0.0

    bprint('Initializing layers and network', level=3)
    lookup_layer = tf.keras.layers.StringLookup(vocabulary=songs_ids, mask_token=None)
    embedding_layer = tf.keras.layers.Embedding(
        input_dim=embedding_matrix.shape[0],
        output_dim=embedding_matrix.shape[1],
        weights=[embedding_matrix],
        trainable=False,
    )
    embedding_layer.build((None,))

    model = tf.keras.Sequential([lookup_layer, embedding_layer])

    bprint('Creating retrieval model', level=3)
    brute_force = tfrs.layers.factorized_top_k.BruteForce(model)
    song_index = brute_force.index(candidates=songs_embeddings, identifiers=songs_ids)
    return model, song_index

In [None]:
class RetrievalModel:
    """
    Build a retrieval model using TF recommender abstraction by packaging
    the vector space in a Keras object.

    We can ship the artifact "as is" to a Sagemaker endpoint, and
    benefit from the PaaS abstraction and hardware acceleration.
    """

    def __init__(self, songs_ids: np.ndarray, songs_embeddings: np.ndarray):
        """Initialize the retrieval model."""
        bprint('Building retrieval model', level=2)
        model, song_index = build_retrieval_model(songs_ids, songs_embeddings)
        self.model = model
        self.song_index = song_index

    def test(self):
        """Test the retrieval model."""
        bprint('Testing retrieval model', level=2)
        test_index = random.randint(0, 10)
        test_id = songs_ids[test_index]
        self.pprint_recommendations(test_id, k=5)
        test_id = r'Alabimbombao ヽ(≧◡≦)八(o^ ^o)ノ'  # Unknown!
        self.pprint_recommendations(test_id, k=5)

    def get_recommendations(self, song_id: str, k: int = 10) -> Tuple:
        """
        Get recommendations for a given song.

        Parameters
        ----------
        song_id : int
            The ID of the song for which recommendations are to be generated.
        k : int, optional
            The number of recommendations to be returned. Default is 10.

        Returns
        -------
        tuple
            A tuple containing the song vector, recommendation scores, and recommendation IDs.
            - song_vector : Tensorflow Tensor
                The vector representation of the input song.
            - rec_scores : Tensorflow Tensor
                The scores of the recommended songs.
            - rec_ids : Tensorflow Tensor
                The IDs of the recommended songs.
        """
        song_vector = self.model(np.array([song_id]))
        rec_scores, rec_ids = self.song_index(tf.constant([song_id]), k=k)
        return song_vector, rec_scores, rec_ids

    def pprint_recommendations(self, song_id, k=10):
        """
        Print recommendations for a given song.

        Parameters
        ----------
        song_id : int
            The ID of the song for which recommendations are to be generated.
        k : int, optional
            The number of recommendations to be returned. Default is 10.
        """
        song_vector, rec_scores, rec_ids = self.get_recommendations(song_id, k=k)
        bprint('Song ID:', song_id, level=3, prefix='* ')
        bprint('Song vector:', song_vector.numpy()[0][:5], level=3)
        bprint('Recommendations after track:', level=3)
        for rec_score, rec_id in zip(rec_scores[0].numpy(), rec_ids[0].numpy()):
            rec_id = str(rec_id, 'utf-8')
            if rec_id != song_id:
                bprint(f'{rec_score:.2f} - {rec_id}', level=4)

    def save(self, *args, **kwargs):
        """Save the retrieval model."""
        self.song_index.save(*args, **kwargs)

In [None]:
songs_ids = np.array(final_vectors.index_to_key)
songs_embeddings = np.array([final_vectors[idx] for idx in songs_ids])

model = RetrievalModel(songs_ids, songs_embeddings)
model.test()

In [None]:
bprint('Saving model locally', level=2)
model_timestamp = int(round(time.time() * 1000))
models_dir = Path('data/04_models')
model_name = models_dir / f'playlist-recs-model-{model_timestamp}/1'
local_tar_name = models_dir / f'model-{model_timestamp}.tar.gz'
bprint(f'Model path: {model_name}', level=3)
bprint(f'Tarfile path: {local_tar_name}', level=3)

model.save(filepath=str(model_name))  # Save the tfrs index model
with tarfile.open(local_tar_name, mode='w:gz') as _tar:
    _tar.add(model_name, recursive=True)