# Michelin Restaurant Dataset: Encoding Text Data

In [0]:
import pandas as pd
import requests
import os
from pyspark.sql.functions import *
from transformers import AutoTokenizer, OpenAIGPTTokenizer
from langchain.text_splitter import RecursiveCharacterTextSplitter
import numpy as np
from numpy.linalg import norm

from sentence_transformers import SentenceTransformer
import mlflow
import mlflow.sentence_transformers

### Catalog, Schema Set up

In [0]:
catalog_ = os.getenv('CATALOG_NAME')
schema_ = os.getenv('SCHEMA_NAME')
spark.sql("USE CATALOG "+catalog_)
spark.sql("USE SCHEMA "+schema_)

### Read Silver Data

In [0]:
silver_df = spark.sql("SELECT * FROM silver_data")
display(silver_df)

In [0]:
%sql
select count(*), count(distinct Res_ID)
from silver_data
where Description is not null;

In [0]:
## Export Silver Data to Parquet
vol_ = "/Volumes/michelin_ga/restaurants/init/silver_data.parquet"
(silver_df
      .write
      .format("parquet")
      .mode("overwrite")
      .save(vol_))

## Encode Restaurant Descriptions

##### Tokenization
Creating a Spark UDF that tokenizes the descriptions, exploding each token to a new row.

In [0]:
## Base function for splitting and tokenizing
max_chunk_size = 200

tokenizer = OpenAIGPTTokenizer.from_pretrained("openai-gpt")
text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(tokenizer, chunk_size=max_chunk_size, chunk_overlap=50)

def tokz_(text, min_chunk_size = 1, max_chunk_size=max_chunk_size):
  if not text:
    return []
  chunks = text_splitter.split_text(text)
  return [c for c in chunks if len(tokenizer.encode(c)) > min_chunk_size]

In [0]:
## Encapsulate function into a PandasUDF
@pandas_udf("array<string>")
def save_tokens(descr: pd.Series) -> pd.Series:
  return descr.apply(tokz_)

In [0]:
%sql
-- Create an empty table with tokens
DROP TABLE IF EXISTS rest_descr_tokenized;
CREATE TABLE IF NOT EXISTS rest_descr_tokenized (
  Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  Res_ID STRING,
  Descr_Tokenized STRING
);

In [0]:
## Generating Tokens for all descriptions
(spark.table("silver_data")
        .filter('Description is not null') # This removes one record
        .select(['Res_ID', 'Description'])
        .withColumn('Descr_Tokenized', explode(save_tokens('Description')))
        .drop('Description')
        .write
        .mode('overwrite')
        .saveAsTable('rest_descr_tokenized'))

display(spark.table('rest_descr_tokenized'))


In [0]:
%sql
select count(*) tot_recs, count(distinct Res_ID) unique_res_Id, min(Res_ID) Res_ID_min, max(Res_ID) Res_ID_max
from rest_descr_tokenized;

