Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing purview test issues and improve performance #350

Merged
merged 12 commits into from
Jun 15, 2022
1 change: 1 addition & 0 deletions feathr_project/feathr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
REGISTRY_TYPEDEF_VERSION="v1"

TYPEDEF_SOURCE=f'feathr_source_{REGISTRY_TYPEDEF_VERSION}'
# TODO: change the name from feathr_workspace_ to feathr_project_
TYPEDEF_FEATHR_PROJECT=f'feathr_workspace_{REGISTRY_TYPEDEF_VERSION}'
TYPEDEF_DERIVED_FEATURE=f'feathr_derived_feature_{REGISTRY_TYPEDEF_VERSION}'
TYPEDEF_ANCHOR=f'feathr_anchor_{REGISTRY_TYPEDEF_VERSION}'
Expand Down
152 changes: 103 additions & 49 deletions feathr_project/feathr/registry/_feature_registry_purview.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tracemalloc import stop
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
from time import sleep

from azure.identity import DefaultAzureCredential
from jinja2 import Template
Expand Down Expand Up @@ -75,6 +76,7 @@ def _register_feathr_feature_types(self):
type_feathr_project = EntityTypeDef(
name=TYPEDEF_FEATHR_PROJECT,
attributeDefs=[
# TODO: this should be called "anchors" rather than "anchor_features" to make it less confusing.
AtlasAttributeDef(
name="anchor_features", typeName=TYPEDEF_ARRAY_ANCHOR, cardinality=Cardinality.SET),
AtlasAttributeDef(
Expand Down Expand Up @@ -219,7 +221,7 @@ def _parse_anchors(self, anchor_list: List[FeatureAnchor]) -> List[AtlasEntity]:
# then parse the source of that anchor
source_entity = self._parse_source(anchor.source)
anchor_fully_qualified_name = self.project_name+self.registry_delimiter+anchor.name
original_id = self.get_feature_id(anchor_fully_qualified_name )
original_id = self.get_feature_id(anchor_fully_qualified_name, type=TYPEDEF_ANCHOR )
original_anchor = self.get_feature_by_guid(original_id) if original_id else None
merged_elements = self._merge_anchor(original_anchor,anchor_feature_entities)
anchor_entity = AtlasEntity(
Expand Down Expand Up @@ -733,18 +735,25 @@ def _delete_all_feathr_entities(self):

:param guid: The guid or guids you want to remove.
"""
entities = self.purview_client.discovery.search_entities(
"feathr*", limit=20)
# should not be large than this, otherwise the backend might throw out error
batch_delte_size = 100

# [print(entity) for entity in entities]
guid_list = [entity["id"] for entity in entities]
# use the `query` API so that it can return immediatelly (don't use the search_entity API as it will try to return all the results in a single request)

# should not be large than this, otherwise the backend might throw out error
batch_delte_size = 15
for i in range(0, len(guid_list), batch_delte_size):
self.purview_client.delete_entity(
guid=guid_list[i:i+batch_delte_size])
while True:
result = self.purview_client.discovery.query(
"feathr", limit=batch_delte_size)
logger.info("Total number of entities:",result['@search.count'] )

# if no results, break:
if result['@search.count'] == 0:
break
entities = result['value']
guid_list = [entity["id"] for entity in entities]
self.purview_client.delete_entity(guid=guid_list)
logger.info("{} feathr entities deleted", batch_delte_size)
# sleep here, otherwise backend might throttle
sleep(1)
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def _get_registry_client(self):
Expand All @@ -753,26 +762,42 @@ def _get_registry_client(self):
"""
return self.purview_client

def list_registered_features(self, project_name: str = None, limit=50, starting_offset=0) -> List[Dict[str,str]]:
def list_registered_features(self, project_name: str, limit=100, starting_offset=0) -> List[Dict[str,str]]:
"""
List all the already registered features. If project_name is not provided or is None, it will return all the
registered features; otherwise it will only return only features under this project
"""
entities = self.purview_client.discovery.search_entities(
f"entityType:{TYPEDEF_ANCHOR_FEATURE} or entityType:{TYPEDEF_DERIVED_FEATURE}", limit=limit, starting_offset=starting_offset)

feature_list = []

if not project_name:
raise RuntimeError("project_name must be specified.")

# get the corresponding features belongs to a certain project.
# note that we need to use "startswith" to filter out the features that don't belong to this project.
# see syntax here: https://docs.microsoft.com/en-us/rest/api/purview/catalogdataplane/discovery/query#discovery_query_andornested
query_filter = {
"and": [
{
"or":
[
{"entityType": TYPEDEF_DERIVED_FEATURE},
{"entityType": TYPEDEF_ANCHOR_FEATURE}
]
},
{
"attributeName": "qualifiedName",
"operator": "startswith",
"attributeValue": project_name + self.registry_delimiter
}
]
}
result = self.purview_client.discovery.query(filter=query_filter)

entities = result['value']

for entity in entities:
if project_name:
# if project_name is a valid string, only append entities if the qualified name start with
# project_name+delimiter
qualified_name: str = entity["qualifiedName"]
# split the name based on delimiter
result = qualified_name.split(self.registry_delimiter)
if result[0].casefold() == project_name:
feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']})
else:
# otherwise append all the entities
feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']})
feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']})

return feature_list

Expand Down Expand Up @@ -810,12 +835,16 @@ def get_feature_lineage(self, guid):
"""
return self.purview_client.get_entity_lineage(guid=guid)

def get_feature_id(self, qualifiedName):
def get_feature_id(self, qualifiedName, type: str):
"""
Get guid of a feature given its qualifiedName
"""
search_term = "qualifiedName:{0}".format(qualifiedName)
entities = self.purview_client.discovery.search_entities(search_term)
# the search term should be full qualified name
# purview_client.get_entity(qualifiedName=qualifiedName) might not work here since it requires an additonal typeName parameter
self.purview_client.get_entity(qualifiedName=qualifiedName, typeName=type)
search_term = "{0}".format(qualifiedName)
result = self.purview_client.discovery.query(search_term)
entities = result['value']
for entity in entities:
if entity.get('qualifiedName') == qualifiedName:
return entity.get('id')
Expand All @@ -829,7 +858,7 @@ def search_features(self, searchTerm):
entities = self.purview_client.discovery.search_entities(searchTerm)
return entities

def _list_registered_entities_with_details(self, project_name: str = None, entity_type: Union[str, List[str]] = None, limit=50, starting_offset=0,) -> List[Dict]:
def _list_registered_entities_with_details(self, project_name: str, entity_type: Union[str, List[str]] = None, limit=1000, starting_offset=0,) -> List[Dict]:
"""
List all the already registered entities. entity_type should be one of: SOURCE, DERIVED_FEATURE, ANCHOR, ANCHOR_FEATURE, FEATHR_PROJECT, or a list of those values
limit: a maximum 1000 will be enforced at the underlying API
Expand All @@ -844,30 +873,55 @@ def _list_registered_entities_with_details(self, project_name: str = None, entit
raise RuntimeError(
f'only SOURCE, DERIVED_FEATURE, ANCHOR, ANCHOR_FEATURE, FEATHR_PROJECT are supported when listing the registered entities, {entity_type} is not one of them.')

# the search grammar is less documented in Atlas/Purview.
# Here's the query grammar: https://atlas.apache.org/2.0.0/Search-Advanced.html
search_string = "".join(
[f" or entityType:{e}" for e in entity_type_list])
# remvoe the first additional " or "
search_string = search_string[4:]
result_entities = self.purview_client.discovery.search_entities(
search_string, limit=limit, starting_offset=starting_offset)
if project_name is None:
raise RuntimeError("You need to specify a project_name")
# the search grammar:
# https://docs.microsoft.com/en-us/azure/purview/how-to-search-catalog#search-query-syntax
# https://docs.microsoft.com/en-us/rest/api/datacatalog/data-catalog-search-syntax-reference

# get the corresponding features belongs to a certain project.
# note that we need to use "startswith" to filter out the features that don't belong to this project.
# see syntax here: https://docs.microsoft.com/en-us/rest/api/purview/catalogdataplane/discovery/query#discovery_query_andornested
# this search does the following:
# search all the entities that start with project_name+delimiter for all the search entities
# However, for TYPEDEF_FEATHR_PROJECT, it doesn't have delimiter in the qualifiedName
# Hence if TYPEDEF_FEATHR_PROJECT is in the `entity_type` input, we need to search for that specifically
# and finally "OR" the result to union them
query_filter = {
xiaoyongzhu marked this conversation as resolved.
Show resolved Hide resolved
"or":
[{
"and": [ {
"or": [{"entityType": e} for e in entity_type_list] # this is a list of the entity types that you want to query
},
{
"attributeName": "qualifiedName",
"operator": "startswith",
"attributeValue": project_name + self.registry_delimiter
}] },
{
"and": [ {
"or": [{"entityType": TYPEDEF_FEATHR_PROJECT}] if TYPEDEF_FEATHR_PROJECT in entity_type_list else None # this is a list of the entity types that you want to query
},
{
"attributeName": "qualifiedName",
"operator": "startswith",
"attributeValue": project_name
} ]} ]
}

# Important properties returned includes:
# id (the guid of the entity), name, qualifiedName, @search.score,
# and @search.highlights
guid_list = []
for entity in result_entities:
if project_name:
# if project_name is a valid string, only append entities if the qualified name start with
# project_name+delimiter
qualified_name: str = entity["qualifiedName"]
# split the name based on delimiter
result = qualified_name.split(self.registry_delimiter)
if result[0].casefold() == project_name:
guid_list.append(entity["id"])
else:
# otherwise append all the entities
guid_list.append(entity["id"])
result = self.purview_client.discovery.query(keywords=None, filter=query_filter, limit = limit)

logger.info(f"Total number of Feathr entities found: {result['@search.count']}" )
result_entities = result['value']

# append the guid list. Since we are using project_name + delimiter to search, all the result will be valid.
# Note that a max of 1000 will be enforced
# TODO: make sure it's scalalbe if there are more than 1000 entities
guid_list = [entity["id"] for entity in result_entities]

entity_res = [] if guid_list is None or len(guid_list)==0 else self.purview_client.get_entity(
guid=guid_list)["entities"]
return entity_res
Expand Down
3 changes: 3 additions & 0 deletions feathr_project/test/test_feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ def test_feathr_register_features_partially():
client.register_features()
time.sleep(30)
full_registration = client.get_features_from_registry(client.project_name)

now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])

client: FeathrClient = registry_test_setup_partially(os.path.join(test_workspace_dir, "feathr_config.yaml"))
new_project_name = client.project_name
Expand Down
15 changes: 4 additions & 11 deletions feathr_project/test/test_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,8 @@ def registry_test_setup(config_path: str):
client.build_features(anchor_list=[agg_anchor, request_anchor], derived_feature_list=derived_feature_list)
return client
def registry_test_setup_partially(config_path: str):


# use a new project name every time to make sure all features are registered correctly
now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])

"""Register a partial of a project. Will call `generate_entities()` and register only the first anchor feature.
"""
client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"})

request_anchor, agg_anchor, derived_feature_list = generate_entities()
Expand All @@ -185,11 +181,8 @@ def registry_test_setup_partially(config_path: str):
return client

def registry_test_setup_append(config_path: str):


# use a new project name every time to make sure all features are registered correctly
now = datetime.now()
os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)])
"""Append features to a project. Will call `generate_entities()` and register from the 2nd anchor feature
"""

client = FeathrClient(config_path=config_path, project_registry_tag={"for_test_purpose":"true"})

Expand Down