# Weaviate - Manage data

This file contains the code to manage Weaviate data

**Source: []()**

In [1]:
import os

In [53]:
import asyncio

In [2]:
os.getcwd()

'E:\\Accessories\\WebDevelopment\\Portfolio\\portfolio_admin_experience_django\\llm\\Weaviate'

In [118]:
import weaviate
import typing
from weaviate.connect import ConnectionParams
from weaviate.classes.init import AdditionalConfig, Timeout, Auth
from weaviate.classes.config import DataType, Configure, Property
from abc import ABC, abstractmethod
import strawberry
from datetime import datetime

# Connect to the client

In [106]:
weaviate_host = "localhost"
weaviate_port = 50050
weaviate_grpc_port = 50051
weaviate_scheme = "http"
weaviate_user = "admin@vip3rtech6069.com"
weaviate_key = "admin123"

weaviate_connection_params = ConnectionParams.from_params(
    http_host=weaviate_host,
    http_port=weaviate_port,
    http_secure=weaviate_scheme == "https",
    grpc_host=weaviate_host,
    grpc_port=weaviate_grpc_port,
    grpc_secure=weaviate_scheme == "https"
)
weaviate_auth_secret = Auth.api_key(weaviate_key)
weaviate_additional_config = AdditionalConfig(
    timeout=Timeout(init=30, query=60, insert=120),  # Values in seconds
)

In [9]:
client = weaviate.WeaviateClient(
    connection_params=weaviate_connection_params,
    auth_client_secret=weaviate_auth_secret,
    additional_config=weaviate_additional_config,
    skip_init_checks=True
)
client.connect()

Exception in callback PollerCompletionQueue._handle_events(<_WindowsSele...e debug=False>)()
handle: <Handle PollerCompletionQueue._handle_events(<_WindowsSele...e debug=False>)()>
Traceback (most recent call last):
  File "E:\Accessories\WebDevelopment\Portfolio\portfolio_admin_experience_django\llm\venv\Lib\asyncio\events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "src\\python\\grpcio\\grpc\\_cython\\_cygrpc/aio/completion_queue.pyx.pxi", line 147, in grpc._cython.cygrpc.PollerCompletionQueue._handle_events
BlockingIOError: [WinError 10035] A non-blocking socket operation could not be completed immediately
Exception in callback PollerCompletionQueue._handle_events(<_WindowsSele...e debug=False>)()
handle: <Handle PollerCompletionQueue._handle_events(<_WindowsSele...e debug=False>)()>
Traceback (most recent call last):
  File "E:\Accessories\WebDevelopment\Portfolio\portfolio_admin_experience_django\llm\venv\Lib\asyncio\events.py", line 88, in _run

In [10]:
client.is_ready()

True

In [107]:
async_client = weaviate.WeaviateAsyncClient(
    connection_params=weaviate_connection_params,
    auth_client_secret=weaviate_auth_secret,
    additional_config=weaviate_additional_config,
    skip_init_checks=True
)

await async_client.connect()

In [108]:
await async_client.is_ready()

True

# Manage collections



In [16]:
class DatasetProperty:

    TEXT_TYPE = "text"
    NUMBER_TYPE = "number"
    INTEGER_TYPE = "integer"
    DATE_TYPE = "date"
    BOOLEAN_TYPE = "boolean"
    GEO_COORDINATES_TYPE = "geoCoordinates"
    PHONE_NUMBER_TYPE = "phoneNumber"
    UUID_TYPE = "uuid"
    BLOB_TYPE = "blob"
    OBJECT_TYPE = "object"

    def __init__(self, name, dtype, description, is_vector, is_indexed):
        self.name = name
        self.type = dtype
        self.description = description
        self.is_vector = is_vector
        self.is_indexed = is_indexed

    def __repr__(self):
        return f"{self.name}, {self.type}, {self.is_indexed}, {self.is_vector}, {self.description}"

    @staticmethod
    def to_dict(dataset_field):
        return {
            "name": dataset_field.name,
            "type": dataset_field.type,
            "description": dataset_field.description,
            "is_vector": dataset_field.is_vector,
            "is_indexed": dataset_field.is_indexed
        }
    
    @staticmethod
    def from_dict(data:dict):
        if not data or type(data) is not dict:
            return None

        return DatasetProperty(
            name=data.get("name", ""),
            type=data.get("type", ""),
            description=data.get("description", ""),
            is_vector=data.get("is_vector", False),
            is_indexed=data.get("is_indexed", False)
        )
    
    @staticmethod
    def to_weaviate_property(propertyName:str)->typing.Type[DataType]:
        if propertyName == DatasetProperty.TEXT_TYPE:
            return DataType.TEXT
        elif propertyName == DatasetProperty.NUMBER_TYPE:
            return DataType.NUMBER
        elif propertyName == DatasetProperty.INTEGER_TYPE:
            return DataType.INT
        elif propertyName == DatasetProperty.DATE_TYPE:
            return DataType.DATE
        elif propertyName == DatasetProperty.BOOLEAN_TYPE:
            return DataType.BOOL
        elif propertyName == DatasetProperty.GEO_COORDINATES_TYPE:
            return DataType.GEO_COORDINATES
        elif propertyName == DatasetProperty.PHONE_NUMBER_TYPE:
            return DataType.PHONE_NUMBER
        elif propertyName == DatasetProperty.UUID_TYPE:
            return DataType.UUID
        elif propertyName == DatasetProperty.BLOB_TYPE:
            return DataType.BLOB
        elif propertyName == DatasetProperty.OBJECT_TYPE:
            return DataType.OBJECT
        else:
            return DataType.TEXT
        
    @staticmethod
    def from_weaviate_property(propertyType:DataType)->str:
        if propertyType == DataType.TEXT:
            return DatasetProperty.TEXT_TYPE
        elif propertyType == DataType.NUMBER:
            return DatasetProperty.NUMBER_TYPE
        elif propertyType == DataType.INT:
            return DatasetProperty.INTEGER_TYPE
        elif propertyType == DataType.DATE:
            return DatasetProperty.DATE_TYPE
        elif propertyType == DataType.BOOL:
            return DatasetProperty.BOOLEAN_TYPE
        elif propertyType == DataType.GEO_COORDINATES:
            return DatasetProperty.GEO_COORDINATES_TYPE
        elif propertyType == DataType.PHONE_NUMBER:
            return DatasetProperty.PHONE_NUMBER_TYPE
        elif propertyType == DataType.UUID:
            return DatasetProperty.UUID_TYPE
        elif propertyType == DataType.BLOB:
            return DatasetProperty.BLOB_TYPE
        elif propertyType == DataType.OBJECT:
            return DatasetProperty.OBJECT_TYPE
        else:
            return DatasetProperty.TEXT_TYPE

