In [None]:
# ===============================================================================================================#
# Copyright 2024 Infosys Ltd.                                                                          #
# Use of this source code is governed by Apache License Version 2.0 that can be found in the LICENSE file or at  #
# http://www.apache.org/licenses/                                                                                #
# ===============================================================================================================#

## Tool 05 - Embedding Clusters
To experiment with embeddings and their clusters

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

In [None]:
class VectorVisualizer(): 
    
    def __init__(self, doc_vector_list, doc_label_list=None):
        self.__doc_vector_list = doc_vector_list
        self.__doc_label_list = doc_label_list
    
    def plot(self, perplexity=2, query_vector_list=None, query_label_list=None, max_label_len=5):
        doc_vector_list = self.__doc_vector_list
        doc_vector_list_count = len(doc_vector_list)
        doc_labels = self.__doc_label_list
        if not doc_labels:
            doc_labels = [f"D{idx+1}" for idx, x in enumerate(doc_vector_list)]
        else:
            if max_label_len > -1:
                doc_labels = [x[0:max_label_len] + "..." for x in doc_labels]
        
        vector_list = self.__doc_vector_list
        query_labels = query_label_list
        if isinstance(query_vector_list, (np.ndarray, np.generic)):
            vector_list = np.concatenate((vector_list, query_vector_list ), axis=0)
            if not query_labels:
                query_labels = [f"Q{idx+1}" for idx, x in enumerate(query_vector_list)]
            else:
                if max_label_len > -1:
                    query_labels = [x[0:max_label_len] + "..." for x in query_labels]

        labels = doc_labels + query_labels
        tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42)
        embedded_vectors = tsne.fit_transform(vector_list)
        # plt.scatter(embedded_vectors[:, 0], embedded_vectors[:, 1])
        
        X_OFFSET = 5  # Adjust as needed for your plot scale
        Y_OFFSET = 0.5  # Adjust as needed for your plot scale
        
        plt.figure(figsize=(12, 6))
        
        # Check if labels are provided and match the length of vector_list
        if labels and len(labels) == len(vector_list):
            for i, label in enumerate(labels):                
                color = 'black'
                if i >= doc_vector_list_count:
                    color = 'red'                    
                plt.scatter(embedded_vectors[i, 0], embedded_vectors[i, 1], color=color)  
                plt.text(embedded_vectors[i, 0] + X_OFFSET , embedded_vectors[i, 1] + Y_OFFSET, label, color=color)
        
        plt.title(f"t-SNE Plot of {len(vector_list)} vectors | perplexity = {perplexity}")
        plt.xlabel("Dimension 1")
        plt.ylabel("Dimension 2")
        plt.show()        

In [None]:
import os
import abc
from sentence_transformers import SentenceTransformer
from openai import AzureOpenAI

class IEmbeddingProvider(metaclass=abc.ABCMeta):
    """Interface class for embedding provider"""

    @abc.abstractmethod
    def generate_embedding(self, texts: list):
        """Generate embedding for given text"""
        raise NotImplementedError
        
    def convert_to_numpy_array(self, text_embeddings):
        """Convert embeddings to numpy array"""
        # If the embeddings are in list format, convert them to numpy array of shape (1, n)
        if isinstance(text_embeddings, list):
            vector = np.array(text_embeddings, dtype=np.float32).reshape(1, -1)
        else:
            vector = text_embeddings
        return vector

class StEmbeddingProvider(IEmbeddingProvider):
    def __init__(self, model_name:str):
        model_path = rf"C:\MyProgramFiles\AI\models\{model_name}"
        self.st = SentenceTransformer(model_path)
    
    def generate_embedding(self, texts:list):
        embeddings = self.st.encode(texts)
        print(embeddings.shape)
        return embeddings

class OpenAIEmbeddingProvider(IEmbeddingProvider):
    def __init__(self, model_name:str):
        api_key = os.environ["INFY_OPENAI_SECRET_KEY"]
        api_version = "2022-12-01"
        azure_endpoint = os.environ["INFY_OPENAI_SERVER_URL"]
        self.__model_name = model_name
        self.__client = AzureOpenAI(
                api_key=api_key,
                api_version=api_version,
                azure_endpoint=azure_endpoint
            )
   
    def generate_embedding(self, texts:list):
        embeddings_list = []
        for text in texts:
            embeddings = self.__client.embeddings.create(
                input=text, model=self.__model_name)
            embeddings = embeddings.data[0].embedding
            embeddings = self.convert_to_numpy_array(embeddings)
            embeddings_list.append(embeddings)
        
        embeddings = np.concatenate(embeddings_list, axis=0)
        print(embeddings.shape)
        return embeddings

In [None]:
embedding_provider:IEmbeddingProvider = StEmbeddingProvider("all-MiniLM-L6-v2")
# embedding_provider:IEmbeddingProvider = OpenAIEmbeddingProvider("text-embedding-ada-002")

In [None]:
vintage_cars_sentences = [
    "Vintage cars symbolize timeless elegance and craftsmanship.",
    "Each classic car tells a unique historical tale.",
    "Restoring vintage cars is a labor of love.",
    "Vintage car rallies showcase automotive history.",
    "Collecting vintage cars is preserving history."
]
postal_stamps_sentences = [
    "Stamps are gateways to the world's stories.",
    "Collecting stamps: a journey through history.",
    "Each stamp reflects its era's art and culture.",
    "Special edition stamps celebrate global events.",
    "Philately connects generations of history enthusiasts."
]
fishing_sentences = [
    "Fishing offers serene escapes into nature's tranquility.",
    "The thrill of the catch rewards patience.",
    "Fishing adventures lead to remote, beautiful corners.",
    "Techniques vary from fly fishing to deep-sea.",
    "Sustainable fishing practices preserve future abundance."
]
docs = vintage_cars_sentences + postal_stamps_sentences + fishing_sentences
docs_labels = [f"{idx+1} ({x})" for idx, x in enumerate(docs)]

In [None]:
docs_vectors = embedding_provider.generate_embedding(docs)

In [None]:
queries = ["Fishing adventures lead to remote, beautiful corners.", "My car is not working.",
          "Tools help in reducing human effort."]
queries_vector = embedding_provider.generate_embedding(queries)
queries_label = [f"{idx+1} ({x})" for idx, x in enumerate(queries)]

In [None]:
vector_viz = VectorVisualizer(docs_vectors, docs_labels)
for p in range(5,10): # Change as per requirement
    vector_viz.plot(perplexity=p, query_vector_list = queries_vector, 
                    query_label_list = queries_label, max_label_len = 10)