<a href="https://colab.research.google.com/github/ArchaeoNader/draft/blob/master/Another_copy_of_Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Packages Installation

In [1]:
%%python -m pip install --upgrade pip
%pip install --upgrade pymupdf
%pip install matplotlib



In [2]:
%pip install nltk
# https://www.nltk.org/

#if you did not install the data to one of the above central locations, you will need to set the NLTK_DATA
# https://www.nltk.org/data.html
import nltk.data
# from pathlib import Path
# import sys
# import os
# from os import path

# ntlkResourcesPath = path.join(Path("./Resources"), "ntlk_tokenizers")
# print( path.join("/Content", ntlkResourcesPath))
# os.environ['nltk_data'] = path.join("./Content", ntlkResourcesPath)

# Loading PunktSentenceTokenizer using English pickle file
nltk.download('punkt', download_dir='/usr/share/nltk_data')
nltk.download('punkt_tab', download_dir='/usr/share/nltk_data')
tokenizer = nltk.data.load('nltk:tokenizers/punkt/PY3/english.pickle')



[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /usr/share/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [3]:
%pip install transformers==4.45.2
%pip install sentence-transformers==2.2.2
%pip install huggingface-hub==0.25.2
%pip install torch
%pip install InstructorEmbedding
%pip install chromadb



In [4]:
from chromadb.api.types import Embeddings, Documents, EmbeddingFunction, Space
from typing import List, Dict, Any, Optional
import numpy as np
from InstructorEmbedding import INSTRUCTOR


class InstructorEmbeddingFunction(EmbeddingFunction[Documents]):
    """
    This class is used to generate embeddings for a list of texts using the Instructor embedding model.
    """

    # If you have a GPU with at least 6GB try model_name = "hkunlp/instructor-xl" and device = "cuda"
    # for a full list of options: https://github.com/HKUNLP/instructor-embedding#model-list
    def __init__(
        self,
        instructor: INSTRUCTOR,
        model_name: str = "hkunlp/instructor-base",
        device: str = "cpu",
        instruction: Optional[str] = None,
    ):
        """
        Initialize the InstructorEmbeddingFunction.

        Args:
            model_name (str, optional): The name of the model to use for text embeddings.
                Defaults to "hkunlp/instructor-base".
            device (str, optional): The device to use for computation.
                Defaults to "cpu".
            instruction (str, optional): The instruction to use for the embeddings.
                Defaults to None.
        """
        # try:
           # from InstructorEmbedding import INSTRUCTOR
        # except ImportError:
        #     raise ValueError(
        #         "The InstructorEmbedding python package is not installed. Please install it with `pip install InstructorEmbedding`"
        #     )

        self.model_name = model_name
        self.device = device
        self.instruction = instruction

        self._model = instructor #INSTRUCTOR(model_name_or_path=model_name, device=device)

    def __call__(self, input: Documents) -> Embeddings:
        """
        Generate embeddings for the given documents.

        Args:
            input: Documents or images to generate embeddings for.

        Returns:
            Embeddings for the documents.
        """
        # Instructor only works with text documents
        if not all(isinstance(item, str) for item in input):
            raise ValueError("Instructor only supports text documents, not images")

        if self.instruction is None:
            embeddings = self._model.encode(input, convert_to_numpy=True)
        else:
            texts_with_instructions = [[self.instruction, text] for text in input]
            embeddings = self._model.encode(
                texts_with_instructions, convert_to_numpy=True
            )

        # Convert to numpy arrays
        return [np.array(embedding, dtype=np.float32) for embedding in embeddings]

    @staticmethod
    def name() -> str:
        return "instructor"

    def default_space(self) -> Space:
        return "cosine"

    def supported_spaces(self) -> List[Space]:
        return ["cosine", "l2", "ip"]

  from tqdm.autonotebook import trange


In [None]:
import chromadb
from InstructorEmbedding import INSTRUCTOR
import os
from pathlib import Path
import shutil

# Huggingface text embedding model
# https://huggingface.co/hkunlp/instructor-xl

# Vector database engine
# https://docs.trychroma.com/docs/overview/introduction

db_dir = Path("./content/resources/chromadb")
embeddingModel_dir = Path("./content/resources/hkunlp_instructor_xl")
#shutil.rmtree(db_dir)
#shutil.rmtree(embeddingModel_dir)

instructor = INSTRUCTOR("hkunlp/instructor-xl", cache_folder=embeddingModel_dir)
ef = InstructorEmbeddingFunction(model_name="hkunlp/instructor-xl", device="cuda", instructor=instructor, instruction="Archaeology")
chroma_client = chromadb.PersistentClient(path=str(db_dir))
collection = chroma_client.get_or_create_collection(name="archaeology", embedding_function=ef )

load INSTRUCTOR_Transformer


In [None]:
%pip install GitPython

import os
from os import listdir
from os.path import isfile
from pathlib import Path
from google.colab import userdata
from git import Repo
import shutil

books_dir = Path("./content/resources/books") #where the downloded files will be stored
print(books_dir)

if os.path.exists(books_dir) and listdir(books_dir) == []:
  shutil.rmtree(books_dir)

if os.path.exists(books_dir) is False:
  #git_token = userdata.get('nader_token')
  git_token = userdata.get('git_token')
  git_user = userdata.get('git_user')
  repo = Repo.clone_from(str.format("https://{}:{}@github.com/Archaeonader/Books.git", git_user, git_token), books_dir)
  #repo = Repo.clone_from(str.format("https://{}:{}@github.com/ibrahimkais/archpredec.git", git_user, git_token), books_dir)


files = listdir(books_dir)
onlyfiles = [os.path.join(books_dir, f) for f in files if isfile(os.path.join(books_dir, f))]

print(len(onlyfiles))
print(onlyfiles)

In [None]:
import re
import torch
import torch.nn.functional as F

# Cleaning and preprocessing function
def clean_and_tokenize(text):
    # Basic regex cleaning
    text = re.sub(r'\s\s+', ' ', text)  # Remove extra spaces
    text = text.lower()  # Lowercase all text
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)  # Remove special characters
    text = text.replace('\n', ' ')
    # Tokenize with gensim
    #tokens = simple_preprocess(text)
    #print(tokens)
    return text #' '.join(tokens)


