Skip to content

Commit

Permalink
Merge branch 'main' into 65-improve-conversation-variables
Browse files Browse the repository at this point in the history
  • Loading branch information
egalvis39 committed Dec 6, 2023
2 parents ae2f9ef + f6284cf commit c46502e
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 21 deletions.
2 changes: 1 addition & 1 deletion menuflow/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ menuflow:
# You can use of the fields defined in RoomEventFilter, for more information:
# https://github.com/mautrix/python/blob/master/mautrix/types/filter.py#L57
room_event_filter:
limit: 5
limit: 0
lazy_load_members: true

# Ignore messages or invitations from this users, it accepts user_ids and regex
Expand Down
5 changes: 5 additions & 0 deletions menuflow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
HTTPRequest,
Input,
InteractiveInput,
InviteUser,
Leave,
Location,
Media,
Expand Down Expand Up @@ -138,6 +139,10 @@ def node(
node_initialized = Leave(
leave_node_data=node_data, room=room, default_variables=self.flow_variables
)
elif node_data.get("type") == "invite_user":
node_initialized = InviteUser(
invite_node_data=node_data, room=room, default_variables=self.flow_variables
)
else:
return

Expand Down
31 changes: 25 additions & 6 deletions menuflow/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,15 @@ async def handle_member(self, evt: StrippedStateEvent) -> None:

if evt.state_key == self.mxid and evt.content.membership == Membership.INVITE:
await self.handle_invite(evt)
elif (
evt.content.membership == Membership.JOIN
and prev_membership != Membership.JOIN
and evt.state_key == self.mxid
):
elif evt.content.membership == Membership.JOIN and prev_membership != Membership.JOIN:
await self.handle_join(evt)
elif evt.content.membership == Membership.LEAVE:
self.log.debug(f"{evt.state_key} LEFT -- EVENT LEAVE ... {evt.room_id}")
if prev_membership == Membership.JOIN:
self.log.debug(f"{evt.state_key} LEFT -- EVENT LEAVE ... {evt.room_id}")
await self.handle_leave(evt)
elif prev_membership == Membership.INVITE:
await self.handle_reject_invite(evt)

async def handle_invite(self, evt: StrippedStateEvent):
if self.util.ignore_user(mxid=evt.sender, origin="invite") or evt.sender == self.mxid:
Expand All @@ -92,6 +90,11 @@ async def handle_invite(self, evt: StrippedStateEvent):

await self.join_room(evt.room_id)

async def handle_reject_invite(self, evt: StrippedStateEvent):
if evt.room_id in Room.pending_invites:
if not Room.pending_invites[evt.room_id].done():
Room.pending_invites[evt.room_id].set_result(False)

def unlock_room(self, room_id: RoomID):
self.log.debug(f"UNLOCKING ROOM... {room_id}")
self.LOCKED_ROOMS.discard(room_id)
Expand Down Expand Up @@ -146,6 +149,14 @@ async def load_room_constants(self, room_id: RoomID):
await room.set_variable("customer_mxid", await room.creator)

async def handle_join(self, evt: StrippedStateEvent):
if evt.room_id in Room.pending_invites:
if not Room.pending_invites[evt.room_id].done():
Room.pending_invites[evt.room_id].set_result(True)

# Ignore all events that are not from the bot
if not evt.state_key == self.mxid:
return

if evt.room_id in self.LOCKED_ROOMS:
self.log.debug(f"Ignoring menu request in {evt.room_id} Menu locked")
return
Expand All @@ -161,8 +172,10 @@ async def handle_join(self, evt: StrippedStateEvent):

async def handle_leave(self, evt: StrippedStateEvent):
self.log.debug(f"{evt.state_key} LEFT -- EVENT LEAVE ... {evt.room_id}")
room = await Room.get_by_room_id(room_id=evt.room_id, bot_id=self.mxid, create=False)
if evt.state_key == self.mxid:
return

room = await Room.get_by_room_id(room_id=evt.room_id, create=False)
if not room:
return

Expand Down Expand Up @@ -196,6 +209,10 @@ async def handle_message(self, message: MessageEvent) -> None:
if not room:
return

if room.room_id in Room.pending_invites:
self.log.warning(f"Ignoring message in {room.room_id} pending invite")
return

await self.algorithm(room=room, evt=message)

async def algorithm(self, room: Room, evt: Optional[MessageEvent] = None) -> None:
Expand Down Expand Up @@ -225,6 +242,8 @@ async def algorithm(self, room: Room, evt: Optional[MessageEvent] = None) -> Non
return
else:
await node.run()
if room.state == RoomState.INVITE:
return

if room.route.state == RouteState.END:
self.log.debug(f"The room {room.room_id} has terminated the flow")
Expand Down
27 changes: 13 additions & 14 deletions menuflow/menu.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,22 @@ async def _start(self, try_n: int | None = 0) -> None:
self.enabled = False
await self.update()
return
if not self.filter_id:
self.filter_id = await self.matrix_handler.create_filter(
Filter(
room=RoomFilter(
timeline=RoomEventFilter(
**self.menuflow.config["menuflow.sync.room_event_filter"]
),
state=StateFilter(
lazy_load_members=True,
),
self.filter_id = await self.matrix_handler.create_filter(
Filter(
room=RoomFilter(
timeline=RoomEventFilter(
**self.menuflow.config["menuflow.sync.room_event_filter"]
),
presence=EventFilter(
not_types=[EventType.PRESENCE],
state=StateFilter(
lazy_load_members=True,
),
)
),
presence=EventFilter(
not_types=[EventType.PRESENCE],
),
)
await self.update()
)
await self.update()
# if self.crypto:
# await self._start_crypto()
self.start_sync()
Expand Down
1 change: 1 addition & 0 deletions menuflow/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .http_request import HTTPRequest
from .input import Input
from .interactive_input import InteractiveInput
from .invite_user import InviteUser
from .leave import Leave
from .location import Location
from .media import Media
Expand Down
76 changes: 76 additions & 0 deletions menuflow/nodes/invite_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from asyncio import Future, create_task, get_running_loop, sleep
from datetime import datetime
from typing import Dict

import mautrix.errors.request

from ..db.room import RoomState
from ..repository import InviteUser as InviteUserModel
from ..room import Room
from .switch import Switch


class InviteUser(Switch):
def __init__(
self, invite_node_data: InviteUserModel, room: Room, default_variables: Dict
) -> None:
super().__init__(invite_node_data, room, default_variables)
self.content = invite_node_data

@property
def invitee(self) -> list[str]:
return self.render_data(self.content.get("invitee"))

@property
def timeout(self) -> str:
return self.render_data(self.content.get("timeout"))

async def _update_menu(self, case_id: str):
o_connection = await self.get_case_by_id(case_id)
await self.room.update_menu(o_connection)
await self.room.matrix_client.algorithm(room=self.room)

async def run(self):
# Invite users to a room.
try:
await self.room.matrix_client.invite_user(self.room.room_id, self.invitee)
except mautrix.errors.request.MForbidden as e:
self.log.error(e)
await self._update_menu("join")
return

await self.room.update_menu(self.id, RoomState.INVITE)

loop = get_running_loop()
pending_invite = loop.create_future()
# Save the Future object in the pending_invites dict.
self.room.pending_invites[self.room.room_id] = pending_invite

create_task(self.check_agent_join(pending_invite))

async def check_agent_join(self, pending_invite: Future):
# Check if the agent has joined the room.
loop = get_running_loop()
end_time = loop.time() + float(self.timeout)

while True:
self.log.debug(datetime.now())
if pending_invite.done():
# when a join event is received, the Future object is resolved
self.log.debug("FUTURE IS DONE")
case_id = "join" if pending_invite.result() else "reject"
break
elif (loop.time() + 1.0) >= end_time:
self.log.debug("TIMEOUT COMPLETED.")
pending_invite.set_result(False)
# Remove user invitation from the room.
await self.room.matrix_client.kick_user(self.room.room_id, self.invitee)
case_id = "timeout"
break

await sleep(1)

if self.room.room_id in self.room.pending_invites:
del self.room.pending_invites[self.room.room_id]

await self._update_menu(case_id)
1 change: 1 addition & 0 deletions menuflow/repository/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Input,
InteractiveInput,
InteractiveMessage,
InviteUser,
Leave,
Location,
Media,
Expand Down
1 change: 1 addition & 0 deletions menuflow/repository/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .http_request import HTTPRequest
from .input import InactivityOptions, Input
from .interactive_input import InteractiveInput, InteractiveMessage
from .invite_user import InviteUser
from .leave import Leave
from .location import Location
from .media import Media
Expand Down
28 changes: 28 additions & 0 deletions menuflow/repository/nodes/invite_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

from attr import dataclass, ib

from .switch import Case, Switch


@dataclass
class InviteUser(Switch):
"""
## Invite User
Invite users to a room.
- id: 'invite_user'
type: 'invite_user'
timeout: 5
invitees: '{{ main_menu }}'
cases:
- id: 'join'
o_connection: 'next_node'
- id: 'reject'
o_connection: 'error_invite_user'
- id: 'timeout'
o_connection: 'timeout_invite_user'
"""

invitees: list[str] = ib(default=None)
timeout: int = ib(default=5)
6 changes: 6 additions & 0 deletions menuflow/room.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import json
from asyncio import Future, Lock
from collections import defaultdict
from logging import getLogger
from typing import Any, Dict, Optional, cast

from mautrix.client import Client as MatrixClient
from mautrix.types import EventType, RoomID, StateEventContent, UserID
from mautrix.util.async_getter_lock import async_getter_lock
from mautrix.util.logging import TraceLogger

from .config import Config
Expand All @@ -16,6 +19,8 @@

class Room(DBRoom):
by_room_id: Dict[RoomID, "Room"] = {}
pending_invites: Dict[RoomID, Future] = {}
_async_get_locks: dict[Any, Lock] = defaultdict(lambda: Lock())

config: Config
log: TraceLogger = getLogger("menuflow.room")
Expand Down Expand Up @@ -52,6 +57,7 @@ def all_variables(self) -> Dict:
return self._variables | self.route._variables

@classmethod
@async_getter_lock
async def get_by_room_id(
cls, room_id: RoomID, bot_id: UserID, create: bool = True
) -> "Room" | None:
Expand Down

0 comments on commit c46502e

Please sign in to comment.