In [1]:
import json  # Import the JSON module for working with JSON data
import numpy as np  # Import NumPy, a library for working with arrays and matrices
import uuid  # Import the UUID module for generating UUIDs (Universally Unique Identifiers)
import pinecone  # Import the Pinecone library for working with vector similarity search
from typing import List, Tuple  # Import List and Tuple types for type hinting
import src.config as config  # Import a module called "config" from a subdirectory called "src"
import logging  # Import the logging module for logging messages
import time  # Import the time module for measuring time
import openai  # Import the OpenAI library for natural language processing
from lingua import Language, LanguageDetectorBuilder  # Import the Lingua library for language detection and identification

import os  # Import the os library for setting environment variables
from langchain.llms import OpenAI  # Import the OpenAI module from the langchain package for interacting with the OpenAI GPT-3 model
from langchain.prompts import PromptTemplate  # Import the PromptTemplate module from the langchain package for generating prompts for the OpenAI GPT-3 model

from tqdm import tqdm

os.environ["OPENAI_API_KEY"] = config.OPENAI_KEY # Set the OPENAI_API_KEY environment variable to the OpenAI API key stored in the config module

  from tqdm.autonotebook import tqdm


In [2]:
class DataLoader:
    def __init__(self):
        """
        A class for loading and processing data from a JSON file.

        Args:
        - data_file (str): The path to the JSON file to load.
        """
        self.quran_data = None  # Initialize Quran data to None
        self.hadith_data = []  # Initialize Hadith data to None
        self.embeddings = []  # Initialize an empty list for embeddings
        self.metadata = []  # Initialize an empty list for metadata

    def load_quran_data(self, data_file):
        """
        Load Quran data from a JSON file and store it in the `quran_data` attribute.
        """
        self.quran_data_file = data_file
        with open(self.quran_data_file, "r") as f:
            self.quran_data = json.load(f)
    
    def load_hadith_data(self, data_files_list):
        """
        Load Quran data from a JSON file and store it in the `quran_data` attribute.
        """
        self.hadith_data_files = data_files_list
        for book in self.hadith_data_files :
            with open(book, "r") as f:
                self.hadith_data.extend(json.load(f))

    def process_data(self):
        """
        Process the Quran data by extracting embeddings and metadata for each verse.
        """
        for verse in tqdm(self.quran_data):
            self.embeddings.append(verse.get("embedding"))  # Add the verse's embedding to the embeddings list
            self.metadata.append(self.encode_metadata_in_id({
                "juz": verse.get("juz"),
                "juz_name_arabic": verse.get("juz_name_arabic"),
                "juz_name_english": verse.get("juz_name_english"),
                "surah_number": verse.get("surah_number"),
                "surah_name_arabic": verse.get("surah_name_arabic"),
                "surah_name_english": verse.get("surah_name_english"),
                "revelation_location": verse.get("revelation_location"),
                "aya_number": verse.get("aya_number"),
                "arabic_diacritics": verse.get("arabic_diacritics"),
                "arabic_clean": verse.get("arabic_clean"),
                "arabic_words_count": verse.get("arabic_words_count"),
                "arabic_letters_count": verse.get("arabic_letters_count"),
            }))  # Add the verse's metadata, encoded in JSON format, to the metadata list
        
        for hadith in tqdm(self.hadith_data):
            if isinstance(hadith["Arabic_Matn"], str) and isinstance(hadith["embeddings"], list) :
                self.embeddings.append(hadith.get("embeddings"))  # Add the verse's embedding to the embeddings list
                self.metadata.append(self.encode_metadata_in_id({
                    "chapter_number": hadith.get("Chapter_Number"),
                    "chapter_english": hadith.get("Chapter_English"),
                    "chapter_arabic": hadith.get("Chapter_Arabic"),
                    "section_number": hadith.get("Section_Number"),
                    "section_english": hadith.get("Section_English"),
                    "section_arabic": hadith.get("Section_Arabic"),
                    "hadith_number": hadith.get("Hadith_number"),
                    "english_hadith": hadith.get("English_Hadith"),
                    "english_isnad": hadith.get("English_Isnad"),
                    "english_matn": hadith.get("English_Matn"),
                    "arabic_isnad": hadith.get("Arabic_Isnad"),
                    "arabic_matn": hadith.get("Arabic_Matn"),
                    "arabic_comment": hadith.get("Arabic_Comment"),
                    "english_grade": hadith.get("English_Grade"),
                    "arabic_grade": hadith.get("Arabic_Grade"),

                }))  # Add the verse's metada

    @staticmethod
    def encode_metadata_in_id(metadata: dict) -> str:
        """
        Encode metadata in JSON format and return it as a string.

        Args:
        - metadata (dict): A dictionary containing metadata for a verse.

        Returns:
        - str: The encoded metadata in JSON format.
        """
        return json.dumps(metadata, ensure_ascii=False, indent=4)

    @staticmethod
    def decode_metadata_from_id(item_id: str) -> dict:
        """
        Decode metadata from a JSON string and return it as a dictionary.

        Args:
        - item_id (str): A string containing metadata in JSON format.

        Returns:
        - dict: The decoded metadata as a dictionary.
        """
        return json.loads(item_id)

