<a href="https://colab.research.google.com/github/Marin-kh/Persian_RAG/blob/main/Persian_RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install hazm
!pip install python-docx
!pip install rake_nltk
!pip install docx
!pip install stanza
!pip install gradio

Collecting rake_nltk
  Using cached rake_nltk-1.0.6-py3-none-any.whl.metadata (6.4 kB)
Using cached rake_nltk-1.0.6-py3-none-any.whl (9.1 kB)
Installing collected packages: rake_nltk
Successfully installed rake_nltk-1.0.6
Collecting docx
  Downloading docx-0.2.4.tar.gz (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.9/54.9 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: docx
  Building wheel for docx (setup.py) ... [?25l[?25hdone
  Created wheel for docx: filename=docx-0.2.4-py3-none-any.whl size=53893 sha256=19fd58aef44869f42a89043111528fb9913956c020c4c47c579c91e22b9bd783
  Stored in directory: /root/.cache/pip/wheels/c1/3e/c3/e81c11effd0be5658a035947c66792dd993bcff317eae0e1ed
Successfully built docx
Installing collected packages: docx
Successfully installed docx-0.2.4


Collecting stanza
  Downloading stanza-1.10.1-py3-none-any.whl.metadata (13 kB)
Collecting emoji (from stanza)
  Downloading emoji-2.14.1-py3-none-any.whl.metadata (5.7 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.3.0->stanza)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.3.0->stanza)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.3.0->stanza)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.3.0->stanza)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.3.0->stanza)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata 

In [3]:
import numpy as np
import pandas as pd
from hazm import stopwords_list, Normalizer, WordTokenizer, SentenceTokenizer, Stemmer, Lemmatizer, sent_tokenize, word_tokenize
import docx
import re
from sklearn.feature_extraction.text import TfidfVectorizer
import requests
from sklearn.metrics.pairwise import cosine_similarity
import openai
import nltk
from rake_nltk import Rake
from google.colab import drive
import stanza
from collections import defaultdict
from openai import OpenAI

In [None]:
stanza.download('fa')
nlp = stanza.Pipeline('fa')

In [None]:
class PersianRAKE(Rake):
    def _tokenize_text_to_sentences(self, text: str):
        return sent_tokenize(text)

    def _tokenize_sentence_to_words(self, sentence: str):
        return word_tokenize(sentence)

In [None]:
def read_from_docx(doc):
    fullText=''
    for pra in doc.paragraphs:
        fullText+=pra.text+' '

    return fullText

def split_into_overlapping_chunks(sentences, max_chunk_size=1000, overlap_size=200):
    chunks = []
    current_chunk = ""
    current_chunk_size = 0

    for sentence in sentences:
        sentence_length = len(sentence)

        if current_chunk_size + sentence_length > max_chunk_size and current_chunk:
            chunks.append(current_chunk.strip())

            overlap_buffer = current_chunk[-overlap_size:].strip() if current_chunk else ""
            current_chunk = overlap_buffer + " "
            current_chunk_size = len(overlap_buffer) + 1

        current_chunk += sentence + " "
        current_chunk_size += sentence_length + 1

    if current_chunk:
        chunks.append(current_chunk.strip())

    return chunks

def preprocess_text_1(text):
    # text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'( +)', ' ', str(text))
    return text


def preprocess_text_2(text):
    text = re.sub('(\(.*?\))|(\[.*?\])', '', str(text))
    text = re.sub(r'( +)', ' ', str(text))

    word_tokenizer = WordTokenizer()
    words = word_tokenizer.tokenize(text)

    stopwords = stopwords_list()
    filtered_words = [word for word in words if word not in stopwords]

    lemmatizer = Lemmatizer()
    lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words]

    return ' '.join(lemmatized_words)


