Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): remove get_platform_instance_id from stateful ingestion #7572

Merged
merged 2 commits into from Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 1 addition & 16 deletions metadata-ingestion/src/datahub/cli/state_cli.py
@@ -1,6 +1,5 @@
import json
import logging
from typing import Optional

import click
from click_default_group import DefaultGroup
Expand Down Expand Up @@ -29,34 +28,20 @@ def state() -> None:
@state.command()
@click.option("--pipeline-name", required=True, type=str)
@click.option("--platform", required=True, type=str)
@click.option("--platform-instance", required=False, type=str)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def inspect(
pipeline_name: str, platform: str, platform_instance: Optional[str]
) -> None:
def inspect(pipeline_name: str, platform: str) -> None:
"""
Get the latest stateful ingestion state for a given pipeline.
Only works for state entity removal for now.
"""

# Note that the platform-instance argument is not generated consistently,
# and is not always equal to the platform_instance config.

datahub_graph = get_default_graph()
checkpoint_provider = DatahubIngestionCheckpointingProvider(datahub_graph, "cli")

job_name = StaleEntityRemovalHandler.compute_job_id(platform)

raw_checkpoint = checkpoint_provider.get_latest_checkpoint(pipeline_name, job_name)
if raw_checkpoint is None and platform_instance is not None:
logger.info(
"Failed to fetch state, but trying legacy URN format because platform_instance is provided."
)
raw_checkpoint = checkpoint_provider.get_latest_checkpoint(
pipeline_name, job_name, platform_instance_id=platform_instance
)

if not raw_checkpoint:
click.secho("No ingestion state found.", fg="red")
exit(1)
Expand Down
@@ -1,6 +1,6 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, NewType, Type, TypeVar
from typing import Any, Dict, NewType, Optional, Type, TypeVar

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
Expand Down Expand Up @@ -43,6 +43,14 @@ def create(
def commit(self) -> None:
pass

@abstractmethod
def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
) -> Optional[DatahubIngestionCheckpointClass]:
pass

@staticmethod
def get_data_job_urn(
orchestrator: str,
Expand All @@ -53,14 +61,3 @@ def get_data_job_urn(
Standardizes datajob urn minting for all ingestion job state providers.
"""
return builder.make_data_job_urn(orchestrator, pipeline_name, job_name)

@staticmethod
def get_data_job_legacy_urn(
orchestrator: str,
pipeline_name: str,
job_name: JobId,
platform_instance_id: str,
) -> str:
return IngestionCheckpointingProviderBase.get_data_job_urn(
orchestrator, f"{pipeline_name}_{platform_instance_id}", job_name
)
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Expand Up @@ -1240,6 +1240,3 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:

def get_report(self):
return self.report

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform
Expand Up @@ -433,13 +433,6 @@ def get_dataplatform_instance_aspect(
else:
return None

def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
return f"{self.platform}"

def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
return BigQueryDatasetKey(
project_id=db_name,
Expand Down
Expand Up @@ -415,8 +415,3 @@ def _parse_into_dbt_column(self, column: Dict) -> DBTColumn:
def get_external_url(self, node: DBTNode) -> Optional[str]:
# TODO: Once dbt Cloud supports deep linking to specific files, we can use that.
return f"https://cloud.getdbt.com/next/accounts/{self.config.account_id}/projects/{self.config.project_id}/develop"

def get_platform_instance_id(self) -> Optional[str]:
"""The DBT project identifier is used as platform instance."""

return f"{self.platform}_{self.config.project_id}"
13 changes: 0 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Expand Up @@ -482,16 +482,3 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
if self.config.git_info and node.dbt_file_path:
return self.config.git_info.get_url_for_file_path(node.dbt_file_path)
return None

def get_platform_instance_id(self) -> Optional[str]:
"""The DBT project identifier is used as platform instance."""

project_id = (
self.load_file_as_json(self.config.manifest_path)
.get("metadata", {})
.get("project_id")
)
if project_id is None:
raise ValueError("DBT project identifier is not found in manifest")

return f"{self.platform}_{project_id}"
Expand Up @@ -318,9 +318,6 @@ def _get_avro_schema_from_data_type(self, column: NestedField) -> Dict[str, Any]
],
}

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance

def get_report(self) -> SourceReport:
return self.report

Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Expand Up @@ -187,9 +187,6 @@ def init_kafka_admin_client(self) -> None:
f"Failed to create Kafka Admin Client due to error {e}.",
)

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance

@classmethod
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":
config: KafkaSourceConfig = KafkaSourceConfig.parse_obj(config_dict)
Expand Down
7 changes: 0 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/ldap.py
Expand Up @@ -288,13 +288,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

cookie = set_cookie(self.lc, pctrls)

def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
return self.config.ldap_server

def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
"""
Handle a DN and attributes by adding manager info and constructing a
Expand Down
Expand Up @@ -1357,8 +1357,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def get_report(self) -> SourceReport:
return self.reporter

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform

def close(self):
self.prepare_for_commit()
Expand Up @@ -1778,8 +1778,5 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
def get_report(self):
return self.reporter

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform

def close(self):
self.prepare_for_commit()
Expand Up @@ -917,9 +917,6 @@ def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext):
run_id=ctx.run_id,
)

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_name

