Skip to content

Commit

Permalink
Consolidate hook management in AzureBatchOperator (#34437)
Browse files Browse the repository at this point in the history
* Consolidate hook management in AzureBatchOperator

* use AirflowProviderDeprecationWarning
  • Loading branch information
hussein-awala committed Sep 19, 2023
1 parent 7de7149 commit 5b85442
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions airflow/providers/microsoft/azure/operators/batch.py
Expand Up @@ -21,8 +21,9 @@
from typing import TYPE_CHECKING, Any, Sequence

from azure.batch import models as batch_models
from deprecated.classic import deprecated

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.batch import AzureBatchHook

Expand Down Expand Up @@ -181,7 +182,12 @@ def __init__(
@cached_property
def hook(self) -> AzureBatchHook:
"""Create and return an AzureBatchHook (cached)."""
return self.get_hook()
return AzureBatchHook(self.azure_batch_conn_id)

@deprecated(reason="use `hook` property instead.", category=AirflowProviderDeprecationWarning)
def get_hook(self) -> AzureBatchHook:
"""Create and return an AzureBatchHook."""
return self.hook

def _check_inputs(self) -> Any:
if not self.os_family and not self.vm_publisher:
Expand Down Expand Up @@ -315,10 +321,6 @@ def on_kill(self) -> None:
)
self.log.info("Azure Batch job (%s) terminated: %s", self.batch_job_id, response)

def get_hook(self) -> AzureBatchHook:
"""Create and return an AzureBatchHook."""
return AzureBatchHook(azure_batch_conn_id=self.azure_batch_conn_id)

def clean_up(self, pool_id: str | None = None, job_id: str | None = None) -> None:
"""
Delete the given pool and job in the batch account.
Expand Down

0 comments on commit 5b85442

Please sign in to comment.