Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/avoid expensive ops #338

Merged
merged 6 commits into from
Jul 4, 2023
Merged

Fix/avoid expensive ops #338

merged 6 commits into from
Jul 4, 2023

Conversation

romain-intel
Copy link
Contributor

No description provided.

@pjoshi30 pjoshi30 assigned pjoshi30 and unassigned pjoshi30 Jan 9, 2023
Copy link
Contributor Author

@romain-intel romain-intel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savingoyal : commented on the logic.

@@ -8,8 +8,8 @@

from services.utils import logging

OP_WORKER_CREATE = 'worker_create'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black modification

@@ -20,20 +20,16 @@ class CacheAsyncClient(CacheClient):
_restart_requested = False

async def start_server(self, cmdline, env):
self.logger = logging.getLogger("CacheAsyncClient:{root}".format(root=self._root))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black modification


self.logger.info("Pending stream keys: {}".format(
list(self.pending_requests)))
if self.logger.isEnabledFor(logging.INFO):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make logging cheaper to avoid evaluating argument if we don't need to.

await asyncio.wait_for(
self._proc.stdin.drain(),
timeout=WAIT_FREQUENCY)
await asyncio.wait_for(self._proc.stdin.drain(), timeout=WAIT_FREQUENCY)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

black formatting

except asyncio.TimeoutError:
self.logger.warn("StreamWriter.drain timeout, request restart: {}".format(repr(self._proc.stdin)))
self.logger.warn(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

black formatting

@@ -115,16 +80,27 @@ def select_columns(self):
# NOTE: We must use a function scope in order to be able to access the table_name variable for list comprehension.
# User should be considered NULL when 'user:*' tag is missing
# This is usually the case with AWS Step Functions
return ["{table_name}.{col} AS {col}".format(table_name=self.table_name, col=k) for k in self.keys] \
+ ["""
return (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

black format

AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)>{heartbeat_cutoff}
THEN {table_name}.last_heartbeat_ts*1000
ELSE NULL
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)<={heartbeat_threshold}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous case had:

  • when you have a HB AND you have a latest failed task AND you have not had a HB recently (within threshold), then finished_at = last_hb
  • when you have a HB AND you have no last failed task that you know of AND you have not had a HB recentlyy (within CUTOFF -- not threshold -- cutoff is larger), then finished_at = last_hb
  • else no finished at

I changed that to remove the need for latest failed task and just check the last hb. If I have not had one in threshold time, I consider that I have a finished_at.

WHEN end_attempt_ok.value IS FALSE
AND end_attempt.ts_epoch > end_attempt_ok.ts_epoch
WHEN end_attempt IS NOT NULL
AND end_attempt_ok.ts_epoch < end_attempt.ts_epoch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same type of logic as above just for the state instead of the finished_at time.

AND {table_name}.last_heartbeat_ts IS NOT NULL
THEN {table_name}.last_heartbeat_ts*1000-{table_name}.ts_epoch
WHEN end_attempt IS NOT NULL
WHEN end_attempt_ok IS NOT NULL
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bug fix, we checked for end_attempt but then used end_attempt_ok

WHEN end_attempt IS NOT NULL
AND end_attempt.ts_epoch > end_attempt_ok.ts_epoch
AND end_attempt_ok.ts_epoch < end_attempt.ts_epoch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no semantic change, I must have been twiddling things.

Copy link
Collaborator

@saikonen saikonen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some general thoughts on the proposed changes:

  1. To my understanding on argo workflows / step-functions, run level heartbeat only gets updates if there is at least one running task for the run? This might not always be the case, for example if all tasks are stuck in scheduler. A run would then falsely be marked as Failed before any task launches. It would flip back to 'running' once a task launches and updates the run heartbeat. Still have to verify this behavior.
  2. Judging from the integration test failures, all legacy cases where a run has no heartbeat will be treated as failures instead of best-effort 'running'. I think it is fine to sunset the support for non-heartbeat statuses at this point.
  3. With this feature, the run heartbeat will be treated as a primary source of truth. Some tests were in place to cover the case where a task counts as running, so the run_heartbeat should not override this for the run status. This might not actually be a relevant test case anymore after the introduction on run_hb refreshes as part of task_hb updates

We can go forward with these changes if the tradeoffs are acceptable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants