In [None]:
from google.colab import drive
drive.mount('/content/drive')

!tar -xvf  '/content/drive/MyDrive/text_labs/20news-bydate.tar.gz' -C '/content/'
!pip install chromadb
!pip install evaluate bert_score

In [None]:
from dataclasses import dataclass

@dataclass
class Document:
    raw_text: str
    metadata: dict
    tokenized_text: list
    lemmatized_tokenized_text: list
    chunks: list

@dataclass
class DocumentChunk:
    metadata: dict
    chunk: list

In [None]:
import re

import nltk
nltk.download('wordnet')
from nltk.stem.snowball import SnowballStemmer
from nltk.stem import WordNetLemmatizer

stemmer = SnowballStemmer(language='english')
lemmatizer = WordNetLemmatizer()

sentence_pattern_split = r'(?<=\.|!|\?)(?<![A-Z]{1}\.)(\s)'
word_pattern = r'#TOKEN#|[A-Za-z]+[\'\`][a-z]|\w+' + '|' + r'(?<![A-Z])\.\.\.|\.|,|!|\?|:|;'

token_specification = {
    'DATE':     r'\b(\d{1,2})[.\-/](\d{2})[.\-/](\d{2,4})\b|' + \
                    r'\b(\d{2})\s([A-Z][a-z]+)\s(\d{4})\b',
    'PHONE':    r'(?:\+7[\s-]?\d{3}[\s-]?\d{3}[\s-]?\d{4})|'    +\
                    r'(?:8[\s-]?\(?\d{3}\)?[\s-]?\d{3}[\s-]?\d{4})',
    'EMAIL':    r'(:?[a-zA-Z0-9_.+-]+@[a-pr-zA-PRZ0-9-]+\.[!\w\d+-]+)',
    'TIME':     r'[0-2][0-9]:[0-5][0-9]:[0-5][0-9]|' +\
                    r'[0-2][0-9]:[0-5][0-9]|'+\
                    r'[0-2]?[0-9]:[0-5][0-9]\s([APap][Mm])',
}
common_pattern = '|'.join(token_specification.values())

def get_author(text):
    author = 'Unknown'
    author_match = re.search(r'From:(.*?)(?=\w+:|$)', text, re.DOTALL)
    if author_match:
        author = author_match.group(1).strip()
        text = text.replace(f'From: {author}', '')
    return author, text

def get_subject(text):
    subject = 'Unknown'
    subject_match = re.search(r'Subject: Re:(.*?)(?=\w+:|$)', text, re.DOTALL)
    if subject_match:
        subject = subject_match.group(1).strip()
        text = text.replace(f'Subject: Re: {subject}', '')
    return subject, text

def get_organization(text):
    org = 'Unknown'
    org_match = re.search(r'Organization:(.*?)(?=\w+:|$)', text, re.DOTALL)
    if org_match:
        org = org_match.group(1).strip()
        text = text.replace(f'Organization: {org}', '')
    return org, text

def delete_useless_meta(text):
    useless_meta = ''
    useless_meta_match = re.search(r'Lines:(.*?)(?=\w+:|$)', text, re.DOTALL)
    if useless_meta_match:
        useless_meta = useless_meta_match.group(1).strip()
        text = text.replace(f'Lines: {useless_meta}', '')
    return text

def get_metadata_from_text(text):
    author, text = get_author(text)
    subject, text = get_subject(text)
    org, text = get_organization(text)
    text = delete_useless_meta(text)
    return {"author": author, "subject": subject, "organization": org}, text


def tokenize_text(text):
    sentence_raw_text = []
    sentence_lemmatized_text = []
    metadata, text = get_metadata_from_text(text)

    sentences = re.split(sentence_pattern_split, text)
    for sentence in sentences:

        # достаём специальные токены - наши усложнённые ситуации
        matches = list(re.finditer(common_pattern, sentence))

        # исключаем невалидные совпадения, валидные сохраняем в порядке возникновения
        special_tokens = []
        for m in matches:
            if len(m.group(0)) > 0:
                special_tokens.append(m.group(0))
                sentence = sentence.replace(m.group(0), '#TOKEN#')

        # находим все слова
        tokens = re.findall(word_pattern, sentence)

        for token in tokens:
            if token == '#TOKEN#':
                token = special_tokens.pop(0)
                lemma = token
            else:
                lemma = lemmatizer.lemmatize(token)
            sentence_raw_text.append(token)
            sentence_lemmatized_text.append(lemma)

    return sentence_raw_text, sentence_lemmatized_text, metadata