##### Embedding
Starting from the different sentences, embed them using a BGE model.
- The BAAI/bge-m3 model was tested but it is designed to return similarity scores, not ideal for pure embedding
- The "bge-base-en-v1.5" embedding model is more indicated and available through Huggingface API [here](https://huggingface.co/BAAI/bge-base-en-v1.5)

For Huggingface models:
- `encode` method is explained [here](https://sbert.net/docs/package_reference/sentence_transformer/SentenceTransformer.html).

In [0]:
# Create and register model

### Import the model
# model = SentenceTransformer("BAAI/bge-m3") # This model is designed to return a set of similarity scores, not doing pure embedding
model = SentenceTransformer("BAAI/bge-base-en-v1.5")

### Generate signature
data_ex = spark.sql("Select Descr_Tokenized from rest_descr_tokenized limit 3").collect()
input_ex = [row['Descr_Tokenized'] for row in data_ex]
output_ex = model.encode(input_ex)
signature = mlflow.models.infer_signature(
    model_input=input_ex,
    model_output=output_ex,
)

# Log & Register the model
model_name_ = f"{catalog_}.{schema_}.ga_bge_base_en"
with mlflow.start_run(run_name="Embedding_Model"):
    logged_model = mlflow.sentence_transformers.log_model(
        model = model,
        registered_model_name = model_name_,
        artifact_path = "ga_bge_base_en",
        signature = signature,
        input_example = input_ex,
    )

loaded_model = mlflow.sentence_transformers.load_model(logged_model.model_uri)

In [0]:
## Local Model
# Load model
mlflow.set_registry_uri("databricks-uc")
model_name__ = "models:/ga_bge_base_en/2"
loaded_model = mlflow.sentence_transformers.load_model(model_name__)

# Define function
def embed_locally(text_):
  return loaded_model.encode(text_)

# Testing
emb = embed_locally("This is an example")
print(emb)
print(emb.shape) # Array of 768

In [0]:
# Embed from Endpoint [This is configured with the BAAI/bge-m3 model]
endpoint_url = 'https://e2-demo-field-eng.cloud.databricks.com/serving-endpoints/ga_embedding/invocations'
scope_name_ = 'michelin_scope'
secret_name_ = 'pat_ga'
PAT_ = dbutils.secrets.get(scope=scope_name_, key=secret_name_)

def embed(text_):
    headers = {'Authorization': f'Bearer {PAT_}'}
    data_dict = {"inputs": [text_]}
    response = requests.post(url=endpoint_url, headers=headers, json=data_dict)
    if response.status_code == 200:
      embedded_data = response.json()['predictions'][0]
      #print(":: Successful!")
      #print(embedded_data)
    else:
      embedded_data = None
      print("Failed! Status code: %s" % (response.status_code))
    return embedded_data

# Testing
emb = embed("This is an example")
print(emb)

In [0]:
### Wrap the function into a Spark UDF
@pandas_udf("array<float>")
def save_embeddings(text_: pd.Series) -> pd.Series:
  #return text_.apply(embed)
  return text_.apply(embed_locally)

In [0]:
%sql
-- Create an empty table with embeddings [This table contains embeddings based on the BAAI/bge-m3 model]
-- DROP TABLE IF EXISTS rest_descr_embedded;
/*
CREATE TABLE IF NOT EXISTS rest_descr_embedded (
  Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  Res_ID STRING,
  Descr_Tokenized STRING,
  Descr_Embedded ARRAY<FLOAT>
);*/

In [0]:
%sql
-- Create an empty table with embeddings [This table contains embeddings based on the BAAI/bge-base-en-v1.5 model]
DROP TABLE IF EXISTS gold_rest_descr_embedded;
CREATE TABLE IF NOT EXISTS gold_rest_descr_embedded (
  Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  Res_ID STRING,
  Descr_Tokenized STRING,
  Descr_Embedded ARRAY<FLOAT>
);

In [0]:
## Generating Embeddings -- Appending new data proceeding in bulks

# Creating a bulk (2k records each)
spark.sql("""
          SELECT * from rest_descr_tokenized where 
          int(right(Res_ID, 5)) >= 12001 and int(right(Res_ID, 5)) <= 16000
          """).createOrReplaceTempView("bulk_v") # 11 mins for 2k records

# Applying the function
(spark.table("bulk_v")
        .select(['Res_ID', 'Descr_Tokenized'])
        .withColumn('Descr_Embedded', save_embeddings('Descr_Tokenized'))
        .write
        .mode('append')
        #.saveAsTable('rest_descr_embedded')
        .saveAsTable('gold_rest_descr_embedded'))

#display(spark.table('rest_descr_embedded'))

##### Data Verification

In [0]:
%sql
-- Verify Tot Counts
select count(*) tot_recs, count(distinct Res_ID) Res_ID_uniques, min(Res_ID), max(Res_ID)
from gold_rest_descr_embedded;

In [0]:
%sql
select count(*) tot_recs, count(distinct Res_ID) Res_ID_uniques, min(Res_ID), max(Res_ID)
from rest_descr_tokenized;

In [0]:
%sql
-- Verify double counts
with d0 as (select Res_Id, count(*) from gold_rest_descr_embedded group by 1 having count(*) > 1)
select Res_Id, Descr_Tokenized
from gold_rest_descr_embedded where Res_Id in (select distinct Res_Id from d0)
order by 1,2;

In [0]:
%sql
-- Verify double counts
with d0 as (select Res_Id, count(*) from rest_descr_tokenized group by 1 having count(*) > 1)
select Res_Id, Descr_Tokenized
from rest_descr_tokenized where Res_Id in (select distinct Res_Id from d0)
order by 1,2;

##### Export Data

In [0]:
# Write to parquet to vol (csv doesn't support arrays)
vol_ = "/Volumes/michelin_ga/restaurants/init/gold_embedded_data.parquet"

emd_data = spark.table("gold_rest_descr_embedded")

(emd_data
      .write
      .format("parquet")
      .mode("overwrite")
      .save(vol_))

## Testing Similarity Search

In [0]:
## Upload data in memory using pandas (single node infra, like in a webapp)
path_ = "/Volumes/michelin_ga/restaurants/init/gold_embedded_data.parquet"

rest_descr_df = pd.read_parquet(path_)
display(rest_descr_df.head())

In [0]:
## Prepare a numpy matrix of all restaurants embeddings
emb_len = 768 #1024
res_ids = []
res_emb = np.zeros((len(rest_descr_df), emb_len), dtype=float)
i = 0
for index, row in rest_descr_df.iterrows():
  res_ids.append(row['Res_ID'])
  res_emb[i] = row['Descr_Embedded']
  i += 1

print(len(res_ids))
print(res_emb.shape)
print(res_emb)

In [0]:
## Test similarity retrieval with a sample sentence
N = 10 # Return N top restaurants
#sample_ = "I am looking for a modern vegan restaurant with a futuristic atmosphere."
#sample_ = "I'd like to eat at any former culinary TV show contestant's restaurant."
sample_ = "I'd like to find a restaurant from any MasterChef winner."

## Encode sample sentence
sample_enb = np.array(embed_locally(sample_))
#print(sample_enb.shape)

## Calculate cos similarity
Cosine_sim = np.dot(res_emb,sample_enb)/(norm(res_emb, axis=1)*norm(sample_enb))
Cosine_sim_df = pd.DataFrame({'Res_IDs': res_ids, 'Cosine_Similarity': Cosine_sim})
Cosine_sim_df = Cosine_sim_df.groupby('Res_IDs').agg(Cosine_Similarity_Max = ('Cosine_Similarity', 'max')).reset_index()

## Join scores with original dataframe
silver_pandas_df = silver_df.toPandas()
Results_df = pd.merge(left = silver_pandas_df, right = Cosine_sim_df, left_on = 'Res_ID', right_on = 'Res_IDs', how = 'inner')
Results_df.sort_values(by = 'Cosine_Similarity_Max', ascending = False, inplace = True)
display(Results_df.head(N))

#display(len(silver_pandas_df))
#display(len(Cosine_sim_df))
#display(len(Results_df))