# Enter ROOT Folder ( Folder contains PDFs or contains another folders with PDFs )

In [6]:
ROOT_FOLDER = r"C:\Users\Fazliddin\Desktop\Project_for_SWFMD\paper"

# Imports

In [2]:
!pip install pydantic==1.10.8

Collecting pydantic==1.10.8
  Downloading pydantic-1.10.8-cp39-cp39-win_amd64.whl (2.2 MB)
     ---------------------------------------- 2.2/2.2 MB 12.7 MB/s eta 0:00:00
Installing collected packages: pydantic
  Attempting uninstall: pydantic
    Found existing installation: pydantic 2.1.1
    Uninstalling pydantic-2.1.1:
      Successfully uninstalled pydantic-2.1.1
Successfully installed pydantic-1.10.8


In [1]:
!pip install numpy==1.20.0

Collecting numpy==1.20.0
  Downloading numpy-1.20.0-cp39-cp39-win_amd64.whl (13.7 MB)
     ---------------------------------------- 13.7/13.7 MB 5.7 MB/s eta 0:00:00
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.21.5
    Uninstalling numpy-1.21.5:
      Successfully uninstalled numpy-1.21.5
Successfully installed numpy-1.20.0


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
daal4py 2021.6.0 requires daal==2021.4.0, which is not installed.
torchmetrics 1.0.1 requires numpy>1.20.0, but you have numpy 1.20.0 which is incompatible.


In [1]:
import torch
from transformers import DonutProcessor, VisionEncoderDecoderModel
import re
from PIL import Image
import fitz  # PyMuPDF
import os
import csv
import time
import spacy
from collections import Counter
from docquery import document, pipeline
import logging


document-question-answering is already registered. Overwriting pipeline for task document-question-answering...


# Helper Functions

In [3]:
def create_log_file(log_file_name, log_level=logging.INFO):
  """
  Creates a log file and returns a logger object.

  Args:
    log_file_name: The name of the log file to create.
    log_level: The minimum level of messages to log (default: INFO).

  Returns:
    A logging.Logger object.
  """

  logger = logging.getLogger(__name__)
  logger.setLevel(log_level)

  handler = logging.FileHandler(log_file_name)
  handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
  logger.addHandler(handler)

  return logger



def pdf_to_text(pdf_path):
    text = ""

    # Open the PDF file
    pdf_document = fitz.open(pdf_path)

    # Iterate through each page and extract text
    for page_number in range(pdf_document.page_count):
        page = pdf_document[page_number]
        text += page.get_text()

    # Close the PDF document
    pdf_document.close()

    return text

def extract_keywords(text, num_keywords=10):
    # Load the spaCy English language model
    nlp = spacy.load("en_core_web_sm")

    # Process the text using spaCy
    doc = nlp(text)

    # Extract tokens that are nouns or adjectives (modify as needed)
    keywords = [token.text for token in doc if token.pos_ in ["NOUN", "ADJ"]]

    # Use Counter to get the most common keywords
    keyword_counter = Counter(keywords)
    top_keywords = keyword_counter.most_common(num_keywords)
    keywords_string = ','.join([keyword[0] for keyword in top_keywords])
    return keywords_string

def pdf_to_image(pdf_path, page_number=0):
    """
    Converts PDF into image and count the number of pages

    returns: Image, number of pages
    """
    # Open the PDF file
    pdf_document = fitz.open(pdf_path)

    # Get the number of pages in the PDF
    num_pages = pdf_document.page_count

    # Get the specified page of the PDF
    page = pdf_document[page_number]

    # Convert the page to a pixmap
    pixmap = page.get_pixmap()

    # Create an Image object from the pixmap data
    image = Image.frombytes("RGB", (pixmap.width, pixmap.height), pixmap.samples)

    # Close the PDF document
    pdf_document.close()

    return image, num_pages


logger = create_log_file('app.log')

In [4]:
class PDFDataCsvGenerator:
    def __init__(self, fieldnames):
        global logger
        self.csv_file_path = f"pdfs_data_{int(time.time())}.csv"
        # print(f"[INFO] CSV FILE NAME: {self.csv_file_path}")
        logger.info(f"[INFO] CSV FILE NAME: {self.csv_file_path}")
        self.fieldnames = fieldnames

        # Check if the CSV file already exists; if not, create it with headers
        if not os.path.exists(self.csv_file_path):
            with open(self.csv_file_path, mode="w", newline='') as csv_file:
                writer = csv.DictWriter(csv_file, fieldnames=self.fieldnames)
                writer.writeheader()

    def add_entry(self, entry_dict):
        with open(self.csv_file_path, mode="a", newline='') as csv_file:
            writer = csv.DictWriter(csv_file, fieldnames=self.fieldnames)
            writer.writerow(entry_dict)



