In [None]:
from pipeline.DBHandler import DBHandler
import cohere
import google.generativeai as genai
from sentence_transformers import SentenceTransformer

# semantic chunking imports
from semantic_router.splitters import RollingWindowSplitter
from semantic_router.encoders import CohereEncoder, OpenAIEncoder
from semantic_router.utils.logger import logger

logger.setLevel("WARNING")  # reduce logs from splitter
import warnings
warnings.filterwarnings("ignore")

from typing import Union

import os
from dotenv import load_dotenv
load_dotenv()
co = cohere.Client(api_key=os.getenv('COHERE_API_KEY'))


In [None]:
def semantic_chunking(encoder: Union[type(CohereEncoder), type(OpenAIEncoder())], directory_path: str, score_threshold: float = 0.4) -> list:
	"""
    Use the semantic chunking to split the documents into semantic chunks
    Args:
        encoder: an embedding model to use for the semantic chunking
        directory_path (str): path to the directory containing the documents
        score_threshold (float): the score threshold for the encoder below which the split is made, between 0 and 1
    Returns:
        splits (list): list of the semantic chunks
    """
	encoder.score_threshold = score_threshold
	splitter = RollingWindowSplitter(
		# Todo: adjust the parameters according to the dataset
		encoder=encoder,
		dynamic_threshold=False,
		min_split_tokens=150,
		max_split_tokens=400,
		window_size=5,
		plot_splits=True,
		enable_statistics=True
	)

	splits = []
	for file_name in os.listdir(directory_path):
		print(file_name)
		file = open(f'{directory_path}/{file_name}', "r")
		example_faq = file.read()
		file.close()

		current_splits = splitter([example_faq])
		complete_current_splits = []

		for i in range(len(current_splits)-1):
			# for more context, add 200 chars from the previous and next splits
			if i == 0:
				split_to_add = {
					'text': ' '.join(current_splits[i].docs + current_splits[i + 1].docs[:200]),
					'origin_file': file_name
				}
				complete_current_splits.append(split_to_add)
			elif i + 1 == len(current_splits):
				split_to_add = {
					'text': ' '.join(current_splits[i - 1].docs[-200:] + current_splits[i].docs),
					'origin_file': file_name
				}
				complete_current_splits.append(split_to_add)
			else:
				split_to_add = {
					'text': ' '.join(current_splits[i - 1].docs[-200:] + current_splits[i].docs + current_splits[i + 1].docs[:200]),
					'origin_file': file_name
				}
				complete_current_splits.append(split_to_add)

		splits.extend(complete_current_splits)
	return splits

In [None]:
doc_number = len(os.listdir('data/docs'))
if doc_number >= 50:
	splits = semantic_chunking(CohereEncoder(), 'data/docs', 0.4)
else:
	print(f'The number of documents is {doc_number}')

In [8]:
def cohere_embedding(text: str, model) -> list:
	"""
	Use the Google Embedding API to embed the text
	Args:
		text (str): the text to embed
		model (str): the name of the model to use for the embedding
	Returns:
		embedding (list): the embedding vector of the text
	Raises:
		Exception: if there is an error in embedding the text
	"""
	try:
		embedding = list(model.encode(text))
		embedding = [float(num) for num in embedding]
	except Exception as e:
		raise Exception(f'Error in embedding the text: {e}')

	return embedding

def google_embedding(text: str, model: str = 'models/text-embedding-004') -> list:
	"""
	Use the Google Embedding API to embed the text
	Args:
		text (str): the text to embed
		model (str): the name of the model to use for the embedding, either 'models/text-embedding-004' or 'models/embedding-001'
	Returns:
		embedding (list): the embedding vector of the text
	Raises:
		Exception: if there is an error in embedding the text
	"""
	try:
		embedding = genai.embed_content(model=model, content=text, task_type='retrieval_document')
	except Exception as e:
		raise Exception(f'Error in embedding the text: {e}')
		
	return embedding['embedding']

def create_chunks(splits, emb_type: str) -> list:
	"""
	Create the chunks of the documents and embed them
	Args:
		splits (list): list of the splits of the documents
		emb_type (str): the type of the embedding to use, either 'emb1', 'emb2', or 'emb3'
	Returns:
		chunks (list): list of the chunks (dicts) with their embeddings
	"""
	chunks = []
	if emb_type == 'emb1':
		model = 'models/text-embedding-004'
		for split in splits:
			embedding = google_embedding(split['text'], model)
			chunk = {
				'text': split['text'],
				'embedding': embedding,
				'origin_file': split['origin_file']
			}
			chunks.append(chunk)
		return chunks
	
	if emb_type == 'emb2':
		model = 'models/embedding-001'
		for split in splits:
			embedding = google_embedding(split['text'], model)
			chunk = {
				'text': split['text'],
				'embedding': embedding,
				'origin_file': split['origin_file']
			}
			chunks.append(chunk)
		return chunks
		
	if emb_type == 'emb3':
		model = SentenceTransformer('all-MiniLM-L6-v2')
		for split in splits:
			embedding = list(cohere_embedding(split['text'], model))
			embedding = [float(num) for num in embedding]
			chunk = {
				'text': split['text'],
				'embedding': embedding,
				'origin_file': split['origin_file']
			}
			chunks.append(chunk)	
		return chunks


In [11]:
for emb in ['emb1', 'emb2', 'emb3']:
	print(f'Creating embeddings for {emb}')
	handler = DBHandler(org_id=f'maccabi_{emb}', user_id='evaluator')
	chunks = create_chunks(splits, emb)
	print(f'Updating the database with the embeddings for {emb}')
	handler.update('embeddings', chunks)

Creating embeddings for emb1
Updating the database with the embeddings for emb1
Creating embeddings for emb2
Updating the database with the embeddings for emb2
Creating embeddings for emb3
Updating the database with the embeddings for emb3
