Skip to content

Commit

Permalink
improve logging (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushforce committed Mar 6, 2023
1 parent d2cd653 commit 7d247d2
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 215 deletions.
4 changes: 0 additions & 4 deletions deeppavlov_agent/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
from .log import init_logger

STATE_API_VERSION = "0.13.0"

init_logger()
145 changes: 137 additions & 8 deletions deeppavlov_agent/core/agent.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,146 @@
import os
import logging
import asyncio
from time import time
from typing import Any
import os
from typing import Any, Dict
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path

import sentry_sdk

from .log import BaseResponseLogger
from .pipeline import Pipeline
from .state_manager import StateManager
from .workflow_manager import WorkflowManager
from .service import Service

logger = logging.getLogger(__name__)

sentry_sdk.init(os.getenv("DP_AGENT_SENTRY_DSN")) # type: ignore


class LocalResponseLogger:
_enabled: bool
_logger: logging.Logger

def __init__(self, enabled: bool, cleanup_timedelta: int = 300) -> None:
agent_path = Path(__file__).resolve().parents[1]

self._services_load: Dict[str, int] = defaultdict(int)
self._services_response_time: Dict[str, Dict[datetime, float]] = defaultdict(
dict
)
self._tasks_buffer: Dict[str, datetime] = dict()
self._enabled = enabled
self._timedelta = timedelta(seconds=cleanup_timedelta)

if self._enabled:
self._logger = logging.getLogger("service_logger")
self._logger.setLevel(logging.DEBUG)
fh = logging.FileHandler(
agent_path
/ f'logs/{datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S_%f")}.log'
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter("%(message)s"))
self._logger.addHandler(fh)

def _log(
self,
time: datetime,
task_id: str,
workflow_record: dict,
service: Service,
status: str,
) -> None:
# service_name = service.name
# dialog_id = workflow_record['dialog'].id
# self._logger.info(
# f"{time.strftime('%Y-%m-%d %H:%M:%S.%f')}\t{dialog_id}\t{task_id}\t{status}\t{service_name}")
pass

def _cleanup(self, time):
time_threshold = time - self._timedelta

for key in list(self._tasks_buffer.keys()):
if self._tasks_buffer[key] < time_threshold:
del self._tasks_buffer[key]
else:
break

for service_response_time in self._services_response_time.values():
for start_time in list(service_response_time.keys()):
if start_time < time_threshold:
del service_response_time[start_time]
else:
break

def log_start(self, task_id: str, workflow_record: dict, service: Service) -> None:
start_time = datetime.utcnow()

if service.is_input():
self._services_load["agent"] += 1
self._tasks_buffer[workflow_record["dialog"].id] = start_time
elif not service.is_responder():
self._tasks_buffer[task_id] = start_time
self._services_load[service.label] += 1

if self._enabled:
self._log(start_time, task_id, workflow_record, service, "start")

def log_end(
self, task_id: str, workflow_record: dict, service: Service, cancelled=False
) -> None:
end_time = datetime.utcnow()

if service.is_responder():
self._services_load["agent"] -= 1
start_time = self._tasks_buffer.pop(workflow_record["dialog"].id, None)
if start_time is not None and not cancelled:
self._services_response_time["agent"][start_time] = (
end_time - start_time
).total_seconds()
elif not service.is_input():
start_time = self._tasks_buffer.pop(task_id, None)
if start_time is not None:
self._services_load[service.label] -= 1
if not cancelled:
self._services_response_time[service.label][start_time] = (
end_time - start_time
).total_seconds()
self._cleanup(end_time)
if self._enabled:
self._log(end_time, task_id, workflow_record, service, "end\t")

def get_current_load(self):
self._cleanup(datetime.now())
response_time = {}
for service_name, time_dict in self._services_response_time.items():
sm = sum(time_dict.values())
ct = len(time_dict)
response_time[service_name] = sm / ct if ct else 0
response = {
"current_load": dict(self._services_load),
"response_time": response_time,
}
return response


# TODO: fix types
class Agent:
_response_logger: BaseResponseLogger
_response_logger: LocalResponseLogger

def __init__(
self,
pipeline: Pipeline,
state_manager: StateManager,
workflow_manager: WorkflowManager,
response_logger: BaseResponseLogger,
enable_response_logger: bool,
) -> None:
self.pipeline = pipeline
self.state_manager = state_manager
self.workflow_manager = workflow_manager
self._response_logger = response_logger
self._response_logger = LocalResponseLogger(enable_response_logger)

def flush_record(self, dialog_id: str):
workflow_record = self.workflow_manager.flush_record(dialog_id)
Expand Down Expand Up @@ -73,9 +185,19 @@ async def process(self, task_id, response: Any = None, **kwargs):
workflow_record, task_data = self.workflow_manager.complete_task(
task_id, response, **kwargs
)
if not workflow_record:

if task_data is None or workflow_record is None:
logger.error("Task or workflow not exist. task_id={task_id}")
return

service = task_data["service"] # type: ignore
logger.info(
(
f"Received response from '{service.name}'. "
f"task_id={task_id}; dialog_id={task_data['dialog']}; data={response}"
)
)

# self._response_logger._logger.info(f"Service {service.label}: {response}")
self._response_logger.log_end(task_id, workflow_record, service)

Expand Down Expand Up @@ -150,10 +272,17 @@ async def create_processing_tasks(self, workflow_record, next_services):
for service in next_services:
tasks = service.apply_dialog_formatter(workflow_record)
for ind, task_data in enumerate(tasks):
dialog_id = workflow_record["dialog"].id

task_id = self.workflow_manager.add_task(
workflow_record["dialog"].id, service, task_data, ind
dialog_id, service, task_data, ind
)
self._response_logger.log_start(task_id, workflow_record, service) # type: ignore

logger.info(
f"Send request to '{service.name}'. task_id={task_id}; dialog_id={dialog_id}; payload={task_data}."
)

self.workflow_manager.set_task_object(
workflow_record["dialog"].id,
task_id,
Expand Down
160 changes: 0 additions & 160 deletions deeppavlov_agent/core/log.py

This file was deleted.

0 comments on commit 7d247d2

Please sign in to comment.