Skip to content

Commit

Permalink
Add metadata gcs sensor to Dagster (airbytehq#25055)
Browse files Browse the repository at this point in the history
* Add solution skeleton

* Add metadata blob resource

* Wire up sensor

* Fix report

* Materialize metadata definitions from folder

* Move metadata to own file

* Run black

* Add PR comments

* Remove dockerhub from action
  • Loading branch information
bnchrch authored and btkcodedev committed Apr 26, 2023
1 parent 7fc4bd8 commit e20d844
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 44 deletions.
5 changes: 0 additions & 5 deletions .github/actions/run-dagger-pipeline/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ runs:
id: get-start-timestamp
run: echo "::set-output name=start-timestamp::$(date +%s)"
shell: bash
- name: Login to DockerHub
run: "docker login -u ${DOCKER_HUB_USERNAME} -p ${DOCKER_HUB_PASSWORD}"
env:
DOCKER_HUB_USERNAME: ${{ secrets.DOCKER_HUB_USERNAME }}
DOCKER_HUB_PASSWORD: ${{ secrets.DOCKER_HUB_PASSWORD }}
- name: Checkout Airbyte
uses: actions/checkout@v3
with:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
METADATA_FILE_NAME = "metadata.yaml"
METADATA_FOLDER = "metadata"
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER


def get_metadata_file_path(dockerRepository: str, version: str) -> str:
"""Get the path to the metadata file for a specific version of a connector.
Args:
dockerRepository (str): Name of the connector docker image.
version (str): Version of the connector.
Returns:
str: Path to the metadata file.
"""
return f"{METADATA_FOLDER}/{dockerRepository}/{version}/{METADATA_FILE_NAME}"


def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_account_file_path: Path) -> Tuple[bool, str]:
Expand All @@ -31,8 +44,11 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_a
storage_client = storage.Client(credentials=credentials)
bucket = storage_client.bucket(bucket_name)

version_blob = bucket.blob(f"metadata/{metadata.data.dockerRepository}/{metadata.data.dockerImageTag}/metadata.yaml")
latest_blob = bucket.blob(f"metadata/{metadata.data.dockerRepository}/latest/metadata.yaml")
version_path = get_metadata_file_path(metadata.data.dockerRepository, metadata.data.dockerImageTag)
latest_path = get_metadata_file_path(metadata.data.dockerRepository, "latest")

version_blob = bucket.blob(version_path)
latest_blob = bucket.blob(latest_path)
if not version_blob.exists():
version_blob.upload_from_filename(str(metadata_file_path))
uploaded = True
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dagster import Definitions

from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_file_manager, gcs_file_blob
from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_file_manager, gcs_file_blob, gcs_directory_blobs
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory
from orchestrator.resources.local import simple_local_file_manager

Expand All @@ -25,6 +25,7 @@
from orchestrator.assets.metadata import (
catalog_derived_metadata_definitions,
valid_metadata_report_dataframe,
metadata_definitions,
)

from orchestrator.assets.dev import (
Expand All @@ -34,12 +35,15 @@
cloud_catalog_diff,
cloud_catalog_diff_dataframe,
oss_catalog_diff_dataframe,
metadata_directory_report,
)

from orchestrator.jobs.catalog import generate_catalog_markdown, generate_local_metadata_files
from orchestrator.jobs.catalog import generate_catalog_markdown, generate_local_metadata_files, generate_catalog
from orchestrator.sensors.catalog import catalog_updated_sensor
from orchestrator.sensors.metadata import metadata_updated_sensor

from orchestrator.config import REPORT_FOLDER, CATALOG_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER

ASSETS = [
oss_destinations_dataframe,
Expand All @@ -64,6 +68,8 @@
cloud_catalog_from_metadata,
cloud_catalog_diff_dataframe,
oss_catalog_diff_dataframe,
metadata_directory_report,
metadata_definitions,
]

RESOURCES = {
Expand All @@ -77,14 +83,16 @@
}
),
"gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}),
"catalog_report_directory_manager": gcs_file_manager.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "gcs_prefix": REPORT_FOLDER}
),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
"catalog_report_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER}),
"metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": METADATA_FILE_NAME}),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
}

SENSORS = [catalog_updated_sensor(job=generate_catalog_markdown, resources_def=RESOURCES)]
SENSORS = [
catalog_updated_sensor(job=generate_catalog_markdown, resources_def=RESOURCES),
metadata_updated_sensor(job=generate_catalog, resources_def=RESOURCES),
]

