Skip to content

Commit

Permalink
fix: improve parsing bytewax job status
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
  • Loading branch information
sudohainguyen committed Nov 6, 2023
1 parent 7dadf56 commit 4eb3209
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from typing import Optional

from kubernetes import client

from feast.infra.materialization.batch_materialization_engine import (
MaterializationJob,
MaterializationJobStatus,
)
from kubernetes import client


class BytewaxMaterializationJob(MaterializationJob):
Expand Down Expand Up @@ -35,13 +34,22 @@ def status(self):
if job_status.active is not None:
if job_status.completion_time is None:
return MaterializationJobStatus.RUNNING
elif job_status.failed is not None:
self._error = Exception(f"Job {self.job_id()} failed")
return MaterializationJobStatus.ERROR
elif job_status.active is None:
if job_status.completion_time is not None:
if job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED
else:
if (
job_status.completion_time is not None
and job_status.conditions[0].type == "Complete"
):
return MaterializationJobStatus.SUCCEEDED

if (
job_status.conditions is not None
and job_status.conditions[0].type == "Failed"
):
self._error = Exception(
f"Job {self.job_id()} failed with reason: "
f"{job_status.condition[0].message}"
)
return MaterializationJobStatus.ERROR
return MaterializationJobStatus.WAITING

def should_be_retried(self):
Expand Down

0 comments on commit 4eb3209

Please sign in to comment.