Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to return tuples instead of Records in Python agents #391

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

from langstream import SimpleRecord
import openai
import json
from openai.embeddings_utils import get_embedding
Expand All @@ -32,5 +31,5 @@ def process(self, records):
embedding = get_embedding(record.value(), engine="text-embedding-ada-002")
result = {"input": str(record.value()), "embedding": embedding}
new_value = json.dumps(result)
processed_records.append((record, [SimpleRecord(value=new_value)]))
processed_records.append((record, [(new_value,)]))
return processed_records
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .api import (
Agent,
Record,
RecordType,
Sink,
Source,
Processor,
Expand All @@ -30,6 +31,7 @@

__all__ = [
"Record",
"RecordType",
"TopicConsumer",
"TopicProducer",
"AgentContext",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

__all__ = [
"Record",
"RecordType",
"AgentContext",
"Agent",
"Source",
Expand Down Expand Up @@ -61,6 +62,9 @@ def headers(self) -> List[Tuple[str, Any]]:
pass


RecordType = Union[Record, list, tuple]


class TopicConsumer(ABC):
"""The topic consumer interface"""

Expand Down Expand Up @@ -153,8 +157,19 @@ class Source(Agent):
"""

@abstractmethod
def read(self) -> List[Record]:
"""The Source agent generates records and returns them as list of records."""
def read(self) -> List[RecordType]:
"""The Source agent generates records and returns them as list of records.

:returns: the list of records. The records must either respect the Record
API contract (have methods value(), key() and so on) or be tuples/list.
If the records are tuples/list, the framework will automatically construct
Record objects from them with the values in the following order : value, key,
headers, origin, timestamp.
Eg:
* if you return [("foo",)] a record Record(value="foo") will be built.
* if you return [("foo", "bar")] a record Record(value="foo", key="bar") will
be built.
"""
pass

def commit(self, records: List[Record]):
Expand All @@ -179,11 +194,28 @@ class Processor(Agent):
@abstractmethod
def process(
self, records: List[Record]
) -> List[Tuple[Record, Union[List[Record], Exception]]]:
"""The agent processes records and returns a list containing the association of
these records and the result of these record processing.
) -> List[Tuple[Record, Union[List[RecordType], Exception]]]:
"""The agent processes records and returns a list containing the associations of
these records with the result of these record processing.
The result of each record processing is a list of new records or an exception.
The transactionality of the function is guaranteed by the runtime.

:returns: the list of associations between an input record and the output
records processed from it.
Eg: [(input_record, [output_record1, output_record2])]
If an input record cannot be processed, the associated element shall be an
exception.
Eg: [(input_record, RuntimeError("Could not process"))]
When the processing is successful, the output records must either respect the
Record API contract (have methods value(), key() and so on) or be tuples/list.
If the output records are tuples/list, the framework will automatically
construct Record objects from them with the values in the following order :
value, key, headers, origin, timestamp.
Eg:
* if you return [(input_record, [("foo",)])] a record Record(value="foo") will
be built.
* if you return [(input_record, [("foo", "bar")])] a record
Record(value="foo", key="bar") will be built.
"""
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import abstractmethod
from typing import Any, List, Tuple, Union

from .api import Record, Processor
from .api import Record, Processor, RecordType

__all__ = ["SimpleRecord", "SingleRecordProcessor"]

Expand All @@ -29,31 +29,31 @@ def __init__(
self,
value,
key=None,
headers: List[Tuple[str, Any]] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small breaking change here for consistency.
Only affects if you were using positional args and not named ones.

origin: str = None,
timestamp: int = None,
headers: List[Tuple[str, Any]] = None,
):
self._value = value
self._key = key
self._headers = headers or []
self._origin = origin
self._timestamp = timestamp
self._headers = headers or []

def key(self):
return self._key

def value(self):
return self._value

def headers(self) -> List[Tuple[str, Any]]:
return self._headers

def origin(self) -> str:
return self._origin

def timestamp(self) -> int:
return self._timestamp

def headers(self) -> List[Tuple[str, Any]]:
return self._headers

def __str__(self):
return (
f"Record(value={self._value}, key={self._key}, origin={self._origin}, "
Expand All @@ -68,13 +68,24 @@ class SingleRecordProcessor(Processor):
"""A Processor that processes records one-by-one"""

@abstractmethod
def process_record(self, record: Record) -> List[Record]:
"""Process one record and return a list of records or raise an exception"""
def process_record(self, record: Record) -> List[RecordType]:
"""Process one record and return a list of records or raise an exception.

:returns: the list of processed records. The records must either respect the
Record API contract (have methods value(), key() and so on) or be tuples/list.
If the records are tuples/list, the framework will automatically construct
Record objects from them with the values in the following order : value, key,
headers, origin, timestamp.
Eg:
* if you return [("foo",)] a record Record(value="foo") will be built.
* if you return [("foo", "bar")] a record Record(value="foo", key="bar") will
be built.
"""
pass

def process(
self, records: List[Record]
) -> List[Tuple[Record, Union[List[Record], Exception]]]:
) -> List[Tuple[Record, Union[List[RecordType], Exception]]]:
results = []
for record in records:
try:
Expand Down
Loading
Loading