Skip to content

Commit

Permalink
Notify commit owner via slack message (#37803)
Browse files Browse the repository at this point in the history
## What
Updates our slack lifecycle notifications to mention the author of the metadata change on slack

![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/PTsI7qAmiIMkhFQg04QF/b20cd2d2-dc18-4a15-ae0e-0f8a218cf871.png)


Spun out of #32715 as a stack
  • Loading branch information
bnchrch committed May 9, 2024
1 parent c0492b0 commit 4779b1e
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 21 deletions.
Expand Up @@ -4,7 +4,7 @@
from dagster import Definitions, EnvVar, ScheduleDefinition, load_assets_from_modules
from dagster_slack import SlackResource
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER
from orchestrator.assets import connector_test_report, github, metadata, registry, registry_entry, registry_report, specs_secrets_mask
from orchestrator.assets import connector_test_report, github, metadata, registry, registry_entry, registry_report, specs_secrets_mask, slack
from orchestrator.config import (
CI_MASTER_TEST_OUTPUT_REGEX,
CI_TEST_REPORT_PREFIX,
Expand Down Expand Up @@ -41,6 +41,7 @@

ASSETS = load_assets_from_modules(
[
slack,
github,
specs_secrets_mask,
metadata,
Expand Down
Expand Up @@ -10,21 +10,22 @@
import orchestrator.hacks as HACKS
import pandas as pd
import sentry_sdk
import yaml
from dagster import AutoMaterializePolicy, DynamicPartitionsDefinition, MetadataValue, OpExecutionContext, Output, asset
from dagster_gcp.gcs.file_manager import GCSFileHandle, GCSFileManager
from google.cloud import storage
from metadata_service.constants import ICON_FILE_NAME, METADATA_FILE_NAME
from metadata_service.models.generated.ConnectorRegistryDestinationDefinition import ConnectorRegistryDestinationDefinition
from metadata_service.models.generated.ConnectorRegistrySourceDefinition import ConnectorRegistrySourceDefinition
from metadata_service.models.transform import to_json_sanitized_dict
from metadata_service.spec_cache import SpecCache
from orchestrator.config import MAX_METADATA_PARTITION_RUN_REQUEST, VALID_REGISTRIES, get_public_url_for_gcs_file
from orchestrator.logging import sentry
from orchestrator.logging.publish_connector_lifecycle import PublishConnectorLifecycle, PublishConnectorLifecycleStage, StageStatus
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition
from orchestrator.utils.blob_helpers import yaml_blob_to_dict
from orchestrator.utils.dagster_helpers import OutputDataFrame
from orchestrator.utils.object_helpers import deep_copy_params
from pydantic import ValidationError
from pydantic import BaseModel, ValidationError
from pydash.objects import get

PolymorphicRegistryEntry = Union[ConnectorRegistrySourceDefinition, ConnectorRegistryDestinationDefinition]
Expand Down Expand Up @@ -332,25 +333,73 @@ def delete_registry_entry(registry_name, metadata_entry: LatestMetadataEntry, me


@sentry_sdk.trace
def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[MetadataDefinition]:
def safe_parse_metadata_definition(file_name: str, metadata_dict: dict) -> Optional[MetadataDefinition]:
"""
Safely parse the metadata definition from the given metadata entry.
Handles the case where the metadata definition is invalid for in old versions of the metadata.
"""
yaml_string = metadata_blob.download_as_string().decode("utf-8")
metadata_dict = yaml.safe_load(yaml_string)

try:
return MetadataDefinition.parse_obj(metadata_dict)

except ValidationError as e:
# only raise the error if "latest" is in the path
if "latest" in metadata_blob.name:
if "latest" in file_name:
raise e
else:
print(f"WARNING: Could not parse metadata definition for {metadata_blob.name}. Error: {e}")
print(f"WARNING: Could not parse metadata definition for {file_name}. Error: {e}")
return None


def safe_get_slack_user_identifier(airbyte_slack_users: pd.DataFrame, metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
"""
Safely get the slack user identifier from the given git info in the metadata file.
"""
if isinstance(metadata_dict, BaseModel):
metadata_dict = to_json_sanitized_dict(metadata_dict)

# if the slack users is empty or none, return none
if airbyte_slack_users is None or airbyte_slack_users.empty:
return None

commit_author = get(metadata_dict, "data.generated.git.commit_author")
commit_author_email = get(metadata_dict, "data.generated.git.commit_author_email")

# if the commit author email is not present, return author name or none
if not commit_author_email:
return commit_author

# if the commit author email is present, try to find the user in the slack users dataframe
# if the user is not found, return the author name or none
slack_user = airbyte_slack_users[airbyte_slack_users["email"] == commit_author_email]
if slack_user.empty:
slack_user = airbyte_slack_users[airbyte_slack_users["real_name"] == commit_author]

if slack_user.empty:
return commit_author

# if the user is found, return the slack real_name and id e.g. "John Doe (U12345678)"
slack_id = slack_user["id"].iloc[0]
slack_real_name = slack_user["real_name"].iloc[0]
return f"{slack_real_name} (<@{slack_id}>)"


def safe_get_commit_sha(metadata_dict: Union[dict, BaseModel]) -> Optional[str]:
"""
Safely get the git commit sha from the given git info in the metadata file.
"""
if isinstance(metadata_dict, BaseModel):
metadata_dict = to_json_sanitized_dict(metadata_dict)

# if the git commit sha is not present, return none
commit_sha = get(metadata_dict, "data.generated.git.commit_sha")
if not commit_sha:
return None

# if the git commit sha is present, return the commit sha
return commit_sha


# ASSETS


Expand All @@ -362,7 +411,7 @@ def safe_parse_metadata_definition(metadata_blob: storage.Blob) -> Optional[Meta
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadataEntry]]:
def metadata_entry(context: OpExecutionContext, airbyte_slack_users: pd.DataFrame) -> Output[Optional[LatestMetadataEntry]]:
"""Parse and compute the LatestMetadataEntry for the given metadata file."""
etag = context.partition_key
context.log.info(f"Processing metadata file with etag {etag}")
Expand All @@ -373,16 +422,22 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
if not matching_blob:
raise Exception(f"Could not find blob with etag {etag}")

metadata_dict = yaml_blob_to_dict(matching_blob)
user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_dict)
commit_sha = safe_get_commit_sha(metadata_dict)

metadata_file_path = matching_blob.name
PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.IN_PROGRESS,
f"Found metadata file with path {metadata_file_path} for etag {etag}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

# read the matching_blob into a metadata definition
metadata_def = safe_parse_metadata_definition(matching_blob)
metadata_def = safe_parse_metadata_definition(matching_blob.name, metadata_dict)

dagster_metadata = {
"bucket_name": matching_blob.bucket.name,
Expand All @@ -398,6 +453,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.FAILED,
f"Could not parse metadata definition for {metadata_file_path}, dont panic, this can be expected for old metadata files",
user_identifier=user_identifier,
commit_sha=commit_sha,
)
return Output(value=None, metadata=dagster_metadata)

Expand All @@ -422,6 +479,8 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
PublishConnectorLifecycleStage.METADATA_VALIDATION,
StageStatus.SUCCESS,
f"Successfully parsed metadata definition for {metadata_file_path}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

return Output(value=metadata_entry, metadata=dagster_metadata)
Expand All @@ -434,19 +493,26 @@ def metadata_entry(context: OpExecutionContext) -> Output[Optional[LatestMetadat
auto_materialize_policy=AutoMaterializePolicy.eager(max_materializations_per_minute=MAX_METADATA_PARTITION_RUN_REQUEST),
)
@sentry.instrument_asset_op
def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry]) -> Output[Optional[dict]]:
def registry_entry(
context: OpExecutionContext, metadata_entry: Optional[LatestMetadataEntry], airbyte_slack_users: pd.DataFrame
) -> Output[Optional[dict]]:
"""
Generate the registry entry files from the given metadata file, and persist it to GCS.
"""
if not metadata_entry:
# if the metadata entry is invalid, return an empty dict
return Output(metadata={"empty_metadata": True}, value=None)

user_identifier = safe_get_slack_user_identifier(airbyte_slack_users, metadata_entry.metadata_definition)
commit_sha = safe_get_commit_sha(metadata_entry.metadata_definition)

PublishConnectorLifecycle.log(
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.IN_PROGRESS,
f"Generating registry entry for {metadata_entry.file_path}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

spec_cache = SpecCache()
Expand Down Expand Up @@ -488,7 +554,9 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
context,
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}",
f"Successfully generated {registry_name} registry entry for {metadata_entry.file_path} at {registry_url}.\n\n*This new Connector will be available for use in the platform on the next release (1-3 min)*",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

# Log the registry entries that were deleted
Expand All @@ -498,6 +566,8 @@ def registry_entry(context: OpExecutionContext, metadata_entry: Optional[LatestM
PublishConnectorLifecycleStage.REGISTRY_ENTRY_GENERATION,
StageStatus.SUCCESS,
f"Successfully deleted {registry_name} registry entry for {metadata_entry.file_path}",
user_identifier=user_identifier,
commit_sha=commit_sha,
)

return Output(metadata=dagster_metadata, value=persisted_registry_entries)
@@ -0,0 +1,51 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import os

import pandas as pd
from dagster import AutoMaterializePolicy, FreshnessPolicy, OpExecutionContext, Output, asset
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe

GROUP_NAME = "slack"

USER_REQUEST_CHUNK_SIZE = 2000
MAX_REQUESTS = 5


@asset(
group_name=GROUP_NAME,
required_resource_keys={"slack"},
auto_materialize_policy=AutoMaterializePolicy.eager(),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 12),
)
def airbyte_slack_users(context: OpExecutionContext) -> OutputDataFrame:
"""
Return a list of all users in the airbyte slack.
"""
if not os.getenv("SLACK_TOKEN"):
context.log.info("Skipping Slack Users asset as SLACK_TOKEN is not set")
return None

client = context.resources.slack.get_client()
users_response = client.users_list(limit=2000)
metadata = users_response.data["response_metadata"]
users = users_response.data["members"]
requests_count = 1

while metadata["next_cursor"] and requests_count < MAX_REQUESTS:
users_response = client.users_list(limit=2000, cursor=metadata["next_cursor"])
metadata = users_response.data["response_metadata"]
users.extend(users_response.data["members"])
requests_count += 1

# Convert to a dataframe of id, real_name, and email
# Remove any deleted or bot profiles
users_df = pd.DataFrame(users)
users_df = users_df[users_df["deleted"] == False]
users_df = users_df[users_df["is_bot"] == False]
users_df["email"] = users_df["profile"].apply(lambda x: x.get("email", None))
users_df = users_df[["id", "real_name", "email"]]

return output_dataframe(users_df)
Expand Up @@ -46,3 +46,6 @@ def get_public_metadata_service_url(file_path: str) -> str:
metadata_bucket = os.getenv("METADATA_BUCKET")
metadata_cdn_url = os.getenv("METADATA_CDN_BASE_URL")
return get_public_url_for_gcs_file(metadata_bucket, file_path, metadata_cdn_url)


REPO_URL = "https://github.com/airbytehq/airbyte/"
Expand Up @@ -6,6 +6,7 @@
from enum import Enum

from dagster import OpExecutionContext
from orchestrator.config import REPO_URL
from orchestrator.ops.slack import send_slack_message


Expand Down Expand Up @@ -56,19 +57,45 @@ def stage_to_log_level(stage_status: StageStatus) -> str:
else:
return "info"

def _commit_link(commit_sha: str) -> str:
"""Create a markdown link to a commit."""
commit_url = f"{REPO_URL}/commit/{commit_sha}"
return f"\ncommit: <{commit_url}|{commit_sha}>"

def _user_mention(user_identifier: str) -> str:
"""Create a markdown link to a user."""
return f"\nauthor: {user_identifier}"

@staticmethod
def create_log_message(
lifecycle_stage: PublishConnectorLifecycleStage,
stage_status: StageStatus,
message: str,
commit_sha: str = None,
user_identifier: str = None,
) -> str:
emoji = stage_status.to_emoji()
return f"*{emoji} _{lifecycle_stage}_ {stage_status}*: {message}"
final_message = f"*{emoji} _{lifecycle_stage}_ {stage_status}*:\n{message}"

if user_identifier:
final_message += PublishConnectorLifecycle._user_mention(user_identifier)

if commit_sha:
final_message += PublishConnectorLifecycle._commit_link(commit_sha)

return final_message

@staticmethod
def log(context: OpExecutionContext, lifecycle_stage: PublishConnectorLifecycleStage, stage_status: StageStatus, message: str):
def log(
context: OpExecutionContext,
lifecycle_stage: PublishConnectorLifecycleStage,
stage_status: StageStatus,
message: str,
commit_sha: str = None,
user_identifier: str = None,
):
"""Publish a connector notification log to logger and slack (if enabled)."""
message = PublishConnectorLifecycle.create_log_message(lifecycle_stage, stage_status, message)
message = PublishConnectorLifecycle.create_log_message(lifecycle_stage, stage_status, message, commit_sha, user_identifier)

level = PublishConnectorLifecycle.stage_to_log_level(stage_status)
log_method = getattr(context.log, level)
Expand Down
@@ -0,0 +1,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import yaml
from google.cloud import storage


def yaml_blob_to_dict(yaml_blob: storage.Blob) -> dict:
"""
Convert the given yaml blob to a dictionary.
"""
yaml_string = yaml_blob.download_as_string().decode("utf-8")
return yaml.safe_load(yaml_string)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -27,6 +27,7 @@
oss_sources_dataframe,
)
from orchestrator.models.metadata import LatestMetadataEntry, MetadataDefinition
from orchestrator.utils.blob_helpers import yaml_blob_to_dict
from pydantic import ValidationError

VALID_METADATA_DICT = {
Expand Down Expand Up @@ -64,11 +65,13 @@ def test_safe_parse_metadata_definition(blob_name, blob_content, expected_result
mock_blob.name = blob_name
mock_blob.download_as_string.return_value = blob_content.encode("utf-8")

metadata_dict = yaml_blob_to_dict(mock_blob)

if expected_exception:
with pytest.raises(expected_exception):
safe_parse_metadata_definition(mock_blob)
safe_parse_metadata_definition(mock_blob.name, metadata_dict)
else:
result = safe_parse_metadata_definition(mock_blob)
result = safe_parse_metadata_definition(mock_blob.name, metadata_dict)
# assert the name is set correctly
assert result == expected_result

Expand Down

0 comments on commit 4779b1e

Please sign in to comment.