In [1]:
import json
import sqlite3
import cloudpickle
import pickle
import collections
import ray
import threading
import hmac

def serialize_sqlite_connection(conn):
    return ray.data.datasource

def deserialize_sqlite_connection(path):
    return sqlite3.connect(path)

def serialize_thread_lock(lock):
    print("serialized_thread_lock")
    return None

def deserialize_thread_lock(_):
    return None

class SerializableHMAC:
    def __init__(self, hmac_obj):
        self.msg = hmac_obj.digest()
        self.digestmod = hmac_obj.digest_size

    @classmethod
    def from_hmac(cls, hmac_obj):
        return cls(hmac_obj)

    def to_hmac(self):
        return hmac.new(b'', self.msg, digestmod=self.digestmod)

def serialize_hmac(hmac_obj):
    return SerializableHMAC.from_hmac(hmac_obj)

def deserialize_hmac(serialized_hmac):
    return serialized_hmac.to_hmac()

class HandleWrapper:
    def __init__(self, obj):
        self.class_name = type(obj).__name__
        self.attributes = {}
        for key, value in obj.__dict__.items():
            if not key.startswith('_'):
                try:
                    cloudpickle.dumps(value)
                    self.attributes[key] = value
                except:
                    self.attributes[key] = f"Unpicklable_{type(value).__name__}"

    @classmethod
    def from_object(cls, obj):
        return cls(obj)

    def to_object(self):
        # This is a placeholder. You might need to implement proper reconstruction logic.
        return type(self.class_name, (), self.attributes)()

def serialize_handle_object(obj):
    return HandleWrapper.from_object(obj)

def deserialize_handle_object(wrapped):
    return wrapped.to_object()


# # Register the custom serializers with Ray
# ray.util.register_serializer(
#     object,  # This will catch all objects
#     serializer=lambda obj: serialize_handle_object(obj) if hasattr(obj, 'handle') or not cloudpickle.is_picklable(obj) else obj,
#     deserializer=lambda obj: deserialize_handle_object(obj) if isinstance(obj, HandleWrapper) else obj
# )

ray.util.register_serializer(sqlite3.Connection, serializer=serialize_sqlite_connection, deserializer=deserialize_sqlite_connection)
ray.util.register_serializer(type(threading.Lock), serializer=serialize_thread_lock, deserializer=deserialize_thread_lock)
ray.util.register_serializer(hmac.HMAC, serializer=serialize_hmac, deserializer=deserialize_hmac)

# Initialize Ray
if not ray.is_initialized():
    ray.init()

2024-09-04 18:30:21,563	INFO worker.py:1783 -- Started a local Ray instance.


# Integrating Agent Framework with Email Processing Example

In [2]:
import os
import dotenv
import ray
import json
import uuid
import lancedb
import pyarrow as pa
import daft
from huggingface_hub import login
from outlines import models, generate
from outlines.models import VLLM
from outlines.processors.structured import JSONLogitsProcessor
from outlines.generate.api import GenerationParameters, SamplingParameters
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
from attrs import define, field
from griptape.mixins import SerializableMixin, FuturesExecutorMixin
from griptape.tasks import BaseTask
from griptape.artifacts import TextArtifact

# Load environment variables
dotenv.load_dotenv('.env')
hf_token = os.getenv("HF_TOKEN")
login(token=hf_token)

# Initialize Ray
if not ray.is_initialized():
    ray.init()

The token has not been saved to the git credentials helper. Pass `add_to_git_credential=True` in this function directly or `--add-to-git-credential` if using via `huggingface-cli` if you want to set the git credential as well.
Token is valid (permission: read).
Your token has been saved to /teamspace/studios/this_studio/.cache/huggingface/token
Login successful


## Define Data Models

In [3]:
class EmailModel(BaseModel):
    sender: str = Field(..., description="Sender's email address")
    subject: str = Field(..., description="Subject of the email")
    content: str = Field(..., description="Content of the email")
    namespace: Optional[str] = Field(default=None, description="Namespace for the email")
    meta: Optional[str] = Field(default=None, description="Metadata for the email")
    vector: Optional[List[float]] = Field(default=None, description="Vector of content for the email")

