### **RAG Application build on Gemini**

# **Importing Libraries**

In [None]:
!pip install langchain langchain_community langchain-google-genai python-dotenv langchain_experimental sentence-transformers langchain_chroma langchainhub  pypdf rapidocr-onnxruntime SpeechRecognition pydub ffmpeg gtts fpdf chromadb langdetect

Collecting langchain_community
  Downloading langchain_community-0.3.19-py3-none-any.whl.metadata (2.4 kB)
Collecting langchain-google-genai
  Downloading langchain_google_genai-2.1.0-py3-none-any.whl.metadata (3.6 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting langchain_experimental
  Downloading langchain_experimental-0.3.4-py3-none-any.whl.metadata (1.7 kB)
Collecting langchain_chroma
  Downloading langchain_chroma-0.2.2-py3-none-any.whl.metadata (1.3 kB)
Collecting langchainhub
  Downloading langchainhub-0.1.21-py3-none-any.whl.metadata (659 bytes)
Collecting pypdf
  Downloading pypdf-5.4.0-py3-none-any.whl.metadata (7.3 kB)
Collecting rapidocr-onnxruntime
  Downloading rapidocr_onnxruntime-1.4.4-py3-none-any.whl.metadata (1.3 kB)
Collecting SpeechRecognition
  Downloading SpeechRecognition-3.14.1-py3-none-any.whl.metadata (31 kB)
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collectin

In [None]:
import os
uploaded_files = os.listdir()
exclude_files = ['.config', 'sample_data']
for file in uploaded_files:
        try:
            # Skip the files that are in the exclude list
            if file not in exclude_files:
                if os.path.exists(file):
                    os.remove(file)
                    print(f"Deleted file: {file}")
            else:
                print(f"Skipped predefined file: {file}")
        except Exception as e:
            print(f"Error while deleting file {file}: {e}")

Skipped predefined file: .config
Skipped predefined file: sample_data


# **Dealing with a PDF**


In [None]:
from google.colab import files
from langchain_community.document_loaders import PyPDFLoader
import os
import re

def work_pdf():
    # Allow the user to upload a PDF
    print("Upload the PDF file")
    uploaded_files = files.upload()

    # Get the exact name of the uploaded file (including suffix if added)
    uploaded_pdf_name = list(uploaded_files.keys())[0]
    print(f"File Name: {uploaded_pdf_name}")

    # Check if the uploaded file already has a suffix (e.g., " (1).pdf")
    if re.search(r" \(\d+\)\.pdf$", uploaded_pdf_name):
        # Extract the base file name by removing any suffix like " (1)"
        base_pdf_name = re.sub(r" \(\d+\)\.pdf$", ".pdf", uploaded_pdf_name)

        # If a file with the base name already exists, delete it
        if os.path.exists(base_pdf_name):
            print(f"File '{base_pdf_name}' already exists. Replacing it...")
            os.remove(base_pdf_name)  # Remove the old file

        # Rename the uploaded file to the base file name
        if os.path.exists(uploaded_pdf_name):  # Ensure the uploaded file exists
            os.rename(uploaded_pdf_name, base_pdf_name)
            print(f"PDF uploaded and saved as: {base_pdf_name}")
        else:
            print(f"Error: Uploaded file '{uploaded_pdf_name}' not found.")
    else:
        # If no suffix exists, keep the uploaded file as-is
        base_pdf_name = uploaded_pdf_name
        print(f"No duplicates found. PDF saved as: {base_pdf_name}")

    # Step 2: Use PyPDFLoader to load the PDF
    if os.path.exists(base_pdf_name):  # Ensure the final file exists before loading
        loader = PyPDFLoader(base_pdf_name)
        data = loader.load()
        print(f"Number of pages loaded: {len(data)}")
        return data
    else:
        print(f"Error: File '{base_pdf_name}' not found.")

In [None]:
import speech_recognition as sr
from pydub import AudioSegment

def work_audio():
    # Allow the user to upload an audio file
    print("Upload the audio file")
    uploaded_audio_files = files.upload()

    # Get the exact name of the uploaded file (including suffix if added)
    uploaded_audio_name = list(uploaded_audio_files.keys())[0]
    print(f"Audio File Name: {uploaded_audio_name}")

    # Determine the file extension and convert to WAV if necessary
    audio_file_extension = uploaded_audio_name.split('.')[-1].lower()

    if audio_file_extension not in ['wav']:
        # Convert MP3 or M4A to WAV
        audio = AudioSegment.from_file(uploaded_audio_name)
        wav_file_path = uploaded_audio_name.replace(audio_file_extension, 'wav')
        audio.export(wav_file_path, format="wav")
        print(f"Converted {uploaded_audio_name} to {wav_file_path}")
    else:
        wav_file_path = uploaded_audio_name

    # Step 3: Recognize and Transcribe Audio
    recognizer = sr.Recognizer()

    # Load the audio file (now in WAV format)
    with sr.AudioFile(wav_file_path) as source:
        audio_data = recognizer.record(source)

    # Recognize and transcribe the audio
    try:
        audio_text = recognizer.recognize_google(audio_data)
        print("Transcribed Text: ", audio_text)
    except sr.UnknownValueError:
        print("Sorry, could not understand the audio.")
        audio_text = ""
    except sr.RequestError as e:
        print(f"Request error from Google Speech Recognition service: {e}")
        audio_text = ""

    # Create a 'document' similar to how you would for the PDF
    data = [{"text": audio_text, "source": uploaded_audio_name}]
    return data  # Return the document-like structure

In [None]:
# Ask the user for their choice: 1 for PDF, 2 for Audio
choice = input("Enter 1 to upload a PDF or 2 to upload an Audio file: ").strip()
data = []
# Call the appropriate function based on the user's choice
if choice == '1':
    data = work_pdf()  # Call the work_pdf function to handle PDF upload and processing
elif choice == '2':
    data = work_audio()  # Call the work_audio function to handle audio upload and processing
else:
    print("Invalid input. Please enter either 1 or 2.")
print(data)

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document

#slit dataa
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000)
if choice == '1':
  # Convert the list of dictionaries to a list of Document objects
  print("PDF Doc Processing...")
  docs = text_splitter.split_documents(data)
