Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added retry option for notebook on failure #57

Open
wants to merge 9 commits into
base: release-v0.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/genie/migrations/0003_notebookobject_retrycount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.4 on 2021-11-26 03:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('genie', '0002_runstatus_updatetimestamp'),
]

operations = [
migrations.AddField(
model_name='notebookobject',
name='retryCount',
field=models.IntegerField(default=0),
),
]
18 changes: 18 additions & 0 deletions api/genie/migrations/0004_runstatus_retryremaining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.4 on 2021-11-26 11:43

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('genie', '0003_notebookobject_retrycount'),
]

operations = [
migrations.AddField(
model_name='runstatus',
name='retryRemaining',
field=models.IntegerField(default=0),
),
]
19 changes: 19 additions & 0 deletions api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 3.2.4 on 2021-11-26 15:52

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('genie', '0004_runstatus_retryremaining'),
]

operations = [
migrations.AlterField(
model_name='notebookobject',
name='notebookTemplate',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='genie.notebooktemplate'),
),
]
4 changes: 3 additions & 1 deletion api/genie/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class RunStatus(models.Model):
workflowRun = models.ForeignKey(WorkflowRun, null=True, blank=True, on_delete=models.SET_NULL)
taskId = models.CharField(max_length=200, default="")
zeppelinServerId = models.CharField(max_length=200, default="")
retryRemaining = models.IntegerField(default=0)


# Connection Models
Expand Down Expand Up @@ -85,8 +86,9 @@ class CustomSchedule(CrontabSchedule):
class NotebookObject(models.Model):
notebookZeppelinId = models.CharField(max_length=10)
connection = models.ForeignKey(Connection, on_delete=models.CASCADE, blank=True, null=True)
notebookTemplate = models.ForeignKey(NotebookTemplate, on_delete=models.CASCADE)
notebookTemplate = models.ForeignKey(NotebookTemplate, on_delete=models.CASCADE, blank=True, null=True)
defaultPayload = models.JSONField(default={})
retryCount = models.IntegerField(default=0)


signals.pre_delete.connect(PeriodicTasks.changed, sender=NotebookJob)
Expand Down
27 changes: 26 additions & 1 deletion api/genie/services/notebookJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ def getNotebooks(offset: int = 0, limit: int = None , searchQuery: str = None, s
for notebook in notebooks:
notebook["name"] = notebook["path"]
notebookObj = next((notebookObj for notebookObj in notebookObjects if notebookObj.notebookZeppelinId == notebook["id"]), False)
notebook["retryCount"] = 0
if notebookObj:
notebook["notebookObjId"] = notebookObj.id
notebook["retryCount"] = notebookObj.retryCount
notebookJob = next((notebookJob for notebookJob in notebookJobs if notebookJob.notebookId == notebook["id"]), False)
if notebookJob:
notebook["isScheduled"] = True
Expand Down Expand Up @@ -301,6 +303,18 @@ def addNotebookJob(notebookId: str, scheduleId: int):
res.update(True, "NotebookJob added successfully", None)
return res

@staticmethod
def updateNotebookRetryCount(notebookId: str, retryCount: int):
"""
Service to add a new NotebookJob
:param notebookId: ID of the notebook for which to create job
:param scheduleId: ID of schedule
"""
res = ApiResponse()
notebookObject = NotebookObject.objects.update_or_create(notebookZeppelinId=notebookId, defaults={"retryCount":retryCount})
res.update(True, "Notebook retry count added successfully", None)
return res

@staticmethod
def deleteNotebookJob(notebookId: int):
"""
Expand All @@ -318,11 +332,22 @@ def runNotebookJob(notebookId: str):
Service to run notebook job
"""
res = ApiResponse("Error in running notebook")
runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual")
retryRemaining = NotebookJobServices.__getRetryRemaining(notebookId)
runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual", retryRemaining=retryRemaining)
runNotebookJobTask.delay(notebookId=notebookId, runStatusId=runStatus.id, runType="Manual")
res.update(True, "Notebook triggered successfully", None)
return res

@staticmethod
def __getRetryRemaining(notebookId: int):
"""
Get retry count set for given notebook in notebook object
"""
notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId)
if notebookObjects.count():
return notebookObjects[0].retryCount
return 0