class EmailListModel(BaseModel):
    emails: List[EmailModel] = Field(..., description="List of emails")

class EmailEntryModel(BaseModel):
    id: str = Field(..., description="Unique identifier for the email")
    sender: str = Field(..., description="Sender's email address")
    subject: str = Field(..., description="Subject of the email")
    content: str = Field(..., description="Content of the email")
    namespace: Optional[str] = Field(default=None, description="Namespace for the email")
    meta: Optional[str] = Field(default=None, description="Metadata for the email")
    vector: Optional[List[float]] = Field(default=None, description="Vectors of content for the email")

## Define Agent Classes

In [4]:
from griptape.structures import Agent
from lancedb.db import DBConnection
import pandas as pd

@define
class EmailProcessingAgent(Agent):
    lancedb_path: str = field(kw_only=True, default="lancedb_dir", metadata={"serializable": True})
    table_name: str = field(kw_only=True, default="emails", metadata={"serializable": True})
    model_name: str = field(kw_only=True, default="NousResearch/Meta-Llama-3.1-8B-Instruct", metadata={"serializable": True})
    max_model_len: int = field(kw_only=True, default=20000, metadata={"serializable": True})
    vllm: VLLM = field(init=False)
    lancedb: DBConnection = field(init=False)
    
    def __attrs_post_init__(self):
        super().__attrs_post_init__()
        self.lancedb = lancedb.connect(self.lancedb_path)
        self.vllm = models.vllm(model_name=self.model_name, max_model_len=self.max_model_len)
        
        if self.table_name in self.lancedb.table_names():
            self.lancedb.drop_table(self.table_name)
        
        schema = pa.schema([
            pa.field('id', pa.string()),
            pa.field('sender', pa.string()),
            pa.field('subject', pa.string()),
            pa.field('content', pa.string()),
            pa.field('namespace', pa.string()),
            pa.field('meta', pa.string()),
            pa.field('vector', pa.list_(pa.float32()))
        ])
        table =self.lancedb.create_table(self.table_name, schema=schema)
        
        # Create FTS index
        table.create_fts_index(['sender', 'subject', 'content'])


    def process_email(self, email: EmailModel) -> EmailEntryModel:
        prompt = f"Summarize this Email: {email.dict()} using the following JSON schema: {EmailModel.schema_json()} Only respond with the JSON object.\n\n"
        
        generation_parameters = GenerationParameters(max_tokens=4096, seed=42, stop_at="<|endoftext|>")
        logits_processor = JSONLogitsProcessor(
            schema=EmailModel.schema(),
            tokenizer=self.vllm.tokenizer,
            whitespace_pattern="\s+")
        sampling_parameters = SamplingParameters(temperature=1.0, sampler="nucleus")
        
        response = self.vllm.generate(prompts=prompt, generation_parameters=generation_parameters, logits_processor=logits_processor, sampling_parameters=sampling_parameters)
        
        print(response)

        try:
            result = json.loads(response.lstrip().rstrip())
            email.namespace = result.get("namespace")
            email.meta = json.dumps(result.get("metadata", {}))
        except json.JSONDecodeError:
            email.namespace = "unknown"
            email.meta = "{}"
        
        email_id = str(uuid.uuid5(uuid.NAMESPACE_OID, str(email.dict())))
        return EmailEntryModel(id=email_id, **email.dict())

    def upsert_email(self, email: EmailEntryModel):
        table = self.lancedb.open_table(self.table_name)
        data_dict = email.model_dump()
        pyarrow_table = pa.Table.from_pydict({k: [v] for k, v in data_dict.items()})
        print(pyarrow_table)
        table.add(pyarrow_table, mode="overwrite")

    def query_by_sender(self, sender: str, count: Optional[int] = None, namespace: Optional[str] = None) -> List[EmailEntryModel]:
        table = self.lancedb.open_table(self.table_name)
        query = table.search(f"sender == '{sender}'", vector_column_name="vector")
        
        if namespace:
            query = query.where(f"namespace == '{namespace}'")
        
        query = query.limit(count or 10)
        results = query.to_pandas().to_dict(orient="records")
        print(results)
        return [EmailEntryModel(**r) for r in results]

