# **RAG-Pipeline**

## **Libraries Importing**

In [1]:
# Temp - Only for DEV!
# !pip install langchain-huggingface
# !pip install langchain-qdrant
# # !pip install qdrant-client
# !pip install openai
# !pip install mlflow
# !pip install langchain

In [None]:
import requests
import re
import os
from bs4 import BeautifulSoup
import json
import nltk
if 'nltk_data' in os.listdir('.'):
    print('NLTK data already downloaded!')
else:
    nltk.download('stopwords')
from nltk.corpus import stopwords
from sentence_transformers import SentenceTransformer
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import numpy as np
import pickle
import torch
import openai
import mlflow
from mlflow import MlflowClient

## **Global Variables Initialization**

In [None]:
ARTIFACTS_DIRNAME = "RAG_artifacts"

#### **Data Ingestion Variables**

In [None]:
ARTICLE_URL = "https://blog.dzencode.com/ru/illyuziya-kachestva-vash-sayt-idealen-pozdravlyaem-vy-tolko-chto-sozhgli-byudzhet/"
ARTICLE_TXT_FILENAME = 'article.txt'
ARTICLE_METADATA_FILENAME = 'metadata.json'
ARTICLE_METADATA_URL_KEY = 'url'
ARTICLE_METADATA_LANG_KEY = 'lang'
ARTICLE_METADATA_DATE_KEY = 'date'
ARTICLE_METADATA_TOPIC_KEY = 'topic'

#### **Data Cleaning and Preprocessing Variables**

In [None]:
PREPROCESSED_ARTICLE_TEXT_FILENAME = 'cleaned_article.txt'
ARTICLE_LANGUAGE = 'russian'

#### **Data Chunking Variables**

In [None]:
CHUNKS_JSONL_FILENAME = 'rag_article.jsonl'
CHUNK_SIZE_IN_TOKENS = 512
CHUNK_METADATA_ID_KEY = 'chunk_ID'
CHUNK_DATA_TEXT_KEY = 'chunk_text'
CHUNK_DATA_METADATA_KEY = 'metadata'

#### **Chunks Embedding Generation and Loading into QdrantDB Variables**

In [None]:
SENTENCE_TRANSFORMER_MODEL_NAME = "multi-qa-distilbert-cos-v1"
CHUNK_DATA_EMBEDDING_VECTOR_KEY = 'embedding'
CHUNKS_COLLECTION_NAME = "RAG_Chunks"
EMBEDDING_VECTOR_SIZE = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL_NAME).get_sentence_embedding_dimension()
QDRANT_HOST = os.environ.get('QDRANT_DB_IP')
QDRANT_PORT = "6333"
QDRANT_CLIENT = QdrantClient(host = QDRANT_HOST, port = QDRANT_PORT)
EMBEDDINGS_MODEL = HuggingFaceEmbeddings(model_name = f"sentence-transformers/{SENTENCE_TRANSFORMER_MODEL_NAME}")
EMBEDDING_VECTORS_FILENAME = "embeddings.pkl"

#### **Chunks Searching Variables**

In [None]:
USER_QUERY = "Что автор имеет в виду под \"иллюзией качества\"?"

#### **Prompt Generation for LLM Variables**

In [None]:
LLM_TASK_TEMPLATE = """You are an assistant for question-answering tasks.
Use three sentences maximum and keep the answer concise.
Use the following pieces of retrieved context to answer the question:
"""

#### **LLM Response Generation Variables**

In [None]:
LLAMACPP_SERVER_HOST = os.environ.get('LLAMACPP_Server_IP')
LLAMACPP_SERVER_PORT = "8080"
LLM_MODEL_NAME = "mistral-v7"

#### **Control and Accounting (MLFlow)Variables**

In [None]:
MLFLOW_TRACKING_HOST = os.environ.get("MLFLOW_IP")
MLFLOW_TRACKING_PORT = "5000"
MLFLOW_TRACKING_URI = f"http://{MLFLOW_TRACKING_HOST}:{MLFLOW_TRACKING_PORT}"
MLFLOW_EXPERIMENT_NAME = "embedding_model_for_RAG_experiment"
RAG_VERSIONS_JOURNAL_NAME = "version_log.json"