class DocQA:
  def __init__(self, model_type="classification"):
    self.model_type = model_type
    self.device = "cuda" if torch.cuda.is_available() else "cpu"
    self.__make_model()

  def __make_model(self):
    if self.model_type == "classification":
      self.task_prompt = "<s_rvlcdip>"
      self.processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base-finetuned-rvlcdip")
      self.model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base-finetuned-rvlcdip")
    elif self.model_type == "document-answering":
      self.task_prompt = "<s_docvqa><s_question>{user_input}</s_question><s_answer>"
      self.processor = DonutProcessor.from_pretrained("naver-clova-ix/donut-base-finetuned-docvqa")
      self.model = VisionEncoderDecoderModel.from_pretrained("naver-clova-ix/donut-base-finetuned-docvqa")

    self.model.to(self.device)

  def answer(self, image, question=None):
    if self.model_type  == "document-answering":
      if not question:
        raise ValueError("Please enter a question.")

    if not image:
      raise ValueError("Please enter image")

    if self.model_type == "document-answering":
      self.task_prompt = "<s_docvqa><s_question>{user_input}</s_question><s_answer>".replace("{user_input}", question)

    pixel_values = self.processor(image, return_tensors="pt").pixel_values
    decoder_input_ids = self.processor.tokenizer(self.task_prompt, add_special_tokens=False, return_tensors="pt").input_ids

    # generate answer
    outputs = self.model.generate(
        pixel_values.to(self.device),
        decoder_input_ids=decoder_input_ids.to(self.device),
        max_length=self.model.decoder.config.max_position_embeddings,
        early_stopping=True,
        pad_token_id=self.processor.tokenizer.pad_token_id,
        eos_token_id=self.processor.tokenizer.eos_token_id,
        use_cache=True,
        num_beams=1,
        bad_words_ids=[[self.processor.tokenizer.unk_token_id]],
        return_dict_in_generate=True,
    )

    # postprocess
    sequence = self.processor.batch_decode(outputs.sequences)[0]
    sequence = sequence.replace(self.processor.tokenizer.eos_token, "").replace(self.processor.tokenizer.pad_token, "")
    sequence = re.sub(r"<.*?>", "", sequence, count=1).strip()  # remove first task start token

    return self.processor.token2json(sequence)



# Main

In [5]:
# Object for classification
classificaton_obj = DocQA(model_type="classification")

# Creating docQuery pipeline.
docquery_pipeline = pipeline('document-question-answering')

questions_columns = {
    0: {
        "question":  "What is the document title?",
        "column_name": "title"
          },
    1: {
        "question":  "Who are the authors of the document?",
        "column_name": "author"
          },
    2: {
        "question": "What is the document date?",
        "column_name": "date"
          }
}

# Main columns
fieldnames = ["class", "page_count", "keywords"]
# Added columns with questions
fieldnames += [questions_columns[i]["column_name"] for i in questions_columns]


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


Downloading config.json:   0%|          | 0.00/789 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to see activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


Downloading tokenizer_config.json:   0%|          | 0.00/315 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/511M [00:00<?, ?B/s]

In [10]:
# Creating CSV file.
pdf_data_generator = PDFDataCsvGenerator(fieldnames= fieldnames)

# Walk through the directory and its subdirectories
for root, dirs, files in os.walk(ROOT_FOLDER):
    for file in files:
        # Check if the file has a PDF extension
        if file.lower().endswith('.pdf'):
          try:
            output_dict = {}
            pdf_path = os.path.join(root, file)
            image, page_count = pdf_to_image(pdf_path)

            output_dict["class"] = classificaton_obj.answer(image)["class"]
            output_dict["page_count"] = page_count

            doc_text = pdf_to_text(pdf_path)
            keywords = extract_keywords(doc_text)
            output_dict["keywords"] = keywords

            doc = document.load_document(pdf_path)
            for i in range(len(questions_columns)):
              question = questions_columns[i]["question"]
              column = questions_columns[i]["column_name"]
              output_dict[column] = docquery_pipeline(question=question, **doc.context)[0]["answer"]

            pdf_data_generator.add_entry(output_dict)
            # print(f"[INFO] FILE: {file} IS DONE; {output_dict}")
            logger.info(f"[INFO] FILE: {file} IS DONE; {output_dict}")

          except Exception as e:
            # print(f"[WARNING] FILE {file} FAILED; {e}")
            logger.error(f"FILE {file} FAILED; {e}")

KeyboardInterrupt: 