Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fe9ce59
Initial pydantic version
ErnestoLoma Nov 10, 2022
356a5f5
Initial version of code generation
ErnestoLoma Nov 22, 2022
dd6b4a7
Initial version of EntityClient
ErnestoLoma Nov 23, 2022
58e967b
Add processing for relationship-attributeDefs
ErnestoLoma Nov 28, 2022
2ee83cc
Correct problem with typing of Referenceable
ErnestoLoma Nov 29, 2022
53ee234
Correct broken test
ErnestoLoma Nov 29, 2022
15a4685
Make type of typeName required Literal
ErnestoLoma Nov 29, 2022
6dad5ac
Correct typing problems
ErnestoLoma Nov 29, 2022
605474d
Added AssetMutationResponse
ErnestoLoma Nov 30, 2022
65c90f6
Refactored test data to data directory
ErnestoLoma Nov 30, 2022
0fffef3
Cleanup flake8 violations
ErnestoLoma Nov 30, 2022
dbdaed9
Refactored to separate unit and integration tests
ErnestoLoma Nov 30, 2022
425406b
Add logic for Announcments
ErnestoLoma Dec 5, 2022
4a0a8a1
Add the ability to update entities
ErnestoLoma Dec 6, 2022
a16c21b
Correct problems with failing unit tests
ErnestoLoma Dec 13, 2022
c1c5951
Added additional integration tests for purging and creating glossaries
ErnestoLoma Dec 13, 2022
809ad23
Cache for roles
cmgrote Dec 14, 2022
b852af0
Remove print statement
cmgrote Dec 14, 2022
04e44a6
Do not attempt to JSON-decode when there is no content (avoids an exc…
cmgrote Dec 14, 2022
7e473ef
Adds classification cache and related tests
cmgrote Dec 14, 2022
8a5145c
Adds custom metadata cache and tests, with relevant cleanups to typed…
cmgrote Dec 15, 2022
8ff8bfd
Cache cleanups and error encapsulation
cmgrote Dec 15, 2022
4f055f3
Refactored to support type hints for MutatedEntities
ErnestoLoma Dec 21, 2022
2b4db14
Merge branch 'prototype_sdk' of https://github.com/atlanhq/atlan-pyth…
ErnestoLoma Dec 21, 2022
9fedfa6
Fix mypy violations
ErnestoLoma Dec 22, 2022
651f66f
Add precommit hook to run tests
ErnestoLoma Dec 22, 2022
fdbe3c3
Add installation of requirement-dev.txt to github action
ErnestoLoma Dec 22, 2022
114a06c
Add environment variables for api key and host to github action
ErnestoLoma Dec 23, 2022
d769d35
Revise the name of the secret name in github action
ErnestoLoma Dec 23, 2022
95eefd3
Simplify complexity of atlan.py
ErnestoLoma Dec 26, 2022
996fade
Refactor upsert function to accept a list of assets
ErnestoLoma Dec 27, 2022
7026c45
Fix integration test
ErnestoLoma Dec 27, 2022
68e0a72
Fix integration test
ErnestoLoma Dec 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .bandit
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[bandit]
exclude = tests,pyatlan/generator
5 changes: 4 additions & 1 deletion .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
env: # Or as an environment variable
ATLAN_API_KEY: ${{ secrets.MARK_ATLAN_API_KEY }}
ATLAN_HOST: https://mark.atlan.com
run: |
pytest
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ wheels/
*.egg-info/
.installed.cfg
*.egg
*.DS_Store
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,12 @@ repos:
hooks:
- id: black
language_version: python3

- repo: local
hooks:
- id: tests
name: run tests
entry: pytest
language: system
types: [python]
stages: [push]
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[mypy]
Empty file added pyatlan/cache/__init__.py
Empty file.
65 changes: 65 additions & 0 deletions pyatlan/cache/classification_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from typing import Optional

from pyatlan.client.atlan import AtlanClient
from pyatlan.client.typedef import TypeDefClient
from pyatlan.model.enums import AtlanTypeCategory
from pyatlan.model.typedef import ClassificationDef


class ClassificationCache:

cache_by_id: dict[str, ClassificationDef] = dict()
map_id_to_name: dict[str, str] = dict()
map_name_to_id: dict[str, str] = dict()
deleted_ids: set[str] = set()
deleted_names: set[str] = set()

@classmethod
def _refresh_cache(cls) -> None:
response = TypeDefClient(AtlanClient()).get_typedefs(
type=AtlanTypeCategory.CLASSIFICATION
)
if response is not None:
cls.cache_by_id = dict()
cls.map_id_to_name = dict()
cls.map_name_to_id = dict()
for classification in response.classification_defs:
classification_id = classification.name
classification_name = classification.display_name
cls.cache_by_id[classification_id] = classification
cls.map_id_to_name[classification_id] = classification_name
cls.map_name_to_id[classification_name] = classification_id

@classmethod
def get_id_for_name(cls, name: str) -> Optional[str]:
"""
Translate the provided human-readable classification name to its Atlan-internal ID string.
"""
cls_id = cls.map_name_to_id.get(name)
if not cls_id and name not in cls.deleted_names:
# If not found, refresh the cache and look again (could be stale)
cls._refresh_cache()
cls_id = cls.map_name_to_id.get(name)
if not cls_id:
# If still not found after refresh, mark it as deleted (could be
# an entry in an audit log that refers to a classification that
# no longer exists)
cls.deleted_names.add(name)
return cls_id

@classmethod
def get_name_for_id(cls, idstr: str) -> Optional[str]:
"""
Translate the provided Atlan-internal classification ID string to the human-readable classification name.
"""
cls_name = cls.map_id_to_name.get(idstr)
if not cls_name and idstr not in cls.deleted_ids:
# If not found, refresh the cache and look again (could be stale)
cls._refresh_cache()
cls_name = cls.map_id_to_name.get(idstr)
if not cls_name:
# If still not found after refresh, mark it as deleted (could be
# an entry in an audit log that refers to a classification that
# no longer exists)
cls.deleted_ids.add(idstr)
return cls_name
160 changes: 160 additions & 0 deletions pyatlan/cache/custom_metadata_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from typing import Optional

from pyatlan.client.atlan import AtlanClient
from pyatlan.client.typedef import TypeDefClient
from pyatlan.error import LogicError, NotFoundError
from pyatlan.model.enums import AtlanTypeCategory
from pyatlan.model.typedef import AttributeDef, CustomMetadataDef


class CustomMetadataCache:

cache_by_id: dict[str, CustomMetadataDef] = dict()
map_id_to_name: dict[str, str] = dict()
map_name_to_id: dict[str, str] = dict()
map_attr_id_to_name: dict[str, dict[str, str]] = dict()
map_attr_name_to_id: dict[str, dict[str, str]] = dict()
archived_attr_ids: dict[str, str] = dict()

@classmethod
def _refresh_cache(cls) -> None:
response = TypeDefClient(AtlanClient()).get_typedefs(
type=AtlanTypeCategory.CUSTOM_METADATA
)
if response is not None:
cls.cache_by_id = dict()
cls.map_id_to_name = dict()
cls.map_name_to_id = dict()
cls.map_attr_id_to_name = dict()
cls.map_attr_name_to_id = dict()
cls.archived_attr_ids = dict()
for cm in response.custom_metadata_defs:
type_id = cm.name
type_name = cm.display_name
cls.cache_by_id[type_id] = cm
cls.map_id_to_name[type_id] = type_name
cls.map_name_to_id[type_name] = type_id
cls.map_attr_id_to_name[type_id] = dict()
cls.map_attr_name_to_id[type_id] = dict()
if cm.attribute_defs:
for attr in cm.attribute_defs:
attr_id = attr.name
attr_name = attr.display_name
cls.map_attr_id_to_name[type_id][attr_id] = attr_name
if attr.options and attr.options.is_archived:
cls.archived_attr_ids[attr_id] = attr_name
else:
if attr_name in cls.map_attr_name_to_id[type_id]:
raise LogicError(
"Multiple custom attributes with exactly the same name ("
+ attr_name
+ ") found for: "
+ type_name,
code="ATLAN-PYTHON-500-100",
)
cls.map_attr_name_to_id[type_id][attr_name] = attr_id

@classmethod
def get_id_for_name(cls, name: str) -> Optional[str]:
"""
Translate the provided human-readable custom metadata set name to its Atlan-internal ID string.
"""
cm_id = cls.map_name_to_id.get(name)
if cm_id:
return cm_id
else:
# If not found, refresh the cache and look again (could be stale)
cls._refresh_cache()
return cls.map_name_to_id.get(name)

@classmethod
def get_name_for_id(cls, idstr: str) -> Optional[str]:
"""
Translate the provided Atlan-internal custom metadata ID string to the human-readable custom metadata set name.
"""
cm_name = cls.map_id_to_name.get(idstr)
if cm_name:
return cm_name
else:
# If not found, refresh the cache and look again (could be stale)
cls._refresh_cache()
return cls.map_id_to_name.get(idstr)

@classmethod
def get_all_custom_attributes(
cls, include_deleted: bool = False, force_refresh: bool = False
) -> dict[str, list[AttributeDef]]:
"""
Retrieve all the custom metadata attributes. The map will be keyed by custom metadata set
name, and the value will be a listing of all the attributes within that set (with all the details
of each of those attributes).
"""
if len(cls.cache_by_id) == 0 or force_refresh:
cls._refresh_cache()
m = {}
for type_id, cm in cls.cache_by_id.items():
type_name = cls.get_name_for_id(type_id)
if not type_name:
raise NotFoundError(
f"The type_name for {type_id} could not be found.", code="fixme"
)
attribute_defs = cm.attribute_defs
if include_deleted:
to_include = attribute_defs
else:
to_include = []
if attribute_defs:
for attr in attribute_defs:
if not attr.options or not attr.options.is_archived:
to_include.append(attr)
m[type_name] = to_include
return m

@classmethod
def get_attr_id_for_name(cls, set_name: str, attr_name: str) -> Optional[str]:
"""
Translate the provided human-readable custom metadata set and attribute names to the Atlan-internal ID string
for the attribute.
"""
attr_id = None
set_id = cls.get_id_for_name(set_name)
if set_id:
sub_map = cls.map_attr_name_to_id.get(set_id)
if sub_map:
attr_id = sub_map.get(attr_name)
if attr_id:
# If found, return straight away
return attr_id
else:
# Otherwise, refresh the cache and look again (could be stale)
cls._refresh_cache()
sub_map = cls.map_attr_name_to_id.get(set_id)
if sub_map:
return sub_map.get(attr_name)
return None

@classmethod
def _get_attributes_for_search_results(cls, set_id: str) -> Optional[list[str]]:
sub_map = cls.map_attr_name_to_id.get(set_id)
if sub_map:
attr_ids = sub_map.values()
dot_names = []
for idstr in attr_ids:
dot_names.append(set_id + "." + idstr)
return dot_names
return None

@classmethod
def get_attributes_for_search_results(cls, set_name: str) -> Optional[list[str]]:
"""
Retrieve the full set of custom attributes to include on search results.
"""
set_id = cls.get_id_for_name(set_name)
if set_id:
dot_names = cls._get_attributes_for_search_results(set_id)
if dot_names:
return dot_names
else:
cls._refresh_cache()
return cls._get_attributes_for_search_results(set_id)
return None
50 changes: 50 additions & 0 deletions pyatlan/cache/role_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Optional

from pyatlan.client.atlan import AtlanClient
from pyatlan.client.role import RoleClient
from pyatlan.model.role import AtlanRole


class RoleCache:

cache_by_id: dict[str, AtlanRole] = dict()
map_id_to_name: dict[str, str] = dict()
map_name_to_id: dict[str, str] = dict()

@classmethod
def _refresh_cache(cls) -> None:
response = RoleClient(AtlanClient()).get_all_roles()
if response is not None:
cls.cache_by_id = dict()
cls.map_id_to_name = dict()
cls.map_name_to_id = dict()
for role in response.records:
role_id = role.id
role_name = role.name
cls.cache_by_id[role_id] = role
cls.map_id_to_name[role_id] = role_name
cls.map_name_to_id[role_name] = role_id

@classmethod
def get_id_for_name(cls, name: str) -> Optional[str]:
"""
Translate the provided human-readable role name to its GUID.
"""
role_id = cls.map_name_to_id.get(name)
if role_id:
return role_id
else:
cls._refresh_cache()
return cls.map_name_to_id.get(name)

@classmethod
def get_name_for_id(cls, idstr: str) -> Optional[str]:
"""
Translate the provided role GUID to the human-readable role name.
"""
role_name = cls.map_id_to_name.get(idstr)
if role_name:
return role_name
else:
cls._refresh_cache()
return cls.map_id_to_name.get(idstr)
Loading