In [46]:
import os
import os.path

import re
import requests
import json
import warnings
import math
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer

### Loading HuggingFace Token

In [47]:
if os.getenv("COLAB_RELEASE_TAG"):
    COLAB = True
    from google.colab import userdata
    HF_TOKEN = userdata.get('HF_TOKEN')
else:
    COLAB = False
    from dotenv import load_dotenv
    load_dotenv()
    HF_TOKEN = os.getenv('HF_TOKEN')

### Embedding Model IDs

In [48]:
model_ids = [
    "sentence-transformers/all-MiniLM-L6-v2",
    "BAAI/bge-large-en-v1.5",
    "intfloat/multilingual-e5-large"
]

### Utils

In [49]:
# Converts model_id into filenames
def model_id_to_filename( model_id ):
    return model_id.split('/')[1].lower()

### Calculate Embeddings

In [50]:
def calculate_embeddings( texts, model_id ):
    # Warning in case of prompts longer than 256 words
    for t in texts :
        n_words = len( re.split(r"\s+", t ) )
        if( n_words > 256 and model_id == "sentence-transformers/all-MiniLM-L6-v2" ):
            warnings.warn( "Warning: Sentence provided is longer than 256 words. Model all-MiniLM-L6-v2 expects sentences up to 256 words." )
            warnings.warn( "Word count: {}".format( n_words ) )

    if( model_id == 'sentence-transformers/all-MiniLM-L6-v2' ):
        model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
        out = model.encode( texts ).tolist()
    else:
        # api_url = f"https://api-inference.huggingface.co/models/{model_id}"
        api_url = f"https://router.huggingface.co/hf-inference/models/{model_id}/pipeline/feature-extraction"
        headers = {"Authorization": f"Bearer {HF_TOKEN}", "Content-Type": "application/json"}
        print( "Request url: " + api_url )
        response = requests.post(api_url, headers=headers, json={"inputs": texts })
        out = response.json()
        
    if( 'error' in out ):
        return out
    while( len( out ) < 384 ):
        out = out[0]
    return out

### Calculate Centroid Vector

In [51]:
def get_centroid( cluster, dimension = 384, k = 10 ):
    centroid = [0] * dimension
    count = 0
    for prompt in cluster['prompts']:
        i = 0
        while i < len( prompt['embedding'] ):
            centroid[i] += prompt['embedding'][i]
            i += 1
        count += 1
        
    i = 0
    while i < len( centroid ):
        centroid[i] /= count
        i += 1

    return centroid

### Populating JSON Files

In [None]:
if( COLAB ):
    json_folder = 'https://raw.githubusercontent.com/IBM/responsible-prompting-api/refs/heads/main/prompt-sentences-main/'
else:
    json_folder = '../prompt-sentences-main/'

json_in_file = json_folder + 'prompt_sentences.json'

# Trying to open the files first
if( COLAB ):
    prompt_json_in = requests.get( json_in_file ).json()
    print( 'Opening file from GitHub repo: ', json_in_file )
else:
    if( os.path.isfile( json_in_file ) ):
        prompt_json_in = json.load( open( json_in_file ) )
        print( 'Opening existing file locally: ', json_in_file )

for model_id in model_ids:

    json_out_file_suffix = model_id_to_filename( model_id )
    json_out_file = f"{json_folder}prompt_sentences-{json_out_file_suffix}.json"

    # Trying to open the files first
    if( COLAB ):
        prompt_json_out = requests.get( json_out_file ).json()
        print( 'Opening file from GitHub repo: ', json_out_file )
    else:
        if( os.path.isfile( json_out_file ) ):
            prompt_json_out = json.load( open( json_out_file ) )
            print( 'Opening existing file locally: ', json_out_file )
        else:
            # Creating an empty file for new transformer
            print( 'Starting a file from scratch for model: ', model_id )

    # API request test
    api_response_dimensions = len( calculate_embeddings( ['testing API endpoint'], model_id ) )
    print( f"Dimensions from hugging face API response: {api_response_dimensions}" )
    json_file_dimensions = len( prompt_json_out['positive_values'][0]['prompts'][0]['embedding'] )
    print( f"Dimensions from json file: {json_file_dimensions}" )
    if( api_response_dimensions != json_file_dimensions ):
        warnings.warn( f"Dimensions are different: API={api_response_dimensions} while JSON sentences file={json_file_dimensions}" )

prompts_embeddings = {}
new_prompts = 0
old_prompts = 0
errors = 0
successes = 0

for cluster in prompt_json_out['positive_values']:
    for prompt in cluster['prompts']:
        if( prompt['embedding'] != [] ):
            prompts_embeddings[ prompt['text'] ] = prompt['embedding']

for cluster in prompt_json_out['negative_values']:
    for prompt in cluster['prompts']:
        if( prompt['embedding'] != [] ):
            prompts_embeddings[ prompt['text'] ] = prompt['embedding']

# Loading all prompts from prompt_json_in, potentially with new/changed sentences