In [3]:
class PineconeManager:
    def __init__(self, api_key: str, index_name: str, dimension: int, create_index: bool = False):
        """
        A class for managing a Pinecone index.

        Args:
        - api_key (str): The Pinecone API key to use.
        - index_name (str): The name of the Pinecone index to manage.
        - dimension (int): The dimensionality of the vectors to be indexed.
        - create_index (bool): Whether to create a new index with the specified name and dimensionality. Defaults to False.
        """
        self.api_key = api_key
        self.index_name = index_name
        self.dimension = dimension
        pinecone.init(api_key=self.api_key)  # Initialize the Pinecone API
        if create_index:
            pinecone.create_index(name=self.index_name, metric="cosine", dimension=self.dimension)  # Create a new index with the specified name and dimensionality
        self.index = pinecone.Index(index_name=self.index_name)  # Initialize the index object
        self.metadata_storage = {}  # Initialize an empty dictionary for storing metadata

    def index_data(self, metadata, embeddings, file_name: str):
            """
            Index data by adding metadata and embeddings to the Pinecone index and save metadata continuously to a JSON file.

            Args:
            - metadata (list): A list of metadata dictionaries, where each dictionary corresponds to a single vector.
            - embeddings (list): A list of embeddings, where each embedding corresponds to a single vector.
            - file_name (str): The name of the file to save the metadata to.
            """
            for meta, embedding in tqdm(zip(metadata, embeddings)):
                if embedding is not None and len(embedding) == self.dimension:
                    short_id = self.generate_short_id()  # Generate a short ID for the metadata
                    self.metadata_storage[short_id] = meta  # Add the metadata to the metadata storage dictionary
                    self.index.upsert([(short_id, embedding)])  # Add the metadata and embedding to the Pinecone index
                    self.save_metadata_storage(file_name, short_id)  # Save the metadata continuously
                else:
                    print(f"[Error] Empty embeddings is found for : {meta}")

    def query_index(self, vector, top_k=20):
        """
        Query the Pinecone index with a vector and retrieve the top K most similar vectors.

        Args:
        - vector (list): A list representing a query vector.
        - top_k (int): The number of results to return. Defaults to 20.

        Returns:
        - list: A list of tuples, where each tuple contains a short ID and a similarity score.
        """
        search_results = self.index.query(vector=vector, top_k=top_k)  # Query the Pinecone index with the vector
        return search_results

    def save_metadata_storage(self, file_name: str, item):
        """
        Save the metadata storage dictionary to a file in JSON format, appending a new record without changing the current values.

        Args:
        - file_name (str): The name of the file to save the metadata to.
        - item: The item containing the metadata to be saved.
        """
        new_data = {item: self.metadata_storage[item]}

        if os.path.exists(file_name):
            with open(file_name, 'r+', encoding='utf-8') as f:
                try:
                    existing_data = json.load(f)
                except json.JSONDecodeError:
                    existing_data = {}

                existing_data.update(new_data)

                f.seek(0)
                f.truncate()
                json.dump(existing_data, f, ensure_ascii=False, indent=4)
        else:
            with open(file_name, 'w', encoding='utf-8') as f:
                json.dump(new_data, f, ensure_ascii=False, indent=4)
                

    def load_metadata_storage(self, file_name: str) -> dict:
        """
        Load the metadata storage dictionary from a file in JSON format.

        Args:
        - file_name (str): The name of the file to load the metadata from.

        Returns:
        - dict: The metadata storage dictionary.
        """
        with open(file_name, 'r', encoding='utf-8') as f:
            metadata_storage = json.load(f)
        self.metadata_storage = metadata_storage
    
    @staticmethod
    def generate_short_id():
        """
        Generate a short, random ID.

        Returns:
        - str: The generated ID.
        """
        return str(uuid.uuid4())[:16]  # Generate a random UUID and return the first 16 characters as the short ID.


