In [None]:
pip install lancedb getdaft pyarrow 

In [None]:

import pyarrow as pa
import gzip
import hashlib
import mimetypes
from ulid import ULID
from datetime import datetime
from enum import Enum
from typing import Optional, Dict, Any, List
import os
import json
import lancedb
import daft
from daft import lit, col

In [None]:
base_schema = [
    pa.field(
        "id",
        pa.string(),
        nullable=False,
        primary_key=True,
        metadata={"description": "ULID Unique identifier for the record"},
    ),
    pa.field(
        "type",
        pa.string(),
        nullable=False,
        metadata={"description": "Type of the data object"},
    ),
    pa.field(
        "created_at",
        pa.timestamp("ns", tz="UTC"),
        nullable=False,
        metadata={"description": "Timestamp when the record was created"},
    ),
    pa.field(
        "updated_at",
        pa.timestamp("ns", tz="UTC"),
        nullable=False,
        metadata={"description": "Timestamp when the record was last updated"},
    ),
    pa.field(
        "inserted_at",
        pa.timestamp("ns", tz="UTC"),
        nullable=False,
        metadata={"description": "Timestamp when the data object was inserted into the database"},
    ),
]




artifact_schema = [
    pa.field(
        "name",
        pa.string(),
        nullable=False,
        metadata={"description": "Name of the artifact"},
    ),
    pa.field(
        "artifact_uri",
        pa.string(),
        nullable=False,
        metadata={"description": "URI where the artifact is stored"},
    ),
    pa.field(
        "payload",
        pa.binary(),
        nullable=False,
        metadata={"description": "Gzipped binary of the artifact file's contents"},
    ),
    pa.field(
        "extension",
        pa.string(),
        nullable=False,
        metadata={"description": "File extension of the artifact"},
    ),
    pa.field(
        "mime_type",
        pa.string(),
        nullable=False,
        metadata={"description": "MIME type of the artifact"},
    ),
    pa.field(
        "version",
        pa.string(),
        nullable=False,
        metadata={"description": "Version of the artifact"},
    ),
    pa.field(
        "size_bytes",
        pa.int64(),
        nullable=False,
        metadata={"description": "Size of the artifact in bytes"},
    ),
    pa.field(
        "checksum",
        pa.string(),
        nullable=False,
        metadata={"description": "MD5 checksum of the artifact"},
    ),
]



In [None]:

class Artifact:
    """
    Versatile model for managing artifacts in a data system.
    """

    schema: ClassVar[pa.Schema] = pa.schema(base_schema + artifact_schema)
    obj_type: ClassVar[str] = "Artifact"

    def __init__(self, files: List[str] = None, uri_prefix: Optional[str] = None):
        # Create an empty DataFrame with the schema from ArtifactObject
        empty_data = {field.name: [] for field in self.schema}
        self.df = daft.from_pydict(empty_data)

        if files:
            self._populate_from_local_file(files, uri_prefix)

    def _populate_from_local_file(self, files: List[str], uri_prefix: Optional[str] = None):
        rows = []
        for file in files:
            with open(file, "rb") as f:
                content = f.read()  

            file_id = str(ULID())
            now = datetime.now().isoformat()
            file_name = os.path.basename(file)
            artifact_uri = f"{uri_prefix}/{file_id}__{file_name}.gzip"
            payload = gzip.compress(content)
            extension = file_name.split(".")[1]
            mime_type = mimetypes.guess_type(file)[0] or "application/octet-stream"
            branch = "1.0"
            size_bytes = len(content)
            checksum = hashlib.md5(payload).hexdigest()
            
            self.df = self.df.with_column("id", slit(file_id)) \
                .with_column("created_at", now) \
                .with_column("updated_at", now) \
                .with_column("inserted_at", now) \
                .with_column("name", file_name) \
                .with_column("artifact_uri", artifact_uri) \
                .with_column("payload", payload) \
                .with_column("extension", extension) \
                .with_column("mime_type", mime_type) \
                .with_column("version", "1.0") \
                .with_column("size_bytes", size_bytes) \
                .with_column("checksum", checksum)  

        self.df = daft.from_pylist(rows)

    def __repr__(self):
        return f"<Artifact with {len(self.df)} files>"

    def __str__(self):
        return self.__repr__()


