Skip to content

Commit

Permalink
Add PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch committed Apr 15, 2023
1 parent a9a5d1e commit f86dce8
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
METADATA_FILE_NAME = "metadata.yaml"
METADATA_FOLDER = "metadata"
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@
from google.cloud import storage
from google.oauth2 import service_account
from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER


def get_metadata_file_path(dockerRepository: str, version: str) -> str:
"""Get the path to the metadata file for a specific version of a connector.
Args:
dockerRepository (str): Name of the connector docker image.
version (str): Version of the connector.
Returns:
str: Path to the metadata file.
"""
return f"{METADATA_FOLDER}/{dockerRepository}/{version}/{METADATA_FILE_NAME}"


def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_account_file_path: Path) -> Tuple[bool, str]:
Expand All @@ -31,8 +44,11 @@ def upload_metadata_to_gcs(bucket_name: str, metadata_file_path: Path, service_a
storage_client = storage.Client(credentials=credentials)
bucket = storage_client.bucket(bucket_name)

version_blob = bucket.blob(f"metadata/{metadata.data.dockerRepository}/{metadata.data.dockerImageTag}/metadata.yaml")
latest_blob = bucket.blob(f"metadata/{metadata.data.dockerRepository}/latest/metadata.yaml")
version_path = get_metadata_file_path(metadata.data.dockerRepository, metadata.data.dockerImageTag)
latest_path = get_metadata_file_path(metadata.data.dockerRepository, "latest")

version_blob = bucket.blob(version_path)
latest_blob = bucket.blob(latest_path)
if not version_blob.exists():
version_blob.upload_from_filename(str(metadata_file_path))
uploaded = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
from orchestrator.sensors.catalog import catalog_updated_sensor
from orchestrator.sensors.metadata import metadata_updated_sensor

from orchestrator.config import REPORT_FOLDER, CATALOG_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME, METADATA_FOLDER
from orchestrator.config import REPORT_FOLDER, CATALOG_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER

ASSETS = [
oss_destinations_dataframe,
Expand Down Expand Up @@ -82,12 +83,10 @@
}
),
"gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}),
"catalog_report_directory_manager": gcs_file_manager.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "gcs_prefix": REPORT_FOLDER}
),
"metadata_folder_blobs": gcs_directory_blobs.configured({"gcs_prefix": METADATA_FOLDER}),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
"catalog_report_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER}),
"metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": METADATA_FILE_NAME}),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
}

SENSORS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ def oss_catalog_diff_dataframe(oss_catalog_diff: dict) -> OutputDataFrame:
return output_dataframe(diff_df)


