Skip to content

Commit

Permalink
[dagster-census] Census Integration Library (#7249)
Browse files Browse the repository at this point in the history
  • Loading branch information
dehume committed Apr 20, 2022
1 parent 4572857 commit 017bd49
Show file tree
Hide file tree
Showing 16 changed files with 907 additions and 0 deletions.
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-census/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[run]
branch = True
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-census/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include README.md
include LICENSE
15 changes: 15 additions & 0 deletions python_modules/libraries/dagster-census/dagster_census/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from dagster.core.utils import check_dagster_package_version

from .ops import census_trigger_sync_op
from .resources import CensusResource, census_resource
from .types import CensusOutput
from .version import __version__

check_dagster_package_version("dagster-census", __version__)

__all__ = [
"CensusResource",
"CensusOutput",
"census_resource",
"census_trigger_sync_op",
]
98 changes: 98 additions & 0 deletions python_modules/libraries/dagster-census/dagster_census/ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from dagster import Array, Bool, Field, In, Noneable, Nothing, Out, Output, op

from .resources import DEFAULT_POLL_INTERVAL
from .types import CensusOutput
from .utils import generate_materialization


@op(
required_resource_keys={"census"},
ins={"start_after": In(Nothing)},
out=Out(
CensusOutput,
description="Parsed json dictionary representing the details of the Census sync after "
"the sync successfully completes.",
),
config_schema={
"sync_id": Field(
int,
is_required=True,
description="id of the parent sync.",
),
"force_full_sync": Field(
config=Bool,
default_value=False,
description="If this trigger request should be a Full Sync. "
"Note that some sync configurations such as Append do not support full syncs.",
),
"poll_interval": Field(
float,
default_value=DEFAULT_POLL_INTERVAL,
description="The time (in seconds) that will be waited between successive polls.",
),
"poll_timeout": Field(
Noneable(float),
default_value=None,
description="The maximum time that will waited before this operation is timed out. By "
"default, this will never time out.",
),
"yield_materializations": Field(
config=Bool,
default_value=True,
description=(
"If True, materializations corresponding to the results of the Census sync will "
"be yielded when the op executes."
),
),
"asset_key_prefix": Field(
config=Array(str),
default_value=["census"],
description=(
"If provided and yield_materializations is True, these components will be used to "
"prefix the generated asset keys."
),
),
},
tags={"kind": "census"},
)
def census_trigger_sync_op(context):
"""
Executes a Census sync for a given ``sync_id``, and polls until that sync
completes, raising an error if it is unsuccessful. It outputs a CensusOutput which contains
the details of the Census sync after the sync successfully completes, as well as details
about which tables the sync updates.
It requires the use of the :py:class:`~dagster_census.census_resource`, which allows it to
communicate with the census API.
Examples:
.. code-block:: python
from dagster import job
from dagster_census import census_resource, census_sync_op
my_census_resource = census_resource.configured(
{
"api_key": {"env": "CENSUS_API_KEY"},
}
)
sync_foobar = census_sync_op.configured({"sync_id": "foobar"}, name="sync_foobar")
@job(resource_defs={"census": my_census_resource})
def my_simple_census_job():
sync_foobar()
"""
census_output = context.resources.census.trigger_sync_and_poll(
sync_id=context.op_config["sync_id"],
force_full_sync=context.op_config["force_full_sync"],
poll_interval=context.op_config["poll_interval"],
poll_timeout=context.op_config["poll_timeout"],
)
if context.op_config["yield_materializations"]:
yield generate_materialization(
census_output, asset_key_prefix=context.op_config["asset_key_prefix"]
)
yield Output(census_output)

0 comments on commit 017bd49

Please sign in to comment.