## **Setup MLFlow Experiment and Start RAG Run**

In [None]:
if ARTIFACTS_DIRNAME in os.listdir():
    print('Artifacts dir already exists!')
else:
    print('Creating RAG Artifacts dir...')
    os.system("mkdir RAG_artifacts")

In [None]:
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow_client = mlflow.client.MlflowClient(MLFLOW_TRACKING_URI)

if mlflow.get_experiment_by_name(MLFLOW_EXPERIMENT_NAME):
    print(f'Experiment {MLFLOW_EXPERIMENT_NAME} already exists!')
    experiment_ID = mlflow.get_experiment_by_name(MLFLOW_EXPERIMENT_NAME).experiment_id
else:
    experiment_ID = mlflow.create_experiment(MLFLOW_EXPERIMENT_NAME)

experiment_existed_parent_runs = mlflow.search_runs(
    experiment_ID, 
    filter_string = """tags.run_type = 'parent'""",
    output_format = 'list'
)

def get_current_parent_run_tags(experiment_existed_parent_runs, parent_runs_versions):
    current_parent_run_tags = {
        'run_type': 'parent',
        'parent_version': 0,
        'artifacts_dirname': ARTIFACTS_DIRNAME,
        'article_URL': ARTICLE_URL,
        'preprocessed_article_text_filename': PREPROCESSED_ARTICLE_TEXT_FILENAME,
        'article_lang': ARTICLE_LANGUAGE,
        'chunk_size': str(CHUNK_SIZE_IN_TOKENS),
        'embedding_model_name': SENTENCE_TRANSFORMER_MODEL_NAME,
        'chunks_collection_name': CHUNKS_COLLECTION_NAME,
        'embedding_vector_size': str(EMBEDDING_VECTOR_SIZE),
        'user_query': USER_QUERY,
        'llm_task_template': LLM_TASK_TEMPLATE,
        'llm_model_name': LLM_MODEL_NAME
    }
    if experiment_existed_parent_runs == []:
        print('This is new experiment with 0 parent runs.')
        print('Setting current parent run version to 1.')
        current_parent_run_version = 1
    else:
        current_parent_run_version = parent_runs_versions[-1] + 1
    current_parent_run_tags['parent_version'] = str(current_parent_run_version)
    return current_parent_run_tags

def get_parent_runs_versions(parent_runs: list, experiment_name):
    experiment_artifacts_URI = mlflow.get_experiment_by_name(MLFLOW_EXPERIMENT_NAME).artifact_location
    parent_runs_versions = []
    for parent_run in parent_runs:
        parent_run_version = parent_run.data.tags['parent_version']
        parent_runs_versions.append(parent_run_version)
    return parent_runs_versions

parent_runs_versions = get_parent_runs_versions(experiment_existed_parent_runs, MLFLOW_EXPERIMENT_NAME)

current_parent_run_tags = get_current_parent_run_tags(experiment_existed_parent_runs, parent_runs_versions)
parent_run_version = current_parent_run_tags['parent_version']
current_parent_run_name = f"RAG_pipeline_execution_version_#{parent_run_version}"

mlflow.start_run(
    experiment_id = experiment_ID,
    run_name = current_parent_run_name,
    tags = current_parent_run_tags
)

## **Data Ingestion**