# Chunk text into fixed-size chunks
def chunk_text(text, chunk_size=200):
    words = text.split()
    return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]

# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0]  # Get token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

In [None]:
from collections import Counter
from sklearn.cluster import DBSCAN
import numpy as np


class PDFTextBlockCategorizer:
    def __init__(self, blocks):
        self.blocks = blocks

    def run(self):
        X = np.array(
            [(x0, y0, x1, y1, len(text)) for x0, y0, x1, y1, text, page_index in self.blocks]
        )

        dbscan = DBSCAN()
        dbscan.fit(X)
        labels = dbscan.labels_
        self.n_clusters = len(np.unique(labels))
        label_counter = Counter(labels)
        most_common_label = label_counter.most_common(1)[0][0]
        labels = [0 if label == most_common_label else 1 for label in labels]
        self.labels = labels

In [None]:
%pip install --upgrade pymupdf

import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import fitz
#from pathlib import Path
from itertools import islice

# https://github.com/pymupdf/PyMuPDF/discussions/2259
class PDFExtractor:
    pdf_root = ""

    def __init__(self, fileName):
        self.pdf_fullpath = fileName
        self.pdf_doc = fitz.open(self.pdf_fullpath)

    def calc_rect_center(self, rect, reverse_y=False):
        if reverse_y:
            x0, y0, x1, y1 = rect[0], -rect[1], rect[2], -rect[3]
        else:
            x0, y0, x1, y1 = rect

        x_center = (x0 + x1) / 2
        y_center = (y0 + y1) / 2
        return (x_center, y_center)

    def extract_all_text_blocks(self) -> list[(int,str)]:
        # * https://pymupdf.readthedocs.io/en/latest/textpage.html#TextPage.extractBLOCKS

        rect_centers = []
        rects = []
        visual_label_texts = []
        categorize_vectors = []
        text_blocks = []

        for page_idx, page in islice(enumerate(self.pdf_doc), len(self.pdf_doc)):
            blocks = page.get_text("blocks")
            page_cnt = page_idx + 1
            # print(f"=== Start Page {page_cnt}: {len(blocks)} blocks ===")
            block_cnt = 0
            for block in blocks:
                block_rect = block[:4]  # (x0,y0,x1,y1)
                x0, y0, x1, y1 = block_rect
                rects.append(block_rect)
                block_text = block[4]
                block_num = block[5]
                # block_cnt += 1
                block_cnt = block_num + 1

                rect_center = self.calc_rect_center(block_rect, reverse_y=True)
                rect_centers.append(rect_center)
                # visual_label_text = f"{block_text.split()[-1]}({page_cnt}.{block_cnt})"
                visual_label_text = f"({page_cnt}.{block_cnt})"
                visual_label_texts.append(visual_label_text)

                # block_type = "text" if block[6] == 0 else "image"
                # print(f"Block: {page_cnt}.{block_cnt}")
                # print(f"<{block_type}> {rect_center} - {block_rect}")
                # print(block_text)

                categorize_vectors.append((*block_rect, block_text, page_idx))

            # print(f"=== End Page {page_cnt}: {len(blocks)} blocks ===\n")

        categorizer = PDFTextBlockCategorizer(categorize_vectors)
        categorizer.run()

        fig, ax = plt.subplots()
        colors = ["b", "r", "g", "c", "m", "y", "k"]

        for i, rect_center in enumerate(rect_centers):
            label_idx = categorizer.labels[i]
            color = colors[label_idx]
            if color != "r":
                text_blocks.append((categorize_vectors[i][5], categorize_vectors[i][4]))
            x0, y0, x1, y1 = rects[i]
            rect = Rectangle((x0, -y0), x1 - x0, -y1 + y0, fill=False, edgecolor=color)
            ax.add_patch(rect)
            x, y = rect_center
            plt.scatter(x, y, color=color)
            plt.annotate(visual_label_texts[i], rect_center)
        plt.show()

        return text_blocks

