Skip to content

Commit

Permalink
add integration with tableau
Browse files Browse the repository at this point in the history
Add integration with Tableau to refresh extracts after each dbt-model'd
run. This will be achieved via filling in new section in model's config:

```yaml
tableau_refresh_tasks:
  - resource_name: embedded_data_source
    project_name: project_name
    resource_type: workbook

  - resource_name: published_data_source_with_extract
    project_name: project_name
    resource_type: datasource
```

If config is specified and `tablea` extra is installed, `dbt-af` will
add new operator into model's task group.

![2024-05-28_10-57-05](https://github.com/Toloka/dbt-af/assets/49479190/e7494906-4398-4709-aca0-d3884f3c3b1d)
  • Loading branch information
NikitaYurasov committed May 29, 2024
1 parent cfae4ed commit 7d035af
Show file tree
Hide file tree
Showing 21 changed files with 1,044 additions and 421 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
uses: snok/install-poetry@v1
- name: Install project
run: |
poetry install --with=dev
poetry install --with=dev --all-extras
- name: Test with pytest
run: |
poetry run pytest -q -s -vv --log-cli-level=INFO --cov=dbt_af --cov-report=term tests
Expand Down
32 changes: 25 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,26 @@

## Overview

_dbt-af_ is a tool that allows you to run dbt models in a distributed manner using Airflow.
**_dbt-af_** is a tool that allows you to run dbt models in a distributed manner using Airflow.
It acts as a wrapper around the Airflow DAG,
allowing you to run the models independently while preserving their dependencies.

![dbt-af](docs/static/airflow_dag_layout.png)

### Why?

1. _dbt-af_ is [domain-driven](https://www.datamesh-architecture.com/#what-is-data-mesh).
1. **_dbt-af_** is [domain-driven](https://www.datamesh-architecture.com/#what-is-data-mesh).
It is designed to separate models from different domains into different DAGs.
This allows you to run models from different domains in parallel.
2. _dbt-af_ brings scheduling to dbt. You can schedule your dbt models to run at a specific time.
3. _dbt-af_ is an ETL-driven tool.
2. **_dbt-af_** is **dbt-first** solution.
It is designed to make analytics' life easier.
End-users could even not know that Airflow is used to schedule their models.
dbt-model's config is an entry point for all your settings and customizations.
3. **_dbt-af_** brings scheduling to dbt. From `@monthly` to `@hourly` and even [more](examples/manual_scheduling.md).
4. **_dbt-af_** is an ETL-driven tool.
You can separate your models into tiers or ETL stages
and build graphs showing the dependencies between models within each tier or stage.
4. _dbt-af_ brings additional features to use different dbt targets simultaneously, different tests scenarios, and
5. **_dbt-af_** brings additional features to use different dbt targets simultaneously, different tests scenarios, and
maintenance tasks.

## Installation
Expand Down Expand Up @@ -68,7 +72,7 @@ for dag_name, dag in dags.items():
globals()[dag_name] = dag
```

In _dbt_project.yml_ you need to set up default targets for all nodes in your project
In _dbt_project.yml_ you need to set up default targets for all nodes in your project
(see [example](examples/dags/dbt_project.yml)):

```yaml
Expand All @@ -80,8 +84,22 @@ bf_cluster: "dev"

This will create Airflow DAGs for your dbt project.

## Features

1. **_dbt-af_** is essentially designed to work with large projects (1000+ models).
When dealing with a significant number of dbt objects across different domains,
it becomes crucial to have all DAGs auto-generated.
**_dbt-af_** takes care of this by generating all the necessary DAGs for your dbt project and structuring them by
domains.
2. Each dbt run is separated into a different Airflow task. All tasks receive a date interval from the Airflow DAG
context. By using the passed date interval in your dbt models, you ensure the *idempotency* of your dbt runs.
3. _**dbt-af**_ lowers the entry threshold for non-infrastructure team members.
This means that analytics professionals, data scientists,
and data engineers can focus on their dbt models and important business logic
rather than spending time on Airflow DAGs.

## Project Information

- [Docs](examples/README.md)
- [PyPI](https://pypi.org/project/dbt-af/)
- Contributing
- [Contributing](CONTRIBUTING.md)
2 changes: 1 addition & 1 deletion dbt_af/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
'conf',
]

__version__ = '0.4.3'
__version__ = '0.5.0'

from . import conf, dags # noqa
16 changes: 15 additions & 1 deletion dbt_af/builder/dag_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt_af.operators.kubernetes_pod import DbtKubernetesPodOperator
from dbt_af.operators.run import DbtRun, DbtSeed, DbtSnapshot, DbtTest
from dbt_af.operators.sensors import AfExecutionDateFn, DbtExternalSensor, DbtSourceFreshnessSensor
from dbt_af.operators.supplemental import TableauExtractsRefreshOperator
from dbt_af.parser.dbt_node_model import DbtNode, DbtNodeConfig
from dbt_af.parser.dbt_profiles import KubernetesTarget
from dbt_af.parser.dbt_source_model import DbtSource
Expand Down Expand Up @@ -183,14 +184,15 @@ def _get_source_deps_with_freshness_check(self) -> list[DbtSource]:
def _create_task_group(self) -> Optional[TaskGroup]:
"""
Create a task group for the model if it has external dependencies or small tests. If waits for all external
dependencies are built per domain, task group is not needed
dependencies are built per domain, a task group is not needed
"""
if (
not self._small_tests
and (not self._get_ext_deps() or self.domain_dag.config.model_dependencies.wait_policy.per_domain)
and not self._get_source_deps_with_freshness_check()
and not self.node_config.enable_from_dttm
and not self.node_config.disable_from_dttm
and not self.node_config.tableau_refresh_tasks
):
return None

Expand Down Expand Up @@ -304,6 +306,17 @@ def _init_source_dependencies_af(self, delayed_deps: DagDelayedDependencyRegistr

delayed_deps(source_wait) >> delayed_deps(self.model_task)

def _init_supplemental_dependencies_af(self, delayed_deps: DagDelayedDependencyRegistry):
if self.dbt_node.config.tableau_refresh_tasks:
tableau_refresh_task = TableauExtractsRefreshOperator(
task_id=f'tableau_refresh__{self.safe_name}',
task_group=self.task_group,
dag=self.domain_dag.af_dag,
tableau_refresh_tasks=self.dbt_node.config.tableau_refresh_tasks,
dbt_af_config=self.domain_dag.config,
)
delayed_deps(self.model_task) >> delayed_deps(tableau_refresh_task)

def init_af(self):
"""
Initialize all Airflow components for the dbt-model and it's dependencies
Expand All @@ -321,6 +334,7 @@ def init_af(self):

self._init_dependencies_af(delayed_deps)
self._init_source_dependencies_af(delayed_deps)
self._init_supplemental_dependencies_af(delayed_deps)


class DagSnapshot(DagModel):
Expand Down
10 changes: 9 additions & 1 deletion dbt_af/conf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from .config import Config, DbtDefaultTargetsConfig, DbtProjectConfig, K8sConfig, MCDIntegrationConfig # noqa
from dbt_af.conf.config import (
Config,
DbtDefaultTargetsConfig,
DbtProjectConfig,
K8sConfig,
MCDIntegrationConfig,
TableauIntegrationConfig,
)

__all__ = [
'Config',
'DbtDefaultTargetsConfig',
'DbtProjectConfig',
'K8sConfig',
'MCDIntegrationConfig',
'TableauIntegrationConfig',
]
62 changes: 60 additions & 2 deletions dbt_af/conf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,61 @@ class MCDIntegrationConfig:
metastore_name: str = attrs.field(default='')


@attrs.define(frozen=True)
class TableauIntegrationConfig:
"""
Config for Tableau integration.
There are two ways to authenticate:
- with username and password
- with token name and personal access token (PAT)
You can use only one of them. If both are specified, then username and password will be used.
:param server_address: The address of the Tableau Server or Tableau Online instance.
:param username: The name of the user whose credentials will be used to sign in
:param password: The password of the user.
:param user_id_to_impersonate: Specifies the id (not the name) of the user to sign in as.
:param token_name: The personal access token name.
:param pat: The personal access token.
:param site_id: This corresponds to the contentUrl attribute in the Tableau REST API.
The site_id is the portion of the URL that follows the /site/ in the URL. For example,
“MarketingTeam” is the site_id in the following URL MyServer/#/site/MarketingTeam/projects.
To specify the default site on Tableau Server, you can use an empty string '' (single quotes, no space).
For Tableau Cloud, you must provide a value for the site_id.
"""

server_address: str = attrs.field(validator=attrs.validators.instance_of(str))

username: Optional[str] = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(str)),
)
password: Optional[str] = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(str)),
)
user_id_to_impersonate: Optional[str] = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(str)),
)

token_name: Optional[str] = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(str)),
)
pat: Optional[str] = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(str)),
)

site_id: Optional[str] = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(str)),
)


@attrs.define(frozen=True)
class DbtProjectConfig:
"""
Expand Down Expand Up @@ -158,15 +213,18 @@ class Config:
model_dependencies: ModelDependenciesSection = attrs.field(factory=ModelDependenciesSection)
include_single_model_manual_dag: bool = attrs.field(default=True)

# airflow specific params
# airflow-specific params
max_active_dag_runs: int = attrs.field(default=50)
af_dag_description: str = attrs.field(default='https://www.buymeacoffee.com/tolokadataplatform')
dag_start_date: pendulum.datetime = attrs.field(default=pendulum.datetime(2023, 10, 1, 0, 0, 0, tz='UTC'))
is_dev: bool = attrs.field(default=False)
use_dbt_target_specific_pools: bool = attrs.field(default=True)

# mcd
# Monte Carlo Data integration
mcd: Optional[MCDIntegrationConfig] = attrs.field(default=None)

# Tableau integration
tableau: Optional[TableauIntegrationConfig] = attrs.field(default=None)

# k8s
k8s: K8sConfig = attrs.field(factory=K8sConfig)
13 changes: 13 additions & 0 deletions dbt_af/integrations/tableau/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from importlib.util import find_spec

from dbt_af.integrations.tableau.extracts import tableau_extracts_refresh


def is_tableau_installed():
return find_spec('tableauserverclient') is not None


__all__ = [
'is_tableau_installed',
'tableau_extracts_refresh',
]
35 changes: 35 additions & 0 deletions dbt_af/integrations/tableau/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from tableauserverclient.models.tableau_auth import Credentials

from dbt_af.conf import Config


def get_tableau_auth_object(config: 'Config') -> 'Credentials':
import tableauserverclient as tsc

if not config.tableau:
raise ValueError('No tableau configuration found in dbt_af config. Please specify tableau credentials.')

if config.tableau.username and config.tableau.password:
logging.info('Using tableau username/password authentication')
return tsc.TableauAuth(
username=config.tableau.username,
password=config.tableau.password,
site_id=config.tableau.site_id,
user_id_to_impersonate=config.tableau.user_id_to_impersonate,
)

if config.tableau.token_name and config.tableau.pat:
logging.info('Using tableau personal access token authentication')
return tsc.PersonalAccessTokenAuth(
token_name=config.tableau.token_name,
personal_access_token=config.tableau.pat,
site_id=config.tableau.site_id,
)

raise ValueError(
'No valid tableau authentication method found in dbt_af config. Please specify tableau credentials.'
)
6 changes: 6 additions & 0 deletions dbt_af/integrations/tableau/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class UnknownTableauResourceTypeException(Exception):
pass


class FailedTableauRefreshTasksException(Exception):
pass
86 changes: 86 additions & 0 deletions dbt_af/integrations/tableau/extracts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from functools import partial
from typing import TYPE_CHECKING

from cachetools import TTLCache, cachedmethod
from cachetools.keys import hashkey

from dbt_af.integrations.tableau.auth import get_tableau_auth_object
from dbt_af.integrations.tableau.exceptions import (
FailedTableauRefreshTasksException,
UnknownTableauResourceTypeException,
)
from dbt_af.parser.dbt_node_model import TableauRefreshResourceType, TableauRefreshTaskConfig

if TYPE_CHECKING:
import tableauserverclient

from dbt_af.conf import Config


class _TableauExtractsRegistry:
def __init__(self, server: 'tableauserverclient.Server'):
self.server = server

self._cache: TTLCache = TTLCache(maxsize=1024, ttl=60 * 15)

@cachedmethod(lambda self: self._cache, key=partial(hashkey, '_get_workbooks_id_mapping'))
def _get_workbooks_id_mapping(self) -> dict[tuple[str, str], str]:
import tableauserverclient as tsc

workbooks_id_mapping: dict[tuple[str, str], str] = {}
for wb in tsc.Pager(self.server.workbooks):
workbooks_id_mapping[(wb.project_name, wb.name)] = wb.id

return workbooks_id_mapping

@cachedmethod(lambda self: self._cache, key=partial(hashkey, '_get_datasources_id_mapping'))
def _get_datasources_id_mapping(self) -> dict[tuple[str, str], str]:
import tableauserverclient as tsc

datasources_id_mapping: dict[tuple[str, str], str] = {}
for ds in tsc.Pager(self.server.datasources):
datasources_id_mapping[(ds.project_name, ds.name)] = ds.id

return datasources_id_mapping

def get_resource_id(self, resource_type: TableauRefreshResourceType, project_name: str, resource_name: str) -> str:
match resource_type:
case TableauRefreshResourceType.workbook:
return self._get_workbooks_id_mapping()[(project_name, resource_name)]
case TableauRefreshResourceType.datasource:
return self._get_datasources_id_mapping()[(project_name, resource_name)]
case _:
raise UnknownTableauResourceTypeException(f'Unknown resource type: {resource_type}')


def tableau_extracts_refresh(tableau_refresh_tasks: 'list[TableauRefreshTaskConfig]', dbt_af_config: 'Config') -> None:
import tableauserverclient as tsc

tableau_auth = get_tableau_auth_object(dbt_af_config)
tableau_server = tsc.Server(server_address=dbt_af_config.tableau.server_address, use_server_version=True)
tableau_server.auth.sign_in(tableau_auth)

extracts_registry = _TableauExtractsRegistry(tableau_server)

failed_tasks: list[TableauRefreshTaskConfig] = []
for refresh_task in tableau_refresh_tasks:
try:
resource_id = extracts_registry.get_resource_id(
refresh_task.resource_type,
refresh_task.project_name,
refresh_task.resource_name,
)
match refresh_task.resource_type:
case TableauRefreshResourceType.workbook:
tableau_server.workbooks.refresh(resource_id)
case TableauRefreshResourceType.datasource:
tableau_server.datasources.refresh(resource_id)
case _:
raise UnknownTableauResourceTypeException(f'Unknown resource type: {refresh_task.resource_type}')
except UnknownTableauResourceTypeException:
failed_tasks.append(refresh_task)

if not failed_tasks:
return

raise FailedTableauRefreshTasksException(f'Failed to refresh the following Tableau resources: {failed_tasks}')
Loading

0 comments on commit 7d035af

Please sign in to comment.