prop = DatasetProperty(
    name="Name",
    description="Property description",
    dtype=DatasetProperty.TEXT_TYPE,
    is_vector=True,
    is_indexed=True
)
print(prop)

Name, text, True, True, Property description


In [23]:
class Dataset:
    def __init__(self, name, description, properties):
        self.name = name
        self.description = description
        self.properties = properties

    def __repr__(self):
        out = f"{self.name}, {self.description}"
        out = out + "\n" + "\n".join([f"{prop}" for prop in self.properties])
        return out
        
    @staticmethod
    def to_dict(dataset:DatasetProperty)->dict:
        return {
            "id": dataset.id,
            "name": dataset.name,
            "description": dataset.description,
            "properties": [DatasetProperty.to_dict(prop) for prop in dataset.properties] if dataset.properties else []
        }
    
    @staticmethod
    def from_dict(data:dict)->typing.Type[DatasetProperty]:
        if not data or type(data) is not dict:
            return None

        return Dataset(
            id=data.get("id", ""),
            name=data.get("name", ""),
            description=data.get("description", ""),
            properties=[DatasetProperty.from_dict(prop) for prop in data.get("properties", [])]
        )

dataset = Dataset(
    name="Test",
    description="Test dataset",
    properties = [
        DatasetProperty(
            name="Name",
            description="Property name as string",
            dtype=DatasetProperty.TEXT_TYPE,
            is_vector=True,
            is_indexed=True
        ),
        DatasetProperty(
            name="description",
            description="Property description",
            dtype=DatasetProperty.TEXT_TYPE,
            is_vector=False,
            is_indexed=False
        ),
        DatasetProperty(
            name="uid",
            description="Property GUID",
            dtype=DatasetProperty.UUID_TYPE,
            is_vector=False,
            is_indexed=True
        ),
        DatasetProperty(
            name="Count",
            description="total count",
            dtype=DatasetProperty.INTEGER_TYPE,
            is_vector=False,
            is_indexed=False
        )
    ]
)
print(dataset)

Test, Test dataset
Name, text, True, True, Property name as string
description, text, False, False, Property description
uid, uuid, True, False, Property GUID
Count, integer, False, False, total count


In [49]:
# Read all collections
client.collections.list_all()

{'Test': _CollectionConfigSimple(name='Test', description='Test dataset', generative_config=None, properties=[_Property(name='name', description='Property name as string', data_type=<DataType.TEXT: 'text'>, index_filterable=True, index_range_filters=False, index_searchable=True, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=None, vectorizer='none'), _Property(name='description', description='Property description', data_type=<DataType.TEXT: 'text'>, index_filterable=False, index_range_filters=False, index_searchable=False, nested_properties=None, tokenization=<Tokenization.WORD: 'word'>, vectorizer_config=None, vectorizer='none'), _Property(name='uid', description='Property GUID', data_type=<DataType.UUID: 'uuid'>, index_filterable=True, index_range_filters=False, index_searchable=False, nested_properties=None, tokenization=None, vectorizer_config=None, vectorizer='none'), _Property(name='count', description='total count', data_type=<DataType.INT: '

In [47]:
# Create a collection
def add_collection(dataset):
    if not dataset:
            return None
        
    try:
        properties = []
        if dataset.properties and len(dataset.properties) > 0:
            for prop in dataset.properties:
                properties.append(Property(
                    name=prop.name,
                    description=prop.description,
                    data_type=DatasetProperty.to_weaviate_property(prop.type),
                    index_filterable=prop.is_indexed,
                    index_searchable=prop.is_indexed and prop.type == DatasetProperty.TEXT_TYPE,
                    skip_vectorization=True, # Do not vectorize any property. The vectorization will occur externally
                ))

        client.collections.create(
            name=dataset.name,
            description=dataset.description,
            vectorizer_config=Configure.Vectorizer.none(), # Do not vectorize any property. The vectorization will occur externally
            vector_index_config=Configure.VectorIndex.hnsw(
                distance_metric=VectorDistances.COSINE,
            ),
            properties=properties

        )
        print(f"Collection '{dataset.name}' created successfully.")
        return dataset
    except weaviate.exceptions.SchemaValidationException as e:
        print(f"Error creating collection: {e}")

    return None


In [48]:
add_collection(dataset)

Collection 'Test' created successfully.


Test, Test dataset
Name, text, True, True, Property name as string
description, text, False, False, Property description
uid, uuid, True, False, Property GUID
Count, integer, False, False, total count

In [96]:
# Get a specific collection
result = async_client.collections.get("Test")
await result.exists()
await result.config.get()

# Manage Posts

In [141]:
class WeaviateCollection(ABC):

    @staticmethod
    @abstractmethod
    def from_dict(data:dict)->typing.Type[typing.Any]:
        pass

    @staticmethod
    @abstractmethod
    def to_dict(data:typing.Any)->dict:
        pass

    @abstractmethod
    def get_embedding(self)->typing.List[float]:
        pass
    

@strawberry.type
class Post(WeaviateCollection):
    uid:typing.Optional[str]
    postId:str
    postTitle:str
    postExcerpt:str
    postContent:str
    postDate:datetime
    postAuthor:str
    postCategories:typing.Optional[str]
    postTags:typing.Optional[str]
    postUrl:typing.Optional[str]

    @staticmethod
    def to_dict(post):
        return {
            "uid": post.uid,
            "postId": post.postId,
            "postTitle": post.postTitle,
            "postExcerpt": post.postExcerpt,
            "postContent": post.postContent,
            "postDate": post.postDate,
            "postAuthor": post.postAuthor,
            "postCategories": post.postCategories,
            "postTags": post.postTags,
            "postUrl": post.postUrl
        }
    
    @staticmethod
    def from_dict(data:dict):
        if not data or type(data) is not dict:
            return None

        return Post(
            uid=data.get("uid", ""),
            postId=data.get("postId", ""),
            postTitle=data.get("postTitle", ""),
            postExcerpt=data.get("postExcerpt", ""),
            postContent=data.get("postContent", ""),
            postDate=data.get("postDate", ""),
            postAuthor=data.get("postAuthor", ""),
            postCategories=data.get("postCategories", ""),
            postTags=data.get("postTags", ""),
            postUrl=data.get("postUrl", "")
        )
    
    def get_embedding(self):
        return []


In [136]:
posts = async_client.collections.get("Post")
await posts.exists()

True

In [143]:
# Get all posts
await posts.query.fetch_objects()

QueryReturn(objects=[Object(uuid=_WeaviateUUIDInt('3561a4de-b744-4313-8432-f7b9ce1d1996'), metadata=MetadataReturn(creation_time=None, last_update_time=None, distance=None, certainty=None, score=None, explain_score=None, is_consistent=None, rerank_score=None), properties={'postExcerpt': 'This is a sample excerpt', 'uid': '', 'postDate': datetime.datetime(2024, 11, 20, 1, 19, 36, 995837, tzinfo=datetime.timezone.utc), 'postContent': 'This is a sample content', 'postUrl': 'https://vip3rtech6069.com', 'postCategories': 'blog,test', 'postTitle': 'Hello World!', 'postTags': 'test,post', 'postAuthor': 'Zohair Mehtab', 'postId': '2648'}, references=None, vector={}, collection='Post')])

In [142]:
# Add new post
post = Post(    
    uid="",
    postId="2648",
    postTitle="Hello World!",
    postAuthor="Zohair Mehtab",
    postDate=datetime.now(),
    postUrl="https://vip3rtech6069.com",
    postCategories="blog,test",
    postTags="test,post",
    postExcerpt="This is a sample excerpt",
    postContent="This is a sample content"
)

uuid = await posts.data.insert(
    properties=Post.to_dict(post),
    vector=[0.12345] * 1536
)
print(uuid)

3561a4de-b744-4313-8432-f7b9ce1d1996


            To use a different timezone, specify it in the datetime object. For example:
            datetime.datetime(2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone(-datetime.timedelta(hours=2))).isoformat() = 2021-01-01T00:00:00-02:00
            


In [None]:
# delete post
result = await posts.data.delete_by_id("3561a4de-b744-4313-8432-f7b9ce1d1996")

# Close Connection

In [102]:
client.close()

In [105]:
await async_client.close()
async_client.is_connected()

False