In [None]:
%pip install pyPDF2
%pip install gensim

from PyPDF2 import PdfReader
from gensim.utils import simple_preprocess

for book_index, file_name in enumerate(onlyfiles):
    metadata = {}
    pagesCount = 0
    with open(file_name, "rb") as f:
        reader = PdfReader(file_name)
        for key in reader.metadata:
            metadata[key.replace('/', '')] = reader.metadata[key]
        pagesCount = len(reader.pages)

    extractor = PDFExtractor(file_name)
    text_blocks = extractor.extract_all_text_blocks()
    for page_index, text in text_blocks:
        metadata["PageIndex"] = page_index + 1
        text = clean_and_tokenize(text)

        if len(text) > 0:
            sentences = tokenizer.sentences_from_text(text)

            for sent_index, sentence in enumerate(sentences):
                tokens = simple_preprocess(sentence)
                sentence = ' '.join(tokens)
                chunked_documents = chunk_text(sentence)
                print(str(page_index) + " ['" + str.join("', '", chunked_documents) + "']")
                # Generate embeddings for all document chunks in batches

                if len(chunked_documents) > 0:
                    metadatas = []
                    ids = []
                    for chunk_index, doc_chunk in enumerate(chunked_documents):
                        metadatas.append(metadata)
                        ids.append(str.format("{} {}{}{}", metadata["Title"], str(page_index).zfill(len(str(pagesCount))), sent_index, chunk_index))

                    collection.add(
                        documents = chunked_documents,
                        metadatas = metadatas,
                        ids = ids
                    )