### Get PDF Content

In [None]:
%pip install pdfplumber
import os
os.chdir('RAG')
print("Current Working Directory:", os.getcwd()) # Mark sure your current working directory is in /path/to/RAG
import pdfplumber # type: ignore

path = 'dataset/general_notes.pdf'
with pdfplumber.open(path) as pdf: 
    content = ''
    for i in range(len(pdf.pages)):
        page = pdf.pages[i] 
        page_content = '\n'.join(page.extract_text().split('\n')[:-1])
        content = content + page_content

### Filtering invalid characters

In [None]:
def filter_invalid_characters(text, valid_chars):
    """Filter out characters that are not in the valid character set."""
    vilid_characters = ''.join([char for char in text if char in valid_chars])
    filtered_characters = ''.join([char for char in text if char not in vilid_characters])
    return vilid_characters, filtered_characters

# valid characters set
valid_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.,!?;:()[]{}<>/-_ \n")

clean_content, filtered_characters = filter_invalid_characters(content, valid_chars)

### Tokenizer

In [None]:
from tokenizer.tokenization_chatglm import ChatGLM4Tokenizer # type: ignore

tokenizer_glm4 = ChatGLM4Tokenizer('tokenizer/tokenizer.model')

tokens = tokenizer_glm4._tokenize('This is a Sentence. 这是一个句子。')
token_ids = [tokenizer_glm4._convert_token_to_id(token) for token in tokens]

recover_tokens = [tokenizer_glm4._convert_id_to_token(token_id) for token_id in token_ids]
recover_text = tokenizer_glm4.convert_tokens_to_string(recover_tokens)

### Split to Chunks

In [None]:
def Split_to_Chunks(content, chunk_size, overlap_size):
    character_index = 0
    chunks = []
    while character_index + chunk_size <= len(content):
        chunks.append(content[character_index : character_index + chunk_size])
        character_index += chunk_size - overlap_size
    chunks.append(content[character_index:])
    return chunks

chunks = Split_to_Chunks(content, 1000, 200)
print(len(chunks))

### Vector Data Base

In [None]:
import torch

class VectorDB:
    def __init__(self, tokenizer:any, embedding_weight_path:str, chunks):
        self.chunks = chunks
        self.tokenizer = tokenizer
        self.vectors = torch.load(embedding_weight_path)
        self.vector_db = self.create_vector_db()
        self.chunk_nums = len(self.vector_db)
        
    def create_vector_db(self):
        vector_db = []
        for chunk in self.chunks:
            tokens = self.tokenizer._tokenize(chunk)
            token_ids = [self.tokenizer._convert_token_to_id(token) for token in tokens]
            vector_db.append(self.vectors[token_ids])
        return vector_db
    
    def _query_to_vectors(self, query, top_k):
        tokens = self.tokenizer._tokenize(query)
        token_ids = [self.tokenizer._convert_token_to_id(token) for token in tokens]
        query_vectors = self.vectors[token_ids]
        cosine_similarity_score = torch.ones(self.chunk_nums)
        for i in range(self.chunk_nums):
            cosine_similarity_score[i] = self.average_cosine_similarity(self.vector_db[i], query_vectors)
        similarity_score, chunk_indices = torch.topk(cosine_similarity_score, top_k)
        results = [self.vector_db[chunk_indice] for chunk_indice in chunk_indices]
        return results, chunk_indices, similarity_score
        
    def _query_to_text(self, query, top_k):
        _, chunk_indices, _ = self._query_to_vectors(query, top_k)
        results = [self.chunks[chunk_indice] for chunk_indice in chunk_indices]
        return results
        
    def average_cosine_similarity(self, chunk_vectors, query_vertors):
        chunk_vectors_norm = chunk_vectors / chunk_vectors.norm(dim=1, keepdim=True)
        query_vertors_norm = query_vertors / query_vertors.norm(dim=1, keepdim=True)
        cosine_sim = torch.mm(chunk_vectors_norm, query_vertors_norm.t())  
        return cosine_sim.mean()
    
VectorDB = VectorDB(tokenizer_glm4, 'tokenizer/embedding_weight.pt', chunks)

In [None]:
# query_text = VectorDB._query_to_text('注意力', 3)
# for text in query_text:
#     print(text)
#     print('-'*50)

### Augmented and Generation:Take the GLM4 for example

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
model_dir = 'GLM4CKPT'
glm4_tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_dir, trust_remote_code=True)
device = torch.device("cuda:0")
model.to(device)
model.eval()
class RAG_GLM4:
    def __init__(self, model, tokenizer, VectorDB):
        self.model = model
        self.VectorDB = VectorDB
        self.tokenizer = tokenizer
    def generate(self, query, top_k=1):
        response, _ = model.chat(self.tokenizer, self.augmented(query, top_k), history=[])
        return response
    def augmented(self, query, top_k=1):
        prompt = f"You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question: {query}. If you don't know the answer, say that you don't know. \n\n Retrieved context: {self.VectorDB._query_to_text(query, top_k)}."
        '''
        A Chinese Version
        prompt = f"你是一个问答任务的助手。使用以下检索到的文本来回答问题：{query}。如果你不知道答案，请说你不知道。\n\n检索到的参考文本：{self.VectorDB._query_to_text(query, top_k)}。"
        '''
        return prompt
    
RAG_GLM4 = RAG_GLM4(model, glm4_tokenizer, VectorDB)

In [None]:
query = 'Explaining DDPM'
response_rag = RAG_GLM4.generate(query, 3)
print(response_rag)
print('-'*100)
response, _ = model.chat(glm4_tokenizer, query, history=[])
print('\n', response)

In [None]:
query_text = VectorDB._query_to_text(query, 3)
for text in query_text:
    print(text)
    print('-'*100)