From 7bad6753c503b35ea9e6ffe1bdaebfaf0ade3f34 Mon Sep 17 00:00:00 2001 From: egalvis Date: Wed, 29 Nov 2023 13:47:42 -0500 Subject: [PATCH 1/3] feat(node): :sparkles: Added new node invite user --- menuflow/db/room.py | 1 + menuflow/flow.py | 5 ++ menuflow/matrix.py | 29 +++++++--- menuflow/nodes/__init__.py | 1 + menuflow/nodes/invite_user.py | 69 ++++++++++++++++++++++++ menuflow/repository/__init__.py | 1 + menuflow/repository/nodes/__init__.py | 1 + menuflow/repository/nodes/invite_user.py | 28 ++++++++++ menuflow/room.py | 2 + 9 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 menuflow/nodes/invite_user.py create mode 100644 menuflow/repository/nodes/invite_user.py diff --git a/menuflow/db/room.py b/menuflow/db/room.py index 12596ab..9f85ec7 100644 --- a/menuflow/db/room.py +++ b/menuflow/db/room.py @@ -15,6 +15,7 @@ class RoomState(Enum): START = "start" END = "end" INPUT = "input" + INVITE = "invite_user" @dataclass diff --git a/menuflow/flow.py b/menuflow/flow.py index ab66ff0..5010fe6 100644 --- a/menuflow/flow.py +++ b/menuflow/flow.py @@ -14,6 +14,7 @@ HTTPRequest, Input, InteractiveInput, + InviteUser, Leave, Location, Media, @@ -139,6 +140,10 @@ def node( node_initialized = Leave( leave_node_data=node_data, room=room, default_variables=self.flow_variables ) + elif node_data.get("type") == "invite_user": + node_initialized = InviteUser( + invite_node_data=node_data, room=room, default_variables=self.flow_variables + ) else: return diff --git a/menuflow/matrix.py b/menuflow/matrix.py index 933a62c..8ad5982 100644 --- a/menuflow/matrix.py +++ b/menuflow/matrix.py @@ -4,7 +4,6 @@ from copy import deepcopy from typing import Dict, Optional -import yaml from mautrix.client import Client as MatrixClient from mautrix.types import ( Membership, @@ -20,7 +19,6 @@ from .db.room import RoomState from .flow import Flow from .nodes import Base, Input, InteractiveInput -from .repository import Flow as FlowModel from .repository import FlowUtils from .room import Room from .user import User @@ -73,15 +71,13 @@ async def handle_member(self, evt: StrippedStateEvent) -> None: if evt.state_key == self.mxid and evt.content.membership == Membership.INVITE: await self.handle_invite(evt) - elif ( - evt.content.membership == Membership.JOIN - and prev_membership != Membership.JOIN - and evt.state_key == self.mxid - ): + elif evt.content.membership == Membership.JOIN and prev_membership != Membership.JOIN: await self.handle_join(evt) elif evt.content.membership == Membership.LEAVE: if prev_membership == Membership.JOIN: await self.handle_leave(evt) + elif prev_membership == Membership.INVITE: + await self.handle_reject_invite(evt) async def handle_invite(self, evt: StrippedStateEvent): if self.util.ignore_user(mxid=evt.sender, origin="invite") or evt.sender == self.mxid: @@ -91,6 +87,11 @@ async def handle_invite(self, evt: StrippedStateEvent): await self.join_room(evt.room_id) + async def handle_reject_invite(self, evt: StrippedStateEvent): + if evt.room_id in Room.pending_invites: + if not Room.pending_invites[evt.room_id].done(): + Room.pending_invites[evt.room_id].set_result(False) + def unlock_room(self, room_id: RoomID): self.log.debug(f"UNLOCKING ROOM... {room_id}") self.LOCKED_ROOMS.discard(room_id) @@ -145,6 +146,14 @@ async def load_room_constants(self, room_id: RoomID): await room.set_variable("customer_mxid", await room.creator) async def handle_join(self, evt: StrippedStateEvent): + if evt.room_id in Room.pending_invites: + if not Room.pending_invites[evt.room_id].done(): + Room.pending_invites[evt.room_id].set_result(True) + + # Ignore all events that are not from the bot + if not evt.state_key == self.mxid: + return + if evt.room_id in self.LOCKED_ROOMS: self.log.debug(f"Ignoring menu request in {evt.room_id} Menu locked") return @@ -194,6 +203,10 @@ async def handle_message(self, message: MessageEvent) -> None: if not room: return + if room.room_id in Room.pending_invites: + self.log.warning(f"Ignoring message in {room.room_id} pending invite") + return + await self.algorithm(room=room, evt=message) async def algorithm(self, room: Room, evt: Optional[MessageEvent] = None) -> None: @@ -223,6 +236,8 @@ async def algorithm(self, room: Room, evt: Optional[MessageEvent] = None) -> Non return else: await node.run() + if room.state == RoomState.INVITE: + return if room.state == RoomState.END: self.log.debug(f"The room {room.room_id} has terminated the flow") diff --git a/menuflow/nodes/__init__.py b/menuflow/nodes/__init__.py index 57daed8..1294903 100644 --- a/menuflow/nodes/__init__.py +++ b/menuflow/nodes/__init__.py @@ -4,6 +4,7 @@ from .http_request import HTTPRequest from .input import Input from .interactive_input import InteractiveInput +from .invite_user import InviteUser from .leave import Leave from .location import Location from .media import Media diff --git a/menuflow/nodes/invite_user.py b/menuflow/nodes/invite_user.py new file mode 100644 index 0000000..f1bb658 --- /dev/null +++ b/menuflow/nodes/invite_user.py @@ -0,0 +1,69 @@ +from asyncio import Future, create_task, get_running_loop, sleep +from datetime import datetime +from typing import Dict + +from menuflow.room import Room + +from ..db.room import RoomState +from ..repository import InviteUser as InviteUserModel +from .switch import Switch + + +class InviteUser(Switch): + def __init__( + self, invite_node_data: InviteUserModel, room: Room, default_variables: Dict + ) -> None: + super().__init__(invite_node_data, room, default_variables) + self.content = invite_node_data + + @property + def invitee(self) -> list[str]: + return self.render_data(self.content.get("invitee")) + + @property + def timeout(self) -> str: + return self.render_data(self.content.get("timeout")) + + async def _update_menu(self, case_id: str): + o_connection = await self.get_case_by_id(case_id) + await self.room.update_menu(o_connection) + await self.room.matrix_client.algorithm(room=self.room) + + async def run(self): + # Invite users to a room. + await self.room.matrix_client.invite_user(self.room.room_id, self.invitee) + await self.room.update_menu(self.id, RoomState.INVITE) + + loop = get_running_loop() + pending_invite = loop.create_future() + # Save the Future object in the pending_invites dict. + self.room.pending_invites[self.room.room_id] = pending_invite + + create_task(self.check_agent_join(pending_invite)) + + async def check_agent_join(self, pending_invite: Future): + # Check if the agent has joined the room. + loop = get_running_loop() + end_time = loop.time() + float(self.timeout) + + while True: + self.log.debug(datetime.now()) + if pending_invite.done(): + # when a join event is received, the Future object is resolved + self.log.debug("FUTURE IS DONE") + case_id = "join" if pending_invite.result() else "reject" + break + elif (loop.time() + 1.0) >= end_time: + self.log.debug("TIMEOUT COMPLETED.") + pending_invite.set_result(False) + # Remove user invitation from the room. + await self.room.matrix_client.kick_user(self.room.room_id, self.invitee) + case_id = "timeout" + break + + await sleep(1) + + if self.room.room_id in self.room.pending_invites: + del self.room.pending_invites[self.room.room_id] + + await self._update_menu(case_id) diff --git a/menuflow/repository/__init__.py b/menuflow/repository/__init__.py index 5bb3b4f..7344538 100644 --- a/menuflow/repository/__init__.py +++ b/menuflow/repository/__init__.py @@ -10,6 +10,7 @@ Input, InteractiveInput, InteractiveMessage, + InviteUser, Leave, Location, Media, diff --git a/menuflow/repository/nodes/__init__.py b/menuflow/repository/nodes/__init__.py index 5af4f7a..2ce66b1 100644 --- a/menuflow/repository/nodes/__init__.py +++ b/menuflow/repository/nodes/__init__.py @@ -3,6 +3,7 @@ from .http_request import HTTPRequest from .input import InactivityOptions, Input from .interactive_input import InteractiveInput, InteractiveMessage +from .invite_user import InviteUser from .leave import Leave from .location import Location from .media import Media diff --git a/menuflow/repository/nodes/invite_user.py b/menuflow/repository/nodes/invite_user.py new file mode 100644 index 0000000..adb4acc --- /dev/null +++ b/menuflow/repository/nodes/invite_user.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from attr import dataclass, ib + +from .switch import Case, Switch + + +@dataclass +class InviteUser(Switch): + """ + ## Invite User + Invite users to a room. + + - id: 'invite_user' + type: 'invite_user' + timeout: 5 + invitees: '{{ main_menu }}' + cases: + - id: 'join' + o_connection: 'next_node' + - id: 'reject' + o_connection: 'error_invite_user' + - id: 'timeout' + o_connection: 'timeout_invite_user' + """ + + invitees: list[str] = ib(default=None) + timeout: int = ib(default=5) diff --git a/menuflow/room.py b/menuflow/room.py index b488430..7c9fd8c 100644 --- a/menuflow/room.py +++ b/menuflow/room.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +from asyncio import Future from logging import getLogger from typing import Any, Dict, cast @@ -16,6 +17,7 @@ class Room(DBRoom): by_room_id: Dict[RoomID, "Room"] = {} + pending_invites: Dict[RoomID, Future] = {} config: Config log: TraceLogger = getLogger("menuflow.room") From 299e863407e8992326b4ca20114b80545f85fbf5 Mon Sep 17 00:00:00 2001 From: Esteban Galvis Date: Fri, 1 Dec 2023 13:33:27 -0500 Subject: [PATCH 2/3] fix: :sparkles: Filtered room events --- menuflow/example-config.yaml | 2 +- menuflow/menu.py | 27 +++++++++++++-------------- menuflow/nodes/invite_user.py | 11 +++++++++-- menuflow/room.py | 7 ++++++- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/menuflow/example-config.yaml b/menuflow/example-config.yaml index e565196..b9b3a2c 100644 --- a/menuflow/example-config.yaml +++ b/menuflow/example-config.yaml @@ -8,7 +8,7 @@ menuflow: # You can use of the fields defined in RoomEventFilter, for more information: # https://github.com/mautrix/python/blob/master/mautrix/types/filter.py#L57 room_event_filter: - limit: 5 + limit: 0 lazy_load_members: true # Ignore messages or invitations from this users, it accepts user_ids and regex diff --git a/menuflow/menu.py b/menuflow/menu.py index 537c7e1..52c0710 100644 --- a/menuflow/menu.py +++ b/menuflow/menu.py @@ -171,23 +171,22 @@ async def _start(self, try_n: int | None = 0) -> None: self.enabled = False await self.update() return - if not self.filter_id: - self.filter_id = await self.matrix_handler.create_filter( - Filter( - room=RoomFilter( - timeline=RoomEventFilter( - **self.menuflow.config["menuflow.sync.room_event_filter"] - ), - state=StateFilter( - lazy_load_members=True, - ), + self.filter_id = await self.matrix_handler.create_filter( + Filter( + room=RoomFilter( + timeline=RoomEventFilter( + **self.menuflow.config["menuflow.sync.room_event_filter"] ), - presence=EventFilter( - not_types=[EventType.PRESENCE], + state=StateFilter( + lazy_load_members=True, ), - ) + ), + presence=EventFilter( + not_types=[EventType.PRESENCE], + ), ) - await self.update() + ) + await self.update() # if self.crypto: # await self._start_crypto() self.start_sync() diff --git a/menuflow/nodes/invite_user.py b/menuflow/nodes/invite_user.py index f1bb658..5b0bc6c 100644 --- a/menuflow/nodes/invite_user.py +++ b/menuflow/nodes/invite_user.py @@ -2,10 +2,11 @@ from datetime import datetime from typing import Dict -from menuflow.room import Room +import mautrix.errors.request from ..db.room import RoomState from ..repository import InviteUser as InviteUserModel +from ..room import Room from .switch import Switch @@ -31,7 +32,13 @@ async def _update_menu(self, case_id: str): async def run(self): # Invite users to a room. - await self.room.matrix_client.invite_user(self.room.room_id, self.invitee) + try: + await self.room.matrix_client.invite_user(self.room.room_id, self.invitee) + except mautrix.errors.request.MForbidden as e: + self.log.error(e) + await self._update_menu("join") + return + await self.room.update_menu(self.id, RoomState.INVITE) loop = get_running_loop() diff --git a/menuflow/room.py b/menuflow/room.py index 7c9fd8c..b9db846 100644 --- a/menuflow/room.py +++ b/menuflow/room.py @@ -1,12 +1,14 @@ from __future__ import annotations import json -from asyncio import Future +from asyncio import Future, Lock +from collections import defaultdict from logging import getLogger from typing import Any, Dict, cast from mautrix.client import Client as MatrixClient from mautrix.types import EventType, RoomID, StateEventContent +from mautrix.util.async_getter_lock import async_getter_lock from mautrix.util.logging import TraceLogger from .config import Config @@ -18,6 +20,7 @@ class Room(DBRoom): by_room_id: Dict[RoomID, "Room"] = {} pending_invites: Dict[RoomID, Future] = {} + _async_get_locks: dict[Any, Lock] = defaultdict(lambda: Lock()) config: Config log: TraceLogger = getLogger("menuflow.room") @@ -43,6 +46,7 @@ def _add_to_cache(self) -> None: self.by_room_id[self.room_id] = self async def clean_up(self): + self.log.info(f"Cleaning up room {self.room_id}") await Util.cancel_task(task_name=self.room_id) del self.by_room_id[self.room_id] self.variables = "{}" @@ -66,6 +70,7 @@ async def creator(self) -> Dict: return created_room_event.get("creator") @classmethod + @async_getter_lock async def get_by_room_id(cls, room_id: RoomID, create: bool = True) -> "Room" | None: """It gets a room from the database, or creates one if it doesn't exist From 576f536c44320e8e52369f70b32345aa33dde497 Mon Sep 17 00:00:00 2001 From: Esteban Galvis Date: Fri, 1 Dec 2023 13:37:04 -0500 Subject: [PATCH 3/3] refactor: :recycle: Exclude no bots leave events --- menuflow/matrix.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/menuflow/matrix.py b/menuflow/matrix.py index 8ad5982..d2f2515 100644 --- a/menuflow/matrix.py +++ b/menuflow/matrix.py @@ -168,8 +168,10 @@ async def handle_join(self, evt: StrippedStateEvent): await self.algorithm(room=room) async def handle_leave(self, evt: StrippedStateEvent): - room = await Room.get_by_room_id(room_id=evt.room_id, create=False) + if evt.state_key == self.mxid: + return + room = await Room.get_by_room_id(room_id=evt.room_id, create=False) if not room: return