In [4]:
import openai

class OpenAIManager:
    def __init__(self):
        """
        Initializes the OpenAIManager class with pre-defined languages, language detector,
        OpenAI model, and a prompt template for translation.
        """
        
        self.languages = [Language.ENGLISH, Language.ARABIC]
        self.lang_detector = LanguageDetectorBuilder.from_languages(*self.languages).build()
        self.openai_llm = OpenAI(temperature=0)
        self.prompt = PromptTemplate(
            input_variables=["query"],
            template="Translate the following arabic text into english : {query}"
        )
        
    def gpt3_embedding(self, content: str, engine: str = 'text-embedding-ada-002') -> list:
        """
        Generates an embedding for the input content using OpenAI GPT-3.

        Args:
            content (str): The input content for which the embedding is required.
            engine (str, optional): The engine to be used for generating the embedding. Defaults to 'text-embedding-ada-002'.

        Returns:
            list: The generated embedding as a list of floats.
        """
        try:
            response = openai.Embedding.create(input=content, engine=engine)
            vector = response['data'][0]['embedding']
            return vector
        except Exception as e:
            logging.error(f'Embedding failed. Error message: {e}')

    def extract_embedding(self, text: str) -> list:
        """
        Extracts the embedding for the given text using GPT-3.

        Args:
            text (str): The input text for which the embedding is required.

        Returns:
            list: The extracted embedding as a list of floats.
        """
        try:
            embedding = self.gpt3_embedding(text)
        except:
            while True:
                try:
                    if len(text) > 8191:
                        logging.warning('[OPENAI ERROR] Trying to get shorter input < 8191 for text...')
                        embedding = self.gpt3_embedding(text[:8191])
                    else:
                        embedding = self.gpt3_embedding(text)
                    break
                except Exception as e:
                    logging.error(f'Trying to get the embedding for text. Error message: {e}')
                    time.sleep(5)
        return embedding
    
    def translate(self, text: str) -> str:
        """
        Translates the input Arabic text to English using the OpenAI model.

        Args:
            text (str): The input Arabic text to be translated.

        Returns:
            str: The translated English text.
        """
        
        translated_text = self.openai_llm(self.prompt.format(query=text))
        return translated_text


In [8]:
# Load data from the specified file and process it
data_loader = DataLoader()
data_loader.load_quran_data("data/embedding/quran_GPT_embeddings_v2.json")
data_loader.load_hadith_data(["/Users/hazemabdelkawy/Codes/islamic/SunnahGPT/data/embeddings/bukhari_GPT_embeddings_v2.json",
                             "/Users/hazemabdelkawy/Codes/islamic/SunnahGPT/data/embeddings/abodawud_GPT_embeddings_v2.json",
                              "/Users/hazemabdelkawy/Codes/islamic/SunnahGPT/data/embeddings/alnasi_GPT_embeddings_v2.json",
                              "/Users/hazemabdelkawy/Codes/islamic/SunnahGPT/data/embeddings/ibnmaja_GPT_embeddings_v2.json",
                              "/Users/hazemabdelkawy/Codes/islamic/SunnahGPT/data/embeddings/muslim_GPT_embeddings_v2.json",
                              "/Users/hazemabdelkawy/Codes/islamic/SunnahGPT/data/embeddings/termzi_GPT_embeddings_v2.json",
                             ])
