In [6]:
import weaviate
import ray 
import os
import logging
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from weaviate.classes.config import Configure, Property, DataType
from weaviate.auth import AuthApiKey
from weaviate.classes.query import MetadataQuery
from weaviate.classes.query import Filter
from langchain_community.llms import VLLMOpenAI
#from langchain_weaviate.vectorstores import WeaviateVectorStore
#from API.app.models import VectorDBRequest

from sentence_transformers import SentenceTransformer
from langchain_weaviate.vectorstores import WeaviateVectorStore
from langchain.prompts import PromptTemplate
#from langchain.chains import RetrievalQA

from langchain_community.retrievers import (
    WeaviateHybridSearchRetriever,
)
from langchain_core.documents import Document

from langchain.vectorstores import Weaviate
from langchain_weaviate.vectorstores import WeaviateVectorStore
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser

from typing import Any, List
import pypdf
from langchain_community.document_loaders import TextLoader

## Initiate weaviate client

In [7]:
weaviate_client = weaviate.connect_to_local(   # `weaviate_key`: your Weaviate API key
                    port=8900,
                    headers={
                        "X-HuggingFace-Api-Key": "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"
                        }
                )

WeaviateConnectionError: Connection to Weaviate failed. Error: [Errno 111] Connection refused. 
Is Weaviate running and reachable at http://localhost:8900?

## Adding a collection

In [12]:
def add_vdb_class(username, class_name,embedder=None, HF_token=None):
        '''
        Description:
            Creates a new class in the Weaviate database with the specified name and username. It also adds the class to the internal database.

        Parameters:

            username (str): The username associated with the new class.
            class_name (str): The name of the new class to be created.

        Returns:

            dict: A response indicating the outcome ('success' or 'error') and relevant messages.
        '''
        try:            
                weaviate_client = weaviate.connect_to_local(   # `weaviate_key`: your Weaviate API key
                    port= 8900,
                    headers={
                        "X-HuggingFace-Api-Key": str(HF_token)
                        }
                )

                prefix = username
                cls = str(prefix) + "_" + str(class_name)
                if embedder is None:
                    vectorizer = "sentence-transformers/all-MiniLM-L6-v2"
                else:
                    vectorizer = embedder
                weaviate_client.collections.create(
                        cls,
                        vectorizer_config=Configure.Vectorizer.text2vec_huggingface(
                            model=vectorizer,
                        ),
                        vector_index_config=Configure.VectorIndex.flat(),
                        properties=[  # properties configuration is optional
                            Property(name="document_title", data_type=DataType.TEXT),
                            Property(name="page_content", data_type=DataType.TEXT),
                        ],
                    )
                # database_response = self.database.add_collection({"username": username, "collection_name": class_name})
                # if database_response:
                #     self.logger.info("class name added successfully to database")     
                #     self.logger.info(f"success: class {class_name} created for user {username}")
                #     return {"success": f"Class {cls} created "}
                # else:
                #     return {"error": "No class name provided"}
        except Exception as e:
            return {"error": str(e)}

In [13]:
add_vdb_class("Nils", "DB1", "hkunlp/instructor-xl", "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")

In [14]:
weaviate_client.collections.list_all()

