Skip to content

Commit

Permalink
[dagster-fivetranAdd Fivetran Resync Op (#6868)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwallace0723 committed Mar 2, 2022
1 parent 3067d32 commit e6fb91b
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from dagster.core.utils import check_dagster_package_version

from .asset_defs import build_fivetran_assets
from .ops import fivetran_sync_op
from .ops import fivetran_resync_op, fivetran_sync_op
from .resources import FivetranResource, fivetran_resource
from .types import FivetranOutput
from .version import __version__

check_dagster_package_version("dagster-fivetran", __version__)

__all__ = ["FivetranResource", "fivetran_resource", "fivetran_sync_op", "FivetranOutput"]
__all__ = [
"FivetranResource",
"fivetran_resource",
"fivetran_sync_op",
"fivetran_resync_op",
"FivetranOutput",
]
121 changes: 120 additions & 1 deletion python_modules/libraries/dagster-fivetran/dagster_fivetran/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import generate_materializations

from dagster import Array, Bool, Field, In, Noneable, Nothing, Out, Output, op
from dagster import Array, AssetKey, Bool, Field, In, Noneable, Nothing, Out, Output, Permissive, op


@op(
Expand Down Expand Up @@ -98,3 +98,122 @@ def my_composed_fivetran_job():
fivetran_output, asset_key_prefix=context.op_config["asset_key_prefix"]
)
yield Output(fivetran_output)


@op(
required_resource_keys={"fivetran"},
ins={"start_after": In(Nothing)},
out=Out(
FivetranOutput,
description="Parsed json dictionary representing the details of the Fivetran connector after "
"the resync successfully completes. "
"See the [Fivetran API Docs](https://fivetran.com/docs/rest-api/connectors#retrieveconnectordetails) "
"to see detailed information on this response.",
),
config_schema={
"connector_id": Field(
str,
is_required=True,
description="The Fivetran Connector ID that this op will sync. You can retrieve this "
'value from the "Setup" tab of a given connector in the Fivetran UI.',
),
"resync_parameters": Field(
Permissive(),
is_required=True,
description="The resync parameters to send in the payload to the Fivetran API. You "
"can find an example resync payload here: https://fivetran.com/docs/rest-api/connectors#request_6",
),
"poll_interval": Field(
float,
default_value=DEFAULT_POLL_INTERVAL,
description="The time (in seconds) that will be waited between successive polls.",
),
"poll_timeout": Field(
Noneable(float),
default_value=None,
description="The maximum time that will waited before this operation is timed out. By "
"default, this will never time out.",
),
"yield_materializations": Field(
config=Bool,
default_value=True,
description=(
"If True, materializations corresponding to the results of the Fivetran sync will "
"be yielded when the op executes."
),
),
"asset_key_prefix": Field(
config=Array(str),
default_value=["fivetran"],
description=(
"If provided and yield_materializations is True, these components will be used to "
"prefix the generated asset keys."
),
),
},
tags={"kind": "fivetran"},
)
def fivetran_resync_op(context):
"""
Executes a Fivetran historical resync for a given ``connector_id``, and polls until that resync
completes, raising an error if it is unsuccessful. It outputs a FivetranOutput which contains
the details of the Fivetran connector after the resync successfully completes, as well as details
about which tables the resync updates.
It requires the use of the :py:class:`~dagster_fivetran.fivetran_resource`, which allows it to
communicate with the Fivetran API.
Examples:
.. code-block:: python
from dagster import job
from dagster_fivetran import fivetran_resource, fivetran_resync_op
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
}
)
sync_foobar = fivetran_resync_op.configured(
{
"connector_id": "foobar",
"resync_parameters": {
"schema_a": ["table_a", "table_b"],
"schema_b": ["table_c"]
}
},
name="sync_foobar"
)
@job(resource_defs={"fivetran": my_fivetran_resource})
def my_simple_fivetran_job():
sync_foobar()
@job(resource_defs={"fivetran": my_fivetran_resource})
def my_composed_fivetran_job():
final_foobar_state = sync_foobar(start_after=some_op())
other_op(final_foobar_state)
"""

fivetran_output = context.resources.fivetran.resync_and_poll(
connector_id=context.op_config["connector_id"],
resync_parameters=context.op_config["resync_parameters"],
poll_interval=context.op_config["poll_interval"],
poll_timeout=context.op_config["poll_timeout"],
)
if context.op_config["yield_materializations"]:
asset_key_filter = [
AssetKey(context.op_config["asset_key_prefix"] + [schema, table])
for schema, tables in context.op_config["resync_parameters"].items()
for table in tables
]
for mat in generate_materializations(
fivetran_output, asset_key_prefix=context.op_config["asset_key_prefix"]
):
if mat.asset_key in asset_key_filter:
yield mat

yield Output(fivetran_output)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
import time
from typing import Any, Dict, Tuple
from typing import Any, Dict, List, Tuple
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -211,6 +211,38 @@ def start_sync(self, connector_id: str) -> Dict[str, Any]:
)
return connector_details

def start_resync(
self, connector_id: str, resync_parameters: Dict[str, List[str]]
) -> Dict[str, Any]:
"""
Initiates a historical sync of all data for multiple schema tables within a Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
resync_parameters (Dict[str, List[str]]): The resync parameters to send to the Fivetran API.
An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_6
Returns:
Dict[str, Any]: Parsed json data representing the connector details API response after
the resync is started.
"""
if self._disable_schedule_on_trigger:
self._log.info("Disabling Fivetran sync schedule.")
self.update_schedule_type(connector_id, "manual")
self._assert_syncable_connector(connector_id)
self.make_request(
method="POST",
endpoint=f"{connector_id}/schemas/tables/resync",
data=json.dumps(resync_parameters),
)
connector_details = self.get_connector_details(connector_id)
self._log.info(
f"Sync initialized for connector_id={connector_id}. View this resync in the Fivetran UI: "
+ get_fivetran_connector_url(connector_details)
)
return connector_details

def poll_sync(
self,
connector_id: str,
Expand Down Expand Up @@ -301,6 +333,41 @@ def sync_and_poll(
)
return FivetranOutput(connector_details=final_details, schema_config=schema_config)

def resync_and_poll(
self,
connector_id: str,
resync_parameters: Dict[str, List[str]],
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: float = None,
) -> FivetranOutput:
"""
Initializes a historical resync operation for the given connector, and polls until it completes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API.
This should be a dictionary with schema names as the keys and a list of tables
to resync as the values.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
out. By default, this will never time out.
Returns:
:py:class:`~FivetranOutput`:
Object containing details about the connector and the tables it updates
"""
schema_config = self.get_connector_schema_config(connector_id)
init_last_sync_timestamp, _, _ = self.get_connector_sync_status(connector_id)
self.start_resync(connector_id, resync_parameters)
final_details = self.poll_sync(
connector_id,
init_last_sync_timestamp,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)
return FivetranOutput(connector_details=final_details, schema_config=schema_config)


@resource(
config_schema={
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import responses
from dagster_fivetran import FivetranOutput, fivetran_resource, fivetran_sync_op
from dagster_fivetran import FivetranOutput, fivetran_resource, fivetran_resync_op, fivetran_sync_op
from dagster_fivetran.resources import FIVETRAN_API_BASE, FIVETRAN_CONNECTOR_PATH

from dagster import AssetKey, job, op
Expand All @@ -8,6 +8,7 @@
DEFAULT_CONNECTOR_ID,
get_complex_sample_connector_schema_config,
get_sample_connector_response,
get_sample_resync_response,
get_sample_sync_response,
get_sample_update_response,
)
Expand Down Expand Up @@ -76,3 +77,70 @@ def fivetran_sync_job():
AssetKey(["fivetran", "abc", "xyz"]),
]
)


def test_fivetran_resync_op():

ft_resource = fivetran_resource.configured({"api_key": "foo", "api_secret": "bar"})
final_data = {"succeeded_at": "2021-01-01T02:00:00.0Z"}
api_prefix = f"{FIVETRAN_API_BASE}/{FIVETRAN_CONNECTOR_PATH}{DEFAULT_CONNECTOR_ID}"

@op
def foo_op():
pass

@job(
resource_defs={"fivetran": ft_resource},
config={
"ops": {
"fivetran_resync_op": {
"config": {
"connector_id": DEFAULT_CONNECTOR_ID,
"resync_parameters": {"xyz1": ["abc1", "abc2"]},
"poll_interval": 0.1,
"poll_timeout": 10,
}
}
}
},
)
def fivetran_resync_job():
fivetran_resync_op(start_after=foo_op())

with responses.RequestsMock() as rsps:
rsps.add(rsps.PATCH, api_prefix, json=get_sample_update_response())
rsps.add(
rsps.POST, f"{api_prefix}/schemas/tables/resync", json=get_sample_resync_response()
)
# connector schema
rsps.add(
rsps.GET, f"{api_prefix}/schemas", json=get_complex_sample_connector_schema_config()
)
# initial state
rsps.add(rsps.GET, api_prefix, json=get_sample_connector_response())
# n polls before updating
for _ in range(2):
rsps.add(rsps.GET, api_prefix, json=get_sample_connector_response())
# final state will be updated
rsps.add(rsps.GET, api_prefix, json=get_sample_connector_response(data=final_data))

result = fivetran_resync_job.execute_in_process()
assert result.output_for_node("fivetran_resync_op") == FivetranOutput(
connector_details=get_sample_connector_response(data=final_data)["data"],
schema_config=get_complex_sample_connector_schema_config()["data"],
)
asset_materializations = [
event
for event in result.events_for_node("fivetran_resync_op")
if event.event_type_value == "ASSET_MATERIALIZATION"
]
assert len(asset_materializations) == 2
asset_keys = set(
mat.event_specific_data.materialization.asset_key for mat in asset_materializations
)
assert asset_keys == set(
[
AssetKey(["fivetran", "xyz1", "abc1"]),
AssetKey(["fivetran", "xyz1", "abc2"]),
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_CONNECTOR_ID,
get_complex_sample_connector_schema_config,
get_sample_connector_response,
get_sample_resync_response,
get_sample_sync_response,
get_sample_update_response,
)
Expand Down Expand Up @@ -260,3 +261,60 @@ def test_sync_and_poll_invalid(data, match):
json=get_sample_sync_response(),
)
ft_resource.sync_and_poll(DEFAULT_CONNECTOR_ID, poll_interval=0.1)


@pytest.mark.parametrize(
"n_polls, succeed_at_end",
[(0, True), (0, False), (4, True), (4, False), (30, True)],
)
def test_resync_and_poll(n_polls, succeed_at_end):

ft_resource = fivetran_resource(
build_init_resource_context(
config={
"api_key": "some_key",
"api_secret": "some_secret",
}
)
)
api_prefix = f"{ft_resource.api_base_url}{DEFAULT_CONNECTOR_ID}"

final_data = (
{"succeeded_at": "2021-01-01T02:00:00.0Z"}
if succeed_at_end
else {"failed_at": "2021-01-01T02:00:00.0Z"}
)

def _mock_interaction():

with responses.RequestsMock() as rsps:
rsps.add(
rsps.GET,
f"{ft_resource.api_base_url}{DEFAULT_CONNECTOR_ID}/schemas",
json=get_complex_sample_connector_schema_config(),
)
rsps.add(rsps.PATCH, api_prefix, json=get_sample_update_response())
rsps.add(
rsps.POST, f"{api_prefix}/schemas/tables/resync", json=get_sample_resync_response()
)
# initial state
rsps.add(rsps.GET, api_prefix, json=get_sample_connector_response())
# n polls before updating
for _ in range(n_polls):
rsps.add(rsps.GET, api_prefix, json=get_sample_connector_response())
# final state will be updated
rsps.add(rsps.GET, api_prefix, json=get_sample_connector_response(data=final_data))
return ft_resource.resync_and_poll(
DEFAULT_CONNECTOR_ID,
resync_parameters={"xyz1": ["abc1", "abc2"]},
poll_interval=0.1,
)

if succeed_at_end:
assert _mock_interaction() == FivetranOutput(
connector_details=get_sample_connector_response(data=final_data)["data"],
schema_config=get_complex_sample_connector_schema_config()["data"],
)
else:
with pytest.raises(Failure, match="failed!"):
_mock_interaction()

0 comments on commit e6fb91b

Please sign in to comment.