From 659143d9753b1431071a94a210c0baa0435da889 Mon Sep 17 00:00:00 2001 From: Daryl Lim <5508348+daryllimyt@users.noreply.github.com> Date: Wed, 8 May 2024 00:47:56 -0700 Subject: [PATCH] refactor(engine): Cleanup binds, fstrings, printf formatting --- tracecat/api/app.py | 24 +++++++++--- tracecat/api/completions.py | 6 +-- tracecat/auth.py | 2 +- tracecat/concurrency.py | 4 +- tracecat/etl/aws_cloudtrail.py | 2 +- tracecat/integrations/_registry.py | 50 ++++++++++++------------ tracecat/llm.py | 4 +- tracecat/messaging/consumer.py | 6 +-- tracecat/messaging/producer.py | 4 +- tracecat/middleware/request.py | 5 ++- tracecat/runner/actions.py | 63 ++++++++++++++++-------------- tracecat/runner/app.py | 30 ++++++++------ tracecat/runner/events.py | 22 +++++++---- tracecat/runner/templates.py | 12 +++--- tracecat/scheduler/app.py | 14 +++---- 15 files changed, 136 insertions(+), 112 deletions(-) diff --git a/tracecat/api/app.py b/tracecat/api/app.py index 3f2cf492..8eab8ffb 100644 --- a/tracecat/api/app.py +++ b/tracecat/api/app.py @@ -26,6 +26,7 @@ authenticate_user_or_service, ) from tracecat.config import TRACECAT__APP_ENV, TRACECAT__RUNNER_URL +from tracecat.contexts import ctx_session_role from tracecat.db import ( Action, ActionRun, @@ -136,12 +137,19 @@ def create_app(**kwargs) -> FastAPI: app.add_middleware(RequestLoggingMiddleware) # TODO: Check TRACECAT__APP_ENV to set methods and headers -logger.bind(env=TRACECAT__APP_ENV, origins=cors_origins_kwargs).warning("App started") +logger.warning("App started", env=TRACECAT__APP_ENV, origins=cors_origins_kwargs) # Catch-all exception handler to prevent stack traces from leaking @app.exception_handler(Exception) async def custom_exception_handler(request: Request, exc: Exception): + logger.error( + "Unexpected error: {!s}", + exc, + role=ctx_session_role.get(), + params=request.query_params, + path=request.url.path, + ) return ORJSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"message": "An unexpected error occurred. Please try again later."}, @@ -166,7 +174,7 @@ async def check_runner_health() -> dict[str, str]: try: response.raise_for_status() except Exception as e: - logger.error(f"Error checking runner health: {e}", exc_info=True) + logger.opt(exception=e).error("Error checking runner health", error=e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error checking runner health", @@ -506,7 +514,11 @@ async def trigger_workflow_run( entrypoint_key=params.action_key, entrypoint_payload=params.payload, ) - logger.debug(f"Triggering workflow: {workflow_id = }, {workflow_params = }") + logger.debug( + "Triggering workflow", + workflow_id=workflow_id, + workflow_params=workflow_params, + ) async with AuthenticatedRunnerClient(role=service_role) as client: response = await client.post( f"/workflows/{workflow_id}", @@ -515,7 +527,7 @@ async def trigger_workflow_run( try: response.raise_for_status() except Exception as e: - logger.error(f"Error triggering workflow: {e}", exc_info=True) + logger.opt(exception=e).error("Error triggering workflow", error=e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error triggering workflow", @@ -947,7 +959,7 @@ def authenticate_webhook( try: webhook = result.one() except NoResultFound as e: - logger.error("Webhook does not exist: %s", e) + logger.opt(exception=e).error("Webhook does not exist", error=e) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found" ) from e @@ -960,7 +972,7 @@ def authenticate_webhook( try: action = result.one() except Exception as e: - logger.error("Action does not exist: %s", e) + logger.opt(exception=e).error("Action does not exist", error=e) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Resource not found" ) from e diff --git a/tracecat/api/completions.py b/tracecat/api/completions.py index e8f73e87..d9d44cb2 100644 --- a/tracecat/api/completions.py +++ b/tracecat/api/completions.py @@ -150,7 +150,7 @@ def _to_disciminated_union(cons: list[CategoryConstraint]) -> tuple[str, str]: Returns: tuple[str, str]: The discriminated union type and the supporting types """ - logger.info(f"Creating discriminated union for {cons =}") + logger.info("Creating discriminated union", cons=cons) supporting_tags = {} for tc in cons: tag = tc.tag @@ -245,8 +245,8 @@ async def stream_case_completions( output_cls=CaseMissingFieldsResponse, field_cons=field_cons, ) - logger.info("🧠 Starting case completions for %d cases...", len(cases)) - logger.bind(system_context=system_context).debug("System context") + logger.info("🧠 Starting case completions for {} cases...", len(cases)) + logger.debug("System context: {}", system_context=system_context) async def task(case: Case) -> str: prompt = f"""Case JSON Object: ```\n{case.model_dump_json()}\n```""" diff --git a/tracecat/auth.py b/tracecat/auth.py index 400d3907..bad115fd 100644 --- a/tracecat/auth.py +++ b/tracecat/auth.py @@ -270,7 +270,7 @@ async def _get_role_from_jwt(token: str | bytes) -> Role: if user_id is None: raise HTTP_EXC("No sub claim in JWT") except ExpiredSignatureError as e: - logger.error(f"ExpiredSignatureError: {e}") + logger.error("Signature expired", error=e) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session expired", diff --git a/tracecat/concurrency.py b/tracecat/concurrency.py index 9b4de4b3..d662aeda 100644 --- a/tracecat/concurrency.py +++ b/tracecat/concurrency.py @@ -17,7 +17,7 @@ def _run_serialized_fn(serialized_wrapped_fn: bytes, role: Role, /, *args, **kwa # NOTE: This is not the raw function - it is still wrapped by the `wrapper` decorator wrapped_fn: Callable[_P, Any] = cloudpickle.loads(serialized_wrapped_fn) ctx_session_role.set(role) - logger.bind(role=role).debug("Running serialized function") + logger.debug("Running serialized function", role=role) kwargs["__role"] = role res = wrapped_fn(*args, **kwargs) return res @@ -28,6 +28,6 @@ class CloudpickleProcessPoolExecutor(ProcessPoolExecutor): def submit(self, fn: Callable[_P, Any], /, *args, **kwargs): # We need to pass the role to the function running in the child process role = ctx_session_role.get() - logger.bind(role=role).debug("Serializing function") + logger.debug("Serializing function", role=role) serialized_fn = cloudpickle.dumps(fn) return super().submit(_run_serialized_fn, serialized_fn, role, *args, **kwargs) diff --git a/tracecat/etl/aws_cloudtrail.py b/tracecat/etl/aws_cloudtrail.py index 621daf88..6f383903 100644 --- a/tracecat/etl/aws_cloudtrail.py +++ b/tracecat/etl/aws_cloudtrail.py @@ -184,7 +184,7 @@ def load_cloudtrail_logs( organization_id: str | None = None, ) -> pl.LazyFrame: logger.info( - "🆗 Download AWS CloudTrail logs from: account_id=%r across regions=%s", + "🆗 Download AWS CloudTrail logs from: account_id={!r} across regions={!s}", account_id, regions, ) diff --git a/tracecat/integrations/_registry.py b/tracecat/integrations/_registry.py index c10b033e..d0b32c30 100644 --- a/tracecat/integrations/_registry.py +++ b/tracecat/integrations/_registry.py @@ -59,7 +59,7 @@ def register( """Decorator factory to register a new integration function with additional parameters.""" def decorator_register(func: FunctionType): - logger.info(f"Registering integration {func.__name__}") + logger.info("Registering integration", name=func.__name__) validate_type_constraints(func) platform = get_integration_platform(func) key = get_integration_key(func) @@ -74,25 +74,27 @@ def wrapper(*args, **kwargs): 2. Inject all secret keys into the execution environment. 3. Clean up the environment after the function has executed. """ - _secrets: list[Secret] = [] - try: - role = kwargs.pop("__role", None) - # Get secrets from the secrets API - self._logger = logger.bind(pid=os.getpid()) - self._logger.bind(key=key).info("Executing in subprocess") - - if secrets: - self._logger.bind(secrets=secrets).info("Pull secrets") - _secrets = self._get_secrets(role=role, secret_names=secrets) - self._set_secrets(_secrets) - - return func(*args, **kwargs) - except Exception as e: - self._logger.error(f"Error running integration '{key}': {e}") - raise - finally: - self._logger.info(f"Cleaning up after integration '{key}'.") - self._unset_secrets(_secrets) + secret_objs: list[Secret] = [] + role: Role = kwargs.pop("__role", None) + with logger.contextualize(user_id=role.user_id, pid=os.getpid()): + try: + # Get secrets from the secrets API + logger.info("Executing in subprocess", key=key) + + if secrets: + logger.info("Pull secrets", secrets=secrets) + secret_objs = self._get_secrets( + role=role, secret_names=secrets + ) + self._set_secrets(secret_objs) + + return func(*args, **kwargs) + except Exception as e: + logger.error("Error running integration {!r}: {!s}", key, e) + raise + finally: + logger.info("Cleaning up after integration {!r}", key) + self._unset_secrets(secret_objs) if key in self._integrations: raise ValueError(f"Integration '{key}' is already registered.") @@ -114,21 +116,19 @@ def wrapper(*args, **kwargs): def _get_secrets(self, role: Role, secret_names: list[str]) -> list[Secret]: """Retrieve secrets from the secrets API.""" - self._logger.opt(lazy=True).debug( - "Getting secrets {secret_names}", secret_names=lambda: secret_names - ) + logger.debug("Getting secrets {}", secret_names) return asyncio.run(batch_get_secrets(role, secret_names)) def _set_secrets(self, secrets: list[Secret]): """Set secrets in the environment.""" for secret in secrets: - self._logger.info(f"Setting secret {secret!r}") + logger.info("Setting secret {!r}", secret.name) for kv in secret.keys: os.environ[kv.key] = kv.value def _unset_secrets(self, secrets: list[Secret]): for secret in secrets: - self._logger.info(f"Deleting secret {secret.name!r}") + logger.info("Deleting secret {!r}", secret.name) for kv in secret.keys: del os.environ[kv.key] diff --git a/tracecat/llm.py b/tracecat/llm.py index 0592c789..b40eb947 100644 --- a/tracecat/llm.py +++ b/tracecat/llm.py @@ -66,7 +66,7 @@ def parse_choice(choice: Choice) -> str | dict[str, Any]: {"role": "user", "content": prompt}, ] - logger.info("🧠 Calling OpenAI API with model: %s...", model) + logger.info("🧠 Calling OpenAI API with {} model...", model) response: ChatCompletion = await client.chat.completions.create( # type: ignore[call-overload] model=model, response_format={"type": response_format}, @@ -76,7 +76,7 @@ def parse_choice(choice: Choice) -> str | dict[str, Any]: **kwargs, ) # TODO: Should track these metrics - logger.bind(usage=response.usage).info("🧠 Usage") + logger.info("🧠 Usage", usage=response.usage) if stream: return response diff --git a/tracecat/messaging/consumer.py b/tracecat/messaging/consumer.py index ea2e3d74..099fd130 100644 --- a/tracecat/messaging/consumer.py +++ b/tracecat/messaging/consumer.py @@ -39,10 +39,10 @@ async def prepare_queue(*, channel: Channel, exchange: str, routing_keys: list[s await queue.bind(ex, routing_key=routing_key) yield queue except Exception as e: - logger.error(f"Error in prepare_exchange: {e}", exc_info=True) + logger.opt(exception=e).error("Error in prepare_exchange", exchange=exchange) finally: # Cleanup - logger.info(f"Cleaning up exchange {exchange!r}") + logger.info("Cleaning up exchange", exchange=exchange) if queue: for routing_key in routing_keys: await queue.unbind(ex, routing_key=routing_key) @@ -101,6 +101,6 @@ async def _subscribe(): async for message in _subscribe(): yield message except Exception as e: - logger.error(f"Error in event subscription: {e}", exc_info=True) + logger.opt(exception=e).error("Error in event subscription") finally: logger.info("Closing event subscription") diff --git a/tracecat/messaging/producer.py b/tracecat/messaging/producer.py index 0c013463..300025b0 100644 --- a/tracecat/messaging/producer.py +++ b/tracecat/messaging/producer.py @@ -24,9 +24,7 @@ async def event_producer( for routing_key in routing_keys: await ex.publish(message, routing_key=routing_key) - logger.bind(routing_key=routing_key, body=message.body).debug( - "Published message" - ) + logger.debug("Published message", routing_key=routing_key, body=message.body) async def publish( diff --git a/tracecat/middleware/request.py b/tracecat/middleware/request.py index 873f23e9..c995da11 100644 --- a/tracecat/middleware/request.py +++ b/tracecat/middleware/request.py @@ -13,13 +13,14 @@ async def dispatch(self, request: Request, call_next): ) # Log the incoming request with parameters - request.app.logger.bind( + request.app.logger.debug( + "Incoming request", method=request.method, scheme=request.url.scheme, hostname=request.url.hostname, path=request.url.path, params=request_params, body=request_body, - ).debug("Request") + ) return await call_next(request) diff --git a/tracecat/runner/actions.py b/tracecat/runner/actions.py index d60aad83..79a9bda1 100644 --- a/tracecat/runner/actions.py +++ b/tracecat/runner/actions.py @@ -397,7 +397,7 @@ async def start_action_run( await emit_create_action_run_event() action_key = action_run.action_key upstream_deps_ar_ids = action_run.deps_upstream(workflow=workflow) - logger.bind(deps=upstream_deps_ar_ids).debug("Waiting for dependencies") + logger.debug("Waiting for dependencies", deps=upstream_deps_ar_ids) error_msg: str | None = None result: ActionRunResult | None = None @@ -439,19 +439,19 @@ async def start_action_run( action_trail = action_trail | {ar_id: result} action_result_store[ar_id] = action_trail run_status = "success" - logger.bind(trail=action_trail).debug("Action run completed") + logger.debug("Action run completed", trail=action_trail) except TimeoutError as e: error_msg = "Action run timed out waiting for dependencies" - logger.bind(upstream_deps=upstream_deps_ar_ids).error(error_msg, exc_info=e) + logger.opt(exception=e).error(error_msg, upstream_deps=upstream_deps_ar_ids) run_status = "failure" except asyncio.CancelledError as e: error_msg = "Action run was cancelled." - logger.warning(error_msg, exc_info=e) + logger.opt(exception=e).warning(error_msg) run_status = "canceled" except Exception as e: error_msg = f"Action run failed with error: {e}." - logger.error(error_msg, exc_info=e) + logger.opt(exception=e).error(error_msg) run_status = "failure" finally: if action_run_status_store[ar_id] != ActionRunStatus.SUCCESS: @@ -501,11 +501,12 @@ async def run_webhook_action( ) -> dict[str, Any]: """Run a webhook action.""" action_run_kwargs = action_run_kwargs or {} - logger.bind( + logger.debug( + "Perform webhook action", url=url, method=method, ar_kwargs=action_run_kwargs, - ).debug("Perform webhook action") + ) # TODO: Perform whitelist/filter step here using the url and method return { "output": action_run_kwargs, @@ -547,12 +548,13 @@ async def run_http_request_action( action_run_kwargs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run an HTTP request action.""" - logger.bind( + logger.debug( + "Perform HTTP request action", url=url, method=method, headers=headers, payload=payload, - ).debug("Perform HTTP request action") + ) try: async with httpx.AsyncClient() as client: @@ -564,7 +566,7 @@ async def run_http_request_action( ) response.raise_for_status() except httpx.HTTPStatusError as e: - logger.error(f"HTTP request failed with status {e.response.status_code}.") + logger.error("HTTP request failed with status {}.", e.response.status_code) raise return parse_http_response_data(response) @@ -576,7 +578,7 @@ async def run_conditional_action( action_run_kwargs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run a conditional action.""" - logger.bind(rules=condition_rules).debug("Perform conditional rules action") + logger.debug("Perform conditional rules action", rules=condition_rules) rule = ConditionRuleValidator.validate_python(condition_rules) rule_match = rule.evaluate() return { @@ -599,10 +601,11 @@ async def run_llm_action( action_run_kwargs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run an LLM action.""" - logger.bind( + logger.debug( + "Perform LLM action", message=message, response_schema=response_schema, - ).debug("Perform LLM action") + ) llm_kwargs = llm_kwargs or {} @@ -629,7 +632,7 @@ async def run_llm_action( generate_pydantic_json_response_schema(response_schema), ) ) - logger.debug(f"{system_context =}") + logger.debug("System context:\n{}", system_context) json_response: dict[str, Any] = await async_openai_call( prompt=message, model=model, @@ -653,12 +656,13 @@ async def run_send_email_action( action_run_kwargs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run a send email action.""" - logger.bind( + logger.debug( + "Perform send email action", sender=sender, recipients=recipients, subject=subject, body=body, - ).debug("Perform send email action") + ) if provider == "resend": email_provider = ResendMailProvider( @@ -669,7 +673,7 @@ async def run_send_email_action( ) else: msg = "Email provider not recognized" - logger.warning(f"{msg}: {provider!r}") + logger.warning("{}: {!r}", msg, provider) email_response = { "status": "error", "message": msg, @@ -695,9 +699,9 @@ async def run_send_email_action( "subject": subject, "body": body, } - except (EmailBouncedError, EmailNotFoundError) as exc: - msg = exc.args[0] - logger.warning(msg=msg, exc_info=exc) + except (EmailBouncedError, EmailNotFoundError) as e: + msg = e.args[0] + logger.opt(exception=e).warning(msg=msg, error=e) email_response = { "status": "warning", "message": msg, @@ -759,7 +763,7 @@ async def run_open_case_action( suppression=suppression, tags=tags, ) - logger.opt(lazy=True).debug("Sinking case {case}", case=lambda: case.model_dump()) + logger.opt(lazy=True).debug("Sinking case", case=lambda: case.model_dump()) try: await asyncio.to_thread(tbl.add, [case.flatten()]) except Exception as e: @@ -776,10 +780,7 @@ async def run_integration_action( action_run_kwargs: dict[str, Any] | None = None, ) -> dict[str, Any]: """Run an integration action.""" - logger.bind( - qualname=qualname, - params=params, - ).debug("Perform integration action") + logger.debug("Perform integration action", qualname=qualname, params=params) params = params or {} @@ -820,13 +821,14 @@ async def run_action( - transform: Apply a transformation to the data. """ - logger.bind( + logger.info( + "Running action", key=key, title=title, type=type, action_run_kwargs=action_run_kwargs, action_kwargs=action_kwargs, - ).info("Running action") + ) action_runner = _ACTION_RUNNER_FACTORY[type] @@ -849,8 +851,9 @@ async def run_action( workflow = ctx_workflow_run.get().workflow processed_action_kwargs.update(action_run_id=ar_id, workflow_id=workflow.id) - logger.bind(processed_action_kwargs=processed_action_kwargs).debug( - "Finish processing action kwargs" + logger.debug( + "Finish processing action kwargs", + processed_action_kwargs=processed_action_kwargs, ) try: @@ -862,7 +865,7 @@ async def run_action( **processed_action_kwargs, ) except Exception as e: - logger.bind(key=key).error("Error running action", exc_info=e) + logger.opt(exception=e).error("Error running action", key=key) raise # Leave dunder keys inside as a form of execution context diff --git a/tracecat/runner/app.py b/tracecat/runner/app.py index 82bbf0c9..4c748457 100644 --- a/tracecat/runner/app.py +++ b/tracecat/runner/app.py @@ -130,9 +130,7 @@ def create_app(**kwargs) -> FastAPI: app.add_middleware(RequestLoggingMiddleware) # TODO: Check TRACECAT__APP_ENV to set methods and headers -logger.bind(env=TRACECAT__APP_ENV, origins=cors_origins_kwargs).warning( - "Runner started" -) +logger.warning("Runner started", env=TRACECAT__APP_ENV, origins=cors_origins_kwargs) class RunnerStatus(StrEnum): @@ -181,8 +179,13 @@ async def valid_workflow(workflow_id: str) -> str: # Catch-all exception handler to prevent stack traces from leaking @app.exception_handler(Exception) async def custom_exception_handler(request: Request, exc: Exception): - role = ctx_session_role.get() - logger.opt(exception=exc).bind(role=role).error("An unexpected error occurred.") + logger.error( + "Unexpected error: {!s}", + exc, + role=ctx_session_role.get(), + params=request.query_params, + path=request.url.path, + ) return ORJSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"message": "An unexpected error occurred. Please try again later."}, @@ -210,7 +213,7 @@ async def check_api_health() -> dict[str, str]: try: response.raise_for_status() except Exception as e: - logger.error(f"Error checking API health: {e}", exc_info=True) + logger.opt(exception=e).error("Error checking API health", error=e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error checking API health", @@ -296,8 +299,8 @@ async def webhook( - Spawn a new process to handle the event. - Store the process in a queue. """ - logger.bind(entrypoint=webhook_metadata.action_key).info("Webhook hit") - logger.bind(payload=payload).debug("Webhook payload") + logger.info("Webhook hit", entrypoint=webhook_metadata.action_key) + logger.debug("Webhook payload", payload=payload) user_id = webhook_metadata.owner_id # If we are here this should be set role = Role(type="service", service_id="tracecat-runner", user_id=user_id) @@ -442,11 +445,12 @@ async def run_workflow( or action_run.id in action_result_store ): run_logger.debug( - f"Action {action_run.id!r} already running or completed. Skipping." + "Action {} already running or completed. Skipping.", + action_run.id, ) continue - run_logger.bind(ar_id=action_run.id).info("Creating action run task") + run_logger.info("Creating action run task", ar_id=action_run.id) action_run_status_store[action_run.id] = ActionRunStatus.PENDING # Schedule a new action run running_jobs_store[action_run.id] = asyncio.create_task( @@ -462,11 +466,11 @@ async def run_workflow( run_status = "success" run_logger.info("Workflow completed.") - except asyncio.CancelledError: - logger.warning("Workflow was canceled.", exc_info=True) + except asyncio.CancelledError as e: + logger.opt(exception=e).warning("Workflow was canceled.", error=e) run_status = "canceled" except Exception as e: - logger.error(f"Workflow failed: {e}", exc_info=True) + logger.opt(exception=e).error("Workflow failed", error=e) finally: logger.info("Shutting down running tasks") for running_task in running_jobs_store.values(): diff --git a/tracecat/runner/events.py b/tracecat/runner/events.py index c9acf6d0..56021a02 100644 --- a/tracecat/runner/events.py +++ b/tracecat/runner/events.py @@ -61,8 +61,11 @@ async def emit_create_workflow_run_event() -> None: routing_keys=[role.user_id], payload={"type": "workflow_run", **event.model_dump()}, ) - logger.bind(name="events.create_wfr", role=role, workflow_id=workflow_id).debug( - "Emitted event" + logger.debug( + "Emitted event", + name="events.create_wfr", + role=role, + workflow_id=workflow_id, ) @@ -98,13 +101,14 @@ async def emit_update_workflow_run_event(*, status: RunStatus) -> None: routing_keys=[role.user_id], payload={"type": "workflow_run", **event.model_dump()}, ) - logger.bind( + logger.debug( + "Emitted event", name="events.update_wfr", role=role, workflow_id=workflow_id, workflow_run_id=workflow_run_id, status=status, - ).debug("Emitted event") + ) ## Action Run Events @@ -140,11 +144,12 @@ async def emit_create_action_run_event() -> None: routing_keys=[role.user_id], payload={"type": "action_run", **event.model_dump()}, ) - logger.bind( + logger.debug( + "Emitted event", name="events.create_ar", action_id=action_id, role=role, - ).debug("Emitted event") + ) async def emit_update_action_run_event( @@ -189,10 +194,11 @@ async def emit_update_action_run_event( routing_keys=[role.user_id], payload={"type": "action_run", **event.model_dump()}, ) - logger.bind( + logger.debug( + "Emitted event", name="events.update_ar", role=role, action_id=action_id, action_run_id=action_run.id, status=status, - ).debug("Emitted event") + ) diff --git a/tracecat/runner/templates.py b/tracecat/runner/templates.py index 2cd50be3..70979c37 100644 --- a/tracecat/runner/templates.py +++ b/tracecat/runner/templates.py @@ -39,18 +39,18 @@ def _evaluate_jsonpath_str( """ jsonpath = match.group(regex_group) - logger.bind(jsonpath=jsonpath).debug("Evaluating jsonpath") + logger.debug("Evaluating jsonpath", jsonpath=jsonpath) try: jsonpath_expr = jsonpath_ng.parse(jsonpath) except JsonPathParserError as e: raise ValueError(f"Invalid jsonpath {jsonpath!r}.") from e - logger.debug(f"{jsonpath_expr = }") + logger.debug("Jsonpath expr: {}", jsonpath_expr) matches = [found.value for found in jsonpath_expr.find(action_trail)] if len(matches) == 1: - logger.debug(f"Match found for {jsonpath}: {matches[0]}.") + logger.debug("Match found for {}.", jsonpath, match=matches[0]) return str(matches[0]) elif len(matches) > 1: - logger.debug(f"Multiple matches found for {jsonpath}: {matches}.") + logger.debug("Multiple matches found for {}", jsonpath, matches=matches) return str(matches) else: # We know that if this function is called, there was a templated field. @@ -101,7 +101,7 @@ def evaluate_templated_fields( def operator(obj: T) -> str: return template_pattern.sub(jsonpath_str_evaluator, obj) - logger.bind(fields=templated_fields).debug("Evaluating templated fields") + logger.debug("Evaluating templated fields", fields=templated_fields) processed_kwargs = _evaluate_templated_dict(templated_fields, operator) return processed_kwargs @@ -217,6 +217,6 @@ async def evaluate_templated_secrets( operator = partial(_evaluate_secret_str, template_pattern=template_pattern) - logger.bind(fields=templated_fields).debug("Evaluating templated secrets") + logger.debug("Evaluating templated secrets", fields=templated_fields) processed_kwargs = await _async_evaluate_dict(templated_fields, operator) return processed_kwargs diff --git a/tracecat/scheduler/app.py b/tracecat/scheduler/app.py index f364048c..2be0343d 100644 --- a/tracecat/scheduler/app.py +++ b/tracecat/scheduler/app.py @@ -83,7 +83,9 @@ async def run_scheduled_workflows(interval_seconds: int | None = None): if should_run: logger.info( - "✅ Run scheduled workflow: id=%s cron=%r", workflow_id, cron + "✅ Run scheduled workflow: id={!s} cron={!r}", + workflow_id, + cron, ) response = await start_workflow( user_id=user_id, @@ -95,7 +97,7 @@ async def run_scheduled_workflows(interval_seconds: int | None = None): response.raise_for_status() except httpx.HTTPStatusError as e: logger.error( - "Failed to schedule workflo: id=%s cron=%r", + "Failed to schedule workflo: id={!s} cron={!r}", workflow_id, cron, exc_info=e, @@ -104,7 +106,7 @@ async def run_scheduled_workflows(interval_seconds: int | None = None): responses.append(response) else: logger.info( - "⏩ Skip scheduled workflow: id=%s cron=%r | Next run: %s", + "⏩ Skip scheduled workflow: id={!s} cron={!r} | Next run: {!s}", workflow_id, cron, next_run, @@ -115,7 +117,7 @@ async def run_scheduled_workflows(interval_seconds: int | None = None): response.raise_for_status() except httpx.HTTPStatusError as e: logger.error( - "Failed to schedule workflow: id=%s cron=%r.", + "Failed to schedule workflow: id={!s} cron={!r}", workflow_id, cron, exc_info=e, @@ -176,9 +178,7 @@ def create_app(**kwargs) -> FastAPI: ) # TODO: Check TRACECAT__APP_ENV to set methods and headers -logger.bind(env=TRACECAT__APP_ENV, origins=cors_origins_kwargs).warning( - "Scheduler started" -) +logger.warning("Scheduler started", env=TRACECAT__APP_ENV, origins=cors_origins_kwargs) @app.get("/")