In [1]:
pip install -q langchain

In [None]:
pip install -U langchain-community

In [None]:
pip install --upgrade langchain-google-genai

In [None]:
pip install -q langchain_google_genai

In [None]:
pip install -q google-generativeai chromadb pypdf  bs4 streamlit docx2txt

In [None]:
pip install python-dotenv

In [None]:
pip install langchain --upgrade

In [None]:
pip freeze > requirements.txt

In [None]:
pip install -r requirements.txt

In [None]:
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai.embeddings import GoogleGenerativeAIEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
import os
from dotenv import load_dotenv ,find_dotenv
load_dotenv(find_dotenv(), override=True)
if os.environ:
    for api_key in os.environ:
        if "API_KEY" in api_key:
            print(api_key)
else:
    import getpass
    os.environ["GOOGLE_API_KEY"] = getpass.getpass("GOOGLE_API_KEY")

In [None]:
# from pprint import pprint
from typing import List
def load_docs_locally(files:List[str]=[]):

    import os
    os.chdir(os.path.join(BASE_DIR,"files/"))
    print(f"current directory: {os.getcwd()}")
    files = [file for file in os.listdir()] if not files else files
    # pprint(files)

    data = []

    for file in files:
        _, extension = os.path.splitext(file)
        if not file.startswith("."):
          match extension:
              case ".pdf":
                  from langchain.document_loaders import PyPDFLoader
                
                  loader = PyPDFLoader(file)
                  print(f"loading pdf {file} ....")
              case ".txt":
                  from langchain.document_loaders import TextLoader
                  loader = TextLoader(file, encoding="utf-8")
                  print(f"loading text {file} ....")
              case ".docx":
                  from langchain.document_loaders import Docx2textLoader
                  loader = Docx2textLoader(file)
                  print(f"loading docx {file} ....")
              case _:
                  print(f"no such available format such as {extension}")


        data += loader.load()
    os.chdir("../")
    # pprint(data)
    return data

In [None]:
def download_file(url:str,filename:str):
    import requests,os
    binary_file = requests.get(url).content
    _,extension = os.path.splitext(url)

    with open(f"files/{filename}{extension}", 'wb') as f:
        f.write(binary_file)

    print(f"done downloading {filename}{extension}")
    return f"files/{filename}{extension}"

In [None]:
def load_docs(docs_urls=["https://fsciences.univ-setif.dz/main_page/home"]):
    from langchain.document_loaders.async_html import AsyncHtmlLoader
    print("loading started....")
    loader = AsyncHtmlLoader(docs_urls)
    documents = loader.load()
    return documents

In [None]:
def clean_html(html_page:str, title:str):
    from pprint import pprint
    from bs4 import BeautifulSoup
    parser = BeautifulSoup(html_page, "html.parser")
    # pprint(parser.prettify())
    with open(f"files/{title}.txt", "w",encoding="utf-8") as f:
        for string in parser.strings:
            if string !="\n":
                f.write(string.strip())
                f.write("\n")

In [None]:
from typing import List
def mass_download(urls:List[str]):
  file_titles = []
  html_pages = load_docs(urls)
  for i,html_page in enumerate(html_pages):
      cleaned_file_title = (
          urls[i]
          .replace("/", "_")
          .replace(".", "_")
          .replace("-", "_")
          .replace("https:", "")
          .replace("dz", "")
          .replace("net", "")
          .replace("com", "")
          .replace("org", "")
          .replace("edu", "")
          .strip("_")
      )
      clean_html(
          html_page.page_content,
          cleaned_file_title
      )
      file_titles.append(cleaned_file_title)
  return file_titles

In [None]:
urls = [
      "https://fsciences.univ-setif.dz/main_page/english",     
  ]
mass_download(urls)

In [None]:
from pprint import pprint
BASE_DIR=os.getcwd()
docs = load_docs_locally()
# pprint(docs)

In [None]:
def chunk_data(docs):
  text_splitter = RecursiveCharacterTextSplitter(chunk_size=250, chunk_overlap=0)
  text = "\n".join([doc.page_content for doc in docs])
  # print(text)
  chunks = text_splitter.split_text(text)
  return chunks

In [None]:
chunks = chunk_data(docs)
print(f"{len(chunks)} chunk")
# pprint(chunks)

In [None]:
def embed_data(chunks):
  from langchain.vectorstores.chroma import Chroma
  embedding = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
  vector_index = Chroma.from_texts(
      chunks,
      embedding
  ).as_retriever(
      search_type="similarity",
      search_kwargs={
          "k":5
      }
  )
  return vector_index

In [None]:
def ask_question(query, vector_index):
  from langchain.prompts import PromptTemplate
  from langchain_google_genai import ChatGoogleGenerativeAI
  from langchain.chains import RetrievalQA

  template = """
  use the following pieces of context to answer the question at the end, translate the answer to arabic. if you don't the answer just say that you don't know the answer, don't try to make up an answer, keep the answer as concise as possible
  {context}
  Question:{question}
  """
  QA_CHAIN_TEMPLATE = PromptTemplate.from_template(template)
  chroma_chain = RetrievalQA.from_chain_type(
      llm=ChatGoogleGenerativeAI(model="gemini-pro", temperature=1),
      retriever=vector_index,
      return_source_documents=True,
      chain_type_kwargs={
          "prompt":QA_CHAIN_TEMPLATE
      },
      verbose=True
  )

#   response = chroma_chain({"query":query})
#   pprint(response)
  response = chroma_chain({"query": query})
  result = response["result"]
  return result

In [None]:
vector_index = embed_data(chunks)
pprint(vector_index)

In [None]:
ask_question(input("give your query"), vector_index)