In [0]:
%pip install --no-dependencies tensorflow-recommenders tensorflow-datasets tensorflow_ranking scann pydot

In [0]:
%load_ext tensorboard

In [0]:
import mlflow
import tensorflow as tf
import tensorflow_recommenders as tfrs
from spark_tensorflow_distributor import MirroredStrategyRunner

print([tf.__version__, tf.config.list_physical_devices('GPU')])

In [0]:
dbutils.widgets.text("num_epochs", "3")
NUM_EPOCHS = int(dbutils.widgets.get("num_epochs"))

dbutils.widgets.text("batch_size", "20000")
BATCH_SIZE = int(dbutils.widgets.get("batch_size"))

print(NUM_EPOCHS, BATCH_SIZE)

path_prefix = "/sais22_sample"
data_path = path_prefix + "/data"

best_model_path = f"/dbfs{path_prefix}/model/best"
chkpt_model_path = f"/dbfs{path_prefix}/model/chkpt"

# needed to access dbfs from python code and tf.data
get_python_path = (lambda path: f"/dbfs{path}")

print(path_prefix, get_python_path(path_prefix), sep="\n")

In [0]:
def get_num_workers():
  """
    source: https://stackoverflow.com/questions/28768642/getting-number-of-visible-nodes-in-pyspark
  """
  s = sc._jsc.sc().getExecutorMemoryStatus().keys()
  l = str(s).replace("Set(","").replace(")","").split(", ")

  d = set()
  for i in l:
      d.add(i.split(":")[0])
  return len(d) - 1

NUM_WORKERS = get_num_workers()
print(f"Number of workers: {NUM_WORKERS}")

# Assume the driver node and worker nodes have the same instance type.
NUM_GPUS = len(tf.config.list_logical_devices('GPU')) * NUM_WORKERS
NUM_CPUS = len(tf.config.list_logical_devices('CPU')) * NUM_WORKERS
USE_GPU = NUM_GPUS > 0
NUM_SLOTS =NUM_GPUS if USE_GPU else NUM_CPUS

print(f"num GPUs: {NUM_GPUS}, num CPUs: {NUM_CPUS}")
print(f"Use GPU: {USE_GPU}, num slots: {NUM_SLOTS}")

In [0]:
try:
    strategy = tf.distribute.MirroredStrategy()
except:
    strategy = tf.distribute.get_strategy()
print("Number of replicas:", strategy.num_replicas_in_sync)

In [0]:
def log_dict_to_mlflow(d, artifact_path=None):
  import json
  import tempfile
  import mlflow
  with tempfile.NamedTemporaryFile(mode='w', suffix='.json') as f:
    # Note the mode is 'w' so json could be dumped
    # Note the suffix is .txt so the UI will show the file
    json.dump(d, f)
    f.seek(0) # You cannot close the file as it will be removed. You have to move back to its head
    mlflow.log_artifact(f.name, artifact_path=artifact_path)
    
