Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): json-schema - add json schema support for files and kaf… #7361

Merged
merged 4 commits into from
Feb 19, 2023

Conversation

shirshanka
Copy link
Contributor

@shirshanka shirshanka commented Feb 17, 2023

…ka schema registry

Adds support for JSON schemas stored in:

  • File
  • Directory
  • Kafka Schema Registry

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • [ x] Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 17, 2023
field_path = field_path.expand_type(discriminated_type, schema)

for field_name, field_schema in schema.get("properties", {}).items():
required_field: bool = field_name in schema.get("required", [])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice default accessors

}
(union_category, union_category_schema) = [
(k, v) for k, v in union_category_map.items() if v
][0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this 0 index access is confusing me mind

raw_schema_string: Optional[str] = None,
) -> SchemaMetadata:
json_schema_as_string = raw_schema_string or json.dumps(json_schema)
md5_hash: str = md5(json_schema_as_string.encode()).hexdigest()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nicee!


schema_ref: SchemaReference
for schema_ref in schema.references:
ref_subject: str = schema_ref["subject"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming both "subject" and "version" are required fields?

all_schemas.extend(
self.get_schemas_from_confluent_ref_json(
reference_schema.schema,
name=schema_ref["name"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming "name" is a required field? (or maybe this method can take Nones?)

@@ -228,6 +301,21 @@ def _get_schema_fields(
imported_schemas,
is_key_schema=is_key_schema,
)
elif schema.schema_type == "JSON":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Constants for the raw strings

)
@capability(
SourceCapability.SCHEMA_METADATA,
"Schemas associated with each topic are extracted from the schema registry. Avro and Protobuf (certified), JSON (incubating). Schema references are supported.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!


class JsonSchemaSourceConfig(StatefulIngestionConfigBase):
path: Union[FilePath, DirectoryPath, AnyHttpUrl] = Field(
description="Set this to a single file-path or a directory-path (for recursive traversal) or a remote url. e.g. https://json.schemastore.org/petstore-v1.0.json"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice that you can point this to a JSON Web URL

if not JsonSchemaTranslator._get_id_from_any_schema(schema_dict):
schema_dict["$id"] = str(v)
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp_file:
tmp_file.write(json.dumps(schema_dict))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any try: except wrapping here when dealing with file read / write

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try-except-nice-error-message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +165 to +166
if reference.startswith("#/"):
parts = reference[2:].split("/")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: would like to see this tedious logic pulled into smaller well named functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already a function

base_path = dirname(str(path))
base_uri = "file://{}/".format(base_path)

with open(path) as schema_file:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on wrapping this open in try:except

Copy link
Contributor Author

@shirshanka shirshanka Feb 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outer try except per file should be enough here

if os.path.isdir(self.config.path):
for root, dirs, files in os.walk(self.config.path, topdown=False):
for file_name in [f for f in files if f.endswith(".json")]:
yield from self._load_one_file(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try-except-warn in case a single file fails to be loaded? Instead of tossing the connector run completely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

)
else:
ref_loader = jsonref.jsonloader
browse_prefix = f"/{self.config.env.lower()}/{self.config.platform}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: We are hoping to eradicate "env" - thoughts on keeping it out of this in favor of platform instance

}
fields: List[SchemaField] = list(
JsonSchemaTranslator.get_fields_from_schema(malformed_schema)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome tests!

@@ -376,7 +376,7 @@ def test_kafka_ignore_warnings_on_schema_type(
schema_id="schema_id_2",
schema=Schema(
schema_str="{}",
schema_type="JSON",
schema_type="UNKNOWN_TYPE",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to prevent schema parsing? Anything we'd want to do here to try out the schema parsing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we were asserting that if confluent schema registry sends us back JSON schemas, we would fail and we have a config flag to control how the connector behaves when we encounter a schema type we dont handle. Now since JSON schemas actually work, this test fails, so I had to change this to pass in a schema type that we don't handle.

Copy link
Collaborator

@jjoyce0510 jjoyce0510 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. I know you want to move on this but please take a look!

@@ -194,7 +194,7 @@ def compute_job_id(cls, platform: Optional[str]) -> JobId:
return JobId(f"{platform}_{job_name_suffix}" if platform else job_name_suffix)

def _init_job_id(self) -> JobId:
platform: Optional[str] = getattr(self.source, "platform")
platform: Optional[str] = getattr(self.source, "platform", "default")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@treff7es : wanted to bring this diff to your attention since it is related to stateful ingestion. Previously this handler was expecting the class to have a platform member variable and failing hard if it did not.

@shirshanka shirshanka merged commit 07e4d06 into datahub-project:master Feb 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants