-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #28 from iKonoTelecomunicaciones/bug_fix_mixed_flows
Bug fix mixed flows
- Loading branch information
Showing
8 changed files
with
155 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,99 +1,110 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from typing import TYPE_CHECKING, Dict | ||
from typing import TYPE_CHECKING, Dict, List | ||
|
||
from mautrix.types import SerializableAttrs | ||
from mautrix.util.logging import TraceLogger | ||
|
||
from .middlewares import HTTPMiddleware | ||
from .nodes import CheckTime, Email, HTTPRequest, Input, Location, Media, Message, Switch | ||
from .repository import Flow as FlowModel | ||
from .room import Room | ||
|
||
if TYPE_CHECKING: | ||
from .middlewares import HTTPMiddleware | ||
|
||
|
||
class Flow: | ||
log: TraceLogger = logging.getLogger("menuflow.flow") | ||
|
||
nodes: Dict[str, (Message, Input, Switch, HTTPRequest, CheckTime)] | ||
middlewares: Dict[str, HTTPMiddleware] | ||
nodes: List[Dict] | ||
middlewares: List[Dict] | ||
|
||
def __init__(self, flow_data: FlowModel) -> None: | ||
self.data: FlowModel = ( | ||
flow_data.serialize() if isinstance(flow_data, SerializableAttrs) else flow_data | ||
) | ||
self.nodes = {} | ||
self.middlewares = {} | ||
self.nodes = self.data.get("nodes", []) | ||
self.middlewares = self.data.get("middlewares", []) | ||
self.nodes_by_id: Dict[str, Dict] = {} | ||
self.middlewares_by_id: Dict[str, Dict] = {} | ||
|
||
def _add_node_to_cache(self, node_data: Dict): | ||
self.nodes_by_id[node_data.get("id")] = node_data | ||
|
||
def _add_middleware_to_cache(self, middleware_data: Dict): | ||
self.middlewares_by_id[middleware_data.get("id")] = middleware_data | ||
|
||
@property | ||
def flow_variables(self) -> Dict: | ||
return self.data.get("flow_variables", {}) | ||
|
||
def load(self): | ||
self.load_middlewares() | ||
self.load_nodes() | ||
|
||
def load_nodes(self): | ||
"""It takes the nodes from the flow data and creates a new node object for each one""" | ||
for node in self.data.get("nodes", []): | ||
if node.get("type") == "message": | ||
node = Message(message_node_data=node) | ||
elif node.get("type") == "media": | ||
node = Media(media_node_data=node) | ||
elif node.get("type") == "email": | ||
node = Email(email_node_data=node) | ||
elif node.get("type") == "location": | ||
node = Location(location_node_data=node) | ||
elif node.get("type") == "switch": | ||
node = Switch(switch_node_data=node) | ||
elif node.get("type") == "input": | ||
node = Input(input_node_data=node) | ||
elif node.get("type") == "check_time": | ||
node = CheckTime(check_time_node_data=node) | ||
elif node.get("type") == "http_request": | ||
node = HTTPRequest(http_request_node_data=node) | ||
|
||
if node.content.get("middleware"): | ||
node.middleware = self.get_middleware_by_id(node.content.get("middleware")) | ||
else: | ||
continue | ||
|
||
node.variables = self.flow_variables or {} | ||
self.nodes[node.id] = node | ||
|
||
def load_middlewares(self): | ||
"""It loads the middlewares from the data file into the `middlewares` dictionary""" | ||
for middleware in self.data.get("middlewares", []): | ||
middleware = HTTPMiddleware(http_middleware_data=middleware) | ||
self.middlewares[middleware.id] = middleware | ||
|
||
def get_node_by_id(self, node_id: str) -> HTTPRequest | Input | Message | Switch | CheckTime: | ||
return self.nodes.get(node_id) | ||
|
||
def get_middleware_by_id(self, middleware_id: str) -> HTTPMiddleware: | ||
return self.middlewares.get(middleware_id) | ||
|
||
def node(self, room: Room) -> HTTPRequest | Input | Message | Switch | CheckTime: | ||
"""It returns the node that should be executed next | ||
Parameters | ||
---------- | ||
room : Room | ||
The room object that the user is currently in. | ||
Returns | ||
------- | ||
The node object. | ||
""" | ||
node = self.get_node_by_id(node_id=room.node_id or "start") | ||
|
||
if not node: | ||
def get_node_by_id(self, node_id: str) -> Dict | None: | ||
node = self.nodes_by_id.get(node_id) | ||
if node: | ||
return node | ||
|
||
for node in self.nodes: | ||
if node_id == node.get("id", ""): | ||
self._add_node_to_cache(node) | ||
return node | ||
|
||
return None | ||
|
||
def get_middleware_by_id(self, middleware_id: str) -> Dict | None: | ||
middleware = self.middlewares_by_id.get(middleware_id) | ||
if middleware: | ||
return middleware | ||
|
||
for middleware in self.middlewares: | ||
if middleware_id == middleware.get("id", ""): | ||
self._add_middleware_to_cache(middleware) | ||
return middleware | ||
|
||
return None | ||
|
||
def middleware(self, middleware_id: str, room: Room) -> HTTPMiddleware: | ||
middleware_data = self.get_middleware_by_id(middleware_id=middleware_id) | ||
|
||
if not middleware_data: | ||
return | ||
|
||
middleware_initialized = HTTPMiddleware(http_middleware_data=middleware_data) | ||
middleware_initialized.room = room | ||
|
||
return middleware_initialized | ||
|
||
def node( | ||
self, room: Room | ||
) -> Message | Input | HTTPRequest | Switch | CheckTime | Media | Email | Location | None: | ||
node_data = self.get_node_by_id(node_id=room.node_id) | ||
|
||
if not node_data: | ||
return | ||
|
||
if node_data.get("type") == "message": | ||
node_initialized = Message(message_node_data=node_data) | ||
elif node_data.get("type") == "media": | ||
node_initialized = Media(media_node_data=node_data) | ||
elif node_data.get("type") == "email": | ||
node_initialized = Email(email_node_data=node_data) | ||
elif node_data.get("type") == "location": | ||
node_initialized = Location(location_node_data=node_data) | ||
elif node_data.get("type") == "switch": | ||
node_initialized = Switch(switch_node_data=node_data) | ||
elif node_data.get("type") == "input": | ||
node_initialized = Input(input_node_data=node_data) | ||
elif node_data.get("type") == "check_time": | ||
node_initialized = CheckTime(check_time_node_data=node_data) | ||
elif node_data.get("type") == "http_request": | ||
node_initialized = HTTPRequest(http_request_node_data=node_data) | ||
|
||
if node_data.get("middleware"): | ||
middleware = self.middleware(node_data.get("middleware"), room) | ||
node_initialized.middleware = middleware | ||
else: | ||
return | ||
|
||
node.room = room | ||
node.variables.update(room._variables) | ||
node_initialized.room = room | ||
node_initialized.variables = self.flow_variables or {} | ||
node_initialized.variables.update(room._variables) | ||
|
||
return node | ||
return node_initialized |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.