Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): context stage 2 #363

Merged
merged 37 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
77c18b5
add: agent representation object
Archento Apr 19, 2024
ba0b907
add: new context to message queue
Archento Apr 19, 2024
efe7c41
Merge branch 'main' into refactor/context_stage_1
Archento Apr 19, 2024
00f322b
remove wallet from AgentRepresentation
Archento Apr 22, 2024
35aaea8
Merge remote-tracking branch 'origin/main' into refactor/context_stage_1
Archento Apr 22, 2024
a6e5745
add: initial wip
Archento Apr 23, 2024
0771f4a
remove protocol query
Archento Apr 23, 2024
0de2c8a
restructure method call
Archento Apr 23, 2024
7e11afe
more refactoring
Archento Apr 24, 2024
e9f13d7
add: current wip
Archento Apr 26, 2024
c8ea58c
re_add send raw
Archento Apr 26, 2024
597e707
add: wip
Archento Apr 26, 2024
5114b39
update: envelope handling
Archento May 7, 2024
9b60863
cleanup
Archento May 7, 2024
dbd8f03
fix: send_sync_message
Archento May 7, 2024
aae9d2a
add deprecation warning for ctx.address
Archento May 7, 2024
aa1b69e
fix: property decorator
Archento May 7, 2024
6b4bc81
Merge remote-tracking branch 'origin/main' into refactor/context_stage_2
Archento May 7, 2024
eb6a7e4
fix: import
Archento May 7, 2024
354a570
feat: handle sync responses
jrriehl May 7, 2024
ad1c7d4
chore: ruff fixes
jrriehl May 7, 2024
8c95901
fix: parse identifier for destination address
jrriehl May 7, 2024
2381ecd
chore: delete print statement
jrriehl May 8, 2024
c50a564
Update python/src/uagents/dispatch.py
Archento May 10, 2024
99b926c
remove: background tasks set
Archento May 14, 2024
a6ee477
move log method to new utils module
Archento May 14, 2024
2705e36
incorporate review comments
Archento May 14, 2024
7cfbb18
remove print
Archento May 14, 2024
4a0e8d4
chore: clean up code
Archento May 15, 2024
5409add
add: timeout for sending future resolve
Archento May 15, 2024
06713c1
chore: typing fix
Archento May 15, 2024
efa3e92
test: fix failing context tests
jrriehl May 15, 2024
00be153
chore: update macos and windows python versions
jrriehl May 15, 2024
6ce5108
fix: server tests
Archento May 15, 2024
81079b5
add: missing method in context ABC
Archento May 17, 2024
dea615e
Merge remote-tracking branch 'origin/main' into refactor/context_stage_2
Archento May 22, 2024
b27d356
remove: timeout from Dispenser
Archento May 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio

from uagents import Model
from uagents.context import send_sync_message
from uagents.communication import send_sync_message

RECIPIENT_ADDRESS = (
"test-agent://agent1q2kxet3vh0scsf0sm7y2erzz33cve6tv5uk63x64upw5g68kr0chkv7hw50"
Expand Down
3 changes: 1 addition & 2 deletions python/examples/17-stateful-communication/agent3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
ChitChatDialogueMessage,
ConcludeChitChatDialogue,
)
from uagents import Agent
from uagents.context import Context
from uagents import Agent, Context

CHAT_AGENT_ADDRESS = "agent1qwvecfwc255pfqqwtjznh9qqk6skl77xc6fzw8mr3ppfex32sr0kcad62n4"

Expand Down
185 changes: 140 additions & 45 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@
import functools
import logging
import uuid
from typing import Any, Coroutine, Dict, List, Optional, Set, Tuple, Type, Union
from typing import (
Any,
Callable,
Coroutine,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
)

