<a href="https://colab.research.google.com/github/Rudelius/multi_pdf_search/blob/main/multi_pdf_search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Search in multiple PDF files
Change the variables on the right side of the document and run the code blocks.

In [None]:
#@title Enter the name or path to your drive folder.
#@markdown Se the image above to choose the correct name.
drive_folder = "myfolder" #@param {type:"string"}

"""Script to search through multiple PDF files with a key phrase."""
__author__ = "Johan Rudelius"

!pip install PyMuPDF
from google.colab import drive
import os
import glob
import functools 
import pandas as pd
import sys, fitz
%load_ext google.colab.data_table

# Mounts your own Google drive to the instance.
drive.mount('/content/drive')

# Creates a list of the PDF file paths.
folder_path = !cd drive && cd MyDrive && cd {drive_folder} && pwd
pdf_path_list = glob.glob(os.path.join(folder_path[0], "*.pdf"))

# Reads pdf files and writes data to .txt files in drive_folder/results
for pdf_path in pdf_path_list:
  fname = os.path.join(folder_path[0], 'results', \
                       pdf_path.split('/')[-1].split('.')[-2])
  with fitz.open(pdf_path) as doc: 
    out = open(fname + ".txt", "wb") 
    for page in doc:
      text = page.getText().encode("utf8")  # Get plain text (is in UTF-8)
      out.write(text)
      out.write(bytes((12,)))  # write page delimiter (form feed 0x0C)
    out.close()

print("The program will read from the folder: ", drive_folder)
print("Found the following files: \n", pdf_path_list)

The google.colab.data_table extension is already loaded. To reload it, use:
  %reload_ext google.colab.data_table
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
The program will read from the folder:  myfolder
Found the following files: 
 ['/content/drive/MyDrive/myfolder/Byggmax 2017.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2018.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2011.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2016.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2015.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2014.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2013.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2012.pdf', '/content/drive/MyDrive/myfolder/Byggmax 2019.pdf']


In [None]:
# @title Enter your search phrase!
#@markdown The search phrase can be any word or sentence.
search_phrase = "online sales"  # @param {type:"string"}
#@markdown Tick the box if you want the results to be case sensitive.
case_sensitive = False  # @param {type:"boolean"}


"""Script to search through multiple PDF files with a key phrase."""
__author__ = "Johan Rudelius"

# todo: change from split() to tokenizer to get words with special characters.
#       Add some pictures and easy documentation.
#       Test what happens without results folder in myFolder.
#       Add some document data above with nice graphs.
#       Add a save to csv / sheets function.
#       Make output prettier.

