# Impoerts

In [0]:
# spark
from pyspark.sql import types as T
from pyspark.sql.functions import udf
from pyspark.sql import functions as F

# similarity
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

# data structs
import pandas as pd
import numpy as np

# functionality
from collections import Counter

# Hyperparameters

In [0]:
# embvedding
model_name = "BAAI/bge-m3"
instructions = "Embed only the name in: "
threshold = 0.79

#data
search_path = "ds_internal_tables.reverse_lookup"

# Util functions

In [0]:
# cosine similarity to probability

def dcision_from_ip(ip,threshold):
        Lambda = (1-threshold)/np.log(0.5)
        passed = np.exp((1-ip)/Lambda)
        return passed
    
@F.pandas_udf("double")
def pandas_ip_to_decision(series: pd.Series) -> pd.Series:
        output = series.apply(lambda items_list:np.sum([dcision_from_ip(item['ip'],threshold=threshold)*item['count'] for item in items_list]))
        return pd.Series(output)  

# what is the ratio of names that have passed the threshold
@F.pandas_udf("int")
def pandas_passed_threhsold(series: pd.Series) -> pd.Series:
        output = series.apply(lambda items_list:np.sum([value['count'] for value in items_list if value['ip']>threshold]))
        return pd.Series(output) 
    
# panda apply embed function

def get_simialrities_score_pd_udf(df: pd.DataFrame) -> pd.DataFrame:

    # original dataframe columns
    cols = df.columns

    # working tempurary dataframe
    tmp = df.copy()

    # search origin name
    tmp['full_name'] = tmp['search_item'].apply(lambda item:item['full_name'])

    # search result names
    tmp['names_count'] = tmp['search_item'].apply(lambda item:Counter([source['_source']['name']['full'] for source in item['lookup_docs']]))
    tmp['names'] = tmp['names_count'].apply(lambda item:list(item.keys()))

    # count the frequency of names
    tmp['counts'] = tmp['names_count'].apply(lambda item:list(item.values()))

    # embed only the unique set and make a map of {name:embedding vector}
    df_explode = tmp[['normalized_number','full_name','names','counts']].explode(['names','counts'])
    items_to_embed = list(name for name in set(tmp['full_name'].to_list()+[name for names in tmp['names'].to_list() for name in names]))
    embeddings = {name:embeddings for name,embeddings in zip(items_to_embed,model.encode([instructions+item for item in items_to_embed],batch_size=512,normalize_embeddings=True).tolist())}

    # map names to vectors
    df_explode['origin_embeddings'] = df_explode['full_name'].map(embeddings)
    df_explode['names_embeddings'] = df_explode['names'].map(embeddings)
    origin = np.array(df_explode['origin_embeddings'].to_list())
    lookup = np.array(df_explode['names_embeddings'].to_list())

    # dot product of origin name vector and search names vectors matrix
    df_explode['ip'] = np.sum(np.multiply(origin,lookup),axis=1).tolist()

    # dictionary of [name,score (ip), frequency (count)]
    df_explode['values'] = df_explode.apply(lambda x:{'name':x['names'],'ip':x['ip'],'count':x['counts']},axis=1)

    # aggregate by the phone number
    output = pd.pivot_table(df_explode,index='normalized_number',values='values',aggfunc=lambda x:x.to_list())

    # total number of occurrences
    output['counts'] = output['values'].apply(lambda list_of_dicts:np.sum([one_dict['count'] for one_dict in list_of_dicts]))

    # weight the score by the counts
    output['ip_sum'] = output['values'].apply(lambda list_of_dicts:np.sum([one_dict['ip']*one_dict['count'] for one_dict in list_of_dicts]))
    output['ip_avg'] = output['ip_sum']/output['counts']

    # return the full dataframe
    df[cols].join(output,on='normalized_number')

    return df[cols].join(output,on='normalized_number')


# pandas apply schema
pandas_apply_added_schema = [T.StructField('values', T.ArrayType(T.StructType([T.StructField('count', T.LongType(), True),
                                                   T.StructField('ip', T.DoubleType(), True),
                                                   T.StructField('name', T.StringType(), True)]), True), True),
                             T.StructField('counts', T.LongType(), True),
                             T.StructField('ip_sum', T.DoubleType(), True),
                             T.StructField('ip_avg', T.DoubleType(), True)]

# Load searches

In [0]:
search_table = spark.read.table(search_path).limit(1000)
N_samples = search_table.count()

# Load model

In [0]:
model = SentenceTransformer(model_name)

Downloading .gitattributes:   0%|          | 0.00/1.57k [00:00<?, ?B/s]

Downloading 1_Pooling/config.json:   0%|          | 0.00/191 [00:00<?, ?B/s]

Downloading README.md:   0%|          | 0.00/15.0k [00:00<?, ?B/s]

Downloading colbert_linear.pt:   0%|          | 0.00/2.10M [00:00<?, ?B/s]

Downloading config.json:   0%|          | 0.00/687 [00:00<?, ?B/s]

Downloading (…)ce_transformers.json:   0%|          | 0.00/123 [00:00<?, ?B/s]

Downloading imgs/.DS_Store:   0%|          | 0.00/6.15k [00:00<?, ?B/s]

Downloading imgs/bm25.jpg:   0%|          | 0.00/69.0k [00:00<?, ?B/s]

Downloading imgs/long.jpg:   0%|          | 0.00/485k [00:00<?, ?B/s]

Downloading imgs/miracl.jpg:   0%|          | 0.00/448k [00:00<?, ?B/s]

Downloading imgs/mkqa.jpg:   0%|          | 0.00/608k [00:00<?, ?B/s]

Downloading imgs/nqa.jpg:   0%|          | 0.00/158k [00:00<?, ?B/s]

Downloading imgs/others.webp:   0%|          | 0.00/21.0k [00:00<?, ?B/s]

Downloading long.jpg:   0%|          | 0.00/127k [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/2.27G [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/54.0 [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading sparse_linear.pt:   0%|          | 0.00/3.52k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/964 [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

Downloading tokenizer_config.json:   0%|          | 0.00/444 [00:00<?, ?B/s]

Downloading modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

# Apply scoring function 

In [0]:
number_of_chunks = max(int(N_samples/2e5),10)
output_schema = T.StructType(search_table.schema.fields + pandas_apply_added_schema)
result_df = search_table.limit(N_samples).repartition(number_of_chunks).groupby(F.spark_partition_id()).applyInPandas(
    get_simialrities_score_pd_udf,
    schema=output_schema
)  

result_df = result_df.withColumn('ip_sum_exp',pandas_ip_to_decision(F.col('values')))
result_df = result_df.withColumn('ip_avg_exp',(F.col('ip_sum_exp')/F.col('counts')))
result_df = result_df.withColumn('passed_threhsold',pandas_passed_threhsold(F.col('values')))
result_df = result_df.withColumn('passed_threhsold',(F.col('passed_threhsold')/F.col('counts')))
result_df = result_df.withColumn('probability',(F.col('passed_threhsold')+F.col('ip_avg_exp'))/2)

# Save data

In [0]:
result_df.write\
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("ds_internal_tables.example_results") 