def check_spelling(main_text):
    endpoint = "https://api.languagetool.org/v2/check"

    data = {
        "text": main_text,
        "language": "en-US",
    }

    response = requests.post(endpoint, data=data)
    json_response = response.json()

    updated_text = main_text

    for match in json_response.get("matches", []):
        replacement = match["replacements"][0]["value"] if match["replacements"] else ""

        offset = match["offset"]
        length = match["length"]

        updated_text = updated_text.replace(main_text[offset:offset+length], replacement)

    print("Original Query: ", main_text)
    print("Spell-checked Query: ", updated_text)
    return updated_text

def phrase_search(sentence):
    doc = nlp(sentence)

    phrases = []
    for sent in doc.sentences:
        for word in sent.words:
            if word.upos in ['NOUN', 'ADJ']:
                phrase = word.text
                for other_word in sent.words:
                    if other_word.head == word.id and other_word.upos in ['NOUN', 'ADJ']:
                        phrase += " " + other_word.text
                if " " in phrase:
                    phrases.append(phrase)
    return phrases

def english_to_persian_number(number_str):
    english_to_persian = {
        "0": "۰",
        "1": "۱",
        "2": "۲",
        "3": "۳",
        "4": "۴",
        "5": "۵",
        "6": "۶",
        "7": "۷",
        "8": "۸",
        "9": "۹",
    }
    persian_number = "".join([english_to_persian[digit] for digit in number_str])
    return persian_number

def persian_words_to_number(sentence):
    word_to_number = {
        "صفر": 0,
        "یک": 1,
        "دو": 2,
        "سه": 3,
        "چهار": 4,
        "پنج": 5,
        "شش": 6,
        "هفت": 7,
        "هشت": 8,
        "نه": 9,
        "ده": 10,
        "یازده": 11,
        "دوازده": 12,
        "سیزده": 13,
        "چهارده": 14,
        "پانزده": 15,
        "شانزده": 16,
        "هفده": 17,
        "هجده": 18,
        "نوزده": 19,
        "بیست": 20,
        "سی": 30,
        "چهل": 40,
        "پنجاه": 50,
        "شصت": 60,
        "هفتاد": 70,
        "هشتاد": 80,
        "نود": 90,
        "صد": 100,
        "یکصد": 100,
        "دویست": 200,
        "سیصد": 300,
        "چهارصد": 400,
        "پانصد": 500,
        "ششصد": 600,
        "هفتصد": 700,
        "هشتصد": 800,
        "نهصد": 900,
        "هزار": 1000,
    }
    words = sentence.split(' ')

    result = []
    temp_number_words = []
    current_number = 0

    for word in words:
        if word[-1:]=='م' and (word[:-1] in word_to_number):
            word = word[:-1]
        if word in word_to_number:
            temp_number_words.append(word)
            current_number += word_to_number[word]
        else:
            if temp_number_words:
                english_number_str = str(current_number)
                persian_number_str = english_to_persian_number(english_number_str)
                result.append(persian_number_str)
                temp_number_words = []
                current_number = 0
            result.append(word)

    if temp_number_words:
        english_number_str = str(current_number)
        persian_number_str = english_to_persian_number(english_number_str)
        result.append(persian_number_str)

    return ' '.join(result)

def preprocess_phrases(text, phrases):
    for phrase in phrases:
        text = text.replace(phrase, phrase.replace(" ", "_"))
    return text

def extract_persian_numbers(text):
    persian_digits = "۰۱۲۳۴۵۶۷۸۹"
    return re.findall(f"[{persian_digits}]+", text)

def calculate_cosine_similarity(docs, phrase):
    vectorizer = TfidfVectorizer(ngram_range=(1, 3))
    tfidf_matrix = vectorizer.fit_transform(docs)
    phrase_vector = vectorizer.transform(phrase)
    return cosine_similarity(phrase_vector, tfidf_matrix)

def calculate_tf(document_numbers):
    tf = []
    for doc in document_numbers:
        tf_dict = defaultdict(int)
        for num in doc:
            tf_dict[num] += 1
        tf.append(tf_dict)
    return tf

def calculate_idf(document_numbers, numbers):
    idf = {}
    total_docs = len(document_numbers)
    for num in numbers:
        doc_count = sum(1 for doc in document_numbers if num in doc)
        idf[num] = np.log((total_docs + 1) / (doc_count + 1)) + 1
    return idf

def calculate_tf_idf(document_numbers, numbers):
    tf = calculate_tf(document_numbers)
    idf = calculate_idf(document_numbers, numbers)
    tf_idf = []
    for doc_tf in tf:
        doc_tf_idf = {}
        for num, freq in doc_tf.items():
            if num in idf:
                doc_tf_idf[num] = freq * idf[num]
        tf_idf.append(doc_tf_idf)
    return tf_idf

In [None]:
# Loading The Main Document
drive.mount('/content/drive')
# document = read_from_docx(docx.Document("/content/drive/My Drive/Constitution_of_the_Islamic_Republic.docx"))
document = read_from_docx(docx.Document("/content/drive/My Drive/delta.docx"))

Mounted at /content/drive


In [None]:
df = pd.DataFrame()
df = pd.read_csv('/content/drive/My Drive/api_key.csv')

In [None]:
# Chunking The Document
normalizer = Normalizer()
normalized_text = normalizer.normalize(document)

sentence_tokenizer = SentenceTokenizer()
sentences = sentence_tokenizer.tokenize(normalized_text)

max_chunk_size = 1000
overlap_size = 200
chunks = split_into_overlapping_chunks(sentences, max_chunk_size, overlap_size)
print("<Chunk 1>")
print(f"Original Chunk:\n{chunks[0]}")

# Preprocessing The Chunks
preprocessed1_chunks = [preprocess_text_1(chunk) for chunk in chunks]

preprocessed2_chunks = [preprocess_text_2(chunk) for chunk in preprocessed1_chunks]
print(f"Preprocessed Chunk:\n{preprocessed2_chunks[0]}")

<Chunk 1>
Original Chunk:
املاک دلتا یکی از شرکت‌های برجسته در صنعت املاک و مستغلات است که در زمینه خرید، فروش، اجاره و مدیریت املاک فعالیت می‌کند. این شرکت با بهره‌گیری از تجربه و تخصص خود در این حوزه، به مشتریان خود خدمات متنوعی ارائه می‌دهد. در این مقاله به بررسی تاریخچه، خدمات، مزایا و چالش‌های املاک دلتا خواهیم پرداخت: تاریخچه دلتا املاک دلتا در سال ۱۳۵۶ تأسیس شد و از آن زمان تا به امروز به یکی از نام‌های معتبر در صنعت املاک و مستغلات تبدیل‌شده است. این شرکت با هدف ارائه خدمات با کیفیت و ساختارهای نوآورانه به مشتریان، فعالیت خود را آغاز کرد. با گذشت زمان، املاک دلتا توانسته است با بهره‌گیری از تیمی متخصص و استفاده از فناوری‌های پیشرفته، جایگاه خود را در بازار مستحکم کند. خدمات املاک دلتا به ارائه طیف گسترده‌ای از خدمات در حوزه املاک و مستغلات می‌پردازد که شامل موارد زیر است: خرید و فروش املاک: این شرکت به مشتریان خود کمک می‌کند تا املاک مناسب برای خرید یا فروش را پیدا کنند. خدمات مشاوره‌ای در زمینه ارزیابی قیمت، معرفی املاک مناسب و فرآیندهای قانونی خرید و فروش از جمله خدمات این بخ

In [None]:
def ask_model(content):
    client = OpenAI(
        base_url="https://api.groq.com/openai/v1",
        api_key=df.loc[2, 'api_key'],
    )

    completion = client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[{"role": "user", "content": content}],
    )

    return completion.choices[0].message.content

