In [1]:
# %pip install tantivy
# %pip install git+https://github.com/Eventual-Inc/Daft.git
# %pip install lancedb
# %pip install git+https://github.com/auxon/griptape.git
# %pip install cloudpickle
# %pip install ray



In [2]:
import json
import sqlite3
import cloudpickle
import ray

def serialize_sqlite_connection(conn):
    return ray.data.datasource

def deserialize_sqlite_connection(path):
    return sqlite3.connect(path)

# Register the custom serializer with cloudpickle
cloudpickle.register_pickle_by_value(sqlite3)
cloudpickle.CloudPickler.dispatch[sqlite3.Connection] = serialize_sqlite_connection

# Register the custom serializer with Ray
ray.util.register_serializer(
    sqlite3.Connection,
    serializer=serialize_sqlite_connection,
    deserializer=deserialize_sqlite_connection
)


In [3]:
from pydantic import BaseModel, EmailStr, Field
from typing import List, Optional

class EmailModel(BaseModel):
    sender: EmailStr = Field(..., description="Sender's email address")
    subject: str = Field(..., description="Subject of the email")
    content: str = Field(..., description="Content of the email")
    namespace: Optional[str] = Field(default=None, description="Namespace for the email")
    meta: Optional[str] = Field(default=None, description="Metadata for the email")
    vector: Optional[List[float]] = Field(default=None, description="Vector of content for the email")

class EmailListModel(BaseModel):
    emails: List[EmailModel] = Field(..., description="List of emails")


In [4]:
import json
import uuid
import lancedb
import pyarrow as pa
from typing import List, Dict, Optional
from attrs import define, field
from pydantic import BaseModel, EmailStr
from griptape.mixins import SerializableMixin, FuturesExecutorMixin
from lancedb.pydantic import pydantic_to_schema

class EmailEntryModel(BaseModel):
    id: str = Field(..., description="Unique identifier for the email")
    sender: str = Field(..., description="Sender's email address")
    subject: str = Field(..., description="Subject of the email")
    content: str = Field(..., description="Content of the email")
    namespace: Optional[str] = Field(default=None, description="Namespace for the email")
    meta: Optional[str] = Field(default=None, description="Metadata for the email")
    vector: Optional[List[float]] = Field(default=None, description="Vectors of content for the email")

@define
class PydanticPyArrowDaftRayLanceDBDriver(SerializableMixin, FuturesExecutorMixin):
    lancedb_path: str = field(kw_only=True, default="lancedb_dir", metadata={"serializable": True})
    table_name: str = field(kw_only=True, default="emails", metadata={"serializable": True})

    def __attrs_post_init__(self):
        # Initialize LanceDB connection
        self.lancedb = lancedb.connect(self.lancedb_path)
        
        # Check if the table exists and delete it if it does
        if self.table_name in self.lancedb.table_names():
            self.lancedb.drop_table(self.table_name)
            print(f"Dropped existing table: {self.table_name}")
        
        # Create LanceDB table using PyArrow schema
        schema = pa.schema([
            pa.field('id', pa.string()),
            pa.field('sender', pa.string()),
            pa.field('subject', pa.string()),
            pa.field('content', pa.string()),
            pa.field('namespace', pa.string()),
            pa.field('meta', pa.string()),
            pa.field('vector', pa.list_(pa.float32()))  # Add vector column
        ])
        table = self.lancedb.create_table(self.table_name, schema=schema)
        print(f"Created table with schema: {table.schema}")


    def upsert_email(self, email: EmailModel, *, email_id: Optional[str] = None, namespace: Optional[str] = None, meta: Optional[Dict] = None) -> str:
        if email_id is None:
            email_id = self._get_default_id(str(email.dict()))

        table = self.lancedb.open_table(self.table_name)
        print(f"Table schema before upsert: {table.schema}")

        # Generate a dummy vector (e.g., a list of floats)
        vector = [0.0] * 128  # Example: 128-dimensional zero vector

        data = EmailEntryModel(
            id=email_id,
            sender=str(email.sender),
            subject=email.subject,
            content=email.content,
            namespace=email.namespace,
            meta=email.meta,
            vector=email.vector 
        )
        print(f"Data to be inserted: {data.model_dump()}")

        # Ensure all fields are included in the dictionary
        data_dict = data.model_dump()
        for field in ['namespace', 'meta', 'vector']:
            if field not in data_dict:
                data_dict[field] = None
        data_dict['vector'] = vector  # Add the vector to the data

        # Define the schema explicitly
        schema = pa.schema([
            pa.field('id', pa.string()),
            pa.field('sender', pa.string()),
            pa.field('subject', pa.string()),
            pa.field('content', pa.string()),
            pa.field('namespace', pa.string()),
            pa.field('meta', pa.string()),
            pa.field('vector', pa.list_(pa.float32()))  # Ensure vector column is included
        ])

        # Convert to PyArrow Table with the defined schema
        pyarrow_table = pa.Table.from_pydict({k: [v] for k, v in data_dict.items()}, schema=schema)
        print(f"PyArrow table schema: {pyarrow_table.schema}")
        table.add(pyarrow_table, mode="overwrite")

        return email_id

    def load_email(self, email_id: str, *, namespace: Optional[str] = None) -> Optional[EmailEntryModel]:
        table = self.lancedb.open_table(self.table_name)
        query = table.search(f"id == '{email_id}'")

        if namespace:
            query = query.filter(f"namespace == '{namespace}'")

        result = query.to_pandas().to_dict(orient="records")
        if result:
            return EmailEntryModel(**result[0])
        return None

    def load_all_emails(self, *, namespace: Optional[str] = None) -> List[EmailEntryModel]:
        table = self.lancedb.open_table(self.table_name)

        if namespace:
            results = table.search(f"namespace == '{namespace}'").to_pandas().to_dict(orient="records")
        else:
            results = table.to_pandas().to_dict(orient="records")

        return [EmailEntryModel(**r) for r in results]

    def delete_email(self, email_id: str) -> None:
        table = self.lancedb.open_table(self.table_name)
        table.delete(f"id == '{email_id}'")

    def query_by_sender(
        self,
        sender: EmailStr,
        *,
        count: Optional[int] = None,
        namespace: Optional[str] = None,
    ) -> List[EmailEntryModel]:
        table = self.lancedb.open_table(self.table_name)
        query = table.search(f"sender == '{sender}'", vector_column_name="vector")

        if namespace:
            query = query.filter(f"namespace == '{namespace}'")

        query = query.limit(count or 10)
        results = query.to_pandas().to_dict(orient="records")

        return [EmailEntryModel(**r) for r in results]

    def _get_default_id(self, value: str) -> str:
        return str(uuid.uuid5(uuid.NAMESPACE_OID, value))


