Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modularization: Datasets modularization pt.4 #441

Merged
Merged
Show file tree
Hide file tree
Changes from 74 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
3a5e0de
Initialization of dataset module
nikpodsh Apr 11, 2023
a50a02f
Refactoring of datasets
nikpodsh Apr 11, 2023
be14986
Refactoring of datasets
nikpodsh Apr 11, 2023
06f82ad
Refactoring of datasets
nikpodsh Apr 11, 2023
38145ae
Fixed leftover in loader
nikpodsh Apr 11, 2023
f0e146a
Dataset refactoring
nikpodsh Apr 11, 2023
b039163
Dataset refactoring
nikpodsh Apr 11, 2023
b7922ed
Dataset refactoring
nikpodsh Apr 11, 2023
1771bca
Notebooks doesn't require tasks
nikpodsh Apr 11, 2023
3d1603f
Renamed tasks to handlers
nikpodsh Apr 11, 2023
fb6b515
Dataset refactoring
nikpodsh Apr 11, 2023
e3596a5
Dataset refactoring
nikpodsh Apr 11, 2023
3af2ecf
Dataset refactoring
nikpodsh Apr 11, 2023
1a063b2
Dataset refactoring
nikpodsh Apr 11, 2023
b733714
Dataset refactoring
nikpodsh Apr 11, 2023
2a4e2e0
Extracted feed registry
nikpodsh Apr 11, 2023
c15d090
Extracted feed and glossary registry and created a model registry
nikpodsh Apr 11, 2023
052a2b1
Dataset refactoring
nikpodsh Apr 12, 2023
d984483
Fixed and unignored test_tables_sync
nikpodsh Apr 12, 2023
dc0c935
Split model registry into feed and glossaries
nikpodsh Apr 12, 2023
727e353
Abstraction for glossaries
nikpodsh Apr 12, 2023
49fbb41
Fixed leftovers
nikpodsh Apr 12, 2023
7d029e7
Datasets refactoring
nikpodsh Apr 13, 2023
be527eb
Added runtime type registration for Union GraphQL type
nikpodsh Apr 13, 2023
3daf2aa
Changed Feed type registration mechanism
nikpodsh Apr 13, 2023
db3bfd3
Added TODO for future refactoring
nikpodsh Apr 13, 2023
13b6e92
Added GlossaryRegistry for Union scheme
nikpodsh Apr 13, 2023
144dfea
Changed import in redshift module
nikpodsh Apr 13, 2023
d43b9b3
No need for Utils yet
nikpodsh Apr 13, 2023
39b244c
Fixed linting
nikpodsh Apr 13, 2023
cb3800a
Datasets refactoring
nikpodsh Apr 14, 2023
dd8e597
Datasets refactoring
nikpodsh Apr 14, 2023
8ca7bea
Datasets refactoring
nikpodsh Apr 14, 2023
e36ab3b
Datasets refactoring
nikpodsh Apr 14, 2023
31720c2
Datasets refactoring
nikpodsh Apr 14, 2023
8a907df
Datasets refactoring
nikpodsh Apr 14, 2023
561da72
Datasets refactoring
nikpodsh Apr 14, 2023
73c8150
Datasets refactoring
nikpodsh Apr 17, 2023
56a3610
Datasets refactoring
nikpodsh Apr 17, 2023
47a38cc
Datasets refactoring
nikpodsh Apr 17, 2023
dbb5517
Datasets refactoring
nikpodsh Apr 17, 2023
a3def13
Merge branch 'datasets-mod-part2' into datasets-mod-part3
nikpodsh Apr 17, 2023
b256678
Datasets refactoring
nikpodsh Apr 17, 2023
66b5ddb
Datasets refactoring
nikpodsh Apr 17, 2023
352d824
Datasets refactoring
nikpodsh Apr 17, 2023
9934a9c
Datasets refactoring
nikpodsh Apr 17, 2023
228c175
Datasets refactoring
nikpodsh Apr 18, 2023
263d10c
Datasets refactoring
nikpodsh Apr 19, 2023
417e6e5
Datasets refactoring
nikpodsh Apr 19, 2023
7aaff5b
Introduced Indexers
nikpodsh Apr 19, 2023
4e31b99
Extracted upsert_dataset_folders into DatasetLocationIndexer and rena…
nikpodsh Apr 19, 2023
b772812
Moved DatasetLocationIndexer into the dataset module
nikpodsh Apr 19, 2023
cd798e2
Moved DatasetStorageLocation methods to the service
nikpodsh Apr 20, 2023
b0e6a62
Renamed the service
nikpodsh Apr 20, 2023
27c6d79
Moved DatasetIndexer to modules
nikpodsh Apr 20, 2023
0e730ac
Created a dataset repository.
nikpodsh Apr 20, 2023
9ac7964
Moved DatasetTableIndexer
nikpodsh Apr 20, 2023
a1825ba
Fixed test mocking
nikpodsh Apr 20, 2023
d295485
Fixed circular import while half of the module is not migrate
nikpodsh Apr 20, 2023
005a5e7
Removed not used alarms
nikpodsh Apr 20, 2023
0fd7c02
Moved dataset table GraphQL api in modules
nikpodsh Apr 21, 2023
7030c82
Moved DatasetTable model to modules
nikpodsh Apr 21, 2023
ba45ca5
Moved delete_doc to BaseIndexer
nikpodsh Apr 21, 2023
dc8ff72
Lazy creation of connection to OpenSearch
nikpodsh Apr 21, 2023
2ac3ae7
Resolved code conflict
nikpodsh Apr 24, 2023
fca218f
Merge remote-tracking branch 'origin/datasets-mod-part2' into dataset…
nikpodsh Apr 25, 2023
ef98aa0
Merge branch 'datasets-mod-part3' into datasets-mod-part4
nikpodsh Apr 25, 2023
2cd14e0
Merge remote-tracking branch 'upstream/modularization-main' into data…
nikpodsh May 2, 2023
a3d9676
Merge branch 'datasets-mod-part3' into datasets-mod-part4
nikpodsh May 2, 2023
f382a68
Review remarks
nikpodsh May 4, 2023
532ff0d
Added TODO
nikpodsh May 4, 2023
afcae66
Merge branch 'datasets-mod-part3' into datasets-mod-part4
nikpodsh May 4, 2023
61d4eb3
After the merge of part3
nikpodsh May 4, 2023
72864d6
Fixed imports
nikpodsh May 4, 2023
07e6975
Update tests/api/conftest.py
nikpodsh May 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions backend/api_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from dataall.core.context import set_context, dispose_context, RequestContext
from dataall.db import init_permissions, get_engine, api, permissions
from dataall.modules.loader import load_modules, ImportMode
from dataall.searchproxy import connect

logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
Expand All @@ -30,7 +29,6 @@
TYPE_DEFS = gql(SCHEMA.gql(with_directives=False))
ENVNAME = os.getenv('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
ES = connect(envname=ENVNAME)
Worker.queue = SqsQueue.send

init_permissions(ENGINE)
Expand Down Expand Up @@ -99,7 +97,6 @@ def handler(event, context):

log.info('Lambda Event %s', event)
log.debug('Env name %s', ENVNAME)
log.debug('ElasticSearch %s', ES)
log.debug('Engine %s', ENGINE.engine.url)

if event['httpMethod'] == 'OPTIONS':
Expand Down Expand Up @@ -137,11 +134,10 @@ def handler(event, context):
print(f'Error managing groups due to: {e}')
groups = []

set_context(RequestContext(ENGINE, username, groups, ES))
set_context(RequestContext(ENGINE, username, groups))

app_context = {
'engine': ENGINE,
'es': ES,
'username': username,
'groups': groups,
'schema': SCHEMA,
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/Dashboard/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def delete_dashboard(context: Context, source, dashboardUri: str = None):
data=None,
check_perm=True,
)
indexers.delete_doc(es=context.es, doc_id=dashboardUri)
DashboardIndexer.delete_doc(doc_id=dashboardUri)
return True


Expand Down
10 changes: 4 additions & 6 deletions backend/dataall/api/Objects/Dataset/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,7 @@ def sync_tables(context: Context, source, datasetUri: str = None):
DatasetTableIndexer.upsert_all(
session=session, dataset_uri=dataset.datasetUri
)
indexers.remove_deleted_tables(
session=session, es=context.es, datasetUri=dataset.datasetUri
)
DatasetTableIndexer.remove_all_deleted(session=session, dataset_uri=dataset.datasetUri)
return Dataset.paginated_dataset_tables(
session=session,
username=context.username,
Expand Down Expand Up @@ -557,13 +555,13 @@ def delete_dataset(

tables = [t.tableUri for t in Dataset.get_dataset_tables(session, datasetUri)]
for uri in tables:
indexers.delete_doc(es=context.es, doc_id=uri)
DatasetIndexer.delete_doc(doc_id=uri)

folders = [f.locationUri for f in DatasetLocationService.get_dataset_folders(session, datasetUri)]
for uri in folders:
indexers.delete_doc(es=context.es, doc_id=uri)
DatasetIndexer.delete_doc(doc_id=uri)

indexers.delete_doc(es=context.es, doc_id=datasetUri)
DatasetIndexer.delete_doc(doc_id=datasetUri)

Dataset.delete_dataset(
session=session,
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/Feed/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ def types(cls):

FeedRegistry.register(FeedDefinition("Worksheet", models.Worksheet))
FeedRegistry.register(FeedDefinition("DataPipeline", models.DataPipeline))
FeedRegistry.register(FeedDefinition("DatasetTable", models.DatasetTable))
FeedRegistry.register(FeedDefinition("Dashboard", models.Dashboard))
8 changes: 3 additions & 5 deletions backend/dataall/api/Objects/Glossary/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from dataclasses import dataclass, field
from typing import Type, Dict, Optional, Protocol, Union, Callable, Any

from opensearchpy import OpenSearch
from dataclasses import dataclass
from typing import Type, Dict, Optional, Protocol, Union

from dataall.api import gql
from dataall.api.gql.graphql_union_type import UnionTypeRegistry
Expand Down Expand Up @@ -56,7 +54,7 @@ def types(cls):
return [gql.Ref(definition.object_type) for definition in cls._DEFINITIONS.values()]

@classmethod
def reindex(cls, session, es: OpenSearch, target_type: str, target_uri: str):
def reindex(cls, session, target_type: str, target_uri: str):
definition = cls._DEFINITIONS[target_type]
if definition.reindexer:
definition.reindexer.upsert(session, target_uri)
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/Glossary/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def reindex(context, linkUri):
if not link:
return

GlossaryRegistry.reindex(session, context.es, link.targetType, link.targetUri)
GlossaryRegistry.reindex(session, link.targetType, link.targetUri)


def _target_model(target_type: str):
Expand Down
4 changes: 2 additions & 2 deletions backend/dataall/api/Objects/ShareObject/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ....api.context import Context
from ....aws.handlers.service_handlers import Worker
from ....db import models
from dataall.modules.datasets.db.models import DatasetStorageLocation
from dataall.modules.datasets.db.models import DatasetStorageLocation, DatasetTable

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -265,7 +265,7 @@ def resolve_dataset(context: Context, source: models.ShareObject, **kwargs):


def union_resolver(object, *_):
if isinstance(object, models.DatasetTable):
if isinstance(object, DatasetTable):
return 'DatasetTable'
elif isinstance(object, DatasetStorageLocation):
return 'DatasetStorageLocation'
Expand Down
4 changes: 2 additions & 2 deletions backend/dataall/api/Objects/Vote/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ def upvote(context: Context, source, input=None):
data=input,
check_perm=True,
)
reindex(session, context.es, vote)
reindex(session, vote)
return vote


def reindex(session, es, vote):
def reindex(session, vote):
if vote.targetType == 'dataset':
DatasetIndexer.upsert(session=session, dataset_uri=vote.targetUri)
elif vote.targetType == 'dashboard':
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/api/Objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
DataPipeline,
Environment,
Activity,
DatasetTable,
Dataset,
Group,
Principal,
Expand Down
2 changes: 0 additions & 2 deletions backend/dataall/api/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ class Context:
def __init__(
self,
engine=None,
es=None,
username=None,
groups=None,
):
self.engine = engine
self.es = es
self.username = username
self.groups = groups
3 changes: 2 additions & 1 deletion backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .sts import SessionHelper
from ... import db
from ...db import models
from dataall.modules.datasets.db.models import DatasetTable

log = logging.getLogger('aws:glue')

Expand Down Expand Up @@ -524,7 +525,7 @@ def get_job_runs(engine, task: models.Task):

@staticmethod
def grant_principals_all_table_permissions(
table: models.DatasetTable, principals: [str], client=None
table: DatasetTable, principals: [str], client=None
):
"""
Update the table permissions on Lake Formation
Expand Down
3 changes: 2 additions & 1 deletion backend/dataall/aws/handlers/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ...db import models
# TODO should be migrated in the redshift module
from dataall.modules.datasets.services.dataset_table import DatasetTableService
from dataall.modules.datasets.db.models import DatasetTable

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -448,7 +449,7 @@ def copy_data(engine, task: models.Task):
session, task.payload['datasetUri']
)

table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
table: DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, task.payload['tableUri']
)

Expand Down
2 changes: 0 additions & 2 deletions backend/dataall/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from dataall.db.connection import Engine
from threading import local
import opensearchpy


_request_storage = local()
Expand All @@ -24,7 +23,6 @@ class RequestContext:
db_engine: Engine
username: str
groups: List[str]
es_engine: opensearchpy.OpenSearch


def get_context() -> RequestContext:
Expand Down
33 changes: 17 additions & 16 deletions backend/dataall/db/api/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from . import Organization
from .. import models, api, exceptions, permissions, paginate
from ..models.Enums import Language, ConfidentialityClassification
from ...modules.datasets.db.dataset_repository import DatasetRepository
from ...modules.datasets.services.dataset_location import DatasetLocationService
from ...utils.naming_convention import (
from dataall.modules.datasets.db.dataset_repository import DatasetRepository
from dataall.modules.datasets.db.models import DatasetTable
from dataall.modules.datasets.services.dataset_location import DatasetLocationService
from dataall.utils.naming_convention import (
NamingConventionService,
NamingConventionPattern,
)
Expand Down Expand Up @@ -266,21 +267,21 @@ def paginated_dataset_tables(
session, username, groups, uri, data=None, check_perm=None
) -> dict:
query = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.filter(
and_(
models.DatasetTable.datasetUri == uri,
models.DatasetTable.LastGlueTableStatus != 'Deleted',
DatasetTable.datasetUri == uri,
DatasetTable.LastGlueTableStatus != 'Deleted',
)
)
.order_by(models.DatasetTable.created.desc())
.order_by(DatasetTable.created.desc())
)
if data and data.get('term'):
query = query.filter(
or_(
*[
models.DatasetTable.name.ilike('%' + data.get('term') + '%'),
models.DatasetTable.GlueTableName.ilike(
DatasetTable.name.ilike('%' + data.get('term') + '%'),
DatasetTable.GlueTableName.ilike(
'%' + data.get('term') + '%'
),
]
Expand Down Expand Up @@ -379,7 +380,7 @@ def transfer_stewardship_to_new_stewards(session, dataset, new_stewards):
group=new_stewards,
permissions=permissions.DATASET_TABLE_READ,
resource_uri=tableUri,
resource_type=models.DatasetTable.__name__,
resource_type=DatasetTable.__name__,
)

dataset_shares = (
Expand Down Expand Up @@ -455,8 +456,8 @@ def update_glue_database_status(session, dataset_uri):
def get_dataset_tables(session, dataset_uri):
"""return the dataset tables"""
return (
session.query(models.DatasetTable)
.filter(models.DatasetTable.datasetUri == dataset_uri)
session.query(DatasetTable)
.filter(DatasetTable.datasetUri == dataset_uri)
.all()
)

Expand Down Expand Up @@ -585,10 +586,10 @@ def _delete_dataset_term_links(session, uri):
@staticmethod
def _delete_dataset_tables(session, dataset_uri) -> bool:
tables = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.filter(
and_(
models.DatasetTable.datasetUri == dataset_uri,
DatasetTable.datasetUri == dataset_uri,
)
)
.all()
Expand Down Expand Up @@ -618,7 +619,7 @@ def get_dataset_by_bucket_name(session, bucket) -> [models.Dataset]:
@staticmethod
def count_dataset_tables(session, dataset_uri):
return (
session.query(models.DatasetTable)
.filter(models.DatasetTable.datasetUri == dataset_uri)
session.query(DatasetTable)
.filter(DatasetTable.datasetUri == dataset_uri)
.count()
)
25 changes: 13 additions & 12 deletions backend/dataall/db/api/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

from .. import models, api, exceptions, paginate, permissions
from . import has_resource_perm, ResourcePolicy, Environment, Dataset
from ...utils.naming_convention import (
from dataall.modules.datasets.db.models import DatasetTable
from dataall.utils.naming_convention import (
NamingConventionService,
NamingConventionPattern,
)
from ...utils.slugify import slugify
from dataall.utils.slugify import slugify

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -334,13 +335,13 @@ def list_available_cluster_tables(
)
created = (
session.query(
models.DatasetTable.datasetUri.label('datasetUri'),
models.DatasetTable.tableUri.label('tableUri'),
DatasetTable.datasetUri.label('datasetUri'),
DatasetTable.tableUri.label('tableUri'),
models.RedshiftCluster.clusterUri.label('clusterUri'),
)
.join(
models.Dataset,
models.DatasetTable.datasetUri == models.Dataset.datasetUri,
DatasetTable.datasetUri == models.Dataset.datasetUri,
)
.filter(
and_(
Expand All @@ -354,19 +355,19 @@ def list_available_cluster_tables(
)
)
.group_by(
models.DatasetTable.datasetUri,
models.DatasetTable.tableUri,
DatasetTable.datasetUri,
DatasetTable.tableUri,
models.RedshiftCluster.clusterUri,
)
)
all_group_tables_sub_query = shared.union(created).subquery(
'all_group_tables_sub_query'
)
query = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.join(
all_group_tables_sub_query,
all_group_tables_sub_query.c.tableUri == models.DatasetTable.tableUri,
all_group_tables_sub_query.c.tableUri == DatasetTable.tableUri,
)
.filter(
models.RedshiftCluster.clusterUri == cluster.clusterUri,
Expand Down Expand Up @@ -541,18 +542,18 @@ def list_copy_enabled_tables(
session, username, groups, uri, data=None, check_perm=True
) -> [models.RedshiftClusterDatasetTable]:
q = (
session.query(models.DatasetTable)
session.query(DatasetTable)
.join(
models.RedshiftClusterDatasetTable,
models.RedshiftClusterDatasetTable.tableUri
== models.DatasetTable.tableUri,
== DatasetTable.tableUri,
)
.filter(models.RedshiftClusterDatasetTable.clusterUri == uri)
)
if data.get('term'):
term = data.get('term')
q = q.filter(
models.DatasetTable.label.ilike('%' + term + '%'),
DatasetTable.label.ilike('%' + term + '%'),
)
return paginate(
q, page=data.get('page', 1), page_size=data.get('pageSize', 20)
Expand Down
Loading