# Iterate over the two positive and negative lists
for cluster in prompt_json_in['positive_values']:
    for prompt in cluster['prompts']:
        if( prompt['text'] in prompts_embeddings ):
            # Prompt found, no need to request embeddings
            prompt['embedding'] = prompts_embeddings[ prompt['text'] ]
            old_prompts += 1
        else:
            # Requesting embedding for new/changed prompt
            embedding = calculate_embeddings( prompt['text'], model_id )
            if( 'error' in embedding ):
                errors += 1
            else:
                # Add the new/changed prompt to the hashmap
                prompts_embeddings[ prompt['text'] ] = embedding

                # Using the new hash
                prompt['embedding'] = prompts_embeddings[ prompt['text'] ]
                successes += 1
            new_prompts += 1

for cluster in prompt_json_in['negative_values']:
    for prompt in cluster['prompts']:
        if( prompt['text'] in prompts_embeddings ):
            # Prompt found, no need to request embeddings
            prompt['embedding'] = prompts_embeddings[ prompt['text'] ]
            old_prompts += 1
        else:
            # Requesting embedding for new/changed prompt
            embedding = calculate_embeddings( prompt['text'], model_id )
            if( 'error' in embedding ):
                errors += 1
            else:
                # Add the new/changed prompt to the hashmap
                prompts_embeddings[ prompt['text'] ] = embedding

                # Using the new hash
                prompt['embedding'] = prompts_embeddings[ prompt['text'] ]
                successes += 1
            new_prompts += 1

print( 'Old prompts: ', old_prompts )
print( 'New prompts: ', new_prompts )
print( 'Errors: ', errors )
print( 'Successes: ', successes )

# After all the embeddings are populated (with no errors), compute the centroids for each value
if( errors == 0 ):
    print( 'Updating centroids.' )
    for cluster in prompt_json_in['positive_values']:
        cluster['centroid'] = get_centroid( cluster, json_file_dimensions, 10 )
    for cluster in prompt_json_in['negative_values']:
        cluster['centroid'] = get_centroid( cluster, json_file_dimensions, 10 )

# Saving the embeddings for a specific LLM
if( COLAB ):
    json_out_file = f"prompt_sentences-{json_out_file_suffix}.json"

with open( json_out_file, 'w') as outfile:
    print( 'Saving into file: ', json_out_file )
    json.dump( prompt_json_in, outfile)
    print( '\n' )

### Metric 1 - Inter-Cluster Centroid Distance

In [53]:
## Inter-cluster centroid distance
# Maximum distance between two nearest centroids

def calculate_metric_1(model_id):

    json_out_file_suffix = model_id_to_filename(model_id)
    json_out_file = os.path.join("..", "prompt-sentences-main", f"prompt_sentences-{json_out_file_suffix}.json")

    with open(json_out_file, 'r') as infile:
        populated_json_file = json.load(infile)

    ## accumulating centroid vectors from positive and negative clusters
    centroid_vectors = []

    ## positive clusters
    for cluster in populated_json_file['positive_values']:
        centroid_vectors.append(cluster['centroid'])

    ## negative clusters
    for cluster in populated_json_file['negative_values']:
        centroid_vectors.append(cluster['centroid'])

    distances = []

    for i, current_centroid in enumerate(centroid_vectors):
        for j, other_centroid in enumerate(centroid_vectors):
            if i != j:
                distance = np.linalg.norm(np.array(current_centroid) - np.array(other_centroid))
                distances.append(distance)

    return max(distances) if distances else 0

In [54]:
metric_1_scores = {}

for model_id in model_ids:
    score = calculate_metric_1(model_id)
    metric_1_scores[model_id_to_filename(model_id)] = score

df_metric_1 = pd.DataFrame(list(metric_1_scores.items()), columns=['model_id', 'inter_cluster_centroid_distance'])
print(df_metric_1)

                model_id  inter_cluster_centroid_distance
0       all-minilm-l6-v2                         1.284927
1      bge-large-en-v1.5                         1.003046
2  multilingual-e5-large                         0.636389


### Metric 2 - Misclassification Rate

In [55]:
## Misclassification Rate
# If centroid of another cluster is closer to a sentence embedding than its own cluster centroid, it is misclassified.

def calculate_metric_2(model_id):

    json_out_file_suffix = model_id_to_filename(model_id)
    json_out_file = os.path.join("..", "prompt-sentences-main", f"prompt_sentences-{json_out_file_suffix}.json")

    with open(json_out_file, 'r') as infile:
        populated_json_file = json.load(infile)

    ## accumulating centroid vectors from positive and negative clusters
    centroid_vectors = []

    ## positive clusters
    for cluster in populated_json_file['positive_values']:
        centroid_vectors.append(cluster['centroid'])

    ## negative clusters
    for cluster in populated_json_file['negative_values']:
        centroid_vectors.append(cluster['centroid'])

    positive_misclassified_count = 0
    negative_misclassified_count = 0

    ## positive clusters
    for cluster in populated_json_file['positive_values']:

        assigned_centroid = cluster['centroid']
        for prompt in cluster['prompts']:

            prompt_embedding = prompt['embedding']

            own_distance = np.linalg.norm(np.array(prompt_embedding) - np.array(assigned_centroid))

            for other_centroid in centroid_vectors:
                if other_centroid != assigned_centroid:
                    other_distance = np.linalg.norm(np.array(prompt_embedding) - np.array(other_centroid))
                    if other_distance < own_distance:
                        positive_misclassified_count += 1
                        break

    ## negative clusters
    for cluster in populated_json_file['negative_values']:

        assigned_centroid = cluster['centroid']
        for prompt in cluster['prompts']:

            prompt_embedding = prompt['embedding']

            own_distance = np.linalg.norm(np.array(prompt_embedding) - np.array(assigned_centroid))

            for other_centroid in centroid_vectors:
                if other_centroid != assigned_centroid:
                    other_distance = np.linalg.norm(np.array(prompt_embedding) - np.array(other_centroid))
                    if other_distance < own_distance:
                        negative_misclassified_count += 1
                        break

    total_misclassified_count = positive_misclassified_count + negative_misclassified_count
    return total_misclassified_count

In [56]:
metric_2_scores = {}

for model_id in model_ids:
    metric_2_scores[model_id_to_filename(model_id)] = calculate_metric_2(model_id)

df_metric_2 = pd.DataFrame(list(metric_2_scores.items()), columns=['model_id', 'misclassification_rate'])
print(df_metric_2)

                model_id  misclassification_rate
0       all-minilm-l6-v2                     729
1      bge-large-en-v1.5                     694
2  multilingual-e5-large                     744


### Metric 3 - Intra-Cluster K-Means Distance

In [57]:
## Intra-cluster K-Means Distance
# Measures the average distance between each point and its assigned cluster centroid.

def calculate_metric_3(model_id):

    json_out_file_suffix = model_id_to_filename(model_id)
    json_out_file = os.path.join("..", "prompt-sentences-main", f"prompt_sentences-{json_out_file_suffix}.json")

    with open(json_out_file, 'r') as infile:
        populated_json_file = json.load(infile)
    
    ## accumulating centroid vectors from positive and negative clusters
    centroid_vectors = []

    ## positive clusters
    for cluster in populated_json_file['positive_values']:
        centroid_vectors.append(cluster['centroid'])

    ## negative clusters
    for cluster in populated_json_file['negative_values']:
        centroid_vectors.append(cluster['centroid'])

    k_means_distances = []

    ## positive_clusters
    for cluster in populated_json_file['positive_values']:
        
        centroid_vector = cluster['centroid']
        distances = []

        for prompt in cluster['prompts']:
            prompt_embedding = prompt['embedding']
            distance = np.linalg.norm(np.array(prompt_embedding) - np.array(centroid_vector))
            distances.append(distance)

        k_means_distances.append(np.mean(distances))

    ## negative_clusters
    for cluster in populated_json_file['negative_values']:

        centroid_vector = cluster['centroid']
        distances = []

        for prompt in cluster['prompts']:
            prompt_embedding = prompt['embedding']
            distance = np.linalg.norm(np.array(prompt_embedding) - np.array(centroid_vector))
            distances.append(distance)

        k_means_distances.append(np.mean(distances))

    ## negative_clusters
    for cluster in populated_json_file['negative_values']:

        centroid_vector = cluster['centroid']
        distances = []

        for prompt in cluster['prompts']:
            prompt_embedding = prompt['embedding']
            distance = np.linalg.norm(np.array(prompt_embedding) - np.array(centroid_vector))
            distances.append(distance)

        k_means_distances.append(np.mean(distances))

    return np.max(k_means_distances)

In [58]:
metric_3 = {}

for model_id in model_ids:
    metric_3[model_id_to_filename(model_id)] = calculate_metric_3(model_id)

df_metric_3 = pd.DataFrame(list(metric_3.items()), columns=['model_id', 'intra_cluster_k_means_distance'])
print(df_metric_3)

                model_id  intra_cluster_k_means_distance
0       all-minilm-l6-v2                        0.868673
1      bge-large-en-v1.5                        0.678656
2  multilingual-e5-large                        0.423670


### Embedding Model Evaluation

In [61]:
## Combining all metrics

def evaluate_embedding_model(model_id):

    metric_1 = calculate_metric_1(model_id)

    metric_2 = calculate_metric_2(model_id)

    metric_3 = calculate_metric_3(model_id)

    return {
        "model_id": model_id_to_filename(model_id),
        "metric_1": metric_1,
        "metric_2": metric_2,
        "metric_3": metric_3
    }

In [62]:
embedding_model_scores = []

for model_id in model_ids:
    scores = evaluate_embedding_model(model_id)
    embedding_model_scores.append(scores)

df_embedding_model_scores = pd.DataFrame(embedding_model_scores)
print(df_embedding_model_scores)

                model_id  metric_1  metric_2  metric_3
0       all-minilm-l6-v2  1.284927       729  0.868673
1      bge-large-en-v1.5  1.003046       694  0.678656
2  multilingual-e5-large  0.636389       744  0.423670