class DocumentSearchHelper:
  """Helper class that holds data representation and enables search.
  """

  def __init__(self, search_pages, case_sensitive, search_phrase):
    """Constructor.
    Args:
        search_pages: an array of document page data in string format.
        case_sensitive: boolean, if True then match regardless of case.
        search_phrase: string repr. of search phrase, whitespace optional.
    """
    self.pages = list(map(lambda x: self._fix_line_breaks(x).split(), \
                          search_pages))
    self.case_sensitive = case_sensitive
    if not case_sensitive:
      search_phrase = search_phrase.lower()
    self.search_phrase = search_phrase.split()

  @staticmethod
  def _fix_line_breaks(text):
    """Concatenates words broken up with a '-' in a linebreak."""
    lines = text.split('\n')
    return functools.reduce(lambda a, b: a[0:-1] + b if a[-1] == '-' \
                            else a + ' ' + b, lines)

  def _get_word(self, page, page_index):
    """Gets a word at the position."""
    return self.pages[page][page_index]

  def _has_next_word(self, page, page_index):
    """Checks if there is a word succeeding the position."""
    if (0 < page_index + 1 < len(self.pages[page])  # Next word in same page.
            or page_index + 1 == len(self.pages[page])  # Last word in page.
            and page + 1 < len(self.pages)):  # Next page exists.
      return True  # These cases are not exhaustive, so it is not failsafe.
    else:
      return False

  def _has_prev_word(self, page, page_index):
    """Checks if there is a word preceding the position."""
    if (0 < page_index < len(self.pages[page])  # Prev word in same page.
            or page_index == 0  # First word in page.
            and page - 1 >= 0):  # Previous page exists.
      return True  # These cases are not exhaustive, so it is not failsafe.
    else:
      return True

  def _next_word(self, page, page_index):
    """Gets position of the next word from the position."""
    assert (self._has_next_word(page, page_index)), \
      "Out of bounds error, there's no next word."
    if page_index + 1 == len(self.pages[page]):
      return page + 1, 0
    else:
      return page, page_index + 1

  def _prev_word(self, page, page_index):
    """Gets position of the previous word from the position."""
    assert (self._has_prev_word(page, page_index)), \
      "Out of bounds error, there's no prev word."
    if page_index == 0:
      return page - 1, len(self.pages[page - 1]) - 1
    else:
      return page, page_index - 1

  def _get_sentence(self, page, page_index):
    """Gets the surrounding sentence corresponding to the position."""
    sentence_breakers = ['.', '?', '!']
    sentence = self._get_word(page, page_index)
    # Append succeeding words.
    index = (page, page_index)
    while self._has_next_word(index[0], index[1]):
      index = self._next_word(index[0], index[1])
      next_word = self._get_word(index[0], index[1])
      sentence = sentence + ' ' + next_word
      if next_word[-1] in sentence_breakers:
        break  # Naive sentence break check.
    # Append preceding words.
    index = (page, page_index)
    while self._has_prev_word(index[0], index[1]):
      index = self._prev_word(index[0], index[1])
      prev_word = self._get_word(index[0], index[1])
      if prev_word[-1] in sentence_breakers:
        break  # Naive sentence break check.
      sentence = prev_word + ' ' + sentence
    return sentence

  def _get_indexes(self):
    """Gets list of all search match positions in the document.
    Returns:
        A list of (page, page_index) tuples / locations of search results.
    """
    indexes = []
    for page in range(len(self.pages)):
      for pageIndex in range(len(self.pages[page])):
        # Check if the next sequence of words equals the search sequence.
        index = (page, pageIndex)
        match = True
        for searchWord in self.search_phrase:
          try_word = self._get_word(index[0], index[1])
          if not case_sensitive:
            try_word = try_word.lower()
          if not try_word == searchWord:
            match = False
            break
          index = self._next_word(index[0], index[1])
        if match:
          indexes.append((page, pageIndex))
    return indexes

  def get_search_results(self):
    """Gets list of all search matches and positions.
    Returns:
        A 2D list in format [[page, page_index, full_sentence]]
    """
    return list(map(lambda x: [x[0], x[1], self._get_sentence(x[0], x[1])], \
                    self._get_indexes()))

# Iterate over all text files.
txt_path_list = glob.glob(os.path.join(folder_path[0], "results", "*.txt"))
df1 = pd.DataFrame(columns=["Document", "Page", "PageIndex", "Text"])
for txt_path in txt_path_list:
  f = open(txt_path, "r")
  # Split text on page separator, except for last separator.
  search_pages = f.read().split('\f')[0:-1]
  f.close()

  # Use DSP class to extract search results, append to data frame.
  dsp = DocumentSearchHelper(search_pages, case_sensitive, search_phrase)
  df2 = pd.DataFrame(dsp.get_search_results(), \
                     columns=["Page", "PageIndex", "Text"])
  df2.insert(0, 'Document', txt_path.split('/')[-1].split('.')[-2])
  df1 = df1.append(df2, ignore_index=True)

df1

Unnamed: 0,Document,Page,PageIndex,Text
0,Byggmax 2015,3,19,"BYGGMAX, 2015 ANNUAL REPORT 6 BYGGMAX, 2015 AN..."
1,Byggmax 2016,5,377,Strategic business decisions Byggmax Group con...
2,Byggmax 2016,5,479,Our online sales continue to display favorable...
3,Byggmax 2016,6,303,We plan to continue growing through both onlin...
4,Byggmax 2016,7,487,Online sales also continue to display favorabl...
5,Byggmax 2016,37,426,Expectations regarding future development Bygg...
6,Byggmax 2016,37,486,Skånska Byggvaror will continue to expand in t...
7,Byggmax 2017,6,415,strategic business decisions The Byggmax Group...


In [None]:
# To use when implementing save function.
drive.flush_and_unmount()
print('All changes made in this colab session should now be visible in Drive.')