Skip to content

Commit

Permalink
Move metadata to own file
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch committed Apr 12, 2023
1 parent 2da04fa commit 643b2b5
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
)

from orchestrator.jobs.catalog import generate_catalog_markdown, generate_local_metadata_files, generate_catalog
from orchestrator.sensors.catalog import catalog_updated_sensor, metadata_updated_sensor
from orchestrator.sensors.catalog import catalog_updated_sensor
from orchestrator.sensors.metadata import metadata_updated_sensor

from orchestrator.config import REPORT_FOLDER, CATALOG_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME, METADATA_FOLDER

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


generate_catalog = define_asset_job(
name="generate_catalog", selection=["metadata_directory_report"]
name="generate_catalog", selection=["metadata_directory_report", "metadata_definitions"]
)
generate_catalog_markdown = define_asset_job(
name="generate_catalog_markdown", selection=["connector_catalog_location_html", "connector_catalog_location_markdown"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
from typing import List, Optional
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus

CURSOR_SEPARATOR = ":"

def deserialize_composite_etag_cursor(etag_cursor: Optional[str]) -> List[str]:
if etag_cursor is None:
return []

return etag_cursor.split(CURSOR_SEPARATOR)
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus

def serialize_composite_etag_cursor(etags: List[str]):
return CURSOR_SEPARATOR.join(etags)
from orchestrator.utils.dagster_helpers import serialize_composite_etag_cursor


def catalog_updated_sensor(job, resources_def) -> SensorDefinition:
Expand Down Expand Up @@ -45,51 +36,8 @@ def catalog_updated_sensor_definition(context: SensorEvaluationContext):
context.log.info("No new catalogs in GCS bucket")
return SkipReason("No new catalogs in GCS bucket")

context.update_cursor(new_etag_cursor) # Question: what happens if the run fails? is the cursor still updated?
context.update_cursor(new_etag_cursor)
context.log.info("New catalogs in GCS bucket")
return RunRequest(run_key="updated_catalogs")

return catalog_updated_sensor_definition


def metadata_updated_sensor(job, resources_def) -> SensorDefinition:
"""
TODO
"""

@sensor(
name=f"{job.name}_on_metadata_updated",
job=job,
minimum_interval_seconds=30,
default_status=DefaultSensorStatus.STOPPED,
)
def metadata_updated_sensor_definition(context: SensorEvaluationContext):
context.log.info("Starting gcs_metadata_updated_sensor")

with build_resources(resources_def) as resources:
context.log.info("Got resources for gcs_metadata_updated_sensor")

etag_cursor_raw = context.cursor or None
etag_cursor = deserialize_composite_etag_cursor(etag_cursor_raw)
etag_cursor_set = set(etag_cursor)

context.log.info(f"Old etag cursor: {etag_cursor}")

metadata_folder_blobs = resources.metadata_folder_blobs
## TODO, yaml vs yml
new_etag_cursors = [blob.etag for blob in metadata_folder_blobs if blob.name.endswith("metadata.yaml")]
new_etag_cursor_set = set(new_etag_cursors)
context.log.info(f"New etag cursor: {new_etag_cursor_set}")

# Note: ETAGs are GCS's way of providing a version number for a file
# Another option would be to use the last modified date or MD5 hash
if etag_cursor_set == new_etag_cursor_set:
context.log.info("No new updated_metadata_files in GCS bucket")
return SkipReason("No new updated_metadata_files in GCS bucket")

serialized_new_etag_cursor = serialize_composite_etag_cursor(new_etag_cursors)
context.update_cursor(serialized_new_etag_cursor) # Question: what happens if the run fails? is the cursor still updated?
context.log.info("New updated_metadata_files in GCS bucket")
return RunRequest(run_key="updated_metadata_files")

return metadata_updated_sensor_definition
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from dagster import sensor, RunRequest, SkipReason, SensorDefinition, SensorEvaluationContext, build_resources, DefaultSensorStatus
from orchestrator.utils.dagster_helpers import deserialize_composite_etag_cursor, serialize_composite_etag_cursor


def metadata_updated_sensor(job, resources_def) -> SensorDefinition:
"""
This sensor is responsible for polling the metadata folder in GCS for new or updated metadata files.
If it notices that the etags have changed, it will trigger the given job.
"""

@sensor(
name=f"{job.name}_on_metadata_updated",
job=job,
minimum_interval_seconds=30,
default_status=DefaultSensorStatus.STOPPED,
)
def metadata_updated_sensor_definition(context: SensorEvaluationContext):
context.log.info("Starting gcs_metadata_updated_sensor")

with build_resources(resources_def) as resources:
context.log.info("Got resources for gcs_metadata_updated_sensor")

etag_cursor_raw = context.cursor or None
etag_cursor = deserialize_composite_etag_cursor(etag_cursor_raw)
etag_cursor_set = set(etag_cursor)

context.log.info(f"Old etag cursor: {etag_cursor}")

metadata_folder_blobs = resources.metadata_folder_blobs
## TODO, yaml vs yml
new_etag_cursors = [blob.etag for blob in metadata_folder_blobs if blob.name.endswith("metadata.yaml")]
new_etag_cursor_set = set(new_etag_cursors)
context.log.info(f"New etag cursor: {new_etag_cursor_set}")

# Note: ETAGs are GCS's way of providing a version number for a file
# Another option would be to use the last modified date or MD5 hash
if etag_cursor_set == new_etag_cursor_set:
context.log.info("No new updated_metadata_files in GCS bucket")
return SkipReason("No new updated_metadata_files in GCS bucket")

serialized_new_etag_cursor = serialize_composite_etag_cursor(new_etag_cursors)
context.update_cursor(serialized_new_etag_cursor)
context.log.info("New updated_metadata_files in GCS bucket")
return RunRequest(run_key="updated_metadata_files")

return metadata_updated_sensor_definition
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
from dagster import MetadataValue, Output
import pandas as pd
from typing import NewType
from typing import Optional, List

OutputDataFrame = Output[pd.DataFrame]
CURSOR_SEPARATOR = ":"


def output_dataframe(result_df: pd.DataFrame) -> Output[pd.DataFrame]:
"""
Returns a Dagster Output object with a dataframe as the result and a markdown preview.
"""
return Output(result_df, metadata={"count": len(result_df), "preview": MetadataValue.md(result_df.to_markdown())})


def deserialize_composite_etag_cursor(etag_cursor: Optional[str]) -> List[str]:
if etag_cursor is None:
return []

return etag_cursor.split(CURSOR_SEPARATOR)

def serialize_composite_etag_cursor(etags: List[str]):
return CURSOR_SEPARATOR.join(etags)

0 comments on commit 643b2b5

Please sign in to comment.