: 

In [None]:
from daft import DataType, Schema, DataFrame, TimeUnit, col
import daft

from daft import DataType, Schema, DataFrame, col 


multimodal_data_schema = Schema({
    "data": DataType.struct(
        DataType.string(), # text
        DataType.image(), # image
        DataType.string(), # audio
        DataType.string(), # video
        DataType.timestamp('ns'), # timestamp
    ),
})

metadata_schema = Schema({
    "id": DataType.string(),
    "name": DataType.string(),
    "description": DataType.string(),
    "created_at": DataType.timestamp('ns'),
    "updated_at": DataType.timestamp('ns'),
    "inserted_at": DataType.timestamp('ns'),
    "version": DataType.string(),
    "size_bytes": DataType.uint64(),
    "checksum": DataType.string(),
    "mime_type": DataType.string(),

})

embeddings_schema = Schema({
    "embeddings": DataType.embedding(),
    "embedding_metadata": metadata_schema,
})

base_schema = Schema({
    "data": 
    "metadata": metadata_schema,
    "embeddings": DataType.embedding(),
    "embedding_metadata": metadata_schema,

})

metadata_df = DataFrame(
    data=None,
    schema=metadata_schema,
    partition_by=["id"],
    mode="append",
    name="metadata",
)

DataType.struct(
        DataType.string(), # id
        DataType.string(), # name
        DataType.string(), # description
        DataType.timestamp('ns'), # created_at
        DataType.timestamp('ns'), # updated_at
        DataType.timestamp('ns'), # inserted_at
        DataType.string(), # version
        DataType.uint64(), # size_bytes
        DataType.string(), # checksum
        DataType.string(), # mime_type
    ),

image_schema = Schema({
    "": DataType.string(),
    "width": DataType.uint64(),
    "height": DataType.uint64(),
})

# Base Node Schema
base_node_schema = Schema({
    "content": DataType.struct(
        DataType.string(), # text
        DataType.image(), # image
        DataType.string(), # audio
        DataType.string(), # video
        DataType.timestamp('ns'), # timestamp
    ),
    "type": DataType.string(),
    "url": DataType.string(),
    "timestamp": DataType.timestamp('ns'),
    "metadata": DataType.struct(
        DataType.string(), # id
        DataType.string(), # name
        DataType.string(), # description
        DataType.timestamp('ns'), # created_at
        DataType.timestamp('ns'), # updated_at
        DataType.timestamp('ns'), # inserted_at
        DataType.string(), # version
    )
})

# Edge Schema
edge_schema = Schema({
    "source": DataType.string(),
    "target": DataType.string(),
    "relation": DataType.string(),
    "timestamp": DataType.timestamp('ns'),
    "metadata": DataType.struct(
        DataType.string(), # id
        DataType.string(), # name
        DataType.string(), # description
        DataType.timestamp('ns'), # created_at
        DataType.timestamp('ns'), # updated_at
        DataType.timestamp('ns'), # inserted_at
        DataType.string(), # version
    )
})

class BaseStructure:
    def __init__(self, index: str):
        self.index = index
        self.nodes = DataFrame(schema=base_node_schema)

    def add_node(self, nodes: DataFrame):
        new_node = DataFrame.from_pydict(node_data)
        self.nodes = self.nodes.concat(new_node)
        return self

    def get(self, ):
        return self.nodes.collect()

    def filter(self, condition):
        """
        Filter the nodes DataFrame.

        Args:
            condition (str): A condition to filter the nodes.
        """
        self.nodes = self.nodes.filter(condition)
        return self

    def upsert(self, nodes: DataFrame):
        """
        Upsert a node into the nodes DataFrame.

        Args:
            nodes (DataFrame): A DataFrame of nodes to upsert.
        """
        condition = self.nodes.select("metadata.id") == nodes.select("metadata.id")
        self.nodes = self.nodes.filter(nodes, condition)
        return self
                                                
    def sql(self, query: str):
        return self.nodes.sql(query)