class MlFlowLogging(tf.keras.callbacks.Callback):
  import mlflow

  def __init__(self, artifact_name="model", *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.artifact_name = artifact_name

  def on_train_end(self, logs=None):
    super().on_train_end(logs)
#       mlflow.keras.log_model(self.model, f"{self.artifact_name}/final")

  def on_epoch_end(self, epoch, logs=None):
    super().on_epoch_end(epoch, logs)
    keys = list(logs.keys())
    for key in keys:
      if "Tensor" not in str(type(logs[key])):
        mlflow.log_metric(key, float(logs[key]), epoch)
        continue
      mlflow.log_metric(key, float(logs[key].numpy()), epoch)

In [0]:
def get_callbacks(is_chief, logdir, model_save_path, model_chkpt_path, monitor_metric="val_loss", early_stop_patience=3, reduce_lr_patience=1, reduce_lr_factor=0.5):
  import os
  from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, TensorBoard
  import mlflow  
  
  reduce_lr = ReduceLROnPlateau(monitor=monitor_metric, factor=reduce_lr_factor, verbose=1,
                                min_delta=0.01, min_lr=1e-6, patience=reduce_lr_patience)
  early_stop = EarlyStopping(monitor=monitor_metric, patience=early_stop_patience, min_delta=0.01,
                               verbose=1)
  
  best_model_save_callback = ModelCheckpoint(
      filepath=model_save_path,
      monitor=monitor_metric,
      save_best_only=True,
      )
  
  model_chkpt_save_callback_tmp = ModelCheckpoint(
    filepath="some_random_path_chkpt",
    monitor=monitor_metric,  
    save_weights_only=True
    )
  
  model_chkpt_save_callback = ModelCheckpoint(
    filepath=os.path.join(model_chkpt_path, "ckpt_{epoch}"),
    monitor=monitor_metric, 
    save_weights_only=True,
    )
  
  best_model_save_callback_tmp = ModelCheckpoint(
    filepath="some_random_path",
    monitor=monitor_metric,
    save_best_only=True
    )
  
  mlflow_logging = MlFlowLogging()
  
  
  profile_batch = [1,2]
  tensorboard = TensorBoard(logdir, profile_batch=profile_batch)
  tensorboard_worker_tmp = TensorBoard("tmp_logs", profile_batch=profile_batch)
  
  callbacks = [
    reduce_lr, 
    early_stop
  ]
  
  if is_chief:
    callbacks += [
      best_model_save_callback,
      model_chkpt_save_callback,
      tensorboard,
      mlflow_logging
     ]   
    
  else:
    callbacks += [
      best_model_save_callback_tmp,
      model_chkpt_save_callback_tmp,
      tensorboard_worker_tmp
     ]
    
  return callbacks

### Process data using Spark

In [0]:
def create_tfrecords_using_spark():
  dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'

  ratings_filename = dbfs_dir + '/ratings.csv' 
  movies_filename = dbfs_dir + '/movies.csv'

  ratings = spark.read.option("header", "true").csv(ratings_filename).selectExpr("cast(movieId as string) movie_id", "cast(userId as string) user_id")
  movies = spark.read.option("header", "true").csv(movies_filename).selectExpr("cast(movieId as string) movie_id", "title")

  train, test = ratings.join(movies, ["movie_id"]).select("movie_id", "user_id", "title").distinct().randomSplit([0.8, 0.2], seed=123)

  #limiting counts for demo
  train = train.limit(80000)
  test = test.limit(20000)

  sampled_df = train.unionByName(test)

  users = sampled_df.select("user_id").distinct()
  print(users.count())

  print(train.count())
  print(test.count())

  train.repartition(10).write.format("tfrecords").mode("overwrite").save(f"{data_path}/train")
  test.repartition(5).write.format("tfrecords").mode("overwrite").save(f"{data_path}/test")
  users.repartition(3).write.format("tfrecords").mode("overwrite").save(f"{data_path}/users")
  #limit to only movies in the sample
  movies.join(sampled_df.select("movie_id").distinct(), ["movie_id"]).repartition(3).write.format("tfrecords").mode("overwrite").save(f"{data_path}/movies")
  
create_tfrecords_using_spark()

### Reading data as tf dataset

Helpers for tf.data

now we are ready to load the datasets

In [0]:
def make_datasets(train_shuffle_buffer=100000, test_shuffle_buffer=20000, distributed=False):
  import os
  import datetime
  import mlflow

  import numpy as np
  import pandas as pd
  import tensorflow as tf
  import tensorflow.keras.layers as L
  
  from tensorflow import keras
  from typing import Dict, Text
  
  def generate_filepaths(typ: str) -> [str]:  
    """
    this method creates a list of file paths, to be read by tf.data
    """
    import os
    
    out = []
    for base,_,files in os.walk(os.path.join("/dbfs" + data_path, typ)):
      out = [os.path.join(base, file) for file in files if not file.startswith("_")]
      return out

  def read_movies_tfrecord(example):
    tfrecord_format = (
        {
          "movie_id": tf.io.FixedLenFeature([], tf.string),
          "title":tf.io.FixedLenFeature([], tf.string)
        }
    )
    example = tf.io.parse_single_example(example, tfrecord_format)
    return example

  def read_interaction_tfrecord(example):
    tfrecord_format = (
        {
            "movie_id": tf.io.FixedLenFeature([], tf.string),
            "user_id": tf.io.FixedLenFeature([], tf.string),
            "title":tf.io.FixedLenFeature([], tf.string)
        }
    )
    example = tf.io.parse_single_example(example, tfrecord_format)
    return example
  
  def create_lookup_for(col_name, dataset):
    lookup = L.StringLookup(num_oov_indices=1, name=f"{col_name}_lookup")
    lookup.adapt(dataset.map(lambda r: r[col_name]).batch(500))
    print(f"Vocabulary size for {col_name} lookup: {lookup.vocabulary_size()}")
    return lookup
  
  def make_distributed_dataset(dataset, shuffle=False):
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA

    return dataset.with_options(options).batch(BATCH_SIZE, drop_remainder=True)
  
  movies_ds = tf.data.TFRecordDataset(filenames=generate_filepaths("movies")).map(read_movies_tfrecord)
  train_ds = tf.data.TFRecordDataset(filenames=generate_filepaths("train")).map(read_interaction_tfrecord)
  test_ds = tf.data.TFRecordDataset(filenames=generate_filepaths("test")).map(read_interaction_tfrecord)

  
  # verifying if the datta was read properly
  # always good to test a batch so we can capture issues
  # if you see NaN issues here, it would be mostly some form of data type issue
  for row in movies_ds.batch(10).take(1):
    print(row)
  
  for row in train_ds.batch(10).take(1):
    print(row)

  for row in test_ds.batch(10).take(1):
    print(row)
    
  user_cols = ["user_id"]
  movie_cols = ["title"]
  
  lookups = {**{col: create_lookup_for(col, train_ds) for col in user_cols}, **{col: create_lookup_for(col, movies_ds) for col in movie_cols}}
  

  if distributed:
    return make_distributed_dataset(train_ds), make_distributed_dataset(test_ds), movies_ds, lookups
  return train_ds.batch(BATCH_SIZE, drop_remainder=True), test_ds.batch(BATCH_SIZE, drop_remainder=True), movies_ds, lookups

In [0]:
train_ds, test_ds, movies_ds, lookups = make_datasets()
lookups

In [0]:
from typing import Dict, List, Text
import os

class MovielensModel(tfrs.Model):

  def __init__(self, user_model, movie_model, movies):
    super().__init__()
    self.movie_model: tf.keras.Model = movie_model
    self.user_model: tf.keras.Model = user_model
    self.task: tf.keras.layers.Layer = tfrs.tasks.Retrieval(
      metrics=tfrs.metrics.FactorizedTopK(
        candidates=movies.map(lambda r: r["title"]).batch(1000).map(movie_model)
      ))

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # We pick out the user features and pass them into the user model.
    user_embeddings = self.user_model(features["user_id"])
    # And pick out the movie features and pass them into the movie model,
    # getting embeddings back.
    positive_movie_embeddings = self.movie_model(features["title"])

    # The task computes the loss and the metrics.
    return self.task(user_embeddings, positive_movie_embeddings, compute_metrics=not training)
 
  def summary(self, *args, **kwargs):
    self.user_model.summary(*args, **kwargs)
    self.movie_model.summary(*args, **kwargs)
    
  def save(self, location, **kwargs):
    print(f"save to location {location}")
    self.user_model.save(os.path.join(location, 'user'), **kwargs)
    self.movie_model.save(os.path.join(location, 'movie'), **kwargs)
    
  def save_weights(self, location, **kwargs):
    print(f"save_weights to location {location}")
    self.user_model.save_weights(os.path.join(location, 'user'), **kwargs)
    self.movie_model.save_weights(os.path.join(location, 'movie'), **kwargs)
    
  def load_weights(self, location, **kwargs):
    self.user_model.load_weights(os.path.join(location, 'user'), **kwargs)
    self.movie_model.load_weights(os.path.join(location, 'movie'), **kwargs)

In [0]:
def make_model(lookups, movies_ds, embedding_dimension=8):
  import tensorflow as tf
  
  user_lookup = lookups["user_id"]
  movie_title_lookup = lookups["title"]
  user_model = tf.keras.Sequential([
    user_lookup,
    tf.keras.layers.Embedding(user_lookup.vocabulary_size(), embedding_dimension)
  ])

  movie_model = tf.keras.Sequential([
    movie_title_lookup,
    tf.keras.layers.Embedding(movie_title_lookup.vocabulary_size(), embedding_dimension)
  ])

  model = MovielensModel(user_model, movie_model, movies_ds)
  model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.01))
  
  return model