@define
class EmailWriteDataframeTask(BaseTask):
    def __init__(self, lancedb_path: str):
        super().__init__()
        self.lancedb_path = lancedb_path
        self.table_name = "emails"
        daft.set_execution_config(enable_native_executor=True)  # Enable Ray execution
        
    def run(self, input_data: EmailListModel):
        # Connect to LanceDB
        db = lancedb.connect(self.lancedb_path)

        # Prepare data
        data = [
            {
                "id": str(uuid.uuid4()),
                "sender": email.sender,
                "subject": email.subject,
                "content": email.content,
                "namespace": email.namespace,
                "meta": email.meta,
                "vector": email.vector,
                "domain": email.sender.split("@")[-1],
                "word_count": len(email.content.split())
            }
            for email in input_data.emails
        ]

        # Create or open the table
        if self.table_name not in db.table_names():
            table = db.create_table(self.table_name, data)
        else:
            table = db.open_table(self.table_name)
            table.add(data)

        # Convert to pandas DataFrame for display
        result_df = pd.DataFrame(data)
        print(result_df)
        return TextArtifact(f"Processed data:\n{result_df}")


    def input(self) -> EmailListModel:
        return EmailListModel

## Run the Integrated Workflow

In [5]:
# Example email data
input_data = EmailListModel(emails=[
    EmailModel(sender="alice@example.com", subject="Meeting", content="Let's meet at 10 AM.", namespace="personal", meta="location:office", vector=[0.1, 0.2, 0.3]),
    EmailModel(sender="bob@example.org", subject="Project Update", content="The project is on track.", namespace="work", meta="status:on track", vector=[0.4, 0.5, 0.6]),
    EmailModel(sender="carol@example.net", subject="Invoice", content="Please find the invoice attached.", namespace="work", meta="status:pending", vector=[0.7, 0.8, 0.9])
])

# Initialize and run the workflow
workflow = EmailWriteDataframeTask(lancedb_path="./lancedb_dir")
result_artifact = workflow.run(input_data=input_data)

# Output the processed results
print(result_artifact.value)

processing_agent = EmailProcessingAgent(lancedb_path="./lancedb_dir", table_name="emails", model_name="NousResearch/Meta-Llama-3.1-8B-Instruct", max_model_len=20000)

processed_emails = []
for email in input_data.emails:
    processed_email = processing_agent.process_email(email)
    processing_agent.upsert_email(processed_email)
    processed_emails.append(processed_email)




                                     id             sender         subject  \
0  3043f8b0-b0c1-404c-8c1f-3ac35c6ee1bb  alice@example.com         Meeting   
1  8e2841b5-38be-4e8d-88f0-39120e57b11e    bob@example.org  Project Update   
2  5d865537-62a3-4d78-8229-0aa29b93f483  carol@example.net         Invoice   

                             content namespace             meta  \
0               Let's meet at 10 AM.  personal  location:office   
1           The project is on track.      work  status:on track   
2  Please find the invoice attached.      work   status:pending   

            vector       domain  word_count  
0  [0.1, 0.2, 0.3]  example.com           5  
1  [0.4, 0.5, 0.6]  example.org           5  
2  [0.7, 0.8, 0.9]  example.net           5  
Processed data:
                                     id             sender         subject  \
0  3043f8b0-b0c1-404c-8c1f-3ac35c6ee1bb  alice@example.com         Meeting   
1  8e2841b5-38be-4e8d-88f0-39120e57b11e    bob@example.org  Pr

Loading safetensors checkpoint shards:   0% Completed | 0/4 [00:00<?, ?it/s]