import requests
from cosmpy.aerial.client import LedgerClient
from cosmpy.aerial.wallet import LocalWallet, PrivateKey
from cosmpy.crypto.address import Address
from pydantic import ValidationError
from uagents.asgi import ASGIServer
from uagents.communication import Dispenser, MsgDigest
from uagents.config import (
AVERAGE_BLOCK_INTERVAL,
LEDGER_PREFIX,
Expand All @@ -20,16 +32,16 @@
REGISTRATION_RETRY_INTERVAL_SECONDS,
REGISTRATION_UPDATE_INTERVAL_SECONDS,
TESTNET_PREFIX,
get_logger,
parse_agentverse_config,
parse_endpoint_config,
)
from uagents.context import (
Context,
EventCallback,
ExternalContext,
InternalContext,
IntervalCallback,
MessageCallback,
MsgDigest,
)
from uagents.crypto import Identity, derive_key_from_seed, is_user_address
from uagents.dispatch import JsonStr, Sink, dispatcher
Expand All @@ -44,6 +56,7 @@
from uagents.protocol import Protocol
from uagents.resolver import GlobalResolver, Resolver
from uagents.storage import KeyValueStore, get_or_create_private_keys
from uagents.utils import get_logger


async def _run_interval(func: IntervalCallback, ctx: Context, period: float):
Expand Down Expand Up @@ -92,6 +105,87 @@ async def _send_error_message(ctx: Context, destination: str, msg: ErrorMessage)
await ctx.send(destination, msg)


class AgentRepresentation:
"""
Represents an agent in the context of a message.

Attributes:
_address (str): The address of the agent.
_name (Optional[str]): The name of the agent.
_signing_callback (Callable): The callback for signing messages.

Properties:
name (str): The name of the agent.
address (str): The address of the agent.
identifier (str): The agent's address and network prefix.

Methods:
sign_digest(data: bytes) -> str: Sign the provided data with the agent's identity.
"""

def __init__(
self,
address: str,
name: Optional[str],
signing_callback: Callable,
):
"""
Initialize the AgentRepresentation instance.

Args:
address (str): The address of the context.
name (Optional[str]): The optional name associated with the context.
signing_callback (Callable): The callback for signing messages.
"""
self._address = address
self._name = name
self._signing_callback = signing_callback

@property
def name(self) -> str:
"""
Get the name associated with the context or a truncated address if name is None.

Returns:
str: The name or truncated address.
"""
if self._name is not None:
return self._name
return self._address[:10]

@property
def address(self) -> str:
"""
Get the address of the context.

Returns:
str: The address of the context.
"""
return self._address

@property
def identifier(self) -> str:
"""
Get the address of the agent used for communication including the network prefix.

Returns:
str: The agent's address and network prefix.
"""
return TESTNET_PREFIX + "://" + self._address
Archento marked this conversation as resolved.
Show resolved Hide resolved

def sign_digest(self, data: bytes) -> str:
"""
Sign the provided data with the callback of the agent's identity.

Args:
data (bytes): The data to sign.

Returns:
str: The signature of the data.
"""
return self._signing_callback(data)


