Skip to content

Commit

Permalink
fix(python): fixed issue in python runner where tasks repeatedly retry
Browse files Browse the repository at this point in the history
closes #221
  • Loading branch information
christopherpickering committed Aug 16, 2022
1 parent 0b1931e commit 0499fe3
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 41 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"url": "github:atlas-bi/atlas-automation-hub"
},
"scripts": {
"commit": "git add . && pre-commit run && git add . && cz --no-verify",
"commit": "git add . && pre-commit run ; git add . && cz --no-verify",
"format": "prettier --config .prettierrc \"web/**/*.{ts,css,less,scss,js,json,md,yaml,html}\" --write",
"install": "gulp build",
"test": "npm run test:eslint; npm run test:prettier",
Expand Down
25 changes: 19 additions & 6 deletions runner/scripts/em_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Optional

from runner.model import Task
from runner.scripts.em_messages import RunnerException, RunnerLog
from runner.scripts.em_messages import RunnerLog


class Cmd:
Expand Down Expand Up @@ -42,7 +42,7 @@ def shell(self) -> str:
out = out_bytes.decode("utf-8")

if "Error" in out:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
17,
Expand All @@ -54,8 +54,11 @@ def shell(self) -> str:
out,
flags=re.IGNORECASE | re.MULTILINE,
),
1,
)

raise ValueError("Error")

RunnerLog(
self.task,
self.run_id,
Expand All @@ -67,7 +70,7 @@ def shell(self) -> str:

except subprocess.CalledProcessError as e:
out = e.output.decode("utf-8")
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
17,
Expand All @@ -80,10 +83,13 @@ def shell(self) -> str:
str(e),
flags=re.IGNORECASE | re.MULTILINE,
),
1,
)

raise

except BaseException as e:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
17,
Expand All @@ -96,30 +102,37 @@ def shell(self) -> str:
str(e),
flags=re.IGNORECASE | re.MULTILINE,
),
1,
)
raise

def run(self) -> str:
"""Run input command as a subprocess command."""
try:
out = os.popen(self.cmd + " 2>&1").read()

if "Error" in out:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
17,
self.error_msg + ("\n" if out != "" else "") + out,
1,
)

raise ValueError("Error")

RunnerLog(self.task, self.run_id, 17, self.success_msg)

return out

# pylint: disable=broad-except
except BaseException as e:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
17,
self.error_msg + ("\n" if out != "" else "") + "\n" + str(e),
1,
)
raise
4 changes: 3 additions & 1 deletion runner/scripts/em_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ def __init__(
if (redis_client.zincrby(f"runner_{task.id}_attempt", 0, "inc") or 1) <= (
task.max_retries or 0
):
run_number = (
run_number = int(
redis_client.zincrby(f"runner_{task.id}_attempt", 0, "inc") or 1
)

# schedule a rerun in 5 minutes.
RunnerLog(
task,
Expand All @@ -135,6 +136,7 @@ def __init__(
requests.get(
"%s/run/%s/delay/5" % (app.config["SCHEDULER_HOST"], task.id)
)

else:
redis_client.delete(f"runner_{task.id}_attempt")

Expand Down
17 changes: 12 additions & 5 deletions runner/scripts/em_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from runner.model import Task
from runner.scripts.em_cmd import Cmd
from runner.scripts.em_messages import RunnerException
from runner.scripts.em_messages import RunnerLog
from runner.scripts.em_params import ParamLoader


Expand Down Expand Up @@ -80,13 +80,16 @@ def __build_env(self) -> None:

# pylint: disable=broad-except
except BaseException as e:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
14,
f"Failed to build environment.\n{self.base_path}\n{e}",
1,
)

raise

def __pip_install(self) -> None:
r"""Get includes from script.
Expand All @@ -96,7 +99,7 @@ def __pip_install(self) -> None:
get import (...) as ...
^\s*?import\K\s+[^\.][^\s]+?(?=\s)
get from (...) imoprt (...)
get from (...) import (...)
^\s*?from\K\s+[^\.].+?(?=import)
"""
try:
Expand Down Expand Up @@ -251,12 +254,14 @@ def __pip_install(self) -> None:
).shell()

except BaseException as e:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
14,
f"Failed to install packages.\n{self.base_path}\n{e}",
1,
)
raise

def __run_script(self) -> None:
try:
Expand Down Expand Up @@ -285,9 +290,11 @@ def __run_script(self) -> None:
).shell()

except BaseException as e:
raise RunnerException(
RunnerLog(
self.task,
self.run_id,
14,
f"Failed to build run script.\n{self.base_path}\n{e}",
1,
)
raise
28 changes: 17 additions & 11 deletions runner/scripts/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def __init__(self, task_id: int) -> None:
RunnerLog(self.task, self.run_id, 8, "Completed task!")

# remove any retry tracking
print(f"clearing redis for runner_{task_id}_attempt")
redis_client.delete(f"runner_{task_id}_attempt")
task.status_id = 4
task.est_duration = (datetime.datetime.now() - task.last_run).total_seconds()
Expand Down Expand Up @@ -559,17 +560,22 @@ def __process(self) -> None:
self.task, self.run_id, 8, f"Processing script failure:\n{e}"
)

# run processing script
output = PyProcesser(
task=self.task,
run_id=self.run_id,
directory=self.temp_path,
source_files=self.source_files,
script=self.task.processing_command or processing_script_name.name
if self.task.processing_type_id != 6 # source code
else processing_script_name.name,
params=self.param_loader,
).run()
try:
# run processing script
output = PyProcesser(
task=self.task,
run_id=self.run_id,
directory=self.temp_path,
source_files=self.source_files,
script=self.task.processing_command or processing_script_name.name
if self.task.processing_type_id != 6 # source code
else processing_script_name.name,
params=self.param_loader,
).run()
except BaseException as e:
raise RunnerException(
self.task, self.run_id, 8, f"Processing script failure:\n{e}"
)

# # allow processer to rename file
if output:
Expand Down
2 changes: 1 addition & 1 deletion scheduler/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def schedule() -> Response:
active_schedule.append(
{
"case": hour,
"count": (sum(1 for x in groups.get(hour)) if groups.get(hour) else 0), # type: ignore[union-attr]
"count": (sum(1 for x in groups.get(hour)) if groups.get(hour) else 0), # type: ignore[union-attr,misc]
}
)

Expand Down
30 changes: 14 additions & 16 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,20 @@ description = check code style
deps =
reformat
flake8
flake8-bugbear
flake8-docstrings
flake8-rst-docstrings
flake8-rst
flake8-typing-imports
flake8-builtins
pep8-naming
flake8-comprehensions
flake8-bandit
flake8-eradicate
flake8-pytest-style
flake8-print
flake8-simplify
flake8-variables-names
flake8-markdown
pygments
; flake8-bugbear
; flake8-docstrings
; flake8-typing-imports
; flake8-builtins
; pep8-naming
; flake8-comprehensions
; flake8-bandit
; flake8-eradicate
; flake8-pytest-style
; flake8-print
; flake8-simplify
; flake8-variables-names
; flake8-markdown
; pygments
black
pylint
mypy
Expand Down
3 changes: 3 additions & 0 deletions web/web/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ def sub_disable_task(task_id: int) -> None:
"""Shared function for disabling a task."""
requests.get(app.config["SCHEDULER_HOST"] + "/delete/" + str(task_id))

# also clear retry counter
redis_client.delete(f"runner_{task_id}_attempt")

task = Task.query.filter_by(id=task_id).first()
task.enabled = 0
task.next_run = None
Expand Down

0 comments on commit 0499fe3

Please sign in to comment.