Skip to content

Commit

Permalink
Fix get_features_from_registry() issue for registered streaming sourc…
Browse files Browse the repository at this point in the history
…es & features (#1184)

* fix: update init arguments for SourceDef and SourceAttributes class to adopt KafkaSource registry

* fix: add brokers, topics and schemaStr to Atlas Type feathr_source_v1
  • Loading branch information
thurstonchen committed Jun 1, 2023
1 parent 2d1e192 commit 7aed2e9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
2 changes: 1 addition & 1 deletion feathr_project/feathr/registry/_feathr_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def dict_to_source(v: dict) -> Source:
registry_tags=v["attributes"].get("tags", {}))
elif type == "kafka":
# print('v["attributes"]', v["attributes"])
kafka_config = KafkaConfig(brokers=v["attributes"]["brokers"], topics=v["attributes"]["topics"], schema=AvroJsonSchema(schemaStr= v["attributes"]["schema"]))
kafka_config = KafkaConfig(brokers=v["attributes"].get("brokers", []), topics=v["attributes"].get ("topics", []), schema=AvroJsonSchema(schemaStr= v["attributes"].get("schemaStr", "")))
source = KafKaSource(name=v["attributes"]["name"], kafkaConfig=kafka_config, registry_tags=v["attributes"].get("tags", {}))
elif type == "generic":
options = v["attributes"].copy()
Expand Down
2 changes: 1 addition & 1 deletion feathr_project/feathr/registry/registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def source_to_def(source: Source) -> dict:
"type": "kafka",
"brokers":source.config.brokers,
"topics":source.config.topics,
"schema":source.config.schema.schemaStr
"schemaStr":source.config.schema.schemaStr
}
print("ret is", ret)
elif isinstance(source, SnowflakeSource):
Expand Down
19 changes: 17 additions & 2 deletions registry/purview-registry/registry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ def __init__(self,
name: str,
type: str,
path: str = None,
brokers: List[str] = None,
topics: List[str] = None,
schema_str: str = None,
preprocessing: Optional[str] = None,
event_timestamp_column: Optional[str] = None,
timestamp_format: Optional[str] = None,
Expand All @@ -442,6 +445,9 @@ def __init__(self,
self.name = name
self.type = type
self.path = path
self.brokers = brokers
self.topics = topics
self.schema_str = schema_str
self.preprocessing = preprocessing
self.event_timestamp_column = event_timestamp_column
self.timestamp_format = timestamp_format
Expand All @@ -459,6 +465,12 @@ def to_dict(self) -> Dict:
"path": self.path,
"tags": self.tags,
}
if self.brokers is not None:
ret["brokers"] = self.brokers
if self.topics is not None:
ret["topics"] = self.topics
if self.schema_str is not None:
ret["schemaStr"] = self.schema_str
if self.preprocessing is not None:
ret["preprocessing"] = self.preprocessing
if self.event_timestamp_column is not None:
Expand Down Expand Up @@ -703,7 +715,7 @@ def __init__(self,
path: str = None,
brokers: List[str] = None,
topics: List[str] = None,
schema: str = None,
schema_str: str = None,
preprocessing: Optional[str] = None,
event_timestamp_column: Optional[str] = None,
timestamp_format: Optional[str] = None,
Expand All @@ -714,7 +726,7 @@ def __init__(self,
self.type = type
self.brokers = brokers
self.topics = topics
self.schema = schema
self.schema_str = schema_str
self.preprocessing = preprocessing
self.event_timestamp_column = event_timestamp_column
self.timestamp_format = timestamp_format
Expand All @@ -725,6 +737,9 @@ def to_attr(self) -> SourceAttributes:
name=self.name,
type=self.type,
path=self.path,
brokers=self.brokers,
topics=self.topics,
schema_str=self.schema_str,
preprocessing=self.preprocessing,
event_timestamp_column=self.event_timestamp_column,
timestamp_format=self.timestamp_format,
Expand Down
6 changes: 6 additions & 0 deletions registry/purview-registry/registry/purview_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,12 @@ def _register_feathr_feature_types(self):

AtlasAttributeDef(
name="path", typeName="string", cardinality=Cardinality.SINGLE),
AtlasAttributeDef(name="brokers",
typeName="array<string>", cardinality=Cardinality.SINGLE),
AtlasAttributeDef(name="topics",
typeName="array<string>", cardinality=Cardinality.SINGLE),
AtlasAttributeDef(name="schemaStr",
typeName="string", cardinality=Cardinality.SINGLE),
AtlasAttributeDef(name="event_timestamp_column",
typeName="string", cardinality=Cardinality.SINGLE),
AtlasAttributeDef(name="timestamp_format",
Expand Down

0 comments on commit 7aed2e9

Please sign in to comment.