def get_chunks_with_overlap(token_list, chunk_size=100, overlap=20):
    num_tokens = len(token_list)

    chunks = []
    position = 0
    if num_tokens <= chunk_size:
        return [token_list]
    while position < num_tokens and position + chunk_size < num_tokens:
        section = token_list[position:position + chunk_size]
        chunks.append(section)
        position += chunk_size - overlap  # Шаг с учетом пересечения
    return chunks

def get_chunks_from_documents(documents: list) -> list[DocumentChunk]:
    chunks = []
    for document in documents:
        for chunk_indx, chunk in enumerate(document.chunks):
            metadata = document.metadata.copy()
            metadata.update({"chunk_id": chunk_indx})
            chunks.append(DocumentChunk(metadata=metadata, chunk=chunk))
    return chunks


In [None]:
import os


CHUNK_ZIZE = 100
OVERLAP = 20

class Loader:
    @staticmethod
    def load_single_document(file_path: str, file_topic):
        with open(file_path, 'r') as f:
            text = f.read()
        raw_text = text
        sentence_raw_text, sentence_lemmatized_text, metadata = tokenize_text(text)
        text_chunk = get_chunks_with_overlap(sentence_lemmatized_text, CHUNK_ZIZE, OVERLAP)
        metadata.update({
                            "topic": file_topic,
                            "doc_id": file_path.split('/')[-1]
                        })
        return Document(raw_text=raw_text,
                        metadata=metadata,
                        tokenized_text=sentence_raw_text,
                        lemmatized_tokenized_text=sentence_lemmatized_text,
                        chunks=text_chunk)

    def load_documents(self, source_dir: str):
        # topics = os.listdir(source_dir)
        # documents = self.load_topics(source_dir, topics)
        pass

    @staticmethod
    def load_topics(source_dir, topic_list):
        documents = []
        for topic in topic_list:
            topic_dir = os.path.join(source_dir, topic)
            filenames = os.listdir(topic_dir)
            for f_name in filenames:
                try:
                    document = Loader.load_single_document(os.path.join(topic_dir, f_name), topic)
                    documents.append(document)
                except Exception as e:
                    print(f'While loading file {os.path.join(topic_dir, f_name)} exception catched: {e}')
        return documents

    @staticmethod
    def load_topic_by_chunks_and_docs(source_dir, topic_list):
        documents = Loader.load_topics(source_dir, topic_list)
        documents = [doc for doc in documents if len(doc.chunks)>0]
        chunks = get_chunks_from_documents(documents)
        return chunks, documents


In [None]:
os.listdir('/content/20news-bydate-test')

In [None]:
topic_list = ['rec.autos', 'rec.motorcycles', 'rec.sport.baseball', 'rec.sport.hockey', 'sci.crypt',
 'sci.electronics',
 'sci.med',
 'sci.space',]
chunks, documents = Loader.load_topic_by_chunks_and_docs('/content/20news-bydate-test', topic_list)

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

class Embedder:
    def __init__(self) -> None:
        self.tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/paraphrase-multilingual-mpnet-base-v2')
        self.model = AutoModel.from_pretrained('sentence-transformers/paraphrase-multilingual-mpnet-base-v2').to(DEVICE)

    @staticmethod
    def mean_pooling(token_embeddings, attention_mask):
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
        return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

    def encode(self, chunk: DocumentChunk):
        tokenized_chunk = self.tokenizer(chunk, padding=True, truncation=True, return_tensors='pt').to(DEVICE)
        tokenized_text = tokenized_chunk['input_ids']
        attention_mask = tokenized_chunk['attention_mask']
        with torch.no_grad():
            model_output = self.model(**tokenized_chunk)

        token_embeddings = model_output[0]
        return Embedder.mean_pooling(token_embeddings, attention_mask)

    def encode_document(self, document: Document):
        pass


    def get_batched_chunks_text_metadata(self, batched_chunks: list[DocumentChunk]):
        batched_chunk_text = []
        batched_chunks_metadata = []
        for chunk in batched_chunks:
            batched_chunk_text.append(' '.join(chunk.chunk))
            batched_chunks_metadata.append(chunk.metadata)
        return batched_chunk_text, batched_chunks_metadata


    def encode_chunks_batched(self, batched_chunks):
        batched_chunk_text, batched_chunk_metadata = self.get_batched_chunks_text_metadata(batched_chunks)
        emb = self.encode(batched_chunk_text)
        return emb.detach().cpu().numpy(), batched_chunk_text, batched_chunk_metadata

In [None]:
import chromadb
import tqdm
import pandas as pd
import numpy as np

BATCH_SIZE = 4

import requests

# Задаем URL для API и ваш ключ
url = 'https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash-latest:generateContent?key=aboba'

def ask_gemini(promt):
    # Тело запроса (данные)
    data = {"contents": [{"parts": [{
                        "text": promt
                    }]}]
        }
    # Выполняем POST-запрос
    response = requests.post(url, json=data, headers={'Content-Type': 'application/json'})

    response = response.json()
    response = response['candidates'][0]['content']['parts'][0]['text']
    return response

def rag_search_gemini(additional_info, promt):
    answer = ask_gemini(f"Context: {additional_info}\n Question: {promt}\n Answer this question based on the provided context.")
    return answer


class ChromaDB:
    def __init__(self, db_path='/content/drive/MyDrive/text_labs/my_indexed_db') -> None:
        self.db_client = chromadb.PersistentClient(path=db_path)
        self.data_collection = self.db_client.get_or_create_collection(name="email_search")

    def upload(self,  documents, embeddings, metadatas, ids):
        self.data_collection.add(
                documents=documents,
                embeddings=embeddings,
                metadatas=metadatas,
                ids=ids
            )

    def search(self, embeddigs, top_k):
        query_result = self.data_collection.query(
            query_embeddings=embeddigs,
            n_results=top_k,
            include=['distances','embeddings', 'documents', 'metadatas'],
        )
        return query_result

class MyIndexedSearch:
    def __init__(self) -> None:
        self.embedder = Embedder()
        self.chroma_db = ChromaDB()

    def index_documents(self, documents):
        chunks = get_chunks_from_documents(documents)
        self.index_chunks(self, chunks)

    def index_chunks(self, chunks):

        for i in tqdm.tqdm(range(0, len(chunks), BATCH_SIZE)):
            start = i
            end = min(i+BATCH_SIZE, len(chunks))
            chunks_batch = chunks[start:end]
            embeddings, chunks_text, metadatas = self.embedder.encode_chunks_batched(chunks_batch)
            self.chroma_db.upload(documents=chunks_text, embeddings=embeddings, metadatas=metadatas, ids=list(map(str, list(range(start,end)))))

        if len(chunks[i+BATCH_SIZE:]):
            embeddings, chunks_text, metadatas = self.embedder.encode_chunks_batched(chunks[i+BATCH_SIZE:])
            self.chroma_db.upload(documents=chunks_text, embeddings=embeddings, metadatas=metadatas, ids=list(map(str, list(range(i+BATCH_SIZE, len(chunks))))))

    def search(self, query: str, top_k):
        # tokenize
        tokens, lemmas, _ = tokenize_text(query)
        lemmatized_query = ' '.join(lemmas)

        # search
        query_embedding = self.embedder.encode(query)
        query_embedding = query_embedding.detach().cpu().numpy()
        result = self.chroma_db.search(query_embedding, top_k)
        results = [(metadata, doc) for metadata, doc in zip(result['metadatas'][0], result['documents'][0])]
        return results

    def rag_search(self, query: str, top_k:int=5):
        answers = self.search(query, top_k)
        retirieved_data = '\n'.join([ans[1] for ans in answers])
        answer = rag_search_gemini(retirieved_data, query)
        return answer


In [None]:
indexSearch = MyIndexedSearch()

In [None]:
# indexSearch.index_chunks(chunks)

### Запросы Лаб 5

In [None]:
def get_pandas_from_search(answers):
    padnas_df = []
    pandas_df = [{"metadata": metadata, "answer": answer} for metadata, answer in answers]
    return pd.DataFrame(pandas_df)

