Skip to content

Commit

Permalink
Run black
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch committed Apr 12, 2023
1 parent 643b2b5 commit 8fbeca0
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@

SENSORS = [
catalog_updated_sensor(job=generate_catalog_markdown, resources_def=RESOURCES),
metadata_updated_sensor(job=generate_catalog, resources_def=RESOURCES)
metadata_updated_sensor(job=generate_catalog, resources_def=RESOURCES),
]

SCHEDULES = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from dagster import MetadataValue, Output, asset

from orchestrator.templates.render import render_connector_catalog_locations_html, render_connector_catalog_locations_markdown
from orchestrator.utils.dagster_helpers import output_dataframe

GROUP_NAME = "catalog_reports"

Expand Down Expand Up @@ -150,6 +149,7 @@ def all_sources_dataframe(
cached_specs=cached_specs,
)


@asset(group_name=GROUP_NAME)
def all_destinations_dataframe(
cloud_destinations_dataframe, oss_destinations_dataframe, github_connector_folders, valid_metadata_report_dataframe, cached_specs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def oss_catalog_diff_dataframe(oss_catalog_diff: dict) -> OutputDataFrame:
diff_df = pd.DataFrame.from_dict(oss_catalog_diff)
return output_dataframe(diff_df)


@asset(required_resource_keys={"metadata_folder_blobs"}, group_name=GROUP_NAME)
def metadata_directory_report(context):
metadata_folder_blobs = context.resources.metadata_folder_blobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,11 @@ def catalog_derived_metadata_definitions(
all_definitions = sources_metadata_list + destinations_metadata_list
return Output(all_definitions, metadata={"count": len(all_definitions)})


@asset(required_resource_keys={"metadata_folder_blobs"}, group_name=GROUP_NAME)
def metadata_definitions(context):
metadata_folder_blobs = context.resources.metadata_folder_blobs
blobs = [blob for blob in metadata_folder_blobs if blob.name.endswith("metadata.yaml")]
blobs = [blob for blob in metadata_folder_blobs if blob.name.endswith("metadata.yml")]

metadata_definitions = []
for blob in blobs:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from dagster import define_asset_job


generate_catalog = define_asset_job(
name="generate_catalog", selection=["metadata_directory_report", "metadata_definitions"]
)
generate_catalog = define_asset_job(name="generate_catalog", selection=["metadata_directory_report", "metadata_definitions"])
generate_catalog_markdown = define_asset_job(
name="generate_catalog_markdown", selection=["connector_catalog_location_html", "connector_catalog_location_markdown"]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def gcs_file_blob(resource_context: InitResourceContext) -> storage.Blob:

return gcs_file_blob


@resource(
required_resource_keys={"gcs_bucket_manager"},
config_schema={
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus

from orchestrator.utils.dagster_helpers import serialize_composite_etag_cursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def metadata_updated_sensor_definition(context: SensorEvaluationContext):
context.log.info(f"Old etag cursor: {etag_cursor}")

metadata_folder_blobs = resources.metadata_folder_blobs
## TODO, yaml vs yml
new_etag_cursors = [blob.etag for blob in metadata_folder_blobs if blob.name.endswith("metadata.yaml")]
new_etag_cursors = [blob.etag for blob in metadata_folder_blobs if blob.name.endswith("metadata.yml")]
new_etag_cursor_set = set(new_etag_cursors)
context.log.info(f"New etag cursor: {new_etag_cursor_set}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ def deserialize_composite_etag_cursor(etag_cursor: Optional[str]) -> List[str]:

return etag_cursor.split(CURSOR_SEPARATOR)


def serialize_composite_etag_cursor(etags: List[str]):
return CURSOR_SEPARATOR.join(etags)

0 comments on commit 8fbeca0

Please sign in to comment.