class Agent(Sink):
"""
An agent that interacts within a communication environment.
Expand Down Expand Up @@ -121,7 +215,8 @@ class Agent(Sink):
of incoming message.
_queries (Dict[str, asyncio.Future]): Dictionary mapping query senders to their response
Futures.
_dispatcher: The dispatcher for message handling.
_dispatcher: The dispatcher for internal handling/sorting of messages.
_dispenser: The dispatcher for external message handling.
_message_queue: Asynchronous queue for incoming messages.
_on_startup (List[Callable]): List of functions to run on agent startup.
_on_shutdown (List[Callable]): List of functions to run on agent shutdown.
Expand Down Expand Up @@ -187,7 +282,6 @@ def __init__(
"""
self._name = name
self._port = port if port is not None else 8000
self._background_tasks: Set[asyncio.Task] = set()
self._resolver = (
resolve
if resolve is not None
Expand Down Expand Up @@ -241,6 +335,7 @@ def __init__(
self._replies: Dict[str, Dict[str, Type[Model]]] = {}
self._queries: Dict[str, asyncio.Future] = {}
self._dispatcher = dispatcher
self._dispenser = Dispenser()
self._message_queue = asyncio.Queue()
self._on_startup = []
self._on_shutdown = []
Expand All @@ -255,20 +350,18 @@ def __init__(
# keep track of supported protocols
self.protocols: Dict[str, Protocol] = {}

self._ctx = Context(
self._identity.address,
self.identifier,
self._name,
self._storage,
self._resolver,
self._identity,
self._wallet,
self._ledger,
self._queries,
replies=self._replies,
self._ctx = InternalContext(
agent=AgentRepresentation(
address=self.address,
name=self._name,
signing_callback=self._identity.sign_digest,
),
storage=self._storage,
ledger=self._ledger,
resolver=self._resolver,
dispenser=self._dispenser,
interval_messages=self._interval_messages,
wallet_messaging_client=self._wallet_messaging_client,
protocols=self.protocols,
logger=self._logger,
)

Expand Down Expand Up @@ -783,8 +876,6 @@ def include(self, protocol: Protocol, publish_manifest: Optional[bool] = False):

if protocol.digest is not None:
self.protocols[protocol.digest] = protocol
if self._ctx is not None:
self._ctx.update_protocols(protocol)

if publish_manifest:
self.publish_manifest(protocol.manifest())
Expand Down Expand Up @@ -866,34 +957,41 @@ def setup(self):
"""
# register the internal agent protocol
self.include(self._protocol)
self.start_message_dispenser()
self._loop.run_until_complete(self._startup())
self.start_background_tasks()
self.start_message_receivers()
self.start_interval_tasks()

def start_message_dispenser(self):
"""
Start the message dispenser.

def start_background_tasks(self):
"""
Start background tasks for the agent.
self._loop.create_task(self._dispenser.run())

def start_interval_tasks(self):
"""
Start interval tasks for the agent.

"""
# Start the interval tasks
for func, period in self._interval_handlers:
task = self._loop.create_task(_run_interval(func, self._ctx, period))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._loop.create_task(_run_interval(func, self._ctx, period))

def start_message_receivers(self):
"""
Start message receiving tasks for the agent.

"""
# start the background message queue processor
task = self._loop.create_task(self._process_message_queue())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._loop.create_task(self._process_message_queue())

# start the wallet messaging client if enabled
if self._wallet_messaging_client is not None:
for task in [
self._wallet_messaging_client.poll_server(),
self._wallet_messaging_client.process_message_queue(self._ctx),
]:
new_task = self._loop.create_task(task)
self._background_tasks.add(new_task)
new_task.add_done_callback(self._background_tasks.discard)
self._loop.create_task(task)

def run(self):
"""
Expand Down Expand Up @@ -927,24 +1025,21 @@ async def _process_message_queue(self):
)
continue

context = Context(
self._identity.address,
self.identifier,
self._name,
self._storage,
self._resolver,
self._identity,
self._wallet,
self._ledger,
self._queries,
context = ExternalContext(
agent=self._ctx.agent,
storage=self._storage,
ledger=self._ledger,
resolver=self._resolver,
dispenser=self._dispenser,
wallet_messaging_client=self._wallet_messaging_client,
logger=self._logger,
queries=self._queries,
session=session,
replies=self._replies,
interval_messages=self._interval_messages,
message_received=MsgDigest(
message=message, schema_digest=schema_digest
),
protocols=self.protocols,
logger=self._logger,
protocol=self.protocols.get(schema_digest),
)

# parse the received message
Expand Down
3 changes: 2 additions & 1 deletion python/src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import pydantic
import uvicorn
from requests.structures import CaseInsensitiveDict
from uagents.config import RESPONSE_TIME_HINT_SECONDS, get_logger
from uagents.config import RESPONSE_TIME_HINT_SECONDS
from uagents.crypto import is_user_address
from uagents.dispatch import dispatcher
from uagents.envelope import Envelope
from uagents.models import ErrorMessage
from uagents.query import enclose_response_raw
from uagents.utils import get_logger

HOST = "0.0.0.0"

Expand Down
Loading
Loading