Skip to content

Commit

Permalink
Merge pull request #53 from hyp3rd/fix/API
Browse files Browse the repository at this point in the history
branch sync
  • Loading branch information
hyp3rd committed Dec 17, 2023
2 parents 148e448 + 553d5d6 commit d091428
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 67 deletions.
2 changes: 1 addition & 1 deletion api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
)
from .base_response_schema import BaseResponse
from .bridge_schema import BridgeResponse, BridgeResponseSchema
from .health_schema import Health, HealthHistory, HealtHistoryManager, HealthSchema
from .health_schema import Health, HealthHistory, HealthHistoryManager, HealthSchema
except ImportError as ex:
raise ex
8 changes: 5 additions & 3 deletions api/models/health_schema.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Health Schema.""" ""

from typing import Dict
from multiprocessing.managers import BaseManager
from typing import Dict

from pydantic import BaseModel
from bridge.enums import ProcessStateEnum

from bridge.config import Config
from bridge.enums import ProcessStateEnum
from bridge.logger import Logger

logger = Logger.get_logger(Config.get_instance().application.name)
Expand Down Expand Up @@ -71,5 +73,5 @@ def get_health_history(self):
return self.health_history


class HealtHistoryManager(BaseManager):
class HealthHistoryManager(BaseManager):
"""Health History Manager."""
48 changes: 15 additions & 33 deletions api/routers/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
BridgeResponseSchema,
Health,
HealthHistory,
HealtHistoryManager,
HealthHistoryManager,
HealthSchema,
)
from api.routers.health import HealthcheckSubscriber, WSConnectionManager
Expand Down Expand Up @@ -37,9 +37,9 @@ def __init__(self):
self.telegram_handler: TelegramHandler

self.dispatcher: EventDispatcher
HealtHistoryManager.register("HealthHistory", HealthHistory)
HealthHistoryManager.register("HealthHistory", HealthHistory)

self.health_history_manager_instance = HealtHistoryManager()
self.health_history_manager_instance = HealthHistoryManager()
self.health_history_manager_instance.start() # pylint: disable=consider-using-with # the server must stay alive as long as we want the shared object to be accessible
self.health_history: HealthHistory = self.health_history_manager_instance.HealthHistory() # type: ignore # pylint: disable=no-member

Expand Down Expand Up @@ -84,34 +84,6 @@ async def start(self):
process_state, pid = self.forwarder.determine_process_state()

try:
# if the pid file is empty and the process is None,
# # then start the bridge
# if pid == 0 and self.bridge_process is not ProcessStateEnum.RUNNING:
# # create a shared list of subscribers
# manager = Manager()
# # create a list of subscribers to pass to the event dispatcher and the healthcheck subscriber
# healthcheck_subscribers: ListProxy[HealthcheckSubscriber] = manager.list()

# self.ws_connection_manager = WSConnectionManager(self.health_history)

# # create the event dispatcher
# self.dispatcher = EventDispatcher(subscribers=healthcheck_subscribers)
# self.healthcheck_subscriber = HealthcheckSubscriber('healthcheck_subscriber',
# self.dispatcher,
# self.health_history,
# self.ws_connection_manager,
# self.websocket_queue)
# self.dispatcher.add_subscriber("healthcheck", self.healthcheck_subscriber)

# self.on_update = self.healthcheck_subscriber.create_on_update_decorator()

# self.bridge_process = Process(
# target=controller, args=(self.dispatcher, True, False, False,))

# # start the bridge process
# self.bridge_process.start()
# # self.bridge_process.join()

if pid == 0 or process_state not in [
ProcessStateEnum.RUNNING,
ProcessStateEnum.STARTING,
Expand Down Expand Up @@ -216,9 +188,19 @@ async def stop(self):
if process_state == ProcessStateEnum.RUNNING and pid > 0:
try:
# await run_controller(dispatcher=self.dispatcher, boot=False, background=False, stop=True)
await self.forwarder.api_controller(start_forwarding=False)
# await self.forwarder.api_controller(start_forwarding=False)
self.health_history_manager_instance.shutdown()
self.dispatcher.stop()
# self.healthcheck_subscriber.unsubscribe(
# "healthcheck", self.healthcheck_subscriber
# )
# self.healthcheck_subscriber.subscribers = []
# self.healthcheck_subscriber = None
self.dispatcher.remove_subscriber(
"healthcheck", self.healthcheck_subscriber
)
# self.dispatcher = None
await self.forwarder.api_controller(start_forwarding=False)

except asyncio.exceptions.CancelledError:
logger.info("Bridge process stopped.")
except Exception as ex: # pylint: disable=broad-except
Expand Down
80 changes: 50 additions & 30 deletions forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@

config = Config.get_instance()

# A list of tasks that should be cancelled on shutdown fron API
forwarder_tasks = [
"forwarder_task",
"shutdown_task",
"on_shutdown_task",
"bridge_start_task",
"telegram_wait_task",
"discord_wait_task",
"api_healthcheck_task",
"on_restored_connectivity_task",
]


class Forwarder(metaclass=SingletonMeta):
"""The forwarder class."""
Expand Down Expand Up @@ -86,7 +98,7 @@ def get_instance(self) -> "Forwarder":

async def api_controller(self, start_forwarding: bool = True) -> OperationStatus:
"""Run the forwarder from the API controller."""
self.logger.debug("Starting the API controller.")
self.logger.debug("API controller invoked.")

if not config.api.enabled:
self.logger.error(ERR_API_DISABLED)
Expand Down Expand Up @@ -190,9 +202,6 @@ async def __forwarder_task(self):
and not discord_client.is_ready()
):
clients = ()
else:
await self.on_shutdown()
clients = ()

def __start(self):
"""Start the bridge."""
Expand All @@ -217,7 +226,9 @@ def __start(self):
_ = self.create_pid_file()

# Create a task for the __forwarder coroutine.
__forwarder_task = self.event_loop.create_task(self.__forwarder_task())
__forwarder_task = self.event_loop.create_task(
self.__forwarder_task(), name="forwarder_task"
)

try:
if self.event_loop.is_running():
Expand Down Expand Up @@ -263,7 +274,9 @@ def __stop(self):
return

try:
self.logger.info("Stopping the %s...", config.application.name)
os.kill(pid, signal.SIGINT)

self.logger.warning(
"Sent SIGINT to the %s process with PID %s.",
config.application.name,
Expand Down Expand Up @@ -398,33 +411,42 @@ async def init_clients(self) -> Tuple[TelegramClient, discord.Client]:
if os.name != "nt" and not config.api.enabled:
for sig in (signal.SIGINT, signal.SIGTERM):
event_loop.add_signal_handler(
sig, lambda sig=sig: asyncio.create_task(self.shutdown(sig))
sig,
lambda sig=sig: asyncio.create_task(
self.shutdown(sig), name="shutdown_task"
),
) # type: ignore
if config.api.enabled:
for sig in (signal.SIGINT, signal.SIGTERM):
event_loop.add_signal_handler(
sig, lambda: asyncio.create_task(self.on_shutdown())
sig,
lambda: asyncio.create_task(
self.api_shutdown(), name="on_shutdown_task"
),
)

try:
lock = asyncio.Lock()
await lock.acquire()
bridge = Bridge(self.telegram_client, self.discord_client)
# Create tasks for starting the main logic and waiting for clients to disconnect
start_task = event_loop.create_task(bridge.start())
start_task = event_loop.create_task(
bridge.start(), name="bridge_start_task"
)
telegram_wait_task = event_loop.create_task(
self.telegram_client.run_until_disconnected() # type: ignore
self.telegram_client.run_until_disconnected(), name="telegram_wait_task" # type: ignore
)
discord_wait_task = event_loop.create_task(
self.discord_client.wait_until_ready()
self.discord_client.wait_until_ready(), name="discord_wait_task"
)
api_healthcheck_task = event_loop.create_task(
HealthHandler(
self.dispatcher, self.telegram_client, self.discord_client
).check(config.application.healthcheck_interval)
).check(config.application.healthcheck_interval),
name="api_healthcheck_task",
)
on_restored_connectivity_task = event_loop.create_task(
bridge.on_restored_connectivity()
bridge.on_restored_connectivity(), name="on_restored_connectivity_task"
)
lock.release()

Expand All @@ -445,16 +467,14 @@ async def init_clients(self) -> Tuple[TelegramClient, discord.Client]:
ex,
exc_info=config.application.debug,
)
finally:
await self.on_shutdown()

return self.telegram_client, self.discord_client

async def on_shutdown(self):
async def api_shutdown(self):
"""Shutdown the bridge."""
self.logger.info("Starting shutdown process...")
task = asyncio.current_task()
all_tasks = asyncio.all_tasks()
all_tasks = asyncio.all_tasks(self.event_loop)

try:
self.logger.info("Disconnecting Telegram client...")
Expand Down Expand Up @@ -484,26 +504,26 @@ async def on_shutdown(self):
and not running_task.done()
and not running_task.cancelled()
):
if task is not None:
self.logger.debug("Cancelling task %s...", {running_task})
if (
running_task is not None
and running_task.get_name() in forwarder_tasks
):
self.logger.debug(
"Cancelling task %s...", {running_task.get_name()}
)
try:
task.cancel()
running_task.cancel()
except Exception as ex: # pylint: disable=broad-except
self.logger.error(
"Error cancelling task %s: %s", {running_task}, {ex}
)

if not config.api.enabled:
self.logger.debug("Stopping event loop...")
asyncio.get_running_loop().stop()
else:
self.remove_pid_file()

self.remove_pid_file()
self.logger.info("Shutdown process completed.")

async def shutdown(self, sig):
"""Shutdown the application gracefully."""
self.logger.warning("shutdown received signal %s, shutting down...", {sig})
self.logger.warning("Shutdown received signal %s, shutting down...", {sig})

# Cancel all tasks
tasks = [
Expand All @@ -525,10 +545,10 @@ async def shutdown(self, sig):
if isinstance(result, Exception):
self.logger.error("Error during shutdown: %s", result)

if not config.api.enabled:
# Stop the loop
if self.event_loop is not None:
self.event_loop.stop()
# if not config.api.enabled:
# Stop the loop
if self.event_loop is not None:
self.event_loop.stop()

self.remove_pid_file()

Expand Down
Empty file modified tools/kill-server.bash
100644 → 100755
Empty file.

0 comments on commit d091428

Please sign in to comment.