# Cosine Similarity Example

In [1]:
import requests
from dataclasses import dataclass
from typing import Any, Dict
from sentence_transformers import SentenceTransformer, util

@dataclass
class CosineSimilarity:
    api_token: str
    API_URL: str = "https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2"

    def __post_init__( self ):
        self.model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

    def headers(self) -> Dict[str, str]:
        return {"Authorization": f"Bearer {self.api_token}"}

    def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        response = requests.post(self.API_URL, headers=self.headers(), json=payload)
        return response.json()

    def get_similarity_score(self, gold_intent: str, pred_intent: str) -> float:
        data = self.query(
            {
                "inputs": {
                    "source_sentence": gold_intent,
                    "sentences": [pred_intent]
                }
            })
        return data[0]
    
    def get_cosine_similarity(self, gold_intent: str, pred_intent: str) -> float:

        #Compute embedding for both lists
        embedding_1 = self.model.encode( gold_intent, convert_to_tensor=True)
        embedding_2 = self.model.encode( pred_intent, convert_to_tensor=True)
        sim = util.pytorch_cos_sim(embedding_1, embedding_2).item()
        return sim


    def compare(self, gold_intent: str, pred_intent: str) -> None:
        cosine_sim = round(self.get_similarity_score(gold_intent, pred_intent) * 100, 2)
        print(f"gold: {gold_intent}\npred: {pred_intent}\nmatch: {cosine_sim}%\n")

    def compare_embed(self, gold_intent: str, pred_intent: str) -> None:
        cosine_sim = round(self.get_cosine_similarity(gold_intent, pred_intent) * 100, 2)
        print(f"gold: {gold_intent}\npred: {pred_intent}\nmatch: {cosine_sim}%\n")

print("---[Using HuggingFace API]---")

#Cosine Similarity Example
similarity_checker = CosineSimilarity( api_token="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" )
similarity_checker.compare( "Book Flight", "Book Plane." )
similarity_checker.compare( "Book Flight", "Book Airplane Reservation." )

print("---[Using Local Embeddings]---")

#Embedding Similarity Example
similarity_checker.compare_embed( "Book Flight", "Book Plane." )
similarity_checker.compare_embed( "Book Flight", "Book Airplane Reservation." )

---[Using HuggingFace API]---
gold: Book Flight
pred: Book Plane.
match: 78.49%

gold: Book Flight
pred: Book Airplane Reservation.
match: 74.98%

---[Using Local Embeddings]---
gold: Book Flight
pred: Book Plane.
match: 78.49%

gold: Book Flight
pred: Book Airplane Reservation.
match: 74.98%



# Accuracy Example

In [2]:
import json
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

