In [None]:
!pip install gradio
!pip install chromadb==0.5.11
!pip install sentence-transformers==3.1.1
!pip install evaluate bert_score

In [None]:
from chromadb.config import Settings
from evaluate import load
from typing import Any
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split

In [None]:
import pandas as pd
import numpy as np
import statistics
import pinecone
import glob
import os

In [None]:
from sentence_transformers import SentenceTransformer

#Лабораторная работа №6

##Declaring constant

In [None]:
# Параметры конфигурации для векторного поиска и разделения текста
INDEX_NAME = "VDB"  # Название индекса для хранения векторных представлений
EMBEDDINGS = 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2'  # Название модели эмбеддингов, используемой для векторизации текстов
SIZE = 250  # Размер фрагмента текста для разделения документов
OVERLAP = 50  # Перекрытие между фрагментами текста для обеспечения контекста
QA_MODEL_NAME = "deepset/roberta-base-squad2"

In [None]:
from transformers import pipeline

qa_model = pipeline('question-answering', model=QA_MODEL_NAME, tokenizer=QA_MODEL_NAME, device="cuda")

##Loader

In [None]:
import itertools
import re

pattern = r"{price_pattern}|{abbr_patterns}|({phone_pattern})|({email_pattern})|(\'?[\w\-]+)|([^A-Za-z0-9 \n])"
sent_pattern = r"((?<=\.|\?|!|\;))({abbr_patterns})\s"

phone_pattern = r"\+?[0-9] ?\(?[0-9]+\)?[0-9 -]+"
# [\+]?[(]?[0-9]{3}[)]?[-\s\.]?[0-9]{3}[-\s\.]?[0-9]{4,6}
email_pattern = r"[^@ \t\r\n]+@[^@ \t\r\n]+\.[^@ \t\r\n]+"
price_pattern = r"(\$ ?\d*\.?\d+)|(\d*\.?\d+ ?\$)"

english_abbr = ["Mr.", "Mrs.", "Mss.", "Ms.", "Dr."]
english_abbr = [x.replace(".", "\.") for x in english_abbr]
english_abbr.extend(map(lambda x: x.lower(), english_abbr.copy()))

sent_pattern = sent_pattern.format(abbr_patterns="".join(map(lambda x: fr"(?<!{x})", english_abbr)))
sent_pattern = re.compile(sent_pattern)

def split_to_sentence(text: str) -> list[str]:
    return list(filter(lambda x: len(x) if x else False, sent_pattern.split(text)))

In [None]:
def load_dataset(split_type="train", n: int | None = None, dataset_path = "../../assets/{split_type}.csv", random_state=42) -> pd.DataFrame:
    assert split_type == "train" or split_type == "test"
    dataset_path = dataset_path.format(split_type=split_type)
    if not os.path.exists(dataset_path):
        splits = {'train': 'yelp_review_full/train-00000-of-00001.parquet',
                  'test': 'yelp_review_full/test-00000-of-00001.parquet'}
        df = pd.read_parquet("hf://datasets/Yelp/yelp_review_full/" + splits[split_type])
        df.to_csv(dataset_path, index=False)
    else:
        df = pd.read_csv(dataset_path)
    if n is None:
        return df
    else:
        return train_test_split(df, train_size=n, stratify=df["label"], random_state=random_state)[0]

In [None]:
def process_df(df: pd.DataFrame) -> list[tuple[list[str], str|int]]:
    data = []
    meta = []
    ids = []
    for idx, row in df.iterrows():
        label, text = row["label"], row["text"]
        chunks = splitter.split_document(text)
        data.extend(chunks)
        meta.extend([{"label": label} for _ in range(len(chunks))])
        ids.extend([f"{idx}_{i}" for i in range(len(chunks))])
    return data, meta, ids

def dataset_batch_iter(df, batch_size):
    for df_b in np.array_split(df, batch_size):
        yield process_df(df_b)
    return

In [None]:
# Класс для загрузки документов из различных источников, поддерживающий работу с разными форматами файлов
class Loader:
    def load_single_document(self, file_path: str):
        return

    def load_documents(self, source_dir: str):
        pass  # Метод для загрузки всех документов из указанной директории

##Splitter

In [None]:
# Класс для разделения документов на фрагменты определённого размера с заданным перекрытием
class Splitter:
    def __init__(self, chunk_size, chunk_overlap):
        assert chunk_size > chunk_overlap
        self.chunk_size = chunk_size
        self.chunk_overlap=chunk_overlap

    def split_document(self, document: str):
        # Метод для разделения переданных документов на фрагменты
        doc_sents = []
        for sent in split_to_sentence(document):
          for i in range(0, len(sent), self.chunk_size-self.chunk_overlap):
            start, end = i, i+self.chunk_size
            doc_sents.append(sent[start: end])
        return doc_sents

In [None]:
splitter=Splitter(SIZE, OVERLAP)

In [None]:
# splitter.split_document("I love driving and I dont loke kaksd asdkf")

In [None]:
if not os.path.exists("./assets"):
  os.mkdir("./assets")

In [None]:
df = load_dataset(split_type="train", n=10, dataset_path='./assets/{split_type}.csv')
for i in dataset_batch_iter(df, batch_size = 2):
  print(i)
  break

##Vector database

In [None]:
# Базовый класс для работы с коллекцией документов, поддерживающий добавление, поиск и очистку данных
class Collector:
    def add(self, texts: list[str], metadatas: list[dict]):
        pass  # Метод для добавления текстов и связанных с ними метаданных в коллекцию

    def add_from_directory(self, dir_path: str):
        pass  # Метод для добавления документов в коллекцию из указанной директории

    def get(self, search_strings: list[str], n_results: int) -> list[Document]:
        pass  # Метод для поиска документов по строкам запроса с ограничением на количество результатов

    def get_documents(self, search_string: str, n_results: int, score_threshold: float) -> list[Document]:
        pass  # Метод для поиска документов с учётом порога релевантности и количества возвращаемых результатов

    def clear(self):
        pass  # Метод для очистки коллекции документов

In [None]:
# Базовый класс для создания эмбеддингов, обеспечивающий интерфейс для получения модели эмбеддингов
class Embedder:
    def __init__(self, model_name):
        pass  # Инициализация эмбеддера

    def get_embedding(self, sent):
        pass  # Метод для получения модели эмбеддингов, которая будет использоваться для векторизации текстов

In [None]:
class SentenceEmbedder(Embedder):
    def __init__(self, model_name: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"):
       self.model = SentenceTransformer(model_name)
    def get_embedding(self, sent):
        # Метод для получения модели эмбеддингов, которая будет использоваться для векторизации текстов
        return self.model.encode(sent).tolist()
    def __call__(self, input):
        return self.get_embedding(input)

In [None]:
class ChromaCollector(Collector):
    def __init__(self, name_prefix, root_path, embeddnig_fn, distance_fn):
      self.client = chromadb.PersistentClient(path=root_path)
      self.distance_fn = distance_fn
      self.embedding_fn = embeddnig_fn
      self._collection_name = name_prefix + self.distance_fn
      self.database = self.get_database()

    def get_database(self):
      return self.client.get_or_create_collection(
            self._collection_name,
            metadata={"hnsw:space": self.distance_fn},
            embedding_function=self.embedding_fn
        )

    def load_dataset(self, df: pd.DataFrame, batch_size=128) -> None:
        for chunks, metas, ids in tqdm(dataset_batch_iter(df, batch_size = batch_size), total=math.ceil(df.shape[0] / batch_size), desc="loading to the DB"):
          self.database.add(
                documents=chunks,
                metadatas=metas,
                ids=ids
            )

    def query(self, query, n_results: int, query_texts=None, where=None, where_document=None):
        return self.database.query(
            n_results=n_results,
            query_texts=query_texts,
            query_embeddings=self.embedding_fn(query),
            where=where,
            where_document=where_document
        )

    def clear(self):
        self.client.delete_collection(self._collection_name)

###Implementation vector database

In [None]:
path_to_index = '/VDB' #@param {type:"string"}
path_to_df = './assets/{split_type}.csv' #@param {type:"string"}

In [None]:
import chromadb
import math

In [None]:
df = load_dataset(split_type="train", n=10_000, dataset_path=path_to_df)
emedder = SentenceEmbedder()

In [None]:
database_cos = ChromaCollector("my_db", path_to_index, emedder, "cosine")

In [None]:
database_cos.load_dataset(df)

##Search

In [None]:
query = 'What is your favorite food?' #@param {type:"string"}
n_results = 5 #@param {type:"integer"}
# score_threshold = 0.5 # @param {type:"slider", min:0, max:1, step:0.1}

result = database_cos.query(query, n_results=5)
for dist, document, meta in zip(result["distances"][0], result["documents"][0], result["metadatas"][0]):
  print(f"{dist:0.2f}  {meta['label']}  {document}")

##Evaluation

In [None]:
n_top = 10 #@param {type:"integer"}

In [None]:
queries = [ # text_query, doc_id
    ("What color is a parking pass?", 121392),
    ("Does the discount compensate the workers incompetent?", 340905),
    ("How long I was a Centurylink customer?",561808),
    ("What was a spicy level for the Panang Curry", 153961),
    ("How many times does the servers came back?", 596493),
    ("Where is a good Ted Wien's store located?", 257236),
    ("Which DJ played on Paul Oakenfield night?", 213914),
    ("How many stars does the Carvel had last year?", 418978),
    ("What item did a man from Arizona buy two at a time?", 33736),
    ("What dish\drink was cancelled because of its taste?", 335445)
]
answers = [
    'white',
    "A discount doesn't compensate",
    '18 months',
    '8 spicy level',
    'twice',
    'Blue Diamond and Arville',
    'David Guetta',
    'a solid 4.',
    "shirts",
    "the latte"
    ]

In [None]:
df[df.index.isin(list(map(lambda x: x[1], queries)))].iloc[:, :]

In [None]:
from evaluate import load
bertscore = load("bertscore")

In [None]:
def get_context(query, n_results=10):
  return database_cos.query(query=query, n_results=n_results)

In [None]:
bert_scores = []
for querу, answer, idx in zip(queries, answers, range(len(answers))):
  question, idx = querу
  context = get_context(question, n_top)
  qa_input = {'question': question,
             'context': ' '.join(context["documents"][0])}
  res = qa_model(qa_input)
  bs = bertscore.compute(predictions=[res['answer']], references=[answer], lang="en")
  bert_scores.append(bs)

  print(f'Question: {question}\nAnswer: {res["answer"]}\nUser answer: {answer}\nScore: {bs["f1"][0]}\n ')

In [None]:
bert_score = np.array([x["f1"] for x in bert_scores], dtype="float64")
my_score = np.array([0, 1, 0, 0, 0, 1, 1, 1, 0, 0])


print("Средняя оценка BERT F1:" ,np.mean(bert_score))
print("Средняя оценка MY:" ,np.mean(my_score))

In [None]:
import gradio as gr

def echo(message, history):
  context = get_context(message, n_top)
  qa_input = {'question': message,
             'context': ' '.join(context["documents"][0])}
  res = qa_model(qa_input)
  return res['answer']

demo = gr.ChatInterface(fn=echo, examples=["hello"], title="My shiny Bot")
demo.launch()