@classmethod
def create(cls, config_dict, ctx):
config = PowerBiDashboardSourceConfig.parse_obj(config_dict)
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Expand Up @@ -224,9 +224,6 @@ def _get_pulsar_metadata(self, url):
f"An ambiguous exception occurred while handling the request: {e}"
)

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance

@classmethod
def create(cls, config_dict, ctx):
config = PulsarSourceConfig.parse_obj(config_dict)
Expand Down
Expand Up @@ -1403,10 +1403,6 @@ def inspect_session_metadata(self) -> None:
except Exception:
self.report.edition = None

# Stateful Ingestion Overrides.
def get_platform_instance_id(self) -> Optional[str]:
return self.config.get_account()

# Ideally we do not want null values in sample data for a column.
# However that would require separate query per column and
# that would be expensive, hence not done.
Expand Down
10 changes: 0 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Expand Up @@ -392,16 +392,6 @@ def get_db_name(self, inspector: Inspector) -> str:
def get_schema_names(self, inspector):
return inspector.get_schema_names()

def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
config_dict = self.config.dict()
host_port = config_dict.get("host_port", "no_host_port")
database = config_dict.get("database", "no_database")
return f"{self.platform}_{host_port}_{database}"

def get_allowed_schemas(self, inspector: Inspector, db_name: str) -> Iterable[str]:
# this function returns the schema names which are filtered by schema_pattern.
for schema in self.get_schema_names(inspector):
Expand Down
Expand Up @@ -166,7 +166,9 @@ class StatefulIngestionSourceBase(Source):
"""

def __init__(
self, config: StatefulIngestionConfigBase, ctx: PipelineContext
self,
config: StatefulIngestionConfigBase[StatefulIngestionConfig],
ctx: PipelineContext,
) -> None:
super().__init__(ctx)
self.stateful_ingestion_config = config.stateful_ingestion
Expand Down Expand Up @@ -278,12 +280,6 @@ def is_checkpointing_enabled(self, job_id: JobId) -> bool:
raise ValueError(f"No use-case handler for job_id{job_id}")
return self._usecase_handlers[job_id].is_checkpointing_enabled()

def get_platform_instance_id(self) -> Optional[str]:
# This method is retained for backwards compatibility, but it is not
# required that new sources implement it. We mainly need it for the
# fallback logic in _get_last_checkpoint.
raise NotImplementedError("no platform_instance_id configured")

def _get_last_checkpoint(
self, job_id: JobId, checkpoint_state_class: Type[StateType]
) -> Optional[Checkpoint]:
Expand All @@ -292,28 +288,15 @@ def _get_last_checkpoint(
"""
last_checkpoint: Optional[Checkpoint] = None
if self.is_stateful_ingestion_configured():
# TRICKY: We currently don't include the platform_instance_id in the
# checkpoint urn, but we previously did. As such, we need to fallback
# and try the old urn format if the new format doesn't return anything.