def get_answer(query):
    query = check_spelling(query)

    processed_query = preprocess_text_2(preprocess_text_1(query))
    query_n = persian_words_to_number(processed_query)
    phrases = phrase_search(query_n)
    numbers = re.findall(r'\d+', query_n)
    numberic_chunks = [persian_words_to_number(chunk) for chunk in preprocessed2_chunks]

    # Similarity Of Query
    query_bonus = calculate_cosine_similarity(numberic_chunks, [query_n]).flatten()

    # Similarity Of Numbers
    document_numbers = [extract_persian_numbers(doc) for doc in numberic_chunks]
    tf_idf_numbers = calculate_tf_idf(document_numbers, numbers)

    number_bonus = np.zeros(len(numberic_chunks))
    for i, doc_tf_idf in enumerate(tf_idf_numbers):
        for num in numbers:
            if num in doc_tf_idf:
                number_bonus[i] += doc_tf_idf[num]
    if np.max(number_bonus) > 0:
        number_bonus = number_bonus / np.max(number_bonus)

    # Similarity Of Phrases
    preprocessed_docs = [preprocess_phrases(doc, phrases) for doc in numberic_chunks]
    preprocessed_phrases = preprocess_phrases(query_n, phrases)

    phrases_similarity = calculate_cosine_similarity(preprocessed_docs, [preprocessed_phrases])
    phrases_bonus = np.max(phrases_similarity, axis=0)
    if np.max(phrases_bonus) > 0:
        phrases_bonus = phrases_bonus / np.max(phrases_bonus)

    query_coef = 0.5
    numbers_coef = 0.3
    phrases_coef = 0.2

    hybrid_scores = (query_bonus * query_coef) + (number_bonus * numbers_coef) + (phrases_bonus * phrases_coef)

    top_k = 3
    indices = np.argsort(-hybrid_scores)[:top_k]

    top_3 = [chunks[idx] for idx in indices]

    total_score = 0
    print("Top 3 Hybrid Results:")
    for idx, chunk in zip(indices, top_3):
        print(f"Chunk {idx + 1} (Score: {hybrid_scores[idx]:.2f}):\n{chunk}")
        total_score += hybrid_scores[idx]


    # making answer
    if total_score <= 0.1:
        prop_related = ask_model(f"ببین این جمله داخل <> درباره ملک هست یا نه.<{query}> و فقد با یک کلمه جواب منو بده بله یا خیر.")
        if "بله" in prop_related:   #prop related
            answer = 'متأسفانه، اطلاعاتی در این باره در دسترس نیست و نمی‌توانم به سوال شما پاسخ دهم. برای اطلاعات بیشتر با شماره پشتیبانی سایت دلتا(8686-021) تماس حاصل فرمایید.'
        else:                       #none prop related
            answer = 'متأسفانه، اطلاعاتی در این باره در دسترس نیست و نمی‌توانم به سوال شما پاسخ دهم.'
    else:
        model_content = f"{top_3[0]}\n{top_3[1]}\n{top_3[2]}\nطبق متن های بالا به طور خلاصه(در حد یک پاراگراف) به این سوال جواب بده و اشاره ای به کلمه پاراگراف یا متن نکن و نگو از متن داری استفاده میکنی و اگه جواب تو متن نبود بگو نمیتوانم جواب شما را بدهم:{query}\n"
        answer = ask_model(model_content)
    return [answer, total_score]

In [None]:
import gradio as gr
import pandas as pd


input = gr.Textbox(label="سوال")

outputs = [
    gr.Textbox(label="پاسخ"),
    gr.Textbox(label="score=")
]

interface = gr.Interface(
    fn=get_answer,
    inputs=input,
    outputs=outputs,
    title="Chat Bot",
    allow_flagging="never",
    theme="dark"
)

interface.launch()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.

Sorry, we can't find the page you are looking for.


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://564f3f822e9a9e160b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