@dataclass
class EvaluationMetricsDemo:
  pred_file: str
  gold_file: str
  embed_handler: CosineSimilarity 

  def is_match(self, gold_intent: str, pred_intent: list) -> bool:
        return gold_intent == pred_intent
    
  def first_match(self, gold_intent: str, pred_intent: list) -> bool:
     return gold_intent.split()[0] == pred_intent.split()[0]

  def exist(self, gold_intent: str, pred_intent: list) -> bool:
    return len( pred_intent ) > 0 and len( gold_intent ) > 0

  def calculate_accuracy(self) -> None:
    with open(self.pred_file, "r") as pred_f, open(self.gold_file, "r") as gold_f:
      pred_lines = pred_f.readlines()
      gold_lines = gold_f.readlines()
      
      assert len(pred_lines) == len(gold_lines)

    total: float = 0.0
    first_word_correct: float = 0.0
    exact_match: float = 0.0

    for pred_line, gold_line in zip(pred_lines, gold_lines):
        if self.gold_file.endswith("json"):
            gold_intent = json.loads(gold_line)["translation"]["tgt"]
        else:
            gold_intent = gold_line.strip()

        pred_intent = pred_line.strip()
    
        total += 1.0
        if self.first_match(gold_intent, pred_intent):
            first_word_correct += 1.0
        if self.exist( gold_intent, pred_intent ) and self.is_match( gold_intent, pred_intent ):
            exact_match += 1.0

    first_word_correct = round( first_word_correct / total * 100, 2 )
    exact_match = round( exact_match / total * 100, 2 )

    return first_word_correct, exact_match

  def calculate_bleu_score(self) -> None:
    smoothie = SmoothingFunction().method1 
    with open(self.pred_file, "r") as pred_f, open(self.gold_file, "r") as gold_f:
        pred_lines = pred_f.readlines()
        gold_lines = gold_f.readlines()

        assert len(pred_lines) == len(gold_lines)

    total: float = 0.0
    blue_scores: list = []

    for pred_line, gold_line in zip(pred_lines, gold_lines):
        if self.gold_file.endswith("json"):
            gold_intent = json.loads(gold_line)["translation"]["tgt"]
        else:
            gold_intent = gold_line.strip()
        pred_intent = pred_line.strip()

        total += 1.0
        reference = [gold_intent.split()]
        hypothesis = pred_intent.split()
        blue_scores.append(sentence_bleu(reference, hypothesis, smoothing_function=smoothie))

    blue_score = sum(blue_scores) / total * 100
    
    return blue_score
  
  def jaccard_similarity(self, label1: str, label2: str) -> float:
    # Tokenize the intent labels
    tokens1 = set(label1.split())
    tokens2 = set(label2.split())

    # Calculate Jaccard similarity
    intersection = len(tokens1.intersection(tokens2))
    union = len(tokens1.union(tokens2))
    similarity = intersection / union if union != 0 else 0.0

    return similarity

  def calculate_jaccard_similarity(self) -> None:
    with open(self.pred_file, "r") as pred_f, open(self.gold_file, "r") as gold_f:
        pred_lines = pred_f.readlines()
        gold_lines = gold_f.readlines()

        assert len(pred_lines) == len(gold_lines)

    total: float = 0.0
    jaccard_scores: list = []

    for pred_line, gold_line in zip(pred_lines, gold_lines):
        if self.gold_file.endswith("json"):
            gold_intent = json.loads(gold_line)["translation"]["tgt"]
        else:
            gold_intent = gold_line.strip()
        pred_intent = pred_line.strip()

        total += 1.0
        jaccard_scores.append( self.jaccard_similarity( gold_intent, pred_intent ) )

    jaccard_score = sum(jaccard_scores) / total * 100
    
    return jaccard_score
  
  def cosine_similarity(self, label1: str, label2: str) -> float:
    return self.embed_handler.get_cosine_similarity( label1, label2 )

  def calculate_cosine_similarity(self) -> None:
    with open(self.pred_file, "r") as pred_f, open(self.gold_file, "r") as gold_f:
        pred_lines = pred_f.readlines()
        gold_lines = gold_f.readlines()

        assert len(pred_lines) == len(gold_lines)

    cosine_scores: int = 0
    total: float = 0.0

    for pred_line, gold_line in zip(pred_lines, gold_lines):
        if self.gold_file.endswith("json"):
            gold_intent = json.loads(gold_line)["translation"]["tgt"]
        else:
            gold_intent = gold_line.strip()
        pred_intent = pred_line.strip()
        total += 1.0

        score = self.cosine_similarity( gold_intent, pred_intent )
        if score >= 0.70:
            cosine_scores += 1

    return cosine_scores / total
    

  def evaluate(self) -> dict:
    accuracy = self.calculate_accuracy()
    bleu_score = self.calculate_bleu_score()
    jaccard_score = self.calculate_jaccard_similarity()
    cosine_scores = self.calculate_cosine_similarity()

    metrics = {
        'accuracy':  {
            'first_word':  accuracy[0],
            'exact_match': accuracy[1]
        },
        'bleu_score': bleu_score,
        'jaccard_score': jaccard_score,
        'cosine_similarity': cosine_scores
    }

    return metrics

In [3]:
metrics = EvaluationMetricsDemo( 
    embed_handler=similarity_checker,
    gold_file="SNIPS/Model_3 results/Labels_gold_silver_model_3_1_full_resource.txt",
    pred_file="SNIPS/Model_3 results/Preds_gold_silver_model_3_1_full_resource.txt"
    )
