diff --git a/composio/cli/context.py b/composio/cli/context.py index af9d9fb52..d330627c3 100644 --- a/composio/cli/context.py +++ b/composio/cli/context.py @@ -2,7 +2,6 @@ CLI Context. """ -import os import typing as t from functools import update_wrapper from pathlib import Path @@ -13,11 +12,7 @@ from rich.console import Console from composio.client import Composio -from composio.constants import ( - ENV_COMPOSIO_API_KEY, - LOCAL_CACHE_DIRECTORY_NAME, - USER_DATA_FILE_NAME, -) +from composio.constants import LOCAL_CACHE_DIRECTORY_NAME, USER_DATA_FILE_NAME from composio.storage.user import UserData @@ -32,6 +27,8 @@ class Context: _cache_dir: t.Optional[Path] = None _console: t.Optional[Console] = None + using_api_key_from_env: bool = False + @property def click_ctx(self) -> click.Context: """Click runtime context.""" @@ -59,17 +56,10 @@ def user_data(self) -> UserData: path = self.cache_dir / USER_DATA_FILE_NAME if not path.exists(): self._user_data = UserData(path=path) - self._user_data.api_key = os.environ.get( - ENV_COMPOSIO_API_KEY, - self._user_data.api_key, - ) + self._user_data.store() if self._user_data is None: self._user_data = UserData.load(path=path) - self._user_data.api_key = os.environ.get( - ENV_COMPOSIO_API_KEY, - self._user_data.api_key, - ) return self._user_data @property diff --git a/composio/cli/logout.py b/composio/cli/logout.py index dd36390f8..e08fdf75a 100644 --- a/composio/cli/logout.py +++ b/composio/cli/logout.py @@ -5,6 +5,7 @@ composio logout """ + import click from composio.cli.context import Context, pass_context diff --git a/composio/cli/triggers.py b/composio/cli/triggers.py index 5fd194d66..4630b90a9 100644 --- a/composio/cli/triggers.py +++ b/composio/cli/triggers.py @@ -179,7 +179,7 @@ def _disable_trigger(context: Context, id: str) -> None: try: response = context.client.triggers.disable(id=id) if response["status"] == "success": - context.console.print(f"Enabled trigger with ID: [green]{id}[/green]") + context.console.print(f"Disabled trigger with ID: [green]{id}[/green]") return raise click.ClickException(f"Could not disable trigger with ID: {id}") except ComposioSDKError as e: diff --git a/composio/client/base.py b/composio/client/base.py index b3d24b670..bb605dd92 100644 --- a/composio/client/base.py +++ b/composio/client/base.py @@ -78,3 +78,4 @@ class BaseClient: """Composio client abstraction.""" http: HttpClient + api_key: str diff --git a/composio/client/collections.py b/composio/client/collections.py index b59f8eee4..0dfe02b87 100644 --- a/composio/client/collections.py +++ b/composio/client/collections.py @@ -3,17 +3,22 @@ """ import base64 +import json import os import time import typing as t import warnings +import pysher +import typing_extensions as te from pydantic import BaseModel, ConfigDict +from pysher.channel import Channel from composio.client.base import BaseClient, Collection from composio.client.endpoints import v1 from composio.client.enums import Action, App, Tag, Trigger from composio.client.exceptions import ComposioClientError +from composio.constants import PUSHER_CLUSTER, PUSHER_KEY from .local_handler import LocalToolHandler @@ -402,6 +407,226 @@ class FileModel(BaseModel): content: bytes +class Connection(BaseModel): + id: str + integrationId: str + clientUniqueUserId: str + status: str + + +class Metadata(BaseModel): + id: str + connectionId: str + triggerName: str + triggerData: str + triggerConfig: t.Dict[str, t.Any] + connection: Connection + + +class TriggerEventData(BaseModel): + """Trigger event payload.""" + + appName: str + payload: dict + originalPayload: t.Dict[str, t.Any] + metadata: Metadata + + clientId: t.Optional[int] = None + + +class _ChunkedTriggerEventData(BaseModel): + """Cunked trigger event data model.""" + + id: str + index: int + chunk: str + final: bool + + +class _TriggerEventFilters(te.TypedDict): + """Trigger event filterset.""" + + app_name: te.NotRequired[str] + trigger_id: te.NotRequired[str] + connection_id: te.NotRequired[str] + trigger_name: te.NotRequired[str] + entity_id: te.NotRequired[str] + integration_id: te.NotRequired[str] + + +TriggerCallback = t.Callable[[TriggerEventData], None] + + +class TriggerSubscription: + """Trigger subscription.""" + + _channel: Channel + _alive: bool + + def __init__(self) -> None: + """Initialize subscription object.""" + self._alive = False + self._chunks: t.Dict[str, t.Dict[int, str]] = {} + self._callbacks: t.List[t.Tuple[TriggerCallback, _TriggerEventFilters]] = [] + + def callback( + self, + filters: t.Optional[_TriggerEventFilters] = None, + ) -> t.Callable[[TriggerCallback], TriggerCallback]: + """Register a trigger callaback.""" + + def _wrap(f: TriggerCallback) -> TriggerCallback: + self._callbacks.append((f, filters or {})) + return f + + return _wrap + + def _validate_filter( + self, + check: t.Any, + name: str, + filters: _TriggerEventFilters, + ) -> None: + """Check if filter is provided and raise if the values does not match.""" + value = filters.get(name) + if value is None: + return + if value != check: + raise ValueError( + f"Skipping since `{name}` filter does not match the event", + ) + + def _handle_callback( + self, + callback: TriggerCallback, + data: TriggerEventData, + filters: _TriggerEventFilters, + ) -> None: + """Handle callback.""" + for name, check in ( + ("app_name", data.appName), + ("trigger_id", data.metadata.id), + ("connection_id", data.metadata.connectionId), + ("trigger_name", data.metadata.triggerName), + ("entity_id", data.metadata.connection.clientUniqueUserId), + ("integration_id", data.metadata.connection.integrationId), + ): + self._validate_filter( + check=check, + name=name, + filters=filters, + ) + callback(data) + + def handle_event(self, event: str) -> None: + """Filter events and call the callback function.""" + try: + data = TriggerEventData(**json.loads(event)) + except Exception as e: + print(f"Error decoding payload: {e}") + try: + for callback, filters in self._callbacks: + self._handle_callback( + callback=callback, + data=data, + filters=filters, + ) + except BaseException as e: + print(f"Erorr handling event `{data.metadata.id}`: {e}") + + def handle_chunked_events(self, event: str) -> None: + """Handle chunked events.""" + data = _ChunkedTriggerEventData(**json.loads(event)) + if data.id not in self._chunks: + self._chunks[data.id] = {} + + self._chunks[data.id][data.index] = data.chunk + if data.final: + _chunks = self._chunks.pop(data.id) + self.handle_event( + event="".join([_chunks[idx] for idx in sorted(_chunks)]), + ) + + def is_alive(self) -> bool: + """Check if subscription is live.""" + return self._alive + + def set_alive(self) -> None: + """Set `_alive` to True.""" + self._alive = True + + def listen(self) -> None: + """Wait infinitely.""" + while True: + time.sleep(1) + + +class _PusherClient: + """Pusher client for Composio SDK.""" + + def __init__(self, client_id: str, base_url: str, api_key: str) -> None: + """Initialize pusher client.""" + self.client_id = client_id + self.base_url = base_url + self.api_key = api_key + self.subscription = TriggerSubscription() + + def _get_connection_handler( + self, + client_id: str, + pusher: pysher.Pusher, + subscription: TriggerSubscription, + ) -> t.Callable[[str], None]: + def _connection_handler(_: str) -> None: + channel = t.cast( + Channel, + pusher.subscribe( + channel_name=f"private-{client_id}_triggers", + ), + ) + channel.bind( + event_name="trigger_to_client", + callback=subscription.handle_event, + ) + channel.bind( + event_name="chunked-trigger_to_client", + callback=subscription.handle_chunked_events, + ) + subscription.set_alive() + + return _connection_handler + + def connect(self, timeout: float = 15.0) -> TriggerSubscription: + """Connect to Pusher channel for given client ID.""" + pusher = pysher.Pusher( + key=PUSHER_KEY, + cluster=PUSHER_CLUSTER, + auth_endpoint=f"{self.base_url}/v1/client/auth/pusher_auth?fromPython=true", + auth_endpoint_headers={ + "x-api-key": self.api_key, + }, + ) + pusher.connection.bind( + "pusher:connection_established", + self._get_connection_handler( + client_id=self.client_id, + pusher=pusher, + subscription=self.subscription, + ), + ) + pusher.connect() + + # Wait for connection to get established + deadline = time.time() + timeout + while time.time() < deadline: + if self.subscription.is_alive(): + return self.subscription + time.sleep(0.5) + raise TimeoutError( + "Timed out while waiting for trigger listener to be established" + ) + + class Triggers(Collection[TriggerModel]): """Collection of triggers.""" @@ -462,12 +687,35 @@ def disable(self, id: str) -> t.Dict: :param connected_account_id: ID of the relevant connected account """ response = self._raise_if_required( - self.client.http.post( - url=str(self.endpoint.disable / id), + self.client.http.patch( + url=str(self.endpoint / "instance" / id / "status"), + json={ + "enabled": False, + }, ) ) return response.json() + def subscribe(self, timeout: float = 15.0) -> TriggerSubscription: + """Subscribe to a trigger and receive trigger events.""" + response = self._raise_if_required( + response=self.client.http.get( + url="/v1/client/auth/client_info", + ) + ) + client_id = response.json().get("client", {}).get("id") + if client_id is None: + raise ComposioClientError("Error fetching client ID") + + pusher = _PusherClient( + client_id=client_id, + base_url=self.client.http.base_url, + api_key=self.client.api_key, + ) + return pusher.connect( + timeout=timeout, + ) + class ActiveTriggerModel(BaseModel): """Active trigger data model.""" diff --git a/composio/client/http.py b/composio/client/http.py index 0c1e5e997..520d4542c 100644 --- a/composio/client/http.py +++ b/composio/client/http.py @@ -67,6 +67,6 @@ def request(url: str, **kwargs: t.Any) -> t.Any: return request def __getattribute__(self, name: str) -> t.Any: - if name in ("get", "post", "put", "delete"): + if name in ("get", "post", "put", "delete", "patch"): return self._wrap(super().__getattribute__(name)) return super().__getattribute__(name) diff --git a/composio/constants.py b/composio/constants.py index d9631ed99..61c764bfe 100644 --- a/composio/constants.py +++ b/composio/constants.py @@ -75,3 +75,13 @@ """ Composio API server base url -> web url mappings. """ + +PUSHER_KEY = "ff9f18c208855d77a152" +""" +API Key for Pusher subscriptions. +""" + +PUSHER_CLUSTER = "mt1" +""" +Name of the pusher cluster. +""" diff --git a/composio/tools/__init__.py b/composio/tools/__init__.py index 6f822c860..3abf8449c 100644 --- a/composio/tools/__init__.py +++ b/composio/tools/__init__.py @@ -14,6 +14,7 @@ ActionModel, FileModel, SuccessExecuteActionResponseModel, + TriggerSubscription, ) from composio.client.enums import Action, App, Tag from composio.client.local_handler import LocalToolHandler @@ -185,3 +186,7 @@ def get_action_schemas( items = items + remote_items return items + + def create_trigger_listener(self, timeout: float = 15.0) -> TriggerSubscription: + """Create trigger subscription.""" + return self.client.triggers.subscribe(timeout=timeout) diff --git a/composio/utils/url.py b/composio/utils/url.py index 6922bf8b6..a928f747d 100644 --- a/composio/utils/url.py +++ b/composio/utils/url.py @@ -16,10 +16,7 @@ def get_api_url_base() -> str: """Get URL for composio API Server.""" - return os.environ.get( - ENV_COMPOSIO_BASE_URL, - DEFAULT_BASE_URL, - ) + return os.environ.get(ENV_COMPOSIO_BASE_URL) or DEFAULT_BASE_URL def get_web_url(path: str) -> str: diff --git a/plugins/autogen/setup.py b/plugins/autogen/setup.py index b2d584951..f5a9aa6a9 100644 --- a/plugins/autogen/setup.py +++ b/plugins/autogen/setup.py @@ -9,7 +9,7 @@ setup( name="composio_autogen", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your Autogen agent.", @@ -22,6 +22,6 @@ "Operating System :: OS Independent", ], python_requires=">=3.9,<4", - install_requires=["composio_core===0.3.11", "pyautogen>=0.2.19"], + install_requires=["composio_core===0.3.12", "pyautogen>=0.2.19"], include_package_data=True, ) diff --git a/plugins/claude/setup.py b/plugins/claude/setup.py index 7258d52b1..1f05b32fe 100644 --- a/plugins/claude/setup.py +++ b/plugins/claude/setup.py @@ -9,7 +9,7 @@ setup( name="composio_claude", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your Claude LLMs.", @@ -22,6 +22,6 @@ "Operating System :: OS Independent", ], python_requires=">=3.9,<4", - install_requires=["composio_openai===0.3.11", "anthropic>=0.25.7"], + install_requires=["composio_openai===0.3.12", "anthropic>=0.25.7"], include_package_data=True, ) diff --git a/plugins/crew_ai/setup.py b/plugins/crew_ai/setup.py index aa8447d94..737de7134 100644 --- a/plugins/crew_ai/setup.py +++ b/plugins/crew_ai/setup.py @@ -9,7 +9,7 @@ setup( name="composio_crewai", - version="0.3.11", + version="0.3.12", author="Himanshu", author_email="himanshu@composio.dev", description="Use Composio to get an array of tools with your CrewAI agent.", @@ -22,6 +22,6 @@ "Operating System :: OS Independent", ], python_requires=">=3.9,<4", - install_requires=["composio_langchain===0.3.11"], + install_requires=["composio_langchain===0.3.12"], include_package_data=True, ) diff --git a/plugins/griptape/setup.py b/plugins/griptape/setup.py index b193493e5..90b29faf9 100644 --- a/plugins/griptape/setup.py +++ b/plugins/griptape/setup.py @@ -9,7 +9,7 @@ setup( name="composio_griptape", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your Griptape wokflow.", @@ -22,6 +22,6 @@ "Operating System :: OS Independent", ], python_requires=">=3.9,<4", - install_requires=["composio_core===0.3.11", "griptape>=0.24.2"], + install_requires=["composio_core===0.3.12", "griptape>=0.24.2"], include_package_data=True, ) diff --git a/plugins/julep/setup.py b/plugins/julep/setup.py index fc2a81acb..47e05eb8e 100644 --- a/plugins/julep/setup.py +++ b/plugins/julep/setup.py @@ -9,7 +9,7 @@ setup( name="composio_julep", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your Julep wokflow.", @@ -22,6 +22,6 @@ "Operating System :: OS Independent", ], python_requires=">=3.9,<4", - install_requires=["composio_openai===0.3.11", "julep>=0.3.2"], + install_requires=["composio_openai===0.3.12", "julep>=0.3.2"], include_package_data=True, ) diff --git a/plugins/langchain/setup.py b/plugins/langchain/setup.py index 6f47a26f2..c148ab45b 100644 --- a/plugins/langchain/setup.py +++ b/plugins/langchain/setup.py @@ -9,7 +9,7 @@ setup( name="composio_langchain", - version="0.3.11", + version="0.3.12", author="Karan", author_email="karan@composio.dev", description="Use Composio to get an array of tools with your LangChain agent.", @@ -27,7 +27,7 @@ "langchain-openai>=0.0.2.post1", "pydantic>=2.6.4", "langchainhub>=0.1.15", - "composio_core===0.3.11", + "composio_core===0.3.12", ], include_package_data=True, ) diff --git a/plugins/llamaindex/setup.py b/plugins/llamaindex/setup.py index 9808d3312..9c6dfe788 100644 --- a/plugins/llamaindex/setup.py +++ b/plugins/llamaindex/setup.py @@ -9,7 +9,7 @@ setup( name="composio_llamaindex", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your LlamaIndex agent.", @@ -24,7 +24,7 @@ python_requires=">=3.9,<4", install_requires=[ "llama_index>=0.10.43", - "composio_langchain===0.3.11", + "composio_langchain===0.3.12", ], include_package_data=True, ) diff --git a/plugins/lyzr/setup.py b/plugins/lyzr/setup.py index c022eb4e4..269697032 100644 --- a/plugins/lyzr/setup.py +++ b/plugins/lyzr/setup.py @@ -9,7 +9,7 @@ setup( name="composio_lyzr", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your Lyzr workflow.", @@ -25,7 +25,7 @@ install_requires=[ "lyzr-automata>=0.1.3", "pydantic>=2.6.4", - "composio_core===0.3.11", + "composio_core===0.3.12", "langchain>=0.1.0", ], include_package_data=True, diff --git a/plugins/openai/setup.py b/plugins/openai/setup.py index 23790f3d9..c6eda6cce 100644 --- a/plugins/openai/setup.py +++ b/plugins/openai/setup.py @@ -9,7 +9,7 @@ setup( name="composio_openai", - version="0.3.11", + version="0.3.12", author="Sawradip", author_email="sawradip@composio.dev", description="Use Composio to get an array of tools with your OpenAI Function Call.", @@ -22,6 +22,6 @@ "Operating System :: OS Independent", ], python_requires=">=3.9,<4", - install_requires=["composio_core===0.3.11"], + install_requires=["composio_core===0.3.12"], include_package_data=True, ) diff --git a/setup.py b/setup.py index c31f08779..dc8233a7a 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setup( name="composio_core", - version="0.3.11", + version="0.3.12", author="Utkarsh", author_email="utkarsh@composio.dev", description="Core package to act as a bridge between composio platform and other services.", @@ -47,13 +47,14 @@ "docker>=7.1.0", "gymnasium>=0.29.1", "pyyaml>=6.0.1", - "sentry-sdk>=2.0.0" + "sentry-sdk>=2.0.0", + "pysher==1.0.8", ], include_package_data=True, package_data={ "composio": [ "local_tools/local_workspace/config/*.yaml", - "local_tools/local_workspace/config/commands/*.sh" + "local_tools/local_workspace/config/commands/*.sh", ] - } + }, )