Skip to content

Commit

Permalink
Merge pull request #43 from iKonoTelecomunicaciones/main
Browse files Browse the repository at this point in the history
Merge all changes
  • Loading branch information
egalvis39 committed Nov 14, 2023
2 parents 02dc0c2 + 05f8c0a commit d7b2c32
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 120 deletions.
3 changes: 3 additions & 0 deletions menuflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .db import init as init_db
from .db import upgrade_table
from .email_client import EmailClient
from .events import NatsPublisher
from .flow_utils import FlowUtils
from .menu import MenuClient
from .repository.flow_utils import FlowUtils as FlowUtilsModel
Expand Down Expand Up @@ -49,6 +50,7 @@ def prepare(self) -> None:
super().prepare()
self.prepare_db()
MenuClient.init_cls(self)
NatsPublisher.init_cls(self.config)
management_api = init_api(self.config, self.loop)
self.server = MenuFlowServer(management_api, self.config, self.loop)
self.flow_utils = FlowUtils()
Expand Down Expand Up @@ -97,6 +99,7 @@ async def start(self) -> None:
asyncio.create_task(self.start_email_connections())

async def stop(self) -> None:
await NatsPublisher.close_connection()
self.add_shutdown_actions(*(menu.stop() for menu in MenuClient.cache.values()))
await super().stop()
self.log.debug("Stopping server")
Expand Down
4 changes: 4 additions & 0 deletions menuflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def _new_token() -> str:
def do_update(self, helper: ConfigUpdateHelper) -> None:
base = helper.base
copy = helper.copy
copy_dict = helper.copy_dict
copy("menuflow.ignore.messages_from")
copy("menuflow.ignore.invitations_from")
copy("menuflow.database")
Expand All @@ -20,10 +21,13 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:
copy("menuflow.timeouts.http_request")
copy("menuflow.timeouts.middlewares")
copy("menuflow.typing_notification")
copy("menuflow.send_events")
copy("server.hostname")
copy("server.port")
copy("server.public_url")
copy("server.base_path")
copy_dict("nats")
copy_dict("logging")
shared_secret = self["server.unshared_secret"]
if shared_secret is None or shared_secret == "generate":
base["server.unshared_secret"] = self._new_token()
Expand Down
5 changes: 5 additions & 0 deletions menuflow/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .base_event import BaseEvent
from .event_generator import send_node_event
from .event_types import MenuflowEventTypes, MenuflowNodeEvents
from .nats_publisher import NatsPublisher
from .node_events import NodeEntry, NodeInputData, NodeInputTimeout
45 changes: 45 additions & 0 deletions menuflow/events/base_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

import asyncio
import json
import logging

from attr import dataclass, ib
from mautrix.types import SerializableAttrs, UserID
from mautrix.util.logging import TraceLogger
from nats.js.client import JetStreamContext

from .event_types import MenuflowEventTypes, MenuflowNodeEvents
from .nats_publisher import NatsPublisher

log: TraceLogger = logging.getLogger("report.event")


@dataclass
class BaseEvent(SerializableAttrs):
event_type: MenuflowEventTypes = ib(default=None)
event: MenuflowNodeEvents = ib(default=None)
timestamp: float = ib(factory=float)
sender: UserID = ib(factory=UserID)

def send(self):
asyncio.create_task(self.send_to_nats())

async def send_to_nats(self):
jetstream: JetStreamContext = None

file = open("/data/room_events.txt", "a")
file.write(f"{json.dumps(self.serialize())}\n\n")
file.close()
log.error(f"Sending event {self.serialize()}")

_, jetstream = await NatsPublisher.get_connection()
if jetstream:
try:
subject = NatsPublisher.config["nats.subject"]
await jetstream.publish(
subject=f"{subject}.{self.event_type}",
payload=json.dumps(self.serialize()).encode(),
)
except Exception as e:
log.error(f"Error publishing event to NATS: {e}")
57 changes: 57 additions & 0 deletions menuflow/events/event_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

from datetime import datetime
from logging import getLogger
from typing import Optional

from ..config import Config
from .event_types import MenuflowEventTypes, MenuflowNodeEvents
from .node_events import NodeEntry, NodeInputData, NodeInputTimeout