{'Nils_DB1': _CollectionConfigSimple(name='Nils_DB1', description=None, generative_config=None, properties=[_Property(name='document_title', description=None, data_type=<DataType.TEXT: 'text'>, index_filterable=True, index_searchable=True, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=_PropertyVectorizerConfig(skip=False, vectorize_property_name=True), vectorizer='text2vec-huggingface'), _Property(name='page_content', description=None, data_type=<DataType.TEXT: 'text'>, index_filterable=True, index_searchable=True, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=_PropertyVectorizerConfig(skip=False, vectorize_property_name=True), vectorizer='text2vec-huggingface')], references=[], reranker_config=None, vectorizer_config=_VectorizerConfig(vectorizer=<Vectorizers.TEXT2VEC_HUGGINGFACE: 'text2vec-huggingface'>, model={'model': 'hkunlp/instructor-xl'}, vectorize_collection_name=True), vectorizer=<Vectorizers.TEXT2VEC_

## Ray actors and paralellization

In [10]:
def split_workload(file_paths, num_actors):
    return [file_paths[i::num_actors] for i in range(num_actors)]

def get_pdf_paths(dir):
    pdf_paths = []
    for file in os.listdir(dir):
        if file.endswith('.pdf'):
            pdf_path = os.path.join(dir, file)
            pdf_paths.append(pdf_path)
    return pdf_paths

In [11]:
pdf_paths = get_pdf_paths("API/received_files/0e5ba6dbf1116059")

In [12]:
workload = split_workload(pdf_paths, 2)

In [13]:
import ray
import weaviate
import logging
from pypdf.errors import PdfStreamError

In [32]:
ray.shutdown()


  _warn("subprocess %s is still running" % self.pid,


In [43]:
@ray.remote(num_gpus=0.2)
class WeaviateEmbedder3:
    def __init__(self, class_name=None):
        self.time_taken = 0
        self.text_list = []
        self.class_name= class_name
        
    async def run_embedder_on_text(self, documents):
        serialized_docs = await self.weaviate_split_pdf(documents)
        doc_list = await self.adding_weaviate_document(serialized_docs)
        return doc_list

    async def weaviate_split_pdf(self, docs):

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        text_docs = text_splitter.split_documents(docs)
        serialized_docs = [await self.weaviate_serialize_document(doc) for doc in text_docs]

        return serialized_docs

    async def weaviate_serialize_document(self, doc):

        document_title = doc.metadata.get('source', '').split('/')[-1]
        return {
            "page_content": doc.page_content,
            "document_title": document_title,
        }

    async def parse_pdf(self, file_path_list):
        documents = []
        for pdf_path in file_path_list:
            try:
                loader = PyPDFLoader(pdf_path)
                documents.extend(loader.load())

            except PdfStreamError as e:

                continue
        return documents

    async def convert_file_to_text(self, document_path):
        documents = await self.parse_pdf(document_path)

        return documents
    
    async def adding_weaviate_document(self, text_lst, collection_name=None):
        weaviate_client = weaviate.connect_to_local(   # `weaviate_key`: your Weaviate API key
                    port= 8900,
                    headers={
                        "X-HuggingFace-Api-Key": "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"
                        }
                )
        
        collection = weaviate_client.collections.get("Nils_DB1")
        
        with collection.batch.dynamic() as batch:
            for text in text_lst:
                batch.add_object(
                    properties=text,
                        #uuid=generate_uuid5(text),
        )
                self.text_list.append(text)

        return self.text_list

In [44]:
# Example usage
import asyncio
async def main():



    weaviate_embedders = [WeaviateEmbedder3.remote() for _ in range(2)]
    pdf_paths = get_pdf_paths("API/received_files/0e5ba6dbf1116059")
    workload = split_workload(pdf_paths, len(pdf_paths))
    futures = [weaviate_embedder.convert_file_to_text.remote(i) for weaviate_embedder, i in zip(weaviate_embedders, workload)]
    very_finals = [weaviate_embedder.run_embedder_on_text.remote(i) for weaviate_embedder, i in zip(weaviate_embedders, futures)]
    results = await asyncio.gather(*very_finals)

    print(results)

    #ray.shutdown()

# Run the async main function
await main()

### Running weaviate embedders

In [82]:
cls = "Nils_DB1"
weaviate_embedders = [WeaviateEmbedder.remote(cls) for _ in range(int(2))]

In [92]:
@ray.remote
class WeaviateEmbedder2:
    def __init__(self, class_name=None):
        self.time_taken = 0
        self.text_list = []
        
        # Configure logger
        logging.basicConfig(
            level=logging.DEBUG,  # Set to DEBUG to capture all types of log messages
            format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
            handlers=[
                logging.FileHandler("app.log"),  # Use FileHandler to log to a file
                logging.StreamHandler()  # Use StreamHandler to also log to the console
            ]
        )
        self.logger = logging.getLogger(__name__)
        
        try:
            self.weaviate_client = weaviate.Client(
                url="http://localhost:8900",
                additional_headers={"X-HuggingFace-Api-Key": "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"},
            )
            self.logger.info("Connected to Weaviate successfully.")
        except Exception as e:
            self.logger.error(f"Error in connecting to Weaviate: {e}")
            raise

    async def weaviate_split_pdf(self, docs):
        self.logger.debug("Starting to split PDF documents.")
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        text_docs = text_splitter.split_documents(docs)
        serialized_docs = [await self.weaviate_serialize_document(doc) for doc in text_docs]
        self.logger.info(f"Serialized documents: {serialized_docs[0]}")
        return serialized_docs

    async def weaviate_serialize_document(self, doc):
        self.logger.debug("Serializing document.")
        document_title = doc.metadata.get('source', '').split('/')[-1]
        return {
            "page_content": doc.page_content,
            "document_title": document_title,
        }

    async def parse_pdf(self, file_path_list):
        self.logger.debug(f"Parsing PDF files: {file_path_list}")
        documents = []
        for pdf_path in file_path_list:
            try:
                loader = PyPDFLoader(pdf_path)
                documents.extend(loader.load())
                self.logger.info(f'Document length: {len(documents)}')
            except PdfStreamError as e:
                self.logger.error(f"Skipping file {pdf_path} due to error: {e}")
                continue
        return documents

    async def convert_file_to_text(self, document_path):
        documents = await self.parse_pdf(document_path)
        self.logger.info(f"Parsed documents: {documents[0]}")
        return documents

In [93]:
pdf_paths = get_pdf_paths("API/received_files/9964c824188d779b/")

In [94]:
weaviate_embedder = WeaviateEmbedder2.remote()

# Test the actor with a simple method call
result = ray.get(weaviate_embedder.convert_file_to_text.remote(pdf_paths))

# Print the result to ensure it worked
print(f"Result from actor: {result}")

ActorDiedError: The actor died because of an error raised in its creation task, [36mray::WeaviateEmbedder2.__init__()[39m (pid=118802, ip=10.128.0.7, actor_id=3c156faf13fdfacd168bc50a03000000, repr=<__main__.FunctionActorManager._create_fake_actor_class.<locals>.TemporaryActor object at 0x7f451d2875b0>)
ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
	class_name: TemporaryActor
	actor_id: 3c156faf13fdfacd168bc50a03000000
Failed to create actor. You set the async flag, but the actor does not have any coroutine functions.

In [83]:
weaviate_embedders

[Actor(WeaviateEmbedder, 20850cd209d1b7274a85bea703000000),
 Actor(WeaviateEmbedder, 85f1a74023e1d5ee557d6d2403000000)]

In [84]:
pdf_paths = get_pdf_paths("API/received_files/0e5ba6dbf1116059")
workload = split_workload(pdf_paths, len(pdf_paths))
workload

[['API/received_files/0e5ba6dbf1116059/langchain_guide.pdf'],
 ['API/received_files/0e5ba6dbf1116059/UEFA_Euro_2024.pdf']]

In [86]:
futures = ray.get([weaviate_embedder.convert_file_to_text.remote(i) for weaviate_embedder, i in zip(weaviate_embedders, workload)])

#doc_lst =[weaviate_embedder.run_embedder_on_text.remote(workload) for weaviate_embedder, workload in zip(weaviate_embedders, futures)]

#final_res_embedder = ray.get(doc_lst)

ActorDiedError: The actor died because of an error raised in its creation task, [36mray::WeaviateEmbedder.__init__()[39m (pid=113632, ip=10.128.0.7, actor_id=20850cd209d1b7274a85bea703000000, repr=<__main__.FunctionActorManager._create_fake_actor_class.<locals>.TemporaryActor object at 0x7f5c81eef3a0>)
ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
	class_name: TemporaryActor
	actor_id: 20850cd209d1b7274a85bea703000000
Failed to create actor. You set the async flag, but the actor does not have any coroutine functions.

In [85]:
ray.get(futures)

ActorDiedError: The actor died because of an error raised in its creation task, [36mray::WeaviateEmbedder.__init__()[39m (pid=111691, ip=10.128.0.7, actor_id=e75a75668964043966c9fcd403000000, repr=<__main__.FunctionActorManager._create_fake_actor_class.<locals>.TemporaryActor object at 0x7f298f3fb3a0>)
ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task.
	class_name: TemporaryActor
	actor_id: e75a75668964043966c9fcd403000000
Failed to create actor. You set the async flag, but the actor does not have any coroutine functions.

In [80]:
@ray.remote
class SimpleActor:
    def __init__(self):
        self.message = "Hello, this is a test message from the actor!"

    def print_message(self):
        print(self.message)
        return self.message

# Create an instance of the actor
actor = SimpleActor.remote()

# Call the method and get the result
result = ray.get(actor.print_message.remote())

# Print the result to ensure it worked
print(f"Result from actor: {result}")

[36m(SimpleActor pid=113284)[0m Hello, this is a test message from the actor!


Result from actor: Hello, this is a test message from the actor!


In [None]:
def process_all_docs(self, dir, username, cls):
        '''
        Description:
            Processes all documents in a specified directory, serializes them, and adds them to Weaviate. Handles both small and large document sets by splitting the workload for efficient processing.

        Parameters:

            dir (str): Directory containing the documents to be processed.
            username (str): The username of the user processing the documents.
            cls (str): The class name for the documents in Weaviate.

        Returns:

            dict: A response indicating the status of the processing ('success' or 'error') and a message detailing the outcome.
        '''

        response = {"status": "initiated", "message": ""}
        try:
            full_class = str(username) + "_" + str(cls)
            document_list = self.parse_pdf(dir)
            serialized_docs = self.weaviate_split_multiple_pdf(document_list)
            if len(serialized_docs) <= 30:
                self.add_weaviate_document(full_class, serialized_docs)
                response["status"] = "success"
                response["message"] = f"Processed {len(serialized_docs)} documents for class {full_class}."
            else:
                self.adding_weaviate_document_no_ray(full_class, serialized_docs)
                #doc_workload = self.divide_workload(self.num_actors, serialized_docs)
                #self.add_weaviate_batch_documents(full_class, doc_workload)
                #self.logger.info(f"check weaviate add data, ")
                response["status"] = "success"
                response["message"] = f"Processed {len(serialized_docs)} documents in batches for class {full_class}."
            return response
        except Exception as e:
            response["status"] = "error"
            response["message"] = str(e)
            return response

In [None]:
def add_weaviate_document(self, cls, docs):
        '''
        Description:
            Adds a list of serialized documents to Weaviate under a specified class. Uses a remote WeaviateEmbedder actor for the operation.

        Parameters:

            cls (str): The class name under which the documents will be added.
            docs (list): A list of serialized documents to be added.
        '''
        actor = WeaviateEmbedder.remote()
        ray.get([actor.adding_weaviate_document.remote(docs, str(cls))])

def add_weaviate_batch_documents(self, cls, doc_workload):
        '''
        Description:
            Adds documents to Weaviate in batches using multiple WeaviateEmbedder actors. This method is used for efficient processing of larger sets of documents.

        Parameters:

            cls (str): The class name under which the documents will be added.
            doc_workload (list): A list of document batches to be added, where each sublist is a separate batch.
        '''
        actors = [WeaviateEmbedder.remote() for _ in range(3)]
        self.logger.info(f"actors creation successful {actors}: %s", )
        results = ray.get([actor.adding_weaviate_document.remote(doc_part, str(cls)) for actor, doc_part in zip(actors, doc_workload)])
        

        self.logger.info(f"check 1st step of ray was successful", )
        self.logger.info(f"check if ray was successful:", )

In [None]:
def parse_pdf(self, directory):    
        '''
        Description:
           Parses all PDF and text files in a given directory, creating a list of documents. It uses different loaders for PDF and text files and handles errors by skipping problematic files.

        Parameters:

            directory (str): The path to the directory containing PDF and text files.

        Returns:

            list: A list of document objects parsed from the files in the specified directory.
        '''
        documents = []
        for file in os.listdir(directory):
            if file.endswith('.pdf'):
                pdf_path = os.path.join(directory, file)
                try:
                    loader = PyPDFLoader(pdf_path, extract_images=False)
                    documents.extend(loader.load())
                except pypdf.errors.PdfStreamError as e:
                    print(f"Skipping file {file} due to error: {e}")
                    continue  # Skip this file and continue with the next one
            elif file.endswith('.txt'):
                text_path = os.path.join(directory, file)
                try:
                    loader = TextLoader(text_path)
                    documents.extend(loader.load())
                except Exception as e:
                    print(f"Error in file {file}: {e}")
                    continue
        #self.logger.info(f"Check the parsed documents: {documents}")
        return documents

In [None]:
def weaviate_serialize_document(self,doc):
        '''
        Description:
            Serializes a document for storage in Weaviate. It extracts the title from the document's metadata and combines it with the page content.

        Parameters:

            doc (Document): The document to be serialized.

        Returns:

            dict: A dictionary containing the serialized content of the document, including its title and page content.
        '''
        document_title = doc.metadata.get('source', '').split('/')[-1]
        return {
            "page_content": doc.page_content,
            "document_title": document_title,
        }
    
def weaviate_split_multiple_pdf(self,docs):   
        '''
        Description:
            Splits multiple PDF documents into chunks for easier processing and storage. This function uses a recursive character text splitter to create smaller, manageable text documents.

        Parameters:

            docs (list): A list of document objects to be split.

        Returns:

            list: A list of serialized document chunks.
        ''' 
        #text_splitter = CharacterTextSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=80)
        text_docs = text_splitter.split_documents(docs)

        serialized_docs = [
                    self.weaviate_serialize_document(doc) 
                    for doc in text_docs
                        ]
        return serialized_docs	

def divide_workload(self, num_actors, documents):
        '''
        Description:
            Divides a list of documents among a specified number of actors (processes or threads) to parallelize processing.

        Parameters:

            num_actors (int): The number of Ray actors (processes/threads) among which the workload will be divided.
            documents (list): A list of documents to be divided.

        Returns:

            list: A list of document lists, where each sublist corresponds to the documents assigned to one actor.
        '''
        docs_per_actor = len(documents) // num_actors

        doc_parts = [documents[i * docs_per_actor: (i + 1) * docs_per_actor] for i in range(num_actors)]

        if len(documents) % num_actors:
            doc_parts[-1].extend(documents[num_actors * docs_per_actor:])

        return doc_parts