Skip to content

Commit

Permalink
Organize Tableau classes (#23353)
Browse files Browse the repository at this point in the history
* Organize Tableau classes
  • Loading branch information
eladkal committed Apr 29, 2022
1 parent 3f98450 commit 9132baf
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 58 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/salesforce/sensors/tableau_job_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import warnings

from airflow.providers.tableau.sensors.tableau_job_status import ( # noqa
from airflow.providers.tableau.sensors.tableau import ( # noqa
TableauJobFailedException,
TableauJobStatusSensor,
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/tableau/example_dags/example_tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow import DAG
from airflow.providers.tableau.operators.tableau import TableauOperator
from airflow.providers.tableau.sensors.tableau_job_status import TableauJobStatusSensor
from airflow.providers.tableau.sensors.tableau import TableauJobStatusSensor

with DAG(
dag_id='example_tableau',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow import DAG
from airflow.providers.tableau.operators.tableau import TableauOperator
from airflow.providers.tableau.sensors.tableau_job_status import TableauJobStatusSensor
from airflow.providers.tableau.sensors.tableau import TableauJobStatusSensor

with DAG(
dag_id='example_tableau_refresh_workbook',
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/tableau/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ sensors:
- integration-name: Tableau
python-modules:
- airflow.providers.tableau.sensors.tableau_job_status
- airflow.providers.tableau.sensors.tableau

hooks:
- integration-name: Tableau
Expand Down
72 changes: 72 additions & 0 deletions airflow/providers/tableau/sensors/tableau.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Optional, Sequence

from airflow.providers.tableau.hooks.tableau import (
TableauHook,
TableauJobFailedException,
TableauJobFinishCode,
)
from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
from airflow.utils.context import Context


class TableauJobStatusSensor(BaseSensorOperator):
"""
Watches the status of a Tableau Server Job.
.. seealso:: https://tableau.github.io/server-client-python/docs/api-ref#jobs
:param job_id: Id of the job to watch.
:param site_id: The id of the site where the workbook belongs to.
:param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>`
containing the credentials to authenticate to the Tableau Server.
"""

template_fields: Sequence[str] = ('job_id',)

def __init__(
self,
*,
job_id: str,
site_id: Optional[str] = None,
tableau_conn_id: str = 'tableau_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.tableau_conn_id = tableau_conn_id
self.job_id = job_id
self.site_id = site_id

def poke(self, context: 'Context') -> bool:
"""
Pokes until the job has successfully finished.
:param context: The task context during execution.
:return: True if it succeeded and False if not.
:rtype: bool
"""
with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
finish_code = tableau_hook.get_job_status(job_id=self.job_id)
self.log.info('Current finishCode is %s (%s)', finish_code.name, finish_code.value)

if finish_code in (TableauJobFinishCode.ERROR, TableauJobFinishCode.CANCELED):
raise TableauJobFailedException('The Tableau Refresh Workbook Job failed!')

return finish_code == TableauJobFinishCode.SUCCESS
60 changes: 8 additions & 52 deletions airflow/providers/tableau/sensors/tableau_job_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Optional, Sequence

from airflow.providers.tableau.hooks.tableau import (
TableauHook,
TableauJobFailedException,
TableauJobFinishCode,
)
from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
from airflow.utils.context import Context


class TableauJobStatusSensor(BaseSensorOperator):
"""
Watches the status of a Tableau Server Job.
.. seealso:: https://tableau.github.io/server-client-python/docs/api-ref#jobs
"""This module is deprecated. Please use :mod:`airflow.providers.tableau.sensors.tableau`."""

:param job_id: Id of the job to watch.
:param site_id: The id of the site where the workbook belongs to.
:param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>`
containing the credentials to authenticate to the Tableau Server.
"""
import warnings

template_fields: Sequence[str] = ('job_id',)
from airflow.providers.tableau.sensors.tableau import TableauJobStatusSensor # noqa

def __init__(
self,
*,
job_id: str,
site_id: Optional[str] = None,
tableau_conn_id: str = 'tableau_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.tableau_conn_id = tableau_conn_id
self.job_id = job_id
self.site_id = site_id

def poke(self, context: 'Context') -> bool:
"""
Pokes until the job has successfully finished.
:param context: The task context during execution.
:return: True if it succeeded and False if not.
:rtype: bool
"""
with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
finish_code = tableau_hook.get_job_status(job_id=self.job_id)
self.log.info('Current finishCode is %s (%s)', finish_code.name, finish_code.value)

if finish_code in (TableauJobFinishCode.ERROR, TableauJobFinishCode.CANCELED):
raise TableauJobFailedException('The Tableau Refresh Workbook Job failed!')

return finish_code == TableauJobFinishCode.SUCCESS
warnings.warn(
"This module is deprecated. Please use `airflow.providers.tableau.sensors.tableau`.",
DeprecationWarning,
stacklevel=2,
)
1 change: 1 addition & 0 deletions dev/provider_packages/prepare_provider_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2233,6 +2233,7 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin
'`airflow.providers.amazon.aws.operators.redshift_cluster` as appropriate.',
'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.redshift_cluster`.',
"This module is deprecated. Please use airflow.providers.amazon.aws.transfers.sql_to_s3`.",
"This module is deprecated. Please use `airflow.providers.tableau.sensors.tableau`.",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pytest
from parameterized import parameterized

from airflow.providers.tableau.sensors.tableau_job_status import (
from airflow.providers.tableau.sensors.tableau import (
TableauJobFailedException,
TableauJobFinishCode,
TableauJobStatusSensor,
Expand All @@ -36,7 +36,7 @@ class TestTableauJobStatusSensor(unittest.TestCase):
def setUp(self):
self.kwargs = {'job_id': 'job_2', 'site_id': 'test_site', 'task_id': 'task', 'dag': None}

@patch('airflow.providers.tableau.sensors.tableau_job_status.TableauHook')
@patch('airflow.providers.tableau.sensors.tableau.TableauHook')
def test_poke(self, mock_tableau_hook):
"""
Test poke
Expand All @@ -51,7 +51,7 @@ def test_poke(self, mock_tableau_hook):
mock_tableau_hook.get_job_status.assert_called_once_with(job_id=sensor.job_id)

@parameterized.expand([(TableauJobFinishCode.ERROR,), (TableauJobFinishCode.CANCELED,)])
@patch('airflow.providers.tableau.sensors.tableau_job_status.TableauHook')
@patch('airflow.providers.tableau.sensors.tableau.TableauHook')
def test_poke_failed(self, finish_code, mock_tableau_hook):
"""
Test poke failed
Expand Down

0 comments on commit 9132baf

Please sign in to comment.