@staticmethod
def stopNotebookJob(notebookId: str):
"""
Expand Down
41 changes: 39 additions & 2 deletions api/genie/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from celery import shared_task
from django.conf import settings

from genie.models import RunStatus, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED
from genie.models import RunStatus, NotebookObject, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED
from system.services import NotificationServices
from utils.zeppelinAPI import ZeppelinAPI
from utils.kubernetesAPI import Kubernetes
Expand Down Expand Up @@ -49,26 +49,49 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch
__evaluateScaleDownZeppelin()
except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")
if runStatus.retryRemaining:
__retryNotebook(runStatus)

runStatus.status = NOTEBOOK_STATUS_ERROR
runStatus.message = str(ex)
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=str(ex))
else:
logger.error(f"Error occured in notebook {notebookId}. Error: Failed to trigger notebook job")
if runStatus.retryRemaining:
__retryNotebook(runStatus)

runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message = "Failed running notebook"
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()

except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")

if runStatus.retryRemaining:
__retryNotebook(runStatus)

runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message = str(ex)
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName if notebookName else notebookId, isSuccess=False, message=str(ex))

def __retryNotebook(runStatus):
"""
Sets up job
"""
newRunStatus = RunStatus.objects.create(
notebookId=runStatus.notebookId, status=NOTEBOOK_STATUS_QUEUED, runType=runStatus.runType, workflowRun_id=runStatus.workflowRun_id, retryRemaining=runStatus.retryRemaining-1
)
response = runNotebookJob.delay(notebookId=newRunStatus.notebookId, runStatusId=newRunStatus.id)
newRunStatus.taskId = response.id
newRunStatus.save()
return newRunStatus.id


def __allocateZeppelinServer(runStatus: RunStatus):
"""
Creates or allocates a zeppelin server to run the notebook on
Expand Down Expand Up @@ -116,7 +139,8 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task
Gets or creates a notebook run status object
"""
if not runStatusId:
runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_RUNNING, runType=runType, taskId=taskId)
retryRemaining=__getRetryRemaining(notebookId)
runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_RUNNING, runType=runType, taskId=taskId, retryRemaining=retryRemaining)
else:
runStatus = RunStatus.objects.get(id=runStatusId)
runStatus.startTimestamp = dt.datetime.now()
Expand All @@ -125,6 +149,15 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task
runStatus.save()
return runStatus

def __getRetryRemaining(notebookId: int):
"""
Get retry count set for given notebook in notebook object
"""
notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId)
if notebookObjects.count():
return notebookObjects[0].retryCount
return 0

def __checkIfNotebookRunning(notebookId: str, zeppelin: ZeppelinAPI):
"""
Checks if notebook is running and returns tuple of isNotebookRunning, notebookName
Expand Down Expand Up @@ -182,11 +215,15 @@ def __setNotebookStatus(response, runStatus: RunStatus):
notebookName = response.get("name", "")
for paragraph in paragraphs:
if paragraph.get("status") != "FINISHED":
if paragraph.get("status") != "ABORT" and runStatus.retryRemaining:
__retryNotebook(runStatus)
runStatus.status=NOTEBOOK_STATUS_ABORT if paragraph.get("status") == "ABORT" else NOTEBOOK_STATUS_ERROR
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=paragraph.get("title", "") + " " + paragraph.get("id","") + " failed")
return
if not response and runStatus.retryRemaining:
__retryNotebook(runStatus)
runStatus.status=NOTEBOOK_STATUS_SUCCESS if response else NOTEBOOK_STATUS_ERROR
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
Expand Down
24 changes: 23 additions & 1 deletion api/genie/tests/test_views_notebookJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_getNotebooks(client, populate_seed_data, mocker):
response = client.get(path, content_type="application/json")
assert response.status_code == 200
assert response.data['data']["count"] == 1
assert response.data['data']["notebooks"] == [{'path': 'notebook', 'id': 'BX976MDDE', 'name': 'notebook', 'isScheduled': False, 'assignedWorkflow': []}]
assert response.data['data']["notebooks"] == [{'path': 'notebook', 'id': 'BX976MDDE', 'name': 'notebook', 'isScheduled': False, 'assignedWorkflow': [], 'retryCount': 0}]


@pytest.mark.django_db
Expand Down Expand Up @@ -104,3 +104,25 @@ def test_notebookJob(client, populate_seed_data, mocker):
assert response.status_code == 200
assert response.data['success'] == True


@pytest.mark.django_db
def test_notebookJob(client, populate_seed_data, mocker):
path = reverse('notebooksJobView')
data = {"notebookId": "BX976MDDE", "retryCount": 4}
response = client.post(path, data=data, content_type="application/json")
assert response.status_code == 200
assert response.data['success'] == True

# Test if it was updated
path = reverse('notebooks', kwargs={"offset": 0})
mocker.patch("utils.zeppelinAPI.ZeppelinAPI.getAllNotebooks", return_value = [{"path": "notebook", "id": "BX976MDDE"}])
response = client.get(path, content_type="application/json")
assert response.status_code == 200
assert response.data['data']["count"] == 1
assert response.data['data']["notebooks"][0]["retryCount"] == 4


# write test case for update retryCount view

# write test case