metrics.evaluate()

{'accuracy': {'first_word': 98.57, 'exact_match': 96.71},
 'bleu_score': 40.77993336786933,
 'jaccard_score': 97.08571428571437,
 'cosine_similarity': 0.9671428571428572}

In [4]:
first_word_correct, exact_match = metrics.calculate_accuracy()
print( f"First Word Accuracy: {first_word_correct}%" )
print( f"Exact Match Accuracy: {exact_match}%" )

First Word Accuracy: 98.57%
Exact Match Accuracy: 96.71%


In [5]:
blue_score = metrics.calculate_bleu_score()
print( f"BLEU Score: {blue_score}%" )

BLEU Score: 40.77993336786933%


In [6]:
avg_jaccard = metrics.calculate_jaccard_similarity()
print( f"Average Jaccard Similarity: {avg_jaccard}%" )

Average Jaccard Similarity: 97.08571428571437%


# Run Across All Files
---
Check accuracy across all n-shot settings.

In [19]:
import os

def evaluate_pred( dir_path: str, label: str, pred: str ) -> dict:
    metrics = EvaluationMetricsDemo(
            embed_handler=similarity_checker,
            gold_file=f"{dir_path}/{label}",
            pred_file=f"{dir_path}/{pred}"
    )
    return metrics.evaluate()

def get_results( path: str ) -> dict:

    all_results = {}
    all_files = os.listdir( path )
    labels = [ file for file in all_files if "true_labels" in file ]

    for label in labels:
        with open( f"{path}/{label}", "r" ) as f:
            if len( f.readlines() ) < 5:
                labels.remove( label )

    preds  = [ file.replace( "true", "predicted" ) for file in labels ]
    for label, pred in zip( labels, preds ):
        print( f"Label: {label}\nPred: {pred}")
        results = {
            "labels_file": label,
            "preds_file": pred,
            "results": evaluate_pred( path, label, pred )
        } 
        all_results[ f"{label.split('_')[0]}_{label.split('_')[1]}" ] = results
    return all_results

def write_results( path: str, results: dict ) -> None:
    json_d = { result: metrics 
        for result, metrics in results.items() }
    
    with open( f"{path}/results.json", "w" ) as f:
        json.dump( json_d, f, indent=4 )

        
ATIS  = "../../analysis/ATIS"
SNIPS = "../../analysis/SNIPS/model_2"
WEATH = "../../analysis/TOPS_Weather"
REMIN = "../../analysis/TOPS_Reminder"

results = get_results( f"{SNIPS}" )
write_results( f"{SNIPS}", results )


#get all folders under ATIS
# folders = os.listdir( SNIPS )
# for folder in folders:
#     print(f"Processing {folder}")
#     results = get_results( f"{SNIPS}/{folder}" )
#     write_results( f"{SNIPS}/{folder}", results )

Label: eight_shot_true_labels.txt
Pred: eight_shot_predicted_labels.txt
Label: four_shot_true_labels.txt
Pred: four_shot_predicted_labels.txt
Label: full_resource_true_labels.txt
Pred: full_resource_predicted_labels.txt


AssertionError: 

In [15]:
import kaleido
import os
from sklearn.metrics import confusion_matrix
import pandas as pd
import numpy as np

import plotly.figure_factory as ff

def load_intents_list(intents_file):
    with open(intents_file, 'r') as intent_preds:
        return [intent.strip() for intent in intent_preds.readlines()]

def get_cleaned_intents(gold: list, pred: list) -> tuple:
    intent_set = set(gold)
    pred_intents = pred.copy()
    for idx, pred in enumerate(pred):
        if pred not in intent_set:
            pred_intents[idx] = "ε"
    return pred_intents, intent_set

def create_confusion_matrix(pred: list, gold: list, intents: list) -> pd.DataFrame:
    data = confusion_matrix(gold, pred, labels=intents)
    df_cm = pd.DataFrame(
        data,
        columns=intents,
        index=intents
    )
    df_cm.index.name = "True Intent"
    df_cm.columns.name = "Predicted Intent"
    return df_cm

