In [1]:
!pip install pymupdf
!pip install frontend
!pip install pytesseract
!pip install langchain
!pip install sentence-transformers
!pip install weaviate-client
!sudo apt-get install tesseract-ocr


from IPython.display import clear_output
clear_output()

## Extraction

In [9]:
import fitz  # PyMuPDF
import os
from PIL import Image
import pytesseract
import io
import tempfile
import cv2
import time
from tqdm import tqdm

def save_temp_image(image, img_index, pgno, rotated = False):
    # Save the PIL Image to a temporary file
    temp_dir = os.getcwd() + "/temp"
    if not rotated:
        temp_image_path = os.path.join(temp_dir, f"temp_image_{pgno}_{img_index}.jpg")
    else:
        temp_image_path = os.path.join(temp_dir, f"temp_image_rotated_{pgno}_{img_index}.jpg")
    image.save(temp_image_path)
    return temp_image_path

def extract_text_from_pdf(pdf_path):
    # Open the PDF file
    pdf_document = fitz.open(pdf_path)

    extracted_text = ""

    for page_number in tqdm(range(pdf_document.page_count)):
        # Get the page
        page = pdf_document[page_number]

        # Extract text from the PDF page
        pdf_text = page.get_text("text")
        extracted_text += f"Page {page_number + 1} PDF Text:\n{pdf_text}\n{'-' * 50}\n"

        # Get the images on the page
        images = page.get_images(full=True)

        for img_index, img_info in enumerate(images):
            # Get the image
            img_index += 1
            base_image = pdf_document.extract_image(img_info[0])

            # Check if image extraction is successful
            if not base_image:
                print(f"Failed to extract image {img_index} on page {page_number + 1}")
                continue

            image_bytes = base_image["image"]

            # Convert image bytes to PIL Image
            with io.BytesIO(image_bytes) as image_buffer:
                pil_image = Image.open(image_buffer)

                temp_image_path = save_temp_image(pil_image, img_index, page_number)

                # Check for image orientation and rotate if needed
                try:
                  orientation, lang = get_image_orientation(temp_image_path)
                  print(temp_image_path, orientation, lang)
                  pil_image = pil_image.rotate(orientation, expand=True)
                except:
                  pass

                # dpi = pil_image.info['dpi']

                # custom_config = r'--dpi ' + str(dpi)

                # Perform OCR on the image
                image_text = pytesseract.image_to_string(pil_image, lang='eng')  # , config = custom_config)

                # Add image text to the overall extracted text
                extracted_text += f"Page {page_number + 1}, Image {img_index} Text:\n{image_text}\n{'-' * 50}\n"

                # Clean up: Remove the temporary image file
            os.remove(temp_image_path)

    # Close the PDF document
    pdf_document.close()

    return extracted_text

def get_image_orientation(image_path):
    # Read the image
    img = cv2.imread(image_path)

    # Convert the image to grayscale
    osd_data = pytesseract.image_to_osd(img)

    # Extract orientation and language from the OSD data
    lines = osd_data.splitlines()
    orientation = int([line for line in lines if 'degrees' in line][0].split(': ')[1])
    lang = str([line for line in lines if 'Script' in line][0].split(': ')[1])

    return orientation, lang


In [10]:
def extract_text(pdf_path):
  start_time = time.time()
  # pdf_path = "/content/drive/MyDrive/GenAI/PDF Documents/Set1/Black_Decker_AirCompresssor_Nil1_Editable.pdf"
  combined_text = extract_text_from_pdf(pdf_path)
  end_time = time.time()
  print("Time to extract ", end_time - start_time)
  output_file = os.getcwd() + '/output/output.txt'

  return combined_text

# Print or use the combined extracted text as needed
# with open(output_file, 'w', encoding='utf-8') as file:
#         file.write(combined_text)

# Chunking

In [11]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [12]:
def split_text(combined_text):
  text_splitter = RecursiveCharacterTextSplitter(
              chunk_size=500,
              chunk_overlap=50,
              add_start_index= True,
              length_function= len
          )
  # Step 3: Split the content into chunks
  texts = text_splitter.create_documents([combined_text])
  return texts

# print(len(texts))

# Embedding


In [13]:
def metadata_extract(text,meta_dict):
  text += " [META]"
  for k,v in meta_dict.items():
    text += " " + k + ":" + str(v)
  return text

In [14]:
def get_metadata(texts):
  for i in tqdm(range(len(texts))):
    texts[i] = metadata_extract(texts[i].page_content, texts[i].metadata)
  return texts

In [15]:
from transformers import DPRContextEncoder, DPRContextEncoderTokenizer

def generate_embeddings(text):
  tokenizer = DPRContextEncoderTokenizer.from_pretrained(
      "facebook/dpr-ctx_encoder-single-nq-base"
  )
  model = DPRContextEncoder.from_pretrained(
      "facebook/dpr-ctx_encoder-single-nq-base"
  )
  input_ids = tokenizer(text, return_tensors="pt")["input_ids"]
  embeddings = model(input_ids).pooler_output
  return embeddings

In [16]:
import weaviate
import json

# auth_config = weaviate.AuthApiKey(api_key="YOUR-WEAVIATE-API-KEY")

client = weaviate.Client(
    url="https://genai-case-study-7d99r5vj.weaviate.network",
    # auth_client_secret=auth_config,
)

In [None]:
print('is_ready:', client.is_ready())

In [None]:
meta_info = client.get_meta()
print(json.dumps(meta_info, indent=2))

In [None]:
class_obj = {"class": "DocumentSearch", "vectorizer": "none"}
client.schema.create_class(class_obj)

In [19]:
def upload_texts(texts):
  client.batch.configure(batch_size=len(texts))
  with client.batch as batch:
      for i, doc in enumerate(texts):
          properties = {"source_text": doc}
          vector = generate_embeddings(doc)
          batch.add_data_object(properties, "DocumentSearch", vector=vector)

In [None]:
folder_path = "/content/drive/MyDrive/GenAI/PDF Documents/Set2/"
for file in tqdm(os.listdir(folder_path)):
  file_path = os.path.join(folder_path,file)
  combined_text = extract_text(file_path)
  texts = split_text(combined_text)
  texts = get_metadata(texts)
  upload_texts(texts)

In [None]:
query = "How to install the air filter?"
query_vector = generate_embeddings(query)

In [7]:
result = client.query.get("DocumentSearch", ["source_text"]).with_limit(2).with_near_vector({
    "vector": query_vector,
    "certainty": 0.7
}).do()

In [None]:
result['data']['Get']["DocumentSearch"]