SCHEDULES = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,36 +132,37 @@ def connector_catalog_location_markdown(context, all_destinations_dataframe, all


@asset(group_name=GROUP_NAME)
def all_destinations_dataframe(
cloud_destinations_dataframe, oss_destinations_dataframe, github_connector_folders, valid_metadata_report_dataframe, cached_specs
def all_sources_dataframe(
cloud_sources_dataframe, oss_sources_dataframe, github_connector_folders, valid_metadata_report_dataframe, cached_specs
) -> pd.DataFrame:
"""
Merge the cloud and oss destinations catalogs into a single dataframe.
Merge the cloud and oss sources catalogs into a single dataframe.
"""

return augment_and_normalize_connector_dataframes(
cloud_df=cloud_destinations_dataframe,
oss_df=oss_destinations_dataframe,
primaryKey="destinationDefinitionId",
connector_type="destination",
cloud_df=cloud_sources_dataframe,
oss_df=oss_sources_dataframe,
primaryKey="sourceDefinitionId",
connector_type="source",
valid_metadata_report_dataframe=valid_metadata_report_dataframe,
github_connector_folders=github_connector_folders,
cached_specs=cached_specs,
)


@asset(group_name=GROUP_NAME)
def all_sources_dataframe(
cloud_sources_dataframe, oss_sources_dataframe, github_connector_folders, valid_metadata_report_dataframe, cached_specs
def all_destinations_dataframe(
cloud_destinations_dataframe, oss_destinations_dataframe, github_connector_folders, valid_metadata_report_dataframe, cached_specs
) -> pd.DataFrame:
"""
Merge the cloud and oss source catalogs into a single dataframe.
Merge the cloud and oss destinations catalogs into a single dataframe.
"""

return augment_and_normalize_connector_dataframes(
cloud_df=cloud_sources_dataframe,
oss_df=oss_sources_dataframe,
primaryKey="sourceDefinitionId",
connector_type="source",
cloud_df=cloud_destinations_dataframe,
oss_df=oss_destinations_dataframe,
primaryKey="destinationDefinitionId",
connector_type="destination",
valid_metadata_report_dataframe=valid_metadata_report_dataframe,
github_connector_folders=github_connector_folders,
cached_specs=cached_specs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,12 @@ def cloud_catalog_diff_dataframe(cloud_catalog_diff: dict) -> OutputDataFrame:
def oss_catalog_diff_dataframe(oss_catalog_diff: dict) -> OutputDataFrame:
diff_df = pd.DataFrame.from_dict(oss_catalog_diff)
return output_dataframe(diff_df)


@asset(required_resource_keys={"metadata_file_blobs"}, group_name=GROUP_NAME)
def metadata_directory_report(context):
metadata_file_blobs = context.resources.metadata_file_blobs
blobs = [blob.name for blob in metadata_file_blobs if blob.name.endswith("metadata.yaml")]
blobs_df = pd.DataFrame(blobs)

return output_dataframe(blobs_df)
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import numpy as np
from typing import List
from dagster import Output, asset
import yaml

from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1

from orchestrator.utils.object_helpers import are_values_equal, merge_values
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.models.metadata import PartialMetadataDefinition
Expand Down Expand Up @@ -206,3 +208,19 @@ def catalog_derived_metadata_definitions(
)
all_definitions = sources_metadata_list + destinations_metadata_list
return Output(all_definitions, metadata={"count": len(all_definitions)})


@asset(required_resource_keys={"metadata_file_blobs"}, group_name=GROUP_NAME)
def metadata_definitions(context):
metadata_file_blobs = context.resources.metadata_file_blobs

metadata_definitions = []
for blob in metadata_file_blobs:
yaml_string = blob.download_as_string().decode("utf-8")
metadata_dict = yaml.safe_load(yaml_string)
metadata_def = ConnectorMetadataDefinitionV1.parse_obj(metadata_dict)
metadata_definitions.append(metadata_def)

metadata_definitions_df = pd.DataFrame(metadata_definitions)

return output_dataframe(metadata_definitions_df)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CATALOG_FOLDER = "catalogs"
REPORT_FOLDER = "generated_reports"

CONNECTOR_REPO_NAME = "airbytehq/airbyte"
CONNECTORS_PATH = "airbyte-integrations/connectors"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster import define_asset_job


generate_catalog = define_asset_job(name="generate_catalog", selection=["metadata_directory_report", "metadata_definitions"])
generate_catalog_markdown = define_asset_job(
name="generate_catalog_markdown", selection=["connector_catalog_location_html", "connector_catalog_location_markdown"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def gcs_bucket_manager(resource_context: InitResourceContext) -> storage.Bucket:
required_resource_keys={"gcp_gcs_client"},
config_schema={
"gcs_bucket": StringSource,
"gcs_prefix": StringSource,
"prefix": StringSource,
},
)
def gcs_file_manager(resource_context) -> GCSFileManager:
Expand All @@ -53,14 +53,14 @@ def gcs_file_manager(resource_context) -> GCSFileManager:
return GCSFileManager(
client=storage_client,
gcs_bucket=resource_context.resource_config["gcs_bucket"],
gcs_base_key=resource_context.resource_config["gcs_prefix"],
gcs_base_key=resource_context.resource_config["prefix"],
)


@resource(
required_resource_keys={"gcs_bucket_manager"},
config_schema={
"gcs_prefix": StringSource,
"prefix": StringSource,
"gcs_filename": StringSource,
},
)
Expand All @@ -73,9 +73,9 @@ def gcs_file_blob(resource_context: InitResourceContext) -> storage.Blob:
"""
bucket = resource_context.resources.gcs_bucket_manager

gcs_prefix = resource_context.resource_config["gcs_prefix"]
prefix = resource_context.resource_config["prefix"]
gcs_filename = resource_context.resource_config["gcs_filename"]
gcs_file_path = f"{gcs_prefix}/{gcs_filename}"
gcs_file_path = f"{prefix}/{gcs_filename}"

resource_context.log.info(f"retrieving gcs file blob for {gcs_file_path}")

Expand All @@ -84,3 +84,27 @@ def gcs_file_blob(resource_context: InitResourceContext) -> storage.Blob:
raise Exception(f"File does not exist at path: {gcs_file_path}")

return gcs_file_blob


@resource(
required_resource_keys={"gcs_bucket_manager"},
config_schema={
"prefix": StringSource,
"suffix": StringSource,
},
)
def gcs_directory_blobs(resource_context: InitResourceContext) -> storage.Blob:
"""
List all blobs in a bucket that match the prefix.
"""
bucket = resource_context.resources.gcs_bucket_manager
prefix = resource_context.resource_config["prefix"]
suffix = resource_context.resource_config["suffix"]

resource_context.log.info(f"retrieving gcs file blobs for prefix: {prefix}, suffix: {suffix}")

gcs_file_blobs = bucket.list_blobs(prefix=prefix)
if suffix:
gcs_file_blobs = [blob for blob in gcs_file_blobs if blob.name.endswith(suffix)]

return gcs_file_blobs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from typing import List
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus


def generate_composite_etag_cursor(etags: List[str]):
return ":".join(etags)
from orchestrator.utils.dagster_helpers import serialize_composite_etags_cursor


def catalog_updated_sensor(job, resources_def) -> SensorDefinition:
Expand All @@ -27,7 +24,7 @@ def catalog_updated_sensor_definition(context: SensorEvaluationContext):
etag_cursor = context.cursor or None
context.log.info(f"Old etag cursor: {etag_cursor}")

new_etag_cursor = generate_composite_etag_cursor(
new_etag_cursor = serialize_composite_etags_cursor(
[resources.latest_oss_catalog_gcs_file.etag, resources.latest_cloud_catalog_gcs_file.etag]
)
context.log.info(f"New etag cursor: {new_etag_cursor}")
Expand All @@ -38,7 +35,7 @@ def catalog_updated_sensor_definition(context: SensorEvaluationContext):
context.log.info("No new catalogs in GCS bucket")
return SkipReason("No new catalogs in GCS bucket")

context.update_cursor(new_etag_cursor) # Question: what happens if the run fails? is the cursor still updated?
context.update_cursor(new_etag_cursor)
context.log.info("New catalogs in GCS bucket")
return RunRequest(run_key="updated_catalogs")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus
from orchestrator.utils.dagster_helpers import deserialize_composite_etags_cursor, serialize_composite_etags_cursor


def metadata_updated_sensor(job, resources_def) -> SensorDefinition:
"""
This sensor is responsible for polling the metadata folder in GCS for new or updated metadata files.
If it notices that the etags have changed, it will trigger the given job.
"""

@sensor(
name=f"{job.name}_on_metadata_updated",
job=job,
minimum_interval_seconds=30,
default_status=DefaultSensorStatus.STOPPED,
)
def metadata_updated_sensor_definition(context: SensorEvaluationContext):
context.log.info("Starting gcs_metadata_updated_sensor")

with build_resources(resources_def) as resources:
context.log.info("Got resources for gcs_metadata_updated_sensor")

etags_cursor_raw = context.cursor or None
etags_cursor = deserialize_composite_etags_cursor(etags_cursor_raw)
etags_cursor_set = set(etags_cursor)

context.log.info(f"Old etag cursor: {etags_cursor}")

metadata_file_blobs = resources.metadata_file_blobs
new_etags_cursor_set = {blob.etag for blob in metadata_file_blobs}
context.log.info(f"New etag cursor: {new_etags_cursor_set}")

# Note: ETAGs are GCS's way of providing a version number for a file
# Another option would be to use the last modified date or MD5 hash
if etags_cursor_set == new_etags_cursor_set:
context.log.info("No new updated_metadata_files in GCS bucket")
return SkipReason("No new updated_metadata_files in GCS bucket")

serialized_new_etags_cursor = serialize_composite_etags_cursor(list(new_etags_cursor_set))
context.update_cursor(serialized_new_etags_cursor)
context.log.info("New updated_metadata_files in GCS bucket")
return RunRequest(run_key="updated_metadata_files")

return metadata_updated_sensor_definition
Loading

0 comments on commit e20d844

Please sign in to comment.