# Obtain the latest checkpoint from GMS for this job.
assert self.ctx.pipeline_name
last_checkpoint_aspect = self.ingestion_checkpointing_state_provider.get_latest_checkpoint( # type: ignore
pipeline_name=self.ctx.pipeline_name,
job_name=job_id,
assert self.ingestion_checkpointing_state_provider
last_checkpoint_aspect = (
self.ingestion_checkpointing_state_provider.get_latest_checkpoint(
pipeline_name=self.ctx.pipeline_name,
job_name=job_id,
)
)
if last_checkpoint_aspect is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a breaking change that should be documented?

Copy link
Collaborator Author

@hsheth2 hsheth2 Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only case in which this would break is if someone upgraded directly from a 3 month old version to latest, in which case we won't be able to fetch previous state from stateful ingestion

# Try again with the platform_instance_id, if implemented.
try:
platform_instance_id = self.get_platform_instance_id()
except NotImplementedError:
pass
else:
last_checkpoint_aspect = self.ingestion_checkpointing_state_provider.get_latest_checkpoint( # type: ignore
pipeline_name=self.ctx.pipeline_name,
job_name=job_id,
platform_instance_id=platform_instance_id,
)

# Convert it to a first-class Checkpoint object.
last_checkpoint = Checkpoint[StateType].create_from_checkpoint_aspect(
Expand Down Expand Up @@ -355,6 +338,8 @@ def _prepare_checkpoint_states_for_commit(self) -> None:
# Perform validations
if not self.is_stateful_ingestion_configured():
return None
assert self.stateful_ingestion_config

if (
self.stateful_ingestion_config
and self.stateful_ingestion_config.ignore_new_state
Expand All @@ -378,7 +363,7 @@ def _prepare_checkpoint_states_for_commit(self) -> None:
job_checkpoint.prepare_for_commit()
try:
checkpoint_aspect = job_checkpoint.to_checkpoint_aspect(
self.stateful_ingestion_config.max_checkpoint_state_size # type: ignore
self.stateful_ingestion_config.max_checkpoint_state_size
)
except Exception as e:
logger.error(
Expand Down
Expand Up @@ -64,21 +64,15 @@ def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
platform_instance_id: Optional[str] = None,
) -> Optional[DatahubIngestionCheckpointClass]:
logger.debug(
f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" platformInstanceId:'{platform_instance_id}', job_name:'{job_name}'"
f" job_name:'{job_name}'"
)

if platform_instance_id is None:
data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)
else:
data_job_urn = self.get_data_job_legacy_urn(
self.orchestrator_name, pipeline_name, job_name, platform_instance_id
)
data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)

latest_checkpoint: Optional[
DatahubIngestionCheckpointClass
Expand All @@ -92,14 +86,14 @@ def get_latest_checkpoint(
if latest_checkpoint:
logger.debug(
f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" platformInstanceId:'{platform_instance_id}', job_name:'{job_name}' found with start_time:"
f" job_name:'{job_name}' found with start_time:"
f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}"
)
return latest_checkpoint
else:
logger.debug(
f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" platformInstanceId:'{platform_instance_id}', job_name:'{job_name}' found"
f" job_name:'{job_name}' found"
)

return None
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Expand Up @@ -2264,6 +2264,3 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance or self.platform
Expand Up @@ -160,9 +160,6 @@ def create(cls, config_dict, ctx):
config = UnityCatalogSourceConfig.parse_obj(config_dict)
return cls(ctx=ctx, config=config)

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance or self.platform

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
Expand Down