log = getLogger()


def send_node_event(
event_type: MenuflowNodeEvents, config: Config, send_event: Optional[bool] = None, **kwargs
):
general_send_event = config["menuflow.send_events"]
send_node_event = send_event if send_event is not None else general_send_event
if not send_node_event:
return

if event_type == MenuflowNodeEvents.NodeEntry:
event = NodeEntry(
event_type=MenuflowEventTypes.NODE,
event=MenuflowNodeEvents.NodeEntry,
timestamp=datetime.utcnow().timestamp(),
room_id=kwargs.get("room_id"),
sender=kwargs.get("sender"),
node_type=kwargs.get("node_type"),
node_id=kwargs.get("node_id"),
o_connection=kwargs.get("o_connection"),
variables=kwargs.get("variables"),
)
elif event_type == MenuflowNodeEvents.NodeInputData:
event = NodeInputData(
event_type=MenuflowEventTypes.NODE,
event=MenuflowNodeEvents.NodeInputData,
timestamp=datetime.utcnow().timestamp(),
room_id=kwargs.get("room_id"),
sender=kwargs.get("sender"),
node_id=kwargs.get("node_id"),
o_connection=kwargs.get("o_connection"),
variables=kwargs.get("variables"),
)
elif event_type == MenuflowNodeEvents.NodeInputTimeout:
event = NodeInputTimeout(
event_type=MenuflowEventTypes.NODE,
event=MenuflowNodeEvents.NodeInputTimeout,
timestamp=datetime.utcnow().timestamp(),
room_id=kwargs.get("room_id"),
sender=kwargs.get("sender"),
node_id=kwargs.get("node_id"),
o_connection=kwargs.get("o_connection"),
variables=kwargs.get("variables"),
)

event.send()
11 changes: 11 additions & 0 deletions menuflow/events/event_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from mautrix.types import SerializableEnum


class MenuflowEventTypes(SerializableEnum):
NODE = "NODE"


class MenuflowNodeEvents(SerializableEnum):
NodeEntry = "NodeEntry"
NodeInputData = "NodeInputData"
NodeInputTimeout = "NodeInputTimeout"
49 changes: 49 additions & 0 deletions menuflow/events/nats_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging

from mautrix.util.logging import TraceLogger
from nats import connect as nats_connect
from nats.aio.client import Client as NATSClient
from nats.js.client import JetStreamContext

from ..config import Config

log: TraceLogger = logging.getLogger("menuflow.nats")


class NatsPublisher:
_nats_conn: NATSClient = None
_jetstream_conn: JetStreamContext = None
config: Config = None

@classmethod
def init_cls(cls, config: Config):
cls.config = config

@classmethod
async def get_connection(cls) -> JetStreamContext:
if not cls.config["nats.enabled"]:
return None, None

if not cls._nats_conn:
try:
cls._nats_conn, cls._jetstream_conn = await cls.nats_jetstream_connection()
except Exception as e:
log.error(f"Error connecting to NATS: {e}")

return cls._nats_conn, cls._jetstream_conn

@classmethod
async def nats_jetstream_connection(cls) -> JetStreamContext:
log.info("Connecting to NATS JetStream")
nc: NATSClient = await nats_connect(cls.config["nats.address"])
js = nc.jetstream()
subject = f"{cls.config['nats.subject']}.*"
await js.add_stream(name="menuflow", subjects=[subject])
return nc, js

@classmethod
async def close_connection(cls):
if cls._nats_conn:
log.info("Closing NATS connection")
await cls._nats_conn.close()
cls._nats_conn = None
32 changes: 32 additions & 0 deletions menuflow/events/node_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

from typing import Dict

from attr import dataclass, ib

from .base_event import BaseEvent


@dataclass
class NodeEntry(BaseEvent):
room_id: str = ib(factory=str)
node_type: str = ib(factory=str)
node_id: str = ib(factory=str)
o_connection: str = ib(default=None)
variables: Dict = ib(factory=dict)


@dataclass
class NodeInputData(BaseEvent):
room_id: str = ib(factory=str)
node_id: str = ib(factory=str)
o_connection: str = ib(factory=str)
variables: Dict = ib(factory=dict)