data_loader.process_data()

100%|██████████| 6236/6236 [00:00<00:00, 86905.76it/s]
100%|██████████| 34088/34088 [00:00<00:00, 56293.46it/s]


In [9]:
print(f"Length of metadata: {len(data_loader.metadata)}")
print(f"Length of embeddings: {len(data_loader.embeddings)}")
print(f"Is metadata and embeddings the same length? {len(data_loader.metadata) == len(data_loader.embeddings)}")
print(f"Is meta has none? {None in data_loader.metadata}")
print(f"Is embedding has none? {None in data_loader.embeddings}")
print(f"Is embeding has empty list? {[] in data_loader.embeddings}")
print(f"Meta[0]: {data_loader.metadata[0]}, Embedding[0]: {data_loader.embeddings[0]}")
print(f"Meta[6235]: {data_loader.metadata[6235]}, Embedding[6235]: {data_loader.embeddings[6235]}")
print(f"Meta[10000]: {data_loader.metadata[10000]}, Embedding[6236]: {data_loader.embeddings[10000]}")
print(f"Meta[30000]: {data_loader.metadata[30000]}, Embedding[6236]: {data_loader.embeddings[30000]}")

Length of metadata: 39317
Length of embeddings: 39317
Is metadata and embeddings the same length? True
Is meta has none? False
Is embedding has none? False
Is embeding has empty list? False
Meta[0]: {
    "juz": 1,
    "juz_name_arabic": "الم",
    "juz_name_english": "Alif Lam Meem",
    "surah_number": 1,
    "surah_name_arabic": "الفاتحة",
    "surah_name_english": "Al-Fatiha",
    "revelation_location": "Makki",
    "aya_number": 1,
    "arabic_diacritics": "بِسْمِ اللَّهِ الرَّحْمَٰنِ الرَّحِيمِ",
    "arabic_clean": "بسم الله الرحمن الرحيم",
    "arabic_words_count": 4,
    "arabic_letters_count": 19
}, Embedding[0]: [-0.02030331641435623, -0.015722380951046944, -0.02074745111167431, -0.022295579314231873, -0.013565152883529663, 0.024770045652985573, -0.006598577834665775, -0.005998995620757341, -0.003489632625132799, -0.020189110189676285, 0.03377964347600937, 0.009929590858519077, 0.0013680945849046111, -0.015519347973167896, -0.012822813354432583, -0.0012023369781672955, 0.033

In [10]:
# Initialize Pinecone manager with API key, index name, and dimension
pinecone_api_key = config.PINECONE_KEY
index_name = "gpt"
dimension = 1536

In [11]:
# Create an instance of PineconeManager and set create_index to True if you want to create a new index
pinecone_manager = PineconeManager(pinecone_api_key, index_name, dimension, create_index=True)

In [12]:
# Index the data and save metadata to storage (uncomment the following lines if needed)
pinecone_manager.index_data(data_loader.metadata, data_loader.embeddings, file_name="data/metadata_storage.json")

864it [03:00,  4.71it/s]

In [None]:
# Load metadata storage from the specified file
pinecone_manager.load_metadata_storage("metadata_storage.json")

In [None]:
# Initialize OpenAI manager
openai_manager = OpenAIManager()

In [None]:
# Get user input and detect its language; if it's Arabic, translate it to English
input_text  = input("Enter your query: ")
if openai_manager.lang_detector.detect_language_of(input_text) == Language.ARABIC:
    input_text = openai_manager.translate(input_text)

# Extract the embedding for the input text
input_query_embedding = openai_manager.extract_embedding(input_text)

# Query the Pinecone index using the extracted embedding and retrieve the top 5 results
search_results = pinecone_manager.query_index(input_query_embedding, top_k=5)
for item in search_results['matches']:
    item_id = item['id']
    item_metadata = pinecone_manager.metadata_storage[item_id]
    similarity = item['score']
    print(f"ID: {item_id}, Similarity: {similarity}, Metadata: {item_metadata}")