diff --git a/feathr_project/feathr/registry/_feathr_registry_client.py b/feathr_project/feathr/registry/_feathr_registry_client.py index 1b9191fbd..a70cd5e20 100644 --- a/feathr_project/feathr/registry/_feathr_registry_client.py +++ b/feathr_project/feathr/registry/_feathr_registry_client.py @@ -224,7 +224,7 @@ def dict_to_source(v: dict) -> Source: registry_tags=v["attributes"].get("tags", {})) elif type == "kafka": # print('v["attributes"]', v["attributes"]) - kafka_config = KafkaConfig(brokers=v["attributes"]["brokers"], topics=v["attributes"]["topics"], schema=AvroJsonSchema(schemaStr= v["attributes"]["schema"])) + kafka_config = KafkaConfig(brokers=v["attributes"].get("brokers", []), topics=v["attributes"].get ("topics", []), schema=AvroJsonSchema(schemaStr= v["attributes"].get("schemaStr", ""))) source = KafKaSource(name=v["attributes"]["name"], kafkaConfig=kafka_config, registry_tags=v["attributes"].get("tags", {})) elif type == "generic": options = v["attributes"].copy() diff --git a/feathr_project/feathr/registry/registry_utils.py b/feathr_project/feathr/registry/registry_utils.py index 081a32682..7f347d278 100644 --- a/feathr_project/feathr/registry/registry_utils.py +++ b/feathr_project/feathr/registry/registry_utils.py @@ -47,7 +47,7 @@ def source_to_def(source: Source) -> dict: "type": "kafka", "brokers":source.config.brokers, "topics":source.config.topics, - "schema":source.config.schema.schemaStr + "schemaStr":source.config.schema.schemaStr } print("ret is", ret) elif isinstance(source, SnowflakeSource): diff --git a/registry/purview-registry/registry/models.py b/registry/purview-registry/registry/models.py index cb4686719..871ff7619 100644 --- a/registry/purview-registry/registry/models.py +++ b/registry/purview-registry/registry/models.py @@ -434,6 +434,9 @@ def __init__(self, name: str, type: str, path: str = None, + brokers: List[str] = None, + topics: List[str] = None, + schema_str: str = None, preprocessing: Optional[str] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = None, @@ -442,6 +445,9 @@ def __init__(self, self.name = name self.type = type self.path = path + self.brokers = brokers + self.topics = topics + self.schema_str = schema_str self.preprocessing = preprocessing self.event_timestamp_column = event_timestamp_column self.timestamp_format = timestamp_format @@ -459,6 +465,12 @@ def to_dict(self) -> Dict: "path": self.path, "tags": self.tags, } + if self.brokers is not None: + ret["brokers"] = self.brokers + if self.topics is not None: + ret["topics"] = self.topics + if self.schema_str is not None: + ret["schemaStr"] = self.schema_str if self.preprocessing is not None: ret["preprocessing"] = self.preprocessing if self.event_timestamp_column is not None: @@ -703,7 +715,7 @@ def __init__(self, path: str = None, brokers: List[str] = None, topics: List[str] = None, - schema: str = None, + schema_str: str = None, preprocessing: Optional[str] = None, event_timestamp_column: Optional[str] = None, timestamp_format: Optional[str] = None, @@ -714,7 +726,7 @@ def __init__(self, self.type = type self.brokers = brokers self.topics = topics - self.schema = schema + self.schema_str = schema_str self.preprocessing = preprocessing self.event_timestamp_column = event_timestamp_column self.timestamp_format = timestamp_format @@ -725,6 +737,9 @@ def to_attr(self) -> SourceAttributes: name=self.name, type=self.type, path=self.path, + brokers=self.brokers, + topics=self.topics, + schema_str=self.schema_str, preprocessing=self.preprocessing, event_timestamp_column=self.event_timestamp_column, timestamp_format=self.timestamp_format, diff --git a/registry/purview-registry/registry/purview_registry.py b/registry/purview-registry/registry/purview_registry.py index 957c3b017..d14306bfd 100644 --- a/registry/purview-registry/registry/purview_registry.py +++ b/registry/purview-registry/registry/purview_registry.py @@ -524,6 +524,12 @@ def _register_feathr_feature_types(self): AtlasAttributeDef( name="path", typeName="string", cardinality=Cardinality.SINGLE), + AtlasAttributeDef(name="brokers", + typeName="array", cardinality=Cardinality.SINGLE), + AtlasAttributeDef(name="topics", + typeName="array", cardinality=Cardinality.SINGLE), + AtlasAttributeDef(name="schemaStr", + typeName="string", cardinality=Cardinality.SINGLE), AtlasAttributeDef(name="event_timestamp_column", typeName="string", cardinality=Cardinality.SINGLE), AtlasAttributeDef(name="timestamp_format",