@dataclass
class NodeInputTimeout(BaseEvent):
room_id: str = ib(factory=str)
node_id: str = ib(factory=str)
o_connection: str = ib(factory=str)
variables: Dict = ib(factory=dict)
17 changes: 17 additions & 0 deletions menuflow/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ menuflow:
start: 1
end: 3

# Do you want that nodes generate events when they are excecuted?
send_events: true

server:
# The IP and port to listen to.
hostname: 0.0.0.0
Expand All @@ -48,6 +51,18 @@ server:
# Set to "generate" to generate and save a new token at startup.
unshared_secret: generate

# Nats configuration
nats:
enabled: false
# Nats server address
address: "nats://nats:4222"
# Nats user
user: "nats"
# Nats password
password: "nats"
# Subject to publish messages
subject: "menuflow.company_name"

# Python logging configuration.
#
# See section 16.7.2 of the Python documentation for more info:
Expand Down Expand Up @@ -77,6 +92,8 @@ logging:
level: DEBUG
aiohttp:
level: INFO
report:
level: DEBUG
root:
level: DEBUG
handlers: [file, console]
10 changes: 10 additions & 0 deletions menuflow/jinja/jinja_template.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from re import match

from fuzzywuzzy import fuzz
from jinja2 import BaseLoader, Environment
from jinja2_ansible_filters import AnsibleCoreFiltersExtension
from jinja2_matrix_filters import MatrixFiltersExtension
Expand Down Expand Up @@ -39,3 +40,12 @@
e.g
{{ match("^(0[1-9]|[12][0-9]|3[01])\s(0[1-9]|1[012])\s(19[0-9][0-9]|20[0-9][0-9])$", "14 09 1999") }}
"""

jinja_env.globals.update(
compare_ratio=lambda text, base_text: fuzz.ratio(text.lower(), base_text.lower())
)
"""
Validates if a text is similar to another text
e.g
{{ compare_ratio("Esteban Galvis", "Esteban Galvis Triana") }}
"""
1 change: 1 addition & 0 deletions menuflow/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from .media import Media
from .message import Message
from .switch import Switch
from .types import Nodes
15 changes: 15 additions & 0 deletions menuflow/nodes/check_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

import pytz

from ..events import MenuflowNodeEvents
from ..events.event_generator import send_node_event
from ..repository import CheckTime as CheckTimeModel
from ..room import Room
from ..utils import Util
from .switch import Switch
from .types import Nodes


class CheckTime(Switch):
Expand Down Expand Up @@ -59,6 +62,18 @@ async def run(self):

await self.room.update_menu(node_id=o_connection, state=None)

send_node_event(
config=self.room.config,
send_event=self.content.get("send_event"),
event_type=MenuflowNodeEvents.NodeEntry,
room_id=self.room.room_id,
sender=self.room.matrix_client.mxid,
node_type=Nodes.check_time,
node_id=self.id,
o_connection=o_connection,
variables={**self.room._variables, **self.default_variables},
)

def check_month(self, month: int) -> bool:
"""If the month are set to "*" (all months), then return True.
Otherwise, check if the current month is within the range of months
Expand Down
15 changes: 15 additions & 0 deletions menuflow/nodes/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

from ..email_client import Email as EmailMessage
from ..email_client import EmailClient
from ..events import MenuflowNodeEvents
from ..events.event_generator import send_node_event
from ..repository import Email as EmailModel
from ..room import Room
from .message import Message
from .types import Nodes


class Email(Message):
Expand Down Expand Up @@ -57,3 +60,15 @@ async def run(self):
asyncio.create_task(self.email_client.send_email(email=email))

await self._update_node()

send_node_event(
config=self.room.config,
send_event=self.content.get("send_event"),
event_type=MenuflowNodeEvents.NodeEntry,
room_id=self.room.room_id,
sender=self.room.matrix_client.mxid,
node_type=Nodes.email,
node_id=self.id,
o_connection=self.o_connection,
variables={**self.room._variables, **self.default_variables},
)
Loading

0 comments on commit d7b2c32

Please sign in to comment.