class EdgeStructure:
    def __init__(self, index: str):
        self.index = index
        self.edges = DataFrame(schema=edge_schema)

    def add_edge(self, edge_data: dict):
        new_edge = DataFrame.from_pydict(edge_data)
        self.edges = self.edges.concat(new_edge)
        return self
    
    def get_edges(self):
        return self.edges

    def filter_edges(self, condition):
        self.edges = self.edges.filter(condition)
        return self

    def upsert_edges(self, source: str, target: str, update_dict: dict):
        condition = 

class Document:
    def __init__(self, index: str):
        self.base = BaseStructure(index)

    def add_text(self, id: str, content: str):
        return self.base.add_node({"id": id, "content": content, "type": "text"})

    def add_image(self, id: str, content: str, url: str):
        return self.base.add_node({"id": id, "content": content, "type": "image", "url": url})

    def add_audio(self, id: str, content: str, url: str, duration: float):
        return self.base.add_node({"id": id, "content": content, "type": "audio", "url": url, "duration": duration})

    def add_video(self, id: str, content: str, url: str, duration: float):
        return self.base.add_node({"id": id, "content": content, "type": "video", "url": url, "duration": duration})

    def add_timestamp(self, id: str, content: str, timestamp):
        return self.base.add_node({"id": id, "content": content, "type": "timestamp", "timestamp": timestamp})

    def get_nodes(self):
        return self.base.get_nodes()

    def filter_nodes(self, condition):
        self.base.filter_nodes(condition)
        return self

    def update_node(self, node_id: str, update_dict: dict):
        self.base.update_node(node_id, update_dict)
        return self

class Graph:
    def __init__(self, index: str):
        self.base = BaseStructure(index)
        self.edges = DataFrame(schema=edge_schema)

    def add_node(self, node_data: dict):
        self.base.add_node(node_data)
        return self

    def add_edge(self, edge_data: dict):
        new_edge = DataFrame.from_pydict(edge_data)
        self.edges = self.edges.concat(new_edge)
        return self

    def get_nodes(self):
        return self.base.get_nodes()

    def get_edges(self):
        return self.edges

    def filter_nodes(self, condition):
        self.base.filter_nodes(condition)
        return self

    def filter_edges(self, condition):
        self.edges = self.edges.filter(condition)
        return self

    def update_node(self, node_id: str, update_dict: dict):
        self.base.update_node(node_id, update_dict)
        return self

    def update_edge(self, source: str, target: str, update_dict: dict):
        condition = (self.edges["source"] == source) & (self.edges["target"] == target)
        for key, value in update_dict.items():
            self.edges = self.edges.with_column(key, 
                daft.where(condition, daft.lit(value), self.edges[key]))
        return self

In [None]:
# Create and populate a document
doc = (Document("doc1")
    .add_text("1", "This is a document")
    .add_image("2", "An image in the document", "http://example.com/image.jpg")
    .add_audio("3", "An audio clip", "http://example.com/audio.mp3", 120.5)
    .filter_nodes(col("type") != "audio"))

# Create and populate a graph
graph = (Graph("graph1")
    .add_node({"id": "1", "content": "Hello, world!", "type": "text"})
    .add_node({"id": "2", "content": "A beautiful landscape", "type": "image", "url": "http://example.com/image.jpg"})
    .add_edge({"source": "1", "target": "2", "relation": "describes"})
    .update_node("1", {"content": "Updated content"})
    .filter_edges(col("relation") == "describes"))

# Get nodes and edges
doc_nodes = doc.get_nodes()
graph_nodes = graph.get_nodes()
graph_edges = graph.get_edges()

In [None]:
# Hierarchical Task Network,:
class HTN():
    def __init__(self, index):
        self.graph = Graph(index)

    def add_node(self, node_data: dict):
        new_node = DataFrame.from_pydict(node_data)
        self.nodes = self.nodes.concat(new_node)
        return self

    def add_edge(self, edge_data: dict):
        new_edge = DataFrame.from_pydict(edge_data)
        self.edges = self.edges.concat(new_edge)
        return self

    def get_nodes(self):
        return self.nodes