INFO 09-04 18:30:39 model_runner.py:890] Loading model weights took 14.9888 GB
INFO 09-04 18:30:45 gpu_executor.py:121] # GPU blocks: 1253, # CPU blocks: 2048
INFO 09-04 18:30:48 model_runner.py:1181] Capturing the model for CUDA graphs. This may lead to unexpected consequences if the model is not static. To run the model in eager mode, set 'enforce_eager=True' or use '--enforce-eager' in the CLI.
INFO 09-04 18:30:48 model_runner.py:1185] CUDA graphs can take additional 1~3 GiB memory per GPU. If you are running out of memory, consider decreasing `gpu_memory_utilization` or enforcing eager mode. You can also reduce the `max_num_seqs` as needed to decrease memory usage.
INFO 09-04 18:31:15 model_runner.py:1300] Graph capturing finished in 26 secs.


Compiling FSM index for all state transitions: 100%|██████████| 148/148 [00:06<00:00, 22.69it/s]
Processed prompts: 100%|██████████| 1/1 [00:04<00:00,  4.54s/it, est. speed input: 68.27 toks/s, output: 15.86 toks/s]


{
    "sender" : "alice@example.com"
    , "subject" : "Meeting"
    , "content" : "Let"
    , "namespace" : null
    , "meta" : "location:office"
    , "vector" : [ 0.1, 0.2, 0.3 ]
}
pyarrow.Table
id: string
sender: string
subject: string
content: string
namespace: null
meta: string
vector: list<item: double>
  child 0, item: double
----
id: [["3868b104-4797-54d6-8f51-85c8d9d43394"]]
sender: [["alice@example.com"]]
subject: [["Meeting"]]
content: [["Let's meet at 10 AM."]]
namespace: [1 nulls]
meta: [["{}"]]
vector: [[[0.1,0.2,0.3]]]


Processed prompts: 100%|██████████| 1/1 [00:05<00:00,  5.00s/it, est. speed input: 61.79 toks/s, output: 15.80 toks/s]


{
    "sender" : "bob@example.org"
    , "subject" : "Project Update"
    , "content" : "The project is on track"
    , "namespace" : "work"
    , "meta" : "status:on track"
    , "vector" : [ 0.4, 0.5,  0.6 ]
}
pyarrow.Table
id: string
sender: string
subject: string
content: string
namespace: string
meta: string
vector: list<item: double>
  child 0, item: double
----
id: [["d3757b5e-2c95-59ce-8ac5-1c4ea92f8abf"]]
sender: [["bob@example.org"]]
subject: [["Project Update"]]
content: [["The project is on track."]]
namespace: [["work"]]
meta: [["{}"]]
vector: [[[0.4,0.5,0.6]]]


Processed prompts: 100%|██████████| 1/1 [00:04<00:00,  4.36s/it, est. speed input: 70.81 toks/s, output: 15.81 toks/s]

{
    "sender" : "carol@example.net" ,
    "subject" : "Invoice" ,
    "content" : "Please find the invoice attached." ,
    "namespace" : "work" ,
    "meta" : "status:pending" ,
    "vector" : [ 0.7 ]  
}
pyarrow.Table
id: string
sender: string
subject: string
content: string
namespace: string
meta: string
vector: list<item: double>
  child 0, item: double
----
id: [["6e9f3d77-decd-51fd-a3c7-6cb86ecaaffa"]]
sender: [["carol@example.net"]]
subject: [["Invoice"]]
content: [["Please find the invoice attached."]]
namespace: [["work"]]
meta: [["{}"]]
vector: [[[0.7,0.8,0.9]]]





In [7]:
# Create the FTS index
table = lancedb.connect("./lancedb_dir").open_table("emails")
table.create_fts_index(['sender', 'subject', 'content'], replace=True)  # Add any other relevant fields

# Query emails by sender
queried_emails = processing_agent.query_by_sender(sender="alice@example.com", count=10, namespace="personal")
print(queried_emails)
print("\nQueried emails:")
for email in queried_emails:
    print(f"Sender: {email.sender}, Subject: {email.subject}")

FileNotFoundError: Table email does not exist.Please first call db.create_table(email, data)