Skip to content

Commit

Permalink
[dagster-airbyte][managed-elements] Explicit handling of reconciling …
Browse files Browse the repository at this point in the history
…secret values (#10195)

* [dagster-airbyte][managed-elements] Explicit handling of reconciling secret values

* lint

* use CLI flag instead

* fix

* is_flag=True
  • Loading branch information
benpankow authored and alangenfeld committed Nov 3, 2022
1 parent 345f0b3 commit 9157ebb
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
ManagedElementDiff,
ManagedElementError,
)
from dagster_managed_elements.types import ManagedElementReconciler
from dagster_managed_elements.types import (
SECRET_MASK_VALUE,
ManagedElementReconciler,
is_key_secret,
)
from dagster_managed_elements.utils import diff_dicts

import dagster._check as check
Expand All @@ -47,15 +51,34 @@ def gen_configured_stream_json(
)


def _ignore_secrets_compare_fn(k: str, _cv: Any, dv: Any) -> Optional[bool]:
if is_key_secret(k):
return dv == SECRET_MASK_VALUE
return None


def _diff_configs(
config_dict: Dict[str, Any], dst_dict: Dict[str, Any], ignore_secrets: bool = True
) -> ManagedElementDiff:
return diff_dicts(
config_dict=config_dict,
dst_dict=dst_dict,
custom_compare_fn=_ignore_secrets_compare_fn if ignore_secrets else None,
)