In [None]:
answers = indexSearch.search('How to do wheelie?', 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search('What are the top contenders for the Stanley Cup?', 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search("What cars competed in the World Manufacturer's Cup in 1965?", 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search('Where to buy electronics?', 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search('Name one of the American spacecraft', 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search('What is the mission of NASA spacecraft?', 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search('Where to find baseball championship schedule?', 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search("Why is the Moon's surface bright?", 5)
get_pandas_from_search(answers)

In [None]:
answers = indexSearch.search("What is the impact of spaceflight to regular society?", 5)
get_pandas_from_search(answers)

### Запросы Лаб 6

#### Test quries

In [None]:
rag_answer = indexSearch.rag_search('Is it possible to do wheelie on a motorcycle with shaft drive?', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search("What cars competed in the World Manufacturer's Cup in 1965?", 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search('What are the top contenders for the Stanley Cup?', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search('How to determine if pitcher qualifies for a save?', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search('Where to buy electronics?', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search('Name one of the American spaceships', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search('What is the mission of NASA spacecraft?', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search('Where to find baseball championship schedule?', 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search("Why is the Moon's surface bright?", 5)
rag_answer

In [None]:
rag_answer = indexSearch.rag_search("What is the impact of spaceflight to regular society?", 5)
rag_answer

#### Evaluate model

In [None]:
from evaluate import load
bertscore = load("bertscore")

def bert_score(model_answer, desired_answer):
    bert_score_ = bertscore.compute(predictions=model_answer, references=desired_answer, lang="en")
    return bert_score_

In [None]:
questions_answers = [
    {
        "question": 'Is it possible to do wheelie on a motorcycle with shaft drive?',
        "answer": "Yes, it is."
    },
    {
        "question": "What cars competed in the World Manufacturer's Cup in 1965?",
        "answer": "Ford GT40, American cobras"
    },
    {
        "question": 'What are the top contenders for the Stanley Cup?',
        "answer": "There are conteders for the Stanley Cup: New York Islanders, Detroit Red Wings, Quebec Nordiques, Montreal Canadiens, Los Angeles Kings."
    },
    {
        "question": 'How to determine if pitcher qualifies for a save?',
        "answer": "To be qualified for a save pittcher must Enter the game with a lead of no more than three runs and pitch at least one inning"
    },
    {
        "question": 'Where to buy electronics?',
        "answer": "You can buy electornics in radio shops such as `Radio Shack`, `Digi Key Corp.`, etc"
    },
    {
        "question": "Name one of American spacecraft",
        "answer": "Space Shuttle",
    },
    {
        "question": 'What is the mission of NASA spacecraft?',
        "answer": "The National Aeronautics and Space Administration's NASA's automated spacecraft for solar system exploration come in many shape and size",
    },
    {
        "question": "Where to find baseball championship schedule?",
        "answer": "You can use book Directory"
    },
    {
        "question": "Why is the Moon's surface bright?",
        "answer": "The Moon's surface is bright beacause it reflects sunlight"
    },
    {
        "question": "What is the impact of spaceflight to regular society?",
        "answer": "The impact of spaceflight is in spheres as medical intensive care , agriculture , environmental protection"
    }
]


In [None]:
import numpy as np
import pandas as pd
def find_best_answer(bert_score_result, answers):
    ind = np.argmax(bert_score_result['f1'])
    return answers[ind]

In [None]:
def evaluate_model(questions_answers, search_func, top_k=5):
    evaluation_results = []
    for row in questions_answers:
        q = row['question']
        a = row['answer']
        db_answer = search_func(q, top_k)
        if isinstance(db_answer, (list, tuple)):
            db_answer = [answ[1] for answ in db_answer]
            bert_score_ = bert_score(db_answer, [a]*top_k)
            answer = db_answer[0]#find_best_answer(bert_score_, answer_text)
            metrics = {k: np.mean(bert_score_[k]) for k in ["precision", "recall", "f1"]}
        else:
            metrics = bert_score([db_answer], [a])
            answer = db_answer
        evaluation_results.append({'question': q, 'desired_answer': a, 'model_answer': answer, 'precision': metrics['precision'], 'recall': metrics['recall'], 'f1': metrics['f1']})
    return pd.DataFrame(evaluation_results)


In [None]:
evaluate_indexed_search = evaluate_model(questions_answers, indexSearch.search)
evaluate_indexed_search

In [None]:
evaulate_rag = evaluate_model(questions_answers, indexSearch.rag_search)
evaulate_rag