9 changes: 7 additions & 2 deletions api/genie/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,13 @@ def get(self, request, notebookId=None):

def post(self, request):
notebookId = request.data["notebookId"]
scheduleId = request.data["scheduleId"]
res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId)
scheduleId = request.data.get("scheduleId")
retryCount = request.data.get("retryCount", None)

if retryCount == None:
res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId)
else:
res = NotebookJobServices.updateNotebookRetryCount(notebookId=notebookId, retryCount=retryCount)
return Response(res.json())

def delete(self, request, notebookId=None):
Expand Down
32 changes: 22 additions & 10 deletions api/workflows/taskUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from utils.zeppelinAPI import Zeppelin

from genie.tasks import runNotebookJob as runNotebookJobTask
from genie.models import NOTEBOOK_STATUS_QUEUED, RunStatus, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS
from genie.models import NotebookObject, NOTEBOOK_STATUS_QUEUED, RunStatus, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS

# Get an instance of a logger
logger = logging.getLogger(__name__)
Expand All @@ -37,7 +37,7 @@ def runWorkflow(workflowId: int, taskId: str, workflowRunId: int = None):
workflowRun = TaskUtils.__getOrCreateWorkflowRun(workflowId, taskId, workflowRunId)
notebookRunStatusIds = TaskUtils.__runNotebookJobsFromList(notebookIds, workflowRun.id)
workflowStatus = polling.poll(
lambda: TaskUtils.__checkGivenRunStatuses(notebookRunStatusIds),
lambda: TaskUtils.__checkGivenRunStatuses(workflowRun.id),
check_success= lambda x: x != "RUNNING",
step=3,
timeout=3600*6,
Expand All @@ -59,15 +59,27 @@ def __runNotebookJobsFromList(notebookIds: List[int], workflowRunId: int):
"""
notebookRunStatusIds = []
for notebookId in notebookIds:
retryRemaining = TaskUtils.__getRetryRemaining(notebookId)
runStatus = RunStatus.objects.create(
notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Workflow", workflowRun_id=workflowRunId
notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Workflow", workflowRun_id=workflowRunId, retryRemaining=retryRemaining
)
response = runNotebookJobTask.delay(notebookId=notebookId, runStatusId=runStatus.id)
runStatus.taskId = response.id
runStatus.save()
notebookRunStatusIds.append(runStatus.id)
time.sleep(0.2) # Sleep for 200ms to make sure zeppelin server has been allocated to previous notebook
return notebookRunStatusIds


def __getRetryRemaining(notebookId: int):
"""
Get retry count set for given notebook in notebook object
"""
notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId)
if notebookObjects.count():
return notebookObjects[0].retryCount
return 0


@staticmethod
def __getNotebookIdsInWorkflow(workflowId: int):
Expand Down Expand Up @@ -98,14 +110,14 @@ def __getOrCreateWorkflowRun(workflowId: int, taskId: str, workflowRunId: int =
return workflowRun

@staticmethod
def __checkGivenRunStatuses(notebookRunStatusIds: List[int]):
def __checkGivenRunStatuses(workflowRunId: int):
"""
Check if given runStatuses are status is SUCCESS
"""
runningAndQueuedNotebookCount = RunStatus.objects.filter(id__in=notebookRunStatusIds).exclude(status=NOTEBOOK_STATUS_RUNNING).exclude(status=NOTEBOOK_STATUS_QUEUED).count()
if (len(notebookRunStatusIds) == runningAndQueuedNotebookCount):
successfulNotebookCount = RunStatus.objects.filter(id__in=notebookRunStatusIds, status=NOTEBOOK_STATUS_SUCCESS).count()
logger.info(f"Batch completed. Successfull Notebooks : {str(successfulNotebookCount)}. Notebooks in batch: {str(len(notebookRunStatusIds))}")
logger.info(f"Notebook Run Status Ids: {str(notebookRunStatusIds)}")
return (len(notebookRunStatusIds) == successfulNotebookCount)
runningAndQueuedNotebookCount = RunStatus.objects.filter(workflowRun_id=workflowRunId).filter(status__in=[NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_QUEUED]).count()
if not runningAndQueuedNotebookCount:
successfulNotebookCount = RunStatus.objects.filter(workflowRun_id=workflowRunId, status=NOTEBOOK_STATUS_SUCCESS).count()
logger.info(f"Batch completed. Successfull Notebooks : {str(successfulNotebookCount)}.")
# logger.info(f"Notebook Run Status Ids: {str(notebookRunStatusIds)}")
return RunStatus.objects.filter(workflowRun_id=workflowRunId).exclude(status=NOTEBOOK_STATUS_SUCCESS).filter(retryRemaining=0).count() == 0
return "RUNNING"
Loading