In [5]:
import daft
import ray
from griptape.tasks import BaseTask
from griptape.artifacts import TextArtifact

class EmailProcessingWorkflow(BaseTask):
    def __init__(self, lancedb_path: str):
        super().__init__()
        self.lancedb_path = lancedb_path
        ray.shutdown()
        ray.init()
        daft.set_execution_config(enable_native_executor=True)  # Enable Ray execution
        

    def run(self, input_data: EmailListModel):
        # Convert input data to Daft DataFrame
        data = {
            "senders": [email.sender for email in input_data.emails],
            "subjects": [email.subject for email in input_data.emails],
            "contents": [email.content for email in input_data.emails],
            "namespace": [email.namespace for email in input_data.emails],
            "meta": [email.meta for email in input_data.emails],
            "vector": [email.vector for email in input_data.emails],
        }
        df = daft.from_pydict(data)  # Create a LogicalPlanBuilder

        # Process data using Daft (this will be executed on Ray)
        df = df.with_column("domain", df["senders"].str.split("@").list.get(-1))
        df = df.with_column("word_count", df["contents"].str.split(" ").list.lengths())

        # Write results to LanceDB
        df.write_lance(self.lancedb_path)

        # Convert Daft DataFrame to Pandas DataFrame for display
        result_df = df.to_pandas()

        return TextArtifact(f"Processed data:\n{result_df}")

    def input(self) -> EmailListModel:
        return EmailListModel


2024-09-08 13:27:01,806	INFO worker.py:1783 -- Started a local Ray instance.


In [6]:
# Example email data
input_data = EmailListModel(emails=[
    EmailModel(sender="alice@example.com", subject="Meeting", content="Let's meet at 10 AM.", namespace="personal", meta="location:office", vector=[0.1, 0.2, 0.3]),
    EmailModel(sender="bob@example.org", subject="Project Update", content="The project is on track.", namespace="work", meta="status:on track", vector=[0.4, 0.5, 0.6]),
    EmailModel(sender="carol@example.net", subject="Invoice", content="Please find the invoice attached.", namespace="work", meta="status:pending", vector=[0.7, 0.8, 0.9])
])

# Initialize the driver
driver = PydanticPyArrowDaftRayLanceDBDriver(lancedb_path="./lancedb_dir")

# Upsert emails into LanceDB
for email in input_data.emails:
    driver.upsert_email(email)

# Create the FTS index
table = driver.lancedb.open_table(driver.table_name)
table.create_fts_index(['sender', 'subject', 'content'])  # Add any other relevant fields

# Now you can query emails by a specific sender
queried_emails = driver.query_by_sender("alice@example.com")

# Print the queried result
print(queried_emails)

# Run the processing workflow using Ray and Daft
workflow = EmailProcessingWorkflow(lancedb_path="./lancedb_dir")
result_artifact = workflow.run(input_data=input_data)

# Output the processed results
print(result_artifact.value)


Dropped existing table: emails
Created table with schema: id: string
sender: string
subject: string
content: string
namespace: string
meta: string
vector: list<item: float>
  child 0, item: float
Table schema before upsert: id: string
sender: string
subject: string
content: string
namespace: string
meta: string
vector: list<item: float>
  child 0, item: float
Data to be inserted: {'id': '7caf2b1a-a72f-58c6-b95d-369ccdb47213', 'sender': 'alice@example.com', 'subject': 'Meeting', 'content': "Let's meet at 10 AM.", 'namespace': 'personal', 'meta': 'location:office', 'vector': [0.1, 0.2, 0.3]}
PyArrow table schema: id: string
sender: string
subject: string
content: string
namespace: string
meta: string
vector: list<item: float>
  child 0, item: float
Table schema before upsert: id: string
sender: string
subject: string
content: string
namespace: string
meta: string
vector: list<item: float>
  child 0, item: float
Data to be inserted: {'id': 'fefab10b-1ff4-562a-a037-f164ee4e8740', 'sender':

2024-09-08 13:27:05,781	INFO worker.py:1783 -- Started a local Ray instance.
thread 'python' panicked at src/daft-physical-plan/src/translate.rs:65:14:
not yet implemented: Sink not yet implemented
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


PanicException: not yet implemented: Sink not yet implemented