In [None]:
def main(URL: str):
    article_metadata = {
        ARTICLE_METADATA_URL_KEY: URL,
        ARTICLE_METADATA_LANG_KEY: '',
        ARTICLE_METADATA_DATE_KEY: '',
        ARTICLE_METADATA_TOPIC_KEY: ''
    }
    response = requests.get(URL)
    if response.status_code == 200:
        article_html = get_article_html(response)
        html_parser = BeautifulSoup(article_html, 'html.parser')
        article_metadata[ARTICLE_METADATA_LANG_KEY] = get_article_lang(response)
        article_metadata[ARTICLE_METADATA_DATE_KEY] = get_request_date(response)
        article_metadata[ARTICLE_METADATA_TOPIC_KEY] = get_article_topic(html_parser)
        save_metadata(article_metadata)
        save_article_HTML_to_txt(article_html)
    else:
        print(f"Error fetching data from API. Status code: {response.status_code}")
        print(f"Response:", response.text)

def get_article_lang(response):
    cookie_str = response.headers['Set-Cookie']
    lang_regex = re.compile(r"(USER_LANG=)([a-zA-Z]*);")
    lang_match_obj = lang_regex.search(cookie_str)
    lang = lang_match_obj.group(2)
    return lang

def get_request_date(response):
    return response.headers['Date']

def get_article_topic(html_parser):
    topic = html_parser.find('title').text
    return topic

def get_article_html(response):
    return response.text

def save_metadata(metadata: dict):
    with open(f"{ARTIFACTS_DIRNAME}/{ARTICLE_METADATA_FILENAME}", 'w') as mdf:
        json.dump(metadata, mdf)

def save_article_HTML_to_txt(article_html):
    with open(f"{ARTIFACTS_DIRNAME}/{ARTICLE_TXT_FILENAME}", 'w') as article_f:
        article_f.write(article_html)