@asset(required_resource_keys={"metadata_folder_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"metadata_file_blobs"}, group_name=GROUP_NAME)
def metadata_directory_report(context):
metadata_folder_blobs = context.resources.metadata_folder_blobs
blobs = [blob.name for blob in metadata_folder_blobs if blob.name.endswith("metadata.yaml")]
metadata_file_blobs = context.resources.metadata_file_blobs
blobs = [blob.name for blob in metadata_file_blobs if blob.name.endswith("metadata.yaml")]
blobs_df = pd.DataFrame(blobs)

return output_dataframe(blobs_df)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import yaml

from metadata_service.models.generated.ConnectorMetadataDefinitionV1 import ConnectorMetadataDefinitionV1

from orchestrator.utils.object_helpers import are_values_equal, merge_values
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.models.metadata import PartialMetadataDefinition
Expand Down Expand Up @@ -209,13 +210,12 @@ def catalog_derived_metadata_definitions(
return Output(all_definitions, metadata={"count": len(all_definitions)})


@asset(required_resource_keys={"metadata_folder_blobs"}, group_name=GROUP_NAME)
@asset(required_resource_keys={"metadata_file_blobs"}, group_name=GROUP_NAME)
def metadata_definitions(context):
metadata_folder_blobs = context.resources.metadata_folder_blobs
blobs = [blob for blob in metadata_folder_blobs if blob.name.endswith("metadata.yml")]
metadata_file_blobs = context.resources.metadata_file_blobs

metadata_definitions = []
for blob in blobs:
for blob in metadata_file_blobs:
yaml_string = blob.download_as_string().decode("utf-8")
metadata_dict = yaml.safe_load(yaml_string)
metadata_def = ConnectorMetadataDefinitionV1.parse_obj(metadata_dict)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CATALOG_FOLDER = "catalogs"
REPORT_FOLDER = "generated_reports"
METADATA_FOLDER = "metadata"

CONNECTOR_REPO_NAME = "airbytehq/airbyte"
CONNECTORS_PATH = "airbyte-integrations/connectors"
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def gcs_bucket_manager(resource_context: InitResourceContext) -> storage.Bucket:
required_resource_keys={"gcp_gcs_client"},
config_schema={
"gcs_bucket": StringSource,
"gcs_prefix": StringSource,
"prefix": StringSource,
},
)
def gcs_file_manager(resource_context) -> GCSFileManager:
Expand All @@ -53,14 +53,14 @@ def gcs_file_manager(resource_context) -> GCSFileManager:
return GCSFileManager(
client=storage_client,
gcs_bucket=resource_context.resource_config["gcs_bucket"],
gcs_base_key=resource_context.resource_config["gcs_prefix"],
gcs_base_key=resource_context.resource_config["prefix"],
)


@resource(
required_resource_keys={"gcs_bucket_manager"},
config_schema={
"gcs_prefix": StringSource,
"prefix": StringSource,
"gcs_filename": StringSource,
},
)
Expand All @@ -73,9 +73,9 @@ def gcs_file_blob(resource_context: InitResourceContext) -> storage.Blob:
"""
bucket = resource_context.resources.gcs_bucket_manager

gcs_prefix = resource_context.resource_config["gcs_prefix"]
prefix = resource_context.resource_config["prefix"]
gcs_filename = resource_context.resource_config["gcs_filename"]
gcs_file_path = f"{gcs_prefix}/{gcs_filename}"
gcs_file_path = f"{prefix}/{gcs_filename}"

resource_context.log.info(f"retrieving gcs file blob for {gcs_file_path}")

Expand All @@ -89,18 +89,22 @@ def gcs_file_blob(resource_context: InitResourceContext) -> storage.Blob:
@resource(
required_resource_keys={"gcs_bucket_manager"},
config_schema={
"gcs_prefix": StringSource,
"prefix": StringSource,
"suffix": StringSource,
},
)
def gcs_directory_blobs(resource_context: InitResourceContext) -> storage.Blob:
"""
List all blobs in a bucket that match the prefix.
"""
bucket = resource_context.resources.gcs_bucket_manager
gcs_prefix = resource_context.resource_config["gcs_prefix"]
prefix = resource_context.resource_config["prefix"]
suffix = resource_context.resource_config["suffix"]

resource_context.log.info(f"retrieving gcs file blobs for {gcs_prefix}")
resource_context.log.info(f"retrieving gcs file blobs for prefix: {prefix}, suffix: {suffix}")

gcs_file_blobs = bucket.list_blobs(prefix=gcs_prefix)
gcs_file_blobs = bucket.list_blobs(prefix=prefix)
if suffix:
gcs_file_blobs = [blob for blob in gcs_file_blobs if blob.name.endswith(suffix)]

return gcs_file_blobs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus

from orchestrator.utils.dagster_helpers import serialize_composite_etag_cursor
from orchestrator.utils.dagster_helpers import serialize_composite_etags_cursor


def catalog_updated_sensor(job, resources_def) -> SensorDefinition:
Expand All @@ -24,7 +24,7 @@ def catalog_updated_sensor_definition(context: SensorEvaluationContext):
etag_cursor = context.cursor or None
context.log.info(f"Old etag cursor: {etag_cursor}")

new_etag_cursor = serialize_composite_etag_cursor(
new_etag_cursor = serialize_composite_etags_cursor(
[resources.latest_oss_catalog_gcs_file.etag, resources.latest_cloud_catalog_gcs_file.etag]
)
context.log.info(f"New etag cursor: {new_etag_cursor}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus
from orchestrator.utils.dagster_helpers import deserialize_composite_etag_cursor, serialize_composite_etag_cursor
from orchestrator.utils.dagster_helpers import deserialize_composite_etags_cursor, serialize_composite_etags_cursor


def metadata_updated_sensor(job, resources_def) -> SensorDefinition:
Expand All @@ -20,25 +20,24 @@ def metadata_updated_sensor_definition(context: SensorEvaluationContext):
with build_resources(resources_def) as resources:
context.log.info("Got resources for gcs_metadata_updated_sensor")

etag_cursor_raw = context.cursor or None
etag_cursor = deserialize_composite_etag_cursor(etag_cursor_raw)
etag_cursor_set = set(etag_cursor)
etags_cursor_raw = context.cursor or None
etags_cursor = deserialize_composite_etags_cursor(etags_cursor_raw)
etags_cursor_set = set(etags_cursor)

context.log.info(f"Old etag cursor: {etag_cursor}")
context.log.info(f"Old etag cursor: {etags_cursor}")

metadata_folder_blobs = resources.metadata_folder_blobs
new_etag_cursors = [blob.etag for blob in metadata_folder_blobs if blob.name.endswith("metadata.yml")]
new_etag_cursor_set = set(new_etag_cursors)
context.log.info(f"New etag cursor: {new_etag_cursor_set}")
metadata_file_blobs = resources.metadata_file_blobs
new_etags_cursor_set = {blob.etag for blob in metadata_file_blobs}
context.log.info(f"New etag cursor: {new_etags_cursor_set}")

# Note: ETAGs are GCS's way of providing a version number for a file
# Another option would be to use the last modified date or MD5 hash
if etag_cursor_set == new_etag_cursor_set:
if etags_cursor_set == new_etags_cursor_set:
context.log.info("No new updated_metadata_files in GCS bucket")
return SkipReason("No new updated_metadata_files in GCS bucket")

serialized_new_etag_cursor = serialize_composite_etag_cursor(new_etag_cursors)
context.update_cursor(serialized_new_etag_cursor)
serialized_new_etags_cursor = serialize_composite_etags_cursor(list(new_etags_cursor_set))
context.update_cursor(serialized_new_etags_cursor)
context.log.info("New updated_metadata_files in GCS bucket")
return RunRequest(run_key="updated_metadata_files")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,28 @@ def output_dataframe(result_df: pd.DataFrame) -> Output[pd.DataFrame]:
return Output(result_df, metadata={"count": len(result_df), "preview": MetadataValue.md(result_df.to_markdown())})


def deserialize_composite_etag_cursor(etag_cursor: Optional[str]) -> List[str]:
if etag_cursor is None:
return []
def deserialize_composite_etags_cursor(etag_cursors: Optional[str]) -> List[str]:
"""Deserialize a cursor string into a list of etags.
return etag_cursor.split(CURSOR_SEPARATOR)
Args:
etag_cursors (Optional[str]): A cursor string
Returns:
List[str]: A list of etags
"""
return etag_cursors.split(CURSOR_SEPARATOR) if etag_cursors else []


def serialize_composite_etags_cursor(etags: List[str]) -> str:
"""Serialize a list of etags into a cursor string.
def serialize_composite_etag_cursor(etags: List[str]):
Dagster cursors are strings, so we need to serialize the list of etags into a string.
https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors#idempotence-and-cursors
Args:
etags (List[str]): unique etag ids from GCS
Returns:
str: A cursor string
"""
return CURSOR_SEPARATOR.join(etags)
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def debug_catalog_projection():
),
"gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}),
"catalog_report_directory_manager": gcs_file_manager.configured(
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "gcs_prefix": REPORT_FOLDER}
{"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER}
),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"gcs_prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
"latest_cloud_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "cloud_catalog.json"}),
}

context = build_op_context(resources=resources)
Expand Down

0 comments on commit f86dce8

Please sign in to comment.