In [0]:
model = make_model(lookups, movies_ds)

In [0]:
import datetime
experiment_log_dir = "/dbfs/tf_logs"
log_dir = experiment_log_dir + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")

In [0]:
%tensorboard --logdir $experiment_log_dir

In [0]:
model.fit(train_ds, epochs=3, callbacks=get_callbacks(is_chief=True, logdir=log_dir, model_save_path=best_model_path, model_chkpt_path=chkpt_model_path))

Distributed Runner

In [0]:
def is_node_chief():
  import os
  import json 
  
  is_chief = True
  if 'TF_CONFIG' in os.environ:
    tf_config = json.loads(os.environ['TF_CONFIG'])
    print(tf_config)
    print(tf_config['task'])
    node_index = tf_config['task']['index']
    is_chief = node_index == 0
    print(f"Node Index: {node_index}, Is Chief: {is_chief}")
    return is_chief

In [0]:
def train():  
  import os
  import mlflow
  import datetime
  
  import tensorflow as tf
  
  print([tf.__version__, tf.config.list_physical_devices('GPU')])
#   monitor_metric = "val_loss"
  logdir = os.path.join(experiment_log_dir, datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
  
  is_chief = is_node_chief()

  train_ds, test_ds, movies_ds, lookups = make_datasets(distributed=True)
  print(lookups)
  
  multi_worker_model = make_model(lookups, movies_ds)
  
  if is_chief:
    mlflow.log_param("num_workers", NUM_SLOTS)
    mlflow.log_param("batch_size", BATCH_SIZE)

#     mlflow.log_param("model_embedding_size", MODEL_EMBEDDING_SIZE)
#     mlflow.log_param("learning_rate", LR)
#     mlflow.log_param("num_epochs", NUM_EPOCHS)
    
    for k,v in lookups.items():
      if v.dtype == "string":
        log_dict_to_mlflow(v.get_vocabulary(), f"inputs/lookups/{k}")


  history = multi_worker_model.fit(train_ds.prefetch(10), 
                                   epochs=int(NUM_EPOCHS),
                                   validation_data=test_ds.prefetch(10),
                                   callbacks=get_callbacks(is_chief, 
                                                           logdir=logdir,
                                                           model_save_path=best_model_path, 
                                                           model_chkpt_path=chkpt_model_path))

  if is_chief:
    for key in history.history.keys():
      [mlflow.log_metric(f"{key}_history", float(l), i) for i, l in enumerate(history.history[key])]
    mlflow.log_param("final_epochs", str(len(history.history[list(history.history.keys())[0]])))

In [0]:
# Use local_mode=True to test on driver node, False will run it on all workers
runner = MirroredStrategyRunner(num_slots=NUM_SLOTS, use_gpu=USE_GPU)
runner.run(train)

Store model to MLFlow

In [0]:
%sh apt-get install -y graphviz

In [0]:
%scala
// Used only for local runs, workaround to get notebook experiment name
dbutils.widgets.text("notebook", dbutils.notebook.getContext().notebookPath.get)

In [0]:
exp_id = mlflow.get_experiment_by_name(dbutils.widgets.get("notebook")).experiment_id
run_id = mlflow.list_run_infos(exp_id)[0].run_id
run_id

create a lookup model

In [0]:
import os

k = 200
movie_model = tf.keras.models.load_model(os.path.join(best_model_path, "movie"))
user_model = tf.keras.models.load_model(os.path.join(best_model_path, "user"))

index = tfrs.layers.factorized_top_k.ScaNN(query_model=user_model, num_reordering_candidates=1000, k=k, name="scann_lookup")

index.index_from_dataset(
  tf.data.Dataset.zip((movies_ds.map(lambda r: r["movie_id"]).batch(100), 
                       movies_ds.map(lambda r: r["title"]).batch(100).map(movie_model)))
)

for row in test_ds.unbatch().batch(10).take(1):
  print(index(row["user_id"]))

index.save(
      f"{best_model_path}/lookup",
      overwrite=True,
      options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
    )

In [0]:
### Verify if we can load the saved lookup model and get recommendations ###
lookup_model = tf.saved_model.load(f"{base_download_path}/models/saved_model/lookup")
for row in test_ds.unbatch().batch(10).take(1):
  print(lookup_model(row["user_id"]))

In [0]:
import os
import json
from mlflow.models.signature import infer_signature

def log_model_mlflow(run_id, tower):
  with mlflow.start_run(run_id=run_id, nested=True):
    loaded_model = tf.keras.models.load_model(os.path.join(best_model_path, tower))
    loaded_model.summary()

    loaded_model.save(f"{tower}")
    mlflow.log_artifact(f"{tower}", "models/saved_model")

    if tower != "lookup":
#     test_ds = tf.data.TFRecordDataset(filenames=generate_filepaths("splits/test")).take(5).map(read_tfrecord_member_interactions)
#       signature = infer_signature(test_ds.batch(1).as_numpy_iterator().next(), loaded_model.predict(test_ds.batch(1).take(1)))
      mlflow.keras.log_model(loaded_model, f"models/mlflow/{tower}") #, signature=signature)

    # Save plots
    tf.keras.utils.plot_model(loaded_model, to_file=f'{tower}_model.png', show_shapes=True, expand_nested=True, show_dtype=True, dpi=172)
    mlflow.log_artifact(f'{tower}_model.png', "plots/model_architecture/")

log_model_mlflow(run_id, "user")
log_model_mlflow(run_id, "movie")
log_model_mlflow(run_id, "lookup")

In [0]:
model_name = "sais22_sample_movie_lookup_model"

client = mlflow.tracking.MlflowClient()
result = mlflow.register_model(
    f"runs:/{run_id}/models/mlflow/lookup",
    model_name
)

print("Name: {}".format(result.name))
print("Version: {}".format(result.version))

client.transition_model_version_stage(
    name=model_name,
    version=result.version,
    stage="Staging"
)

Promote model to Production

In [0]:
eval_metric = "factorized_top_k/top_10_categorical_accuracy"
ACCEPTABLE_THRESHOLD = 0.1

def get_run_id_version_from_stage(model_name, stage):
  for mv in client.search_model_versions(f"name='{model_name}'"):
        if mv.current_stage == stage:
          return mv.run_id, mv.version
  return None, None

staging_run_id, staging_model_version = get_run_id_version_from_stage(model_name, "Staging")
prod_run_id, prod_model_version = get_run_id_version_from_stage(model_name, "Production")

get_metrics = lambda run_id: mlflow.get_run(run_id).data.metrics['val_factorized_top_k/top_5_categorical_accuracy']

checks_passed = True

if prod_run_id:
  diff = get_metrics(prod_run_id) - get_metrics(staging_run_id)
  print(eval_metric, diff)
  checks_passed &= diff <= ACCEPTABLE_THRESHOLD
  
  if checks_passed:
    client.transition_model_version_stage(
          name=model_name,
          version=prod_model_version,
          stage="Archived"
    )

if checks_passed:
    client.transition_model_version_stage(
        name=model_name,
        version=staging_model_version,
        stage="Production"
      )
print(f"Successfully promoted model version {staging_model_version} to Production")

In [0]:
# workaround to load tf saved model instead of mlflow model for custom tf
base_download_path = "/dbfs/tmp/mlflow_download/"
client.download_artifacts(prod_run_id, "models/saved_model/", base_download_path)

In [0]:
def batch_lookup_pandas_udf(batch_df):
  """
  Pandas UDF that loads an index and member model and runs inference based on given UDF

  Parameters
  ----------
  batch_df: A pandas DataFrame

  Returns given dataframe with recommendations and scores
  """
  import numpy as np    
  import tensorflow as tf
  import tensorflow_recommenders as tfrs
  from tensorflow_recommenders.layers.factorized_top_k import ScaNN
  
  lookup_model = tf.saved_model.load(f"{base_download_path}/models/saved_model/lookup")
  dist_batch, styles_batch = lookup_model(batch_df["user_id"])
  batch_df = batch_df.loc[:, ["user_id"]]
  batch_df['recommendations'] = styles_batch.numpy().tolist()
  batch_df['scores'] = dist_batch.numpy().tolist()
  return batch_df

output_schema = StructType([
  StructField("user_id", StringType()),
  StructField("recommendations", ArrayType(StringType())),
  StructField("scores", ArrayType(DoubleType()))
])

Batch Inference with stored model

In [0]:
import pyspark.sql.functions as f
from pyspark.sql import Window
from pyspark.sql.types import *

batch_size = 1000

# for demo purposes, we will load the test data as inference dataframe
inference_df = spark.read.format("tfrecords").load(f"{data_path}/test").repartition(10).select("user_id").distinct()


batched_df = (inference_df.withColumn("batch", (f.row_number().over(Window.orderBy("user_id"))/f.lit(batch_size)).cast(IntegerType()))
                              .repartition(f.col("batch")))

recs_df = batched_df.groupBy("batch").applyInPandas(batch_lookup_pandas_udf, schema=output_schema).cache()
display(recs_df)

user_id,batch
65784,12
65785,12
65831,12
65832,12
65839,12
65842,12
65845,12
65852,12
6587,12
65891,12


Get the top 50 recs for each member

In [0]:
movie_df = spark.read.format("tfrecords").load(f"{data_path}/movies")

user_recs_df = (recs_df
                  .selectExpr("user_id", "posexplode(recommendations)")
                  .selectExpr("user_id", "pos", "col movie_id")
                  .join(movie_df, ["movie_id"])
                  .select("user_id", "title", "pos")
                  .filter("pos <= 50")
                  .sort("user_id", "pos"))
display(user_recs_df)

user_id,title,pos
10000,Oh Boy (A Coffee in Berlin) (2012),0
10000,When the Cat's Away (Chacun cherche son chat) (1996),1
10000,My Giant (1998),2
10000,Fire on the Mountain (1996),3
10000,"Scarlet Letter, The (1995)",4
10000,Diva (1981),5
10000,"Glass Menagerie, The (1950)",6
10000,"Hundred-Foot Journey, The (2014)",7
10000,"Armstrong Lie, The (2013)",8
10000,To the Arctic (2012),9


Using model with streaming data

In [0]:
SERVING_IP = "ENTER IP HERE"
SERVING_PORT = "ENTER PORT VALUE HERE"

def create_tf_serving_json(data):
  """ Convert the data from dataframe into tf serving json"""
  return {'inputs': {name: value if isinstance(value, list) else [value]
                       for name, value in data.items()} 
                       if isinstance(data, dict) else data.tolist()}


def tfserving_score_model(dataset):
  url = f'http://{SERVING_IP}:{SERVING_PORT}/v1/models/lookup:predict'
  data_json = create_tf_serving_json(dataset)
  response = requests.request(method='POST', url=url, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()