Skip to content

Commit

Permalink
Refactor: reduce some conditions in providers (#34440)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro committed Sep 19, 2023
1 parent 26f6a51 commit d20c32f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 26 deletions.
27 changes: 11 additions & 16 deletions airflow/providers/google/cloud/triggers/dataproc.py
Expand Up @@ -98,11 +98,10 @@ async def run(self):
)
state = job.status.state
self.log.info("Dataproc job: %s is in state: %s", self.job_id, state)
if state in (JobStatus.State.ERROR, JobStatus.State.DONE, JobStatus.State.CANCELLED):
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
break
elif state == JobStatus.State.ERROR:
raise AirflowException(f"Dataproc job execution failed {self.job_id}")
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
break
elif state == JobStatus.State.ERROR:
raise AirflowException(f"Dataproc job execution failed {self.job_id}")
await asyncio.sleep(self.polling_interval_seconds)
yield TriggerEvent({"job_id": self.job_id, "job_state": state})

Expand Down Expand Up @@ -316,21 +315,17 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
operation = await hook.get_operation(region=self.region, operation_name=self.name)
if operation.done:
if operation.error.message:
yield TriggerEvent(
{
"operation_name": operation.name,
"operation_done": operation.done,
"status": "error",
"message": operation.error.message,
}
)
return
status = "error"
message = operation.error.message
else:
status = "success"
message = "Operation is successfully ended."
yield TriggerEvent(
{
"operation_name": operation.name,
"operation_done": operation.done,
"status": "success",
"message": "Operation is successfully ended.",
"status": status,
"message": message,
}
)
return
Expand Down
5 changes: 1 addition & 4 deletions airflow/providers/microsoft/azure/hooks/cosmos.py
Expand Up @@ -281,10 +281,7 @@ def upsert_document(self, document, database_name=None, collection_name=None, do
raise AirflowBadRequest("You cannot insert a None document")

# Add document id if isn't found
if "id" in document:
if document["id"] is None:
document["id"] = document_id
else:
if document.get("id") is None:
document["id"] = document_id

created_document = (
Expand Down
10 changes: 4 additions & 6 deletions airflow/providers/singularity/operators/singularity.py
Expand Up @@ -155,9 +155,8 @@ def execute(self, context: Context) -> None:
self.log.info("Stopping instance %s", self.instance)
self.instance.stop() # type: ignore[attr-defined]

if self.auto_remove is True:
if self.auto_remove and os.path.exists(self.image):
shutil.rmtree(self.image)
if self.auto_remove and os.path.exists(self.image):
shutil.rmtree(self.image)

# If the container failed, raise the exception
if result["return_code"] != 0:
Expand All @@ -179,6 +178,5 @@ def on_kill(self) -> None:
self.instance.stop()

# If an image exists, clean it up
if self.auto_remove is True:
if self.auto_remove and os.path.exists(self.image):
shutil.rmtree(self.image)
if self.auto_remove and os.path.exists(self.image):
shutil.rmtree(self.image)

0 comments on commit d20c32f

Please sign in to comment.