with mlflow.start_run(
    run_name = "Data_Ingestion",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    main(ARTICLE_URL)

## **Data Cleaning and Preprocessing**

In [None]:
loaded_stopwords = stopwords.words(ARTICLE_LANGUAGE)

def main(article_filename: str):
	article_html = get_article_html(article_filename)
	article_without_html_tags = remove_html_tags(article_html)
	preprocessed_article_text = get_cleaned_preprocessed_article_text(article_without_html_tags)
	save_preprocessed_article_text(preprocessed_article_text)

def remove_html_tags(article_html: str):
	html_parser = BeautifulSoup(article_html, 'html.parser')
	only_article_text = html_parser.find('html').text
	return only_article_text

def get_cleaned_preprocessed_article_text(article_text: str):
	not_word_chars_digit_regex = r'[\W0-9]'
	lowered_article_text = article_text.lower()
	article_text_elements = re.split(not_word_chars_digit_regex, lowered_article_text)
	article_text_without_empty_strs_stopwords = [
		word for word in article_text_elements if word != '' and word not in loaded_stopwords
	]
	preprocessed_article_text = ' '.join(article_text_without_empty_strs_stopwords)
	return preprocessed_article_text

def get_article_html(article_filename: str):
	with open(f"{ARTIFACTS_DIRNAME}/{article_filename}", 'r') as article_f:
		article_html = article_f.read()
	return article_html

def save_preprocessed_article_text(preprocessed_text: str):
	with open(f"{ARTIFACTS_DIRNAME}/{PREPROCESSED_ARTICLE_TEXT_FILENAME}", 'w') as prep_txt_f:
		prep_txt_f.write(preprocessed_text)

with mlflow.start_run(
    run_name = "Data_Cleaning_and_Preprocessing",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    main(ARTICLE_TXT_FILENAME)

**Data Chunking for RAG**

In [None]:
def main(preprocessed_article_file: str, chunk_size_in_tokens: int, metadata_file: str, chunks_JSONL_file: str):
    preprocessed_article_text = get_preprocessed_article_text(preprocessed_article_file)
    article_words = get_article_words(preprocessed_article_text)
    chunks_texts = get_chunks_texts(article_words, chunk_size_in_tokens)
    metadata = get_metadata(metadata_file)
    chunked_data = get_chunked_data(chunks_texts, metadata)
    save_chunks_into_JSONL(chunked_data, chunks_JSONL_file)

def get_preprocessed_article_text(filename: str):
	with open(f"{ARTIFACTS_DIRNAME}/{filename}", 'r') as prep_txt_f:
		preprocessed_article_text = prep_txt_f.read()
		return preprocessed_article_text

def get_article_words(article_text: str) -> list:
	return article_text.split(' ')

def get_chunked_data(chunks_texts: list[str], metadata: dict):
    current_chunk_ID = 1
    chunked_data = []
    for (chunk_ID, chunk_text) in enumerate(chunks_texts, current_chunk_ID):
        chunk_metadata = metadata.copy()
        chunk_metadata[CHUNK_METADATA_ID_KEY] = chunk_ID
        chunk = {CHUNK_DATA_TEXT_KEY: '', CHUNK_DATA_METADATA_KEY: chunk_metadata}
        chunk[CHUNK_DATA_TEXT_KEY] = chunk_text
        chunked_data.append(chunk)
    return chunked_data

def get_chunks_texts(article_words: list, words_symbols_amount_for_chunks: int):
    chunks_texts = []
    while len(article_words) != 0:
        chunk_words_remained_article_words = get_chunk_words(article_words, words_symbols_amount_for_chunks)
        chunks_texts.append(' '.join(chunk_words_remained_article_words['chunk_words']))
        article_words = chunk_words_remained_article_words['remained_article_words']
    return chunks_texts
        
def get_chunk_words(article_words: list, words_symbols_amount_for_chunks: int):
    chunk_words = []
    while len(''.join(chunk_words)) < words_symbols_amount_for_chunks:
        if len(''.join(article_words)) - words_symbols_amount_for_chunks < words_symbols_amount_for_chunks:
            chunk_words = article_words
            return {'chunk_words': chunk_words, 'remained_article_words': []}
        chunk_words.append(article_words.pop(0))
    return {'chunk_words': chunk_words, 'remained_article_words': article_words}

def get_metadata(metadata_file: str):
	with open(f"{ARTIFACTS_DIRNAME}/{metadata_file}", 'r') as metadata_f:
		return json.load(metadata_f)

def save_chunks_into_JSONL(chunks: list, JSONL_file: str):
	serialized_jsonl_chunks = '\n'.join([json.dumps(chunk) for chunk in chunks])
	with open(f"{ARTIFACTS_DIRNAME}/{JSONL_file}", 'w') as jsonl_f:
		jsonl_f.write(serialized_jsonl_chunks)

with mlflow.start_run(
    run_name = "Data_Chunking_for_RAG",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    main(PREPROCESSED_ARTICLE_TEXT_FILENAME,
    	CHUNK_SIZE_IN_TOKENS,
    	ARTICLE_METADATA_FILENAME,
    	CHUNKS_JSONL_FILENAME
    )

**Embeddings Generation and Loading into Qdrant**

In [None]:
def main(qdrant_client, chunks_collection_name, 
chunks_file: str, embed_model, embed_vector_size, embed_vectors_filename):
    chunks = get_chunked_data(chunks_file)
    if qdrant_client.collection_exists(chunks_collection_name):
        print("Qdrant Collection for Chunks Embeddings already exists!")
        print(f"Deleting {chunks_collection_name} collection...", end = '')
        qdrant_client.delete_collection(chunks_collection_name)
        print("DONE")
    print("Collection doesn't exist. Creating...", end = '')
    create_chunks_Qdrant_collection(qdrant_client, chunks_collection_name, embed_vector_size)
    print('DONE')
    chunks_texts_embedding_vectors = get_chunks_texts_embed_vectors(embed_model, chunks)
    save_chunks_embedding_vectors(chunks_texts_embedding_vectors, embed_vectors_filename)
    add_embed_vectors_to_chunks(chunks, chunks_texts_embedding_vectors)
    add_chunks_to_Qdrant_collection(embed_model, chunks, qdrant_client, chunks_collection_name)

def get_chunked_data(chunks_file: str) -> list[dict]:
	loaded_chunked_data = []
	with open(f"{ARTIFACTS_DIRNAME}/{chunks_file}", 'r') as chunks_f:
		for line in chunks_f:
			loaded_chunked_data.append(json.loads(line))
	return loaded_chunked_data

def create_chunks_Qdrant_collection(client, chunks_collection_name: str, embed_vector_size: int):
    client.create_collection(
        collection_name = chunks_collection_name,
        vectors_config = VectorParams(
            size = embed_vector_size,
            distance = Distance.DOT
        )
    )

def add_chunks_to_Qdrant_collection(embed_model, chunks, client, collection_name: str):
    current_chunk_ID = client.count(collection_name).count + 1
    for (chunk_ID, chunk) in enumerate(chunks, current_chunk_ID):
        client.upsert(
            collection_name = collection_name,
            points = [get_transformed_chunk_as_PointStruct(embed_model, chunk, chunk_ID)],
            wait = True
        )

def get_transformed_chunk_as_PointStruct(embed_model, chunk: dict, point_id: int):
    point = PointStruct(
        id = point_id,
        vector = chunk[CHUNK_DATA_EMBEDDING_VECTOR_KEY],
        payload = {
            CHUNK_METADATA_ID_KEY: chunk[CHUNK_DATA_METADATA_KEY][CHUNK_METADATA_ID_KEY],
            CHUNK_DATA_TEXT_KEY: chunk[CHUNK_DATA_TEXT_KEY],
            CHUNK_DATA_METADATA_KEY: chunk[CHUNK_DATA_METADATA_KEY]
        }
    )
    return point

def get_chunks_texts_embed_vectors(embed_model, chunks):
    chunks_texts = [chunk[CHUNK_DATA_TEXT_KEY] for chunk in chunks]
    return embed_model.embed_documents(chunks_texts)

def add_embed_vectors_to_chunks(chunks, embed_vectors: list[list]):
    for (chunk_embed_vector, chunk) in zip(embed_vectors, chunks):
        chunk[CHUNK_DATA_EMBEDDING_VECTOR_KEY] = chunk_embed_vector
        
def save_chunks_embedding_vectors(embed_vectors: list[np.array], filename: str):
    with open(f"{ARTIFACTS_DIRNAME}/{filename}", 'wb') as embed_vectors_f:
        pickle.dump(embed_vectors, embed_vectors_f)
    print(f'Embedding Vectors Saved into {filename}!')

with mlflow.start_run(
    run_name = "Embeddings_Generation_and_Loading_into_Qdrant",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    main(QDRANT_CLIENT, 
         CHUNKS_COLLECTION_NAME,
         CHUNKS_JSONL_FILENAME,
         EMBEDDINGS_MODEL, 
         EMBEDDING_VECTOR_SIZE, 
         EMBEDDING_VECTORS_FILENAME
    )

**Chunks Searching for User Query(Retrieve stage)**

In [None]:
def main(qdrant_client, chunks_collection_name, embed_model, user_query: str):
    vector_store = get_qdrant_vector_store(qdrant_client, chunks_collection_name, embed_model)
    embedding_user_query = embed_model.embed_query(user_query)
    user_answer_chunks = vector_store.similarity_search_by_vector(embedding_user_query, k = 2)
    return user_answer_chunks

def get_qdrant_vector_store(client, collection_name, embeddings_model):
    return QdrantVectorStore.from_existing_collection(
        host = QDRANT_HOST,
        port = QDRANT_PORT,
        collection_name = collection_name,
        embedding = embeddings_model,
        distance = Distance.DOT,
        content_payload_key = CHUNK_DATA_TEXT_KEY,
        metadata_payload_key = CHUNK_DATA_METADATA_KEY
    )

with mlflow.start_run(
    run_name = "Chunks_Searching_for_User_Query(Retrieve stage)",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    chunks_for_LLM = main(
        QDRANT_CLIENT,
        CHUNKS_COLLECTION_NAME,
        EMBEDDINGS_MODEL,
        USER_QUERY
    )

#### **Prompt Generation for LLM**

In [None]:
def main(retrieved_chunks: list, user_query: str, llm_task_template: str):
    context_text = get_context_text(retrieved_chunks)
    system_role_message = get_system_role_message_for_prompt(llm_task_template, context_text)
    user_role_message = get_user_role_message_for_prompt(user_query)
    prompt = get_prompt(system_role_message, user_role_message)
    return prompt

def get_prompt(system_role_message, user_role_message) -> list[dict]:
    prompt = [system_role_message, user_role_message]
    return prompt

def get_system_role_message_for_prompt(llm_task_template, context):
    return {"role": "system", "content": f"{llm_task_template}{context}"}

def get_user_role_message_for_prompt(question):
    return {"role": "user", "content": question}

def get_context_text(chunks: list):
    context_text = '\n'.join([chunk.page_content for chunk in chunks])
    return context_text

with mlflow.start_run(
    run_name = "Prompt Generation for LLM",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    prompt = main(chunks_for_LLM, USER_QUERY, LLM_TASK_TEMPLATE)

mlflow.log_artifacts(ARTIFACTS_DIRNAME, artifact_path = f"mlflow-artifacts:/{parent_run_version}")

#### **LLM Reply Generation**

In [None]:
# TODO: add trace for input-output tracking
def main(llm_host, llm_port, model_name, prompt):
    client = get_local_OpenAI_client(llm_host, llm_port)
    answer = get_llm_answer(client, model_name, prompt)
    return answer

def get_local_OpenAI_client(host, port):
    client = openai.OpenAI(
        base_url = f"http://{host}:{port}/v1",
        api_key = "sk-no-key-required"
    )
    return client
    
def get_llm_answer(client, model_name, prompt: list[dict]):
    completion = client.chat.completions.create(
        model = model_name,
        messages = prompt,
        max_completion_tokens = 30
    )
    return completion.choices[0].message

with mlflow.start_run(
    run_name = "LLM Reply Generation",
    experiment_id = experiment_ID,
    nested = True
) as child_run:
    answer = main(
        LLAMACPP_SERVER_HOST, 
        LLAMACPP_SERVER_PORT, 
        LLM_MODEL_NAME, 
        prompt
    )
    print(answer)
    
mlflow.end_run()

#### **RAG Files Versions Journal Creation from MLFlow Server**

In [None]:
experiment_existed_parent_runs = mlflow.search_runs(
    experiment_ID, 
    filter_string = """tags.run_type = 'parent'""",
    output_format = 'list'
)

def save_RAG_versions_journal(journal):
    with open(f"{ARTIFACTS_DIRNAME}/{RAG_VERSIONS_JOURNAL_NAME}", 'w') as rag_v_f:
        json.dump(journal, rag_v_f)

def get_RAG_versions_journal(parent_runs: list, experiment_name) -> list[dict]:
    rag_files_versions_journal = []
    experiment_artifacts_URI = mlflow.get_experiment_by_name(MLFLOW_EXPERIMENT_NAME).artifact_location
    parent_runs_article_texts = []
    for parent_run in parent_runs:
        parent_run_version = parent_run.data.tags['parent_version']
        parent_run_artifacts_files_URI = f"{parent_run.info.artifact_uri}/mlflow-artifacts:/{parent_run_version}/"
        parent_run_artifacts = mlflow.artifacts.list_artifacts(parent_run_artifacts_files_URI)
        parent_run_artifacts_json = {'rag_files_version': parent_run_version, 'artifacts': []}
        for run_artifact_file in parent_run_artifacts:
            artifact_URI = parent_run_artifacts_files_URI + run_artifact_file.path
            artifact_info = {'artifact_filename': run_artifact_file.path, 'artifact_URI': artifact_URI}
            parent_run_artifacts_json['artifacts'].append(artifact_info)
        rag_files_versions_journal.append(parent_run_artifacts_json)
    return rag_files_versions_journal

RAG_versions_journal = get_RAG_versions_journal(experiment_existed_parent_runs, MLFLOW_EXPERIMENT_NAME)
save_RAG_versions_journal(RAG_versions_journal)