elif choice == '2':
  print("Audio Doc Processing...")
  docs = text_splitter.split_documents([Document(page_content=data[0]['text'], metadata={'source': data[0]['source']})])

print("Total number of documents: ", len(docs))

# **Model Embedding with Gemini Credentials and .JSON file**


In [None]:
import re
# Allow the user to upload a credentials JSON file dynamically
print("Upload the credentials JSON file")
uploaded_json = files.upload()

# Get the exact name of the uploaded file (including suffix if added)
uploaded_file_name = list(uploaded_json.keys())[0]
print(f"File Name: {uploaded_file_name}")

# Check if the uploaded file already has a suffix (e.g., " (1).json")
if re.search(r" \(\d+\)\.json$", uploaded_file_name):
    # Extract the base file name by removing any suffix like " (1)"
    base_file_name = re.sub(r" \(\d+\)\.json$", ".json", uploaded_file_name)

    # If a file with the base name already exists, delete it
    if os.path.exists(base_file_name):
        print(f"File '{base_file_name}' already exists. Replacing it...")
        os.remove(base_file_name)  # Remove the old file

    # Rename the uploaded file to the base file name
    if os.path.exists(uploaded_file_name):  # Ensure the uploaded file exists
        os.rename(uploaded_file_name, base_file_name)
        print(f"Credentials JSON uploaded and saved as: {base_file_name}")
    else:
        print(f"Error: Uploaded file '{uploaded_file_name}' not found.")
else:
    # If no suffix exists, keep the uploaded file as-is
    base_file_name = uploaded_file_name
    print(f"No duplicates found. JSON file saved as: {base_file_name}")

In [None]:
key_path = base_file_name
print(key_path)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path

In [None]:
from langchain.vectorstores import Chroma
from langchain_google_genai import GoogleGenerativeAIEmbeddings
# Initialize the embeddings with the credentials
vectorstore = None
retriever = None
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
vector = embeddings.embed_query("hello, world!")
print(vector[:5])


In [None]:
# Create a new Chroma instance with the desired parameters
vectorstore = Chroma.from_documents(documents=docs, embedding=GoogleGenerativeAIEmbeddings(model="models/embedding-001"))

In [None]:
retriever = vectorstore.as_retriever( search_type="similarity" ,search_kwargs={"k": len(docs)})
retrieved_docs = retriever.invoke("what is it")

In [None]:
len(retrieved_docs)

In [None]:
print(retrieved_docs[0].page_content)

# **Setting model, diversity(temperature), wordcount(max_tokkens) and Propmts to ask from model**

In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI( model="gemini-1.5-flash-latest", temperature=0.1, max_tokens=500 )

In [None]:
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

system_prompt = (
    "You are an assistant for kids under the age of 18, helping with question-answering tasks. "
    "If the question is asked by someone over 18, politely deny the request and say that you can only assist kids. "
    "Use the following pieces of retrieved context to answer the question. If you don't know the answer, say that you don't know. "
    "Use three sentences maximum and keep the answer concise. "
    "\n\n"
    "{context}"
)


prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)

In [None]:
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)

# **Testing with a prompt**

In [None]:
import textwrap
from gtts import gTTS
from fpdf import FPDF
import IPython.display as ipd

user_input = input("Enter your question: ")
#Like What is new in YOLOv9?

response = rag_chain.invoke({"input": user_input})

# Format the response for better readability
formatted_answer = textwrap.fill(response["answer"], width=80)
print("\n",formatted_answer)

In [None]:
from gtts import gTTS
import langdetect  # For detecting language
import IPython.display as ipd

text = formatted_answer
try:
    # Detect the language of the text
    detected_lang = langdetect.detect(text)  # Returns 'ur' for Urdu, 'en' for English, etc.
    print(f"Detected language: {detected_lang}")

    # Use Urdu if detected, otherwise fallback to English
    lang = 'ur' if detected_lang == 'ur' else 'en'

    # Generate the audio
    tts = gTTS(text, lang=lang)
    tts.save("response.mp3")

    # Play the audio
    ipd.display(ipd.Audio("response.mp3"))
    print("Audio generated and played successfully.")
except langdetect.lang_detect_exception.LangDetectException:
    print("Language detection failed. Defaulting to English.")
    tts = gTTS(text, lang='en')
    tts.save("response.mp3")
    ipd.display(ipd.Audio("response.mp3"))
except ValueError as e:
    print(f"Error generating audio: {e}")
except Exception as ex:
    print(f"An unexpected error occurred: {ex}")


In [None]:
# List all files in the current directory
uploaded_files = os.listdir()

# List of files/folders to exclude from deletion (add predefined system files here)
exclude_files = ['.config', 'sample_data']
del_file = input("\n\tProgram Complete.\nDo you want to delete all uploaded files? (yes/no): ").strip().lower()
if del_file == "yes":
    # Loop through and delete files, excluding the predefined ones
    for file in uploaded_files:
        try:
            # Skip the files that are in the exclude list
            if file not in exclude_files:
                if os.path.exists(file):
                    os.remove(file)
                    print(f"Deleted file: {file}")
            else:
                print(f"Skipped predefined file: {file}")
        except Exception as e:
            print(f"Error while deleting file {file}: {e}")