gold_path = "../../analysis/ATIS/model_1/one_shot_true_labels.txt"
pred_path = "../../analysis/ATIS/model_1/one_shot_predicted_labels.txt"

gold_intents = load_intents_list(gold_path)
pred_intents = load_intents_list(pred_path)

pred_intents, intent_set = get_cleaned_intents(gold_intents, pred_intents)

confmat_name = "ATIS"
out_path = "confmat_figures"
if not os.path.exists(out_path):
    os.makedirs(out_path)

confusion_matrix = create_confusion_matrix(pred_intents, gold_intents, list(intent_set))

fig = ff.create_annotated_heatmap(
    confusion_matrix.to_numpy(),
    colorscale='Blues',
    x=list(confusion_matrix.columns),
    y=list(confusion_matrix.index)
)

# Move graph down so that it doesn't overlap with title
fig.update_layout(
    height=600,
    width=800,
    title_text="<b>Confusion Matrix</b>",
    margin=dict(t=500),
)
fig.write_image(os.path.join(out_path, f"conf_mat_{confmat_name}_1shot.png"))

ValueError: Shape of passed values is (17, 17), indices imply (16, 16)

In [None]:
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
from collections import defaultdict
import pandas as pd 
import plotly.figure_factory as ff
import numpy as np

def load_intents_list(intents_file):
    intents = []
    with open(intents_file, 'r') as intent_preds:
        intents.append(intent.strip() for intent in intent_preds)
    return intents

def create_confusion_matrix( path: str, pred: str, gold: str ) -> pd.DataFrame:
    pred_intents = load_intents_list( f"{path}/{pred}" )
    gold_intents = load_intents_list( f"{path}/{gold}" )

    pred_intents = get_cleaned_intents( gold_intents, pred_intents )
    data = confusion_matrix(gold_intents, pred_intents)
    df_cm = pd.DataFrame(data, columns=np.unique(gold_intents), index = np.unique(gold_intents))
    df_cm.index.name = "True Intent"
    df_cm.columns.name = "Predicted Intent"
    return df_cm


def get_cleaned_intents( gold: list, pred: list ) -> list:
    # replace all bad generated intents with BAD GENERATION
    intent_set = set( gold )
    pred_intents = pred.copy()
    for idx, pred in enumerate( pred ):
        if pred not in intent_set:
            pred_intents[idx] = "ε"
    return pred_intents


# non-normalized figure
fig = ff.create_annotated_heatmap(df_cm.to_numpy(), colorscale='Blues', x=list(df_cm.columns), y=list(df_cm.index))
fig.update_layout(height=600, width=800, title_text="<b>Confusion Matrix</b>")
fig.show()

# normalized figure
df_cmn = df_cm.astype('float') / np.array(df_cm.sum(axis=1))[:, np.newaxis]

df_cmn.index.name = "True Intent"
df_cmn.columns.name = "Predicted Intent"
fig = ff.create_annotated_heatmap(df_cmn.to_numpy(), colorscale='Blues', x=list(df_cmn.columns), y=list(df_cmn.index))
fig.update_layout(height=600, width=800, title_text="<b>Normalized Confusion Matrix</b>")
fig.show()

{'Rate Book.', 'Search Creative Work.', 'Search Screening Event.', 'Get Weather.', 'Play Music.', 'Add To Playlist.', 'Book Restaurant.'} {'Rate Book.', 'Search Creative Work.', 'Search Screening Event.', 'Get Weather.', 'ε', 'Play Music.', 'Add To Playlist.', 'Book Restaurant.'}
Precisions:  [1.         1.         0.984375   0.66666667 1.         0.37219731
 0.72881356 0.00625   ]
Recalls:  [0.89 0.62 0.63 0.12 0.26 0.83 0.43 1.  ]
F1 scores:  [0.94179894 0.7654321  0.76829268 0.20338983 0.41269841 0.51393189
 0.5408805  0.01242236]
<function confusion_matrix at 0x000002497EF22320>
