Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New generic tableau operator: TableauOperator #16915

Merged
merged 44 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f00268b
Added generic tableau operator
mariotaddeucci Jul 9, 2021
8dae389
Added refresh workbooks and datasources tests
mariotaddeucci Jul 14, 2021
0a36df8
Added TableauOperator example dag
mariotaddeucci Jul 14, 2021
04a6119
Deprecated TableauRefreshWorkbookOperato
mariotaddeucci Jul 14, 2021
af738fb
Added doc and fixed operator parameters
mariotaddeucci Jul 15, 2021
6360673
Update operators.rst
mariotaddeucci Jul 15, 2021
866d17b
Update operators.rst
mariotaddeucci Jul 15, 2021
bc56ed2
Update operators.rst
mariotaddeucci Jul 15, 2021
2294919
Fixed block refresh on TableauRefreshWorkbookOperator
mariotaddeucci Jul 15, 2021
62cc236
Added generic tableau operator
mariotaddeucci Jul 9, 2021
bdb7c61
Added refresh workbooks and datasources tests
mariotaddeucci Jul 14, 2021
151bb79
Added TableauOperator example dag
mariotaddeucci Jul 14, 2021
433ed43
Deprecated TableauRefreshWorkbookOperato
mariotaddeucci Jul 14, 2021
2672349
Added doc and fixed operator parameters
mariotaddeucci Jul 15, 2021
bb28e95
Update operators.rst
mariotaddeucci Jul 15, 2021
7386f04
Update operators.rst
mariotaddeucci Jul 15, 2021
e6c87db
Update operators.rst
mariotaddeucci Jul 15, 2021
be463b5
Fixed block refresh on TableauRefreshWorkbookOperator
mariotaddeucci Jul 15, 2021
e7df085
Resolve conflicts
mariotaddeucci Jul 15, 2021
31ec441
Merge branch 'main' of https://github.com/mariotaddeucci/airflow into…
mariotaddeucci Jul 15, 2021
5f89273
Removed unused tableau_hook on TableauRefreshWorkbookOperator
mariotaddeucci Jul 15, 2021
65ae8af
Merge branch 'apache:main' into main
mariotaddeucci Jul 15, 2021
5cf78ca
Fixed example dag
mariotaddeucci Jul 19, 2021
d0ce4d4
Merge branch 'main' into main
mariotaddeucci Jul 21, 2021
3b9b5d9
Removed duplicated code
mariotaddeucci Jul 22, 2021
730b2db
Fixed check_interval param
mariotaddeucci Jul 22, 2021
e9cf7c6
Moded tests from tableau_workbook_operator to tableau_operator
mariotaddeucci Aug 3, 2021
6df4480
Merge branch 'apache:main' into main
mariotaddeucci Aug 3, 2021
8804667
Fixed unit tests
mariotaddeucci Aug 3, 2021
e57b70a
Fixed flake8
mariotaddeucci Aug 4, 2021
5264339
Lint fix
mariotaddeucci Aug 7, 2021
9c88122
Isort fixeds
mariotaddeucci Aug 7, 2021
90d7613
Fixed static checks
mariotaddeucci Aug 7, 2021
5fa2bf6
Update operators.rst
mariotaddeucci Aug 7, 2021
399a26c
Fixed static checks
mariotaddeucci Aug 7, 2021
f017710
Merge branch 'apache:main' into main
mariotaddeucci Aug 7, 2021
cdc4a9c
Fix Lint and doc example reference
mariotaddeucci Aug 8, 2021
df0b2ee
Fix doc toctree
mariotaddeucci Aug 8, 2021
8cb1ed2
Spelling fixes
mariotaddeucci Aug 8, 2021
1095b58
Fix spell
mariotaddeucci Aug 8, 2021
7357eb9
Merge branch 'apache:main' into main
mariotaddeucci Aug 8, 2021
7eda4ce
Improve doc methods table
mariotaddeucci Aug 8, 2021
db2d8da
Merge branch 'apache:main' into main
mariotaddeucci Aug 8, 2021
aaf2b90
Merge branch 'apache:main' into main
mariotaddeucci Aug 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions airflow/providers/tableau/example_dags/example_tableau.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag that performs two refresh operations on a Tableau Workbook aka Extract. The first one
waits until it succeeds. The second does not wait since this is an asynchronous operation and we don't know
when the operation actually finishes. That's why we have another task that checks only that.
"""
from datetime import timedelta

from airflow import DAG
from airflow.providers.tableau.operators.tableau import TableauOperator
from airflow.providers.tableau.sensors.tableau_job_status import TableauJobStatusSensor
from airflow.utils.dates import days_ago

DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}

with DAG(
dag_id='example_tableau',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
) as dag:
# Refreshes a workbook and waits until it succeeds.
# [START howto_operator_tableau]
task_refresh_workbook_blocking = TableauOperator(
resource='workbooks',
method='refresh',
find='MyWorkbook',
match_with='name',
site_id='my_site',
blocking_refresh=True,
task_id='refresh_tableau_workbook_blocking',
)
# [END howto_operator_tableau]
# Refreshes a workbook and does not wait until it succeeds.
task_refresh_workbook_non_blocking = TableauOperator(
resource='workbooks',
method='refresh',
find='MyWorkbook',
match_with='name',
site_id='my_site',
blocking_refresh=False,
task_id='refresh_tableau_workbook_non_blocking',
)
# The following task queries the status of the workbook refresh job until it succeeds.
task_check_job_status = TableauJobStatusSensor(
site_id='my_site',
job_id="{{ ti.xcom_pull(task_ids='refresh_tableau_workbook_non_blocking') }}",
task_id='check_tableau_job_status',
)
task_refresh_workbook_non_blocking >> task_check_job_status
144 changes: 144 additions & 0 deletions airflow/providers/tableau/operators/tableau.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.tableau.hooks.tableau import (
TableauHook,
TableauJobFailedException,
TableauJobFinishCode,
)

RESOURCES_METHODS = {
'datasources': ['delete', 'refresh'],
'groups': ['delete'],
'projects': ['delete'],
'schedule': ['delete'],
'sites': ['delete'],
'subscriptions': ['delete'],
'tasks': ['delete', 'run'],
'users': ['remove'],
'workbooks': ['delete', 'refresh'],
}


class TableauOperator(BaseOperator):
"""
Execute a Tableau API Resource
https://tableau.github.io/server-client-python/docs/api-ref

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:TableauOperator`

:param resource: The name of the resource to use.
:type resource: str
:param method: The name of the resource's method to execute.
:type method: str
:param find: The reference of resource that will receive the action.
:type find: str
:param match_with: The resource field name to be matched with find parameter.
:type match_with: Optional[str]
:param site_id: The id of the site where the workbook belongs to.
:type site_id: Optional[str]
:param blocking_refresh: By default will be blocking means it will wait until it has finished.
:type blocking_refresh: bool
:param check_interval: time in seconds that the job should wait in
between each instance state checks until operation is completed
:type check_interval: float
:param tableau_conn_id: The :ref:`Tableau Connection id <howto/connection:tableau>`
containing the credentials to authenticate to the Tableau Server.
:type tableau_conn_id: str
"""

def __init__(
self,
*,
resource: str,
method: str,
find: str,
match_with: str = 'id',
site_id: Optional[str] = None,
blocking_refresh: bool = True,
check_interval: float = 20,
tableau_conn_id: str = 'tableau_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.resource = resource
self.method = method
self.find = find
self.match_with = match_with
self.check_interval = check_interval
self.site_id = site_id
self.blocking_refresh = blocking_refresh
self.tableau_conn_id = tableau_conn_id

def execute(self, context: dict) -> str:
"""
Executes the Tableau API resource and pushes the job id or downloaded file URI to xcom.
:param context: The task context during execution.
:type context: dict
:return: the id of the job that executes the extract refresh or downloaded file URI.
:rtype: str
"""
available_resources = RESOURCES_METHODS.keys()
if self.resource not in available_resources:
error_message = f'Resource not found! Available Resources: {available_resources}'
raise AirflowException(error_message)

available_methods = RESOURCES_METHODS[self.resource]
if self.method not in available_methods:
error_message = f'Method not found! Available methods for {self.resource}: {available_methods}'
raise AirflowException(error_message)

with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:

resource = getattr(tableau_hook.server, self.resource)
method = getattr(resource, self.method)

resource_id = self._get_resource_id(tableau_hook)

response = method(resource_id)

if self.method == 'refresh':

job_id = response.id

if self.blocking_refresh:
if not tableau_hook.wait_for_state(
job_id=job_id,
check_interval=self.check_interval,
mariotaddeucci marked this conversation as resolved.
Show resolved Hide resolved
target_state=TableauJobFinishCode.SUCCESS,
):
raise TableauJobFailedException(f'The Tableau Refresh {self.resource} Job failed!')

return job_id

def _get_resource_id(self, tableau_hook: TableauHook) -> str:

if self.match_with == 'id':
return self.find

for resource in tableau_hook.get_all(resource_name=self.resource):
if getattr(resource, self.match_with) == self.find:
resource_id = resource.id
self.log.info('Found matching with id %s', resource_id)
return resource_id

raise AirflowException(f'{self.resource} with {self.match_with} {self.find} not found!')
54 changes: 22 additions & 32 deletions airflow/providers/tableau/operators/tableau_refresh_workbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings
from typing import Optional

from tableauserverclient import WorkbookItem

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.tableau.hooks.tableau import (
TableauHook,
TableauJobFailedException,
TableauJobFinishCode,
from airflow.providers.tableau.operators.tableau import TableauOperator

warnings.warn(
"""This operator is deprecated. Please use `airflow.providers.tableau.operators.tableau`.""",
DeprecationWarning,
stacklevel=2,
)


class TableauRefreshWorkbookOperator(BaseOperator):
"""
This operator is deprecated. Please use `airflow.providers.tableau.operators.tableau`.

Refreshes a Tableau Workbook/Extract

.. seealso:: https://tableau.github.io/server-client-python/docs/api-ref#workbooks
Expand Down Expand Up @@ -75,29 +77,17 @@ def execute(self, context: dict) -> str:
:return: the id of the job that executes the extract refresh
:rtype: str
"""
with TableauHook(self.site_id, self.tableau_conn_id) as tableau_hook:
workbook = self._get_workbook_by_name(tableau_hook)

job_id = self._refresh_workbook(tableau_hook, workbook.id)
if self.blocking:
if not tableau_hook.wait_for_state(
job_id=job_id,
check_interval=self.check_interval,
target_state=TableauJobFinishCode.SUCCESS,
):
raise TableauJobFailedException('The Tableau Refresh Workbook Job failed!')

self.log.info('Workbook %s has been successfully refreshed.', self.workbook_name)
return job_id

def _get_workbook_by_name(self, tableau_hook: TableauHook) -> WorkbookItem:
for workbook in tableau_hook.get_all(resource_name='workbooks'):
if workbook.name == self.workbook_name:
self.log.info('Found matching workbook with id %s', workbook.id)
return workbook
raise AirflowException(f'Workbook {self.workbook_name} not found!')
job_id = TableauOperator(
resource='workbooks',
method='refresh',
find=self.workbook_name,
match_with='name',
site_id=self.site_id,
tableau_conn_id=self.tableau_conn_id,
blocking_refresh=self.blocking,
check_interval=self.check_interval,
task_id='refresh_workbook',
dag=None,
).execute(context={})

def _refresh_workbook(self, tableau_hook: TableauHook, workbook_id: str) -> str:
job = tableau_hook.server.workbooks.refresh(workbook_id)
self.log.info('Refreshing Workbook %s...', self.workbook_name)
return job.id
return job_id
3 changes: 3 additions & 0 deletions airflow/providers/tableau/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ additional-dependencies:
integrations:
- integration-name: Tableau
external-doc-url: https://www.tableau.com/
how-to-guide:
- /docs/apache-airflow-providers-tableau/operators.rst
logo: /integration-logos/tableau/tableau.png
tags: [service]

operators:
- integration-name: Tableau
python-modules:
- airflow.providers.tableau.operators.tableau
mariotaddeucci marked this conversation as resolved.
Show resolved Hide resolved
- airflow.providers.tableau.operators.tableau_refresh_workbook

sensors:
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow-providers-tableau/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Content
:caption: References

Connection Types <connections/tableau>
Operators <operators>
Python API <_api/airflow/providers/tableau/index>

.. toctree::
Expand Down
Loading