In [None]:
# download the weaviate client
%pip install -U weaviate-client

In [None]:
import weaviate
import os
from weaviate.classes.init import Auth, AdditionalConfig, Timeout
from weaviate.classes.query import Filter, MetadataQuery
from weaviate import WeaviateAsyncClient
from typing import Any 
from dotenv import load_dotenv


class Singleton(type):
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class WeaviateClient(metaclass=Singleton):
    __client = None

    def __init__(self):
        # Retrieve environment variables
        load_dotenv()
        CLUSTER_URL = os.getenv("CLUSTER_URL")
        API_KEY = os.getenv("API_KEY")
        OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
        self.__wcd_url = CLUSTER_URL
        self.__wcd_api_key = API_KEY
        self.__vectorization_api_key = OPENAI_API_KEY

    async def __get_client(self) -> WeaviateAsyncClient:
        if not self.__client:
            self.__client = weaviate.use_async_with_weaviate_cloud(
                cluster_url=self.__wcd_url,
                auth_credentials=Auth.api_key(self.__wcd_api_key),
                headers={"X-OpenAI-Api-Key": self.__vectorization_api_key},
                additional_config=AdditionalConfig(timeout=Timeout(init=30, query=30, insert=30)),
                skip_init_checks=True,
            )
            await self.__client.connect()
        return self.__client
    # Define async methods to interact with Weaviate
    async def fetch_objects(
        self, collection_name: str, property: str = None, search_key: Any = None, limit: int = 100
    ) -> list:
        client = await self.__get_client()
        collection = client.collections.get(collection_name)

        filter = None
        if property and search_key:
            filter = Filter.by_property(property).equal(search_key)

        response = await collection.query.fetch_objects(filters=filter, limit=limit)
        return response.objects

    async def near_text(self, collection_name: str, similarity_text: str, limit: int = 1) -> list:
        client = await self.__get_client()
        collection = client.collections.get(collection_name)

        response = await collection.query.near_text(
            query=similarity_text, limit=limit, return_metadata=MetadataQuery(certainty=True)
        )
        return response.objects
    
    async def hybrid_search(self, collection_name: str, tenant: str, query_properties: dict, similarity_text: str, limit: int = 1) -> list:
        client = await self.__get_client()
        collection = client.collections.get(collection_name).with_tenant(tenant)

        response = await collection.query.hybrid(
            query=similarity_text, limit=limit, query_properties=query_properties
        )
        return response.objects

    async def delete_all_collection_objects(self, collection_name: str):
        client = await self.__get_client()
        collection = client.collections.get(collection_name)
        await collection.data.delete_many(where=Filter.by_id().not_equal("*"))

    async def add_objects_to_collection(self, collection_name: str, data_objects: list):
        client = await self.__get_client()
        collection = client.collections.get(collection_name)

        for data_object in data_objects:
            await collection.data.insert(data_object)

    async def close_connection(self):
        client = await self.__get_client()
        await client.close()

In [None]:
import asyncio

weaviate_client = WeaviateClient()  # Initialize the client

# Example usage of a method in an asynchronous context
async def test_fetch_objects():
    response = await weaviate_client.fetch_objects("<COLLECTION_NAME>")
    print(response)

# Run the test function
await test_fetch_objects()

In [None]:
weaviate_client = WeaviateClient()  # Initialize the client

# Example usage of a method in an asynchronous context
async def hybrid_search():
    response = await weaviate_client.hybrid_search("<COLLECTION_NAME>", "<TENANT_NAME>",similarity_text="<QUERY_STRING>", limit=4, query_properties=["<SPECIFIC_PROPERTY>"])
    print(response)

# Run the function
await hybrid_search()