def diff_sources(
config_src: Optional[AirbyteSource], curr_src: Optional[AirbyteSource]
config_src: Optional[AirbyteSource],
curr_src: Optional[AirbyteSource],
ignore_secrets: bool = True,
) -> ManagedElementCheckResult:
"""
Utility to diff two AirbyteSource objects.
"""
diff = diff_dicts(
diff = _diff_configs(
config_src.source_configuration if config_src else {},
curr_src.source_configuration if curr_src else {},
ignore_secrets,
)
if not diff.is_empty():
name = config_src.name if config_src else curr_src.name if curr_src else "Unknown"
Expand All @@ -65,14 +88,17 @@ def diff_sources(


def diff_destinations(
config_dst: Optional[AirbyteDestination], curr_dst: Optional[AirbyteDestination]
config_dst: Optional[AirbyteDestination],
curr_dst: Optional[AirbyteDestination],
ignore_secrets: bool = True,
) -> ManagedElementCheckResult:
"""
Utility to diff two AirbyteDestination objects.
"""
diff = diff_dicts(
diff = _diff_configs(
config_dst.destination_configuration if config_dst else {},
curr_dst.destination_configuration if curr_dst else {},
ignore_secrets,
)
if not diff.is_empty():
name = config_dst.name if config_dst else curr_dst.name if curr_dst else "Unknown"
Expand Down Expand Up @@ -113,6 +139,7 @@ def reconcile_sources(
workspace_id: str,
dry_run: bool,
should_delete: bool,
ignore_secrets: bool,
) -> Tuple[Mapping[str, InitializedAirbyteSource], ManagedElementCheckResult]:
"""
Generates a diff of the configured and existing sources and reconciles them to match the
Expand All @@ -132,7 +159,11 @@ def reconcile_sources(
continue

diff = diff.join(
diff_sources(configured_source, existing_source.source if existing_source else None)
diff_sources(
configured_source,
existing_source.source if existing_source else None,
ignore_secrets,
)
)

if existing_source and (
Expand Down Expand Up @@ -187,7 +218,6 @@ def reconcile_sources(
source_id=source_id,
source_definition_id=defn_id,
)

return initialized_sources, diff


Expand All @@ -198,6 +228,7 @@ def reconcile_destinations(
workspace_id: str,
dry_run: bool,
should_delete: bool,
ignore_secrets: bool,
) -> Tuple[Mapping[str, InitializedAirbyteDestination], ManagedElementCheckResult]:
"""
Generates a diff of the configured and existing destinations and reconciles them to match the
Expand All @@ -220,6 +251,7 @@ def reconcile_destinations(
diff_destinations(
configured_destination,
existing_destination.destination if existing_destination else None,
ignore_secrets,
)
)

Expand Down Expand Up @@ -286,6 +318,7 @@ def reconcile_config(
objects: List[AirbyteConnection],
dry_run: bool = False,
should_delete: bool = False,
ignore_secrets: bool = True,
) -> ManagedElementCheckResult:
"""
Main entry point for the reconciliation process. Takes a list of AirbyteConnection objects
Expand Down Expand Up @@ -335,10 +368,16 @@ def reconcile_config(
)

all_sources, sources_diff = reconcile_sources(
res, config_sources, existing_sources, workspace_id, dry_run, should_delete
res,
config_sources,
existing_sources,
workspace_id,
dry_run,
should_delete,
ignore_secrets,
)
all_dests, dests_diff = reconcile_destinations(
res, config_dests, existing_dests, workspace_id, dry_run, should_delete
res, config_dests, existing_dests, workspace_id, dry_run, should_delete, ignore_secrets
)

# Now that we have updated the set of sources and destinations, we can
Expand Down Expand Up @@ -599,20 +638,22 @@ def __init__(

super().__init__()

def check(self) -> ManagedElementCheckResult:
def check(self, **kwargs) -> ManagedElementCheckResult:
return reconcile_config(
self._airbyte_instance,
self._connections,
dry_run=True,
should_delete=self._delete_unmentioned_resources,
ignore_secrets=(not kwargs.get("include_all_secrets", False)),
)

def apply(self) -> ManagedElementCheckResult:
def apply(self, **kwargs) -> ManagedElementCheckResult:
return reconcile_config(
self._airbyte_instance,
self._connections,
dry_run=False,
should_delete=self._delete_unmentioned_resources,
ignore_secrets=(not kwargs.get("include_all_secrets", False)),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
from datetime import datetime

import mock
import pytest
import requests
from dagster_airbyte import airbyte_resource, load_assets_from_connections
Expand Down Expand Up @@ -271,3 +272,41 @@ def test_change_source_and_destination(empty_airbyte_instance, airbyte_source_fi

check_result = check(TEST_ROOT_DIR, "example_airbyte_stack:reconciler_different_dest")
assert check_result == ManagedElementDiff()


def test_mark_secrets_as_changed(docker_compose_airbyte_instance, airbyte_source_files):

# First, apply a stack and check that there's no diff after applying it
apply(TEST_ROOT_DIR, "example_airbyte_stack:reconciler")

check_result = check(TEST_ROOT_DIR, "example_airbyte_stack:reconciler")
assert ManagedElementDiff() == check_result

# Ensure that a different config has a diff
check_result = check(TEST_ROOT_DIR, "example_airbyte_stack:reconciler_different_dest")
other_check_result = check(
TEST_ROOT_DIR,
"example_airbyte_stack:reconciler_different_dest",
include_all_secrets=True,
)
assert other_check_result == check_result
assert ManagedElementDiff() != check_result

# Next, mock to treat all config as secrets - now, we don't expect a diff
# because we ignore all the config fields which have changed
with mock.patch(
"dagster_airbyte.managed.reconciliation._ignore_secrets_compare_fn", return_value=True
):
check_result = check(TEST_ROOT_DIR, "example_airbyte_stack:reconciler")
assert ManagedElementDiff() == check_result

check_result = check(TEST_ROOT_DIR, "example_airbyte_stack:reconciler_different_dest")
assert ManagedElementDiff() == check_result

# This reconciler has mark_secrets_as_changed set to True, so we expect a diff
check_result = check(
TEST_ROOT_DIR,
"example_airbyte_stack:reconciler_different_dest",
include_all_secrets=True,
)
assert ManagedElementDiff() != check_result
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_reconcilable_objects(module: ModuleType) -> List[ManagedElementReconcile
]


def check(module_dir: str, import_str: str) -> ManagedElementDiff:
def check(module_dir: str, import_str: str, **kwargs) -> ManagedElementDiff:
reconcilable_objects = get_reconcilable_objects_from_module(
module_dir=module_dir, import_str=import_str
)
Expand All @@ -70,15 +70,15 @@ def check(module_dir: str, import_str: str) -> ManagedElementDiff:

diff = ManagedElementDiff()
for obj in reconcilable_objects:
result = obj.check()
result = obj.check(**kwargs)
if isinstance(result, ManagedElementDiff):
diff = diff.join(result)
else:
click.echo(result)
return diff


def apply(module_dir: str, import_str: str) -> ManagedElementDiff:
def apply(module_dir: str, import_str: str, **kwargs) -> ManagedElementDiff:
reconcilable_objects = get_reconcilable_objects_from_module(
module_dir=module_dir, import_str=import_str
)
Expand All @@ -87,7 +87,7 @@ def apply(module_dir: str, import_str: str) -> ManagedElementDiff:

diff = ManagedElementDiff()
for obj in reconcilable_objects:
result = obj.apply()
result = obj.apply(**kwargs)
if isinstance(result, ManagedElementDiff):
diff = diff.join(result)
else:
Expand Down Expand Up @@ -117,8 +117,16 @@ def main():
type=click.Path(exists=True),
help="Optional relative or absolute path to load module from, will be appended to system path.",
)
def check_cmd(module, working_directory):
click.echo(check(working_directory, module))
@click.option(
"--include-all-secrets",
is_flag=True,
help=(
"Whether to include all secrets in the diff, acting as if all secrets will be pushed to the remote state."
" Secrets cannot be retrieved and diffed against the remote state, so this option simulates the diff if this flag is included to the apply command."
),
)
def check_cmd(module, working_directory, include_all_secrets):
click.echo(check(working_directory, module, include_all_secrets=include_all_secrets))


@main.command(
Expand All @@ -138,5 +146,14 @@ def check_cmd(module, working_directory):
type=click.Path(exists=True),
help="Optional relative or absolute path to load module from, will be appended to system path.",
)
def apply_cmd(module, working_directory):
click.echo(apply(working_directory, module))
@click.option(
"--include-all-secrets",
is_flag=True,
help=(
"Whether to push all secret values to the remote state, or only those that aren't already present."
" Secrets cannot be retrieved and diffed against the remote state, so this option is required when a secret is changed."
" If False, secrets that are already present in the remote state will not be pushed."
),
)
def apply_cmd(module, working_directory, include_all_secrets):
click.echo(apply(working_directory, module, include_all_secrets=include_all_secrets))
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@ class ManagedElementError(enum.Enum):

SANITIZE_KEY_KEYWORDS = ["password", "token", "secret"]

SECRET_MASK_VALUE = "**********"


def is_key_secret(key: str):
"""
Rudamentary check to see if a config key is a secret value.
"""
return any(keyword in key for keyword in SANITIZE_KEY_KEYWORDS)


def _sanitize(key: str, value: str):
"""
Rudamentary sanitization of values so we can avoid printing passwords
to the console.
"""
if any(keyword in key for keyword in SANITIZE_KEY_KEYWORDS):
return "**********"
if is_key_secret(key):
return SECRET_MASK_VALUE
return value


Expand Down Expand Up @@ -241,15 +250,19 @@ class ManagedElementReconciler(ABC):
"""

@abstractmethod
def check(self) -> ManagedElementCheckResult:
def check(self, **kwargs) -> ManagedElementCheckResult:
"""
Returns whether the user provided config for the managed element is in sync with the external resource.
kwargs contains any optional user-specified arguments to the check command.
"""
raise NotImplementedError()

@abstractmethod
def apply(self) -> ManagedElementCheckResult:
def apply(self, **kwargs) -> ManagedElementCheckResult:
"""
Reconciles the managed element with the external resource, returning the applied diff.
kwargs contains any optional user-specified arguments to the apply command.
"""
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, Optional

from .types import ManagedElementDiff


def diff_dicts(
config_dict: Optional[Dict[str, Any]],
dst_dict: Optional[Dict[str, Any]],
custom_compare_fn: Optional[Callable[[str, Any, Any], bool]] = None,
) -> ManagedElementDiff:
"""
Utility function which builds a ManagedElementDiff given two dictionaries.
Args:
config_dict (Optional[Dict[str, Any]]): The dictionary from the user config.
dst_dict (Optional[Dict[str, Any]]): The dictionary from the destination.
custom_compare_fn (Optional[Callable[[Any, Any], bool]]): A custom comparison function to
use for comparing values in the dictionaries. Return True if the two values are the same.
Only used for non-None, non-dict values.
"""
diff = ManagedElementDiff()

Expand All @@ -19,28 +27,30 @@ def diff_dicts(
# Both dicts have the key with a dict value - recurse and
# compare the nested dicts
if type(config_dict.get(key)) == dict and type(dst_dict.get(key)) == dict:
nested_diff = diff_dicts(config_dict[key], dst_dict[key])
nested_diff = diff_dicts(config_dict[key], dst_dict[key], custom_compare_fn)
if not nested_diff.is_empty():
diff = diff.with_nested(key, nested_diff)
# If one dict has the key as a dict but not the other,
# recurse and optionally remove the non-dict value in the other
elif type(config_dict.get(key)) == dict:
if key in dst_dict:
diff = diff.delete(key, dst_dict[key])
nested_diff = diff_dicts(config_dict[key], {})
nested_diff = diff_dicts(config_dict[key], {}, custom_compare_fn)
if not nested_diff.is_empty():
diff = diff.with_nested(key, nested_diff)
elif type(dst_dict.get(key)) == dict:
if key in config_dict:
diff = diff.add(key, config_dict[key])
nested_diff = diff_dicts({}, dst_dict[key])
nested_diff = diff_dicts({}, dst_dict[key], custom_compare_fn)
if not nested_diff.is_empty():
diff = diff.with_nested(key, nested_diff)
# Handle non-dict values
elif key not in config_dict:
diff = diff.delete(key, dst_dict[key])
elif key not in dst_dict:
diff = diff.add(key, config_dict[key])
elif config_dict[key] != dst_dict[key]:
elif config_dict[key] != dst_dict[key] and (
not custom_compare_fn or not custom_compare_fn(key, config_dict[key], dst_dict[key])
):
diff = diff.modify(key, dst_dict[key], config_dict[key])
return diff

0 comments on commit 9157ebb

Please sign in to comment.