Skip to content

Commit

Permalink
Dataset Modularization pt.1 (#413)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Refactoring (Modularization)

### Relates
- Related issues #295 and #412 

### Short Summary
First part of migration of `Dataset` (`DatasetTableColumn`) TL;DR :) 

### Long Summary 
Datasets are huge. It's one of the central modules that's spread
everywhere across the application. Migrating the entire Dataset piece
would be very difficult task and, more importantly, even more difficult
to review. Therefore, I decided to break down this work into "small"
steps to make it more convenient to review.
Dataset's API consist of the following items:
* `Dataset`
* `DatasetTable`
* `DatasetTableColumn`
* `DatasetLocation`
* `DatasetProfiling`

In this PR, there is only creation of `Dataset` module and migration of
`DatasetTableColumn` (and some related to it pieces). Why? Because the
plan was to migrate it, to see what issues would come up along with it
and to address them here. The refactoring of `DatasetTableColumn` will
be in other PR.
The issues: 
1) Glossaries
2) Feed
3) Long tasks for datasets
4) Redshift

Glossaries rely on GraphQL UNION of different type (including datasets).
Created an abstraction for glossary registration. There was an idea to
change frontend, but it'd require a lot of work to do this

Feed: same as glossaries. Solved the similar way. For feed, changing
frontend API is more feasible, but I wanted it to be consistent with
glossaries

Long tasks for datasets. They migrated into tasks folder and doesn't
require a dedicated loading for its code (at least for now). But there
are two concerns:
1) The deployment uses a direct module folder references to run them
(e.g. `dataall.modules.datasets....`, so basically when a module is
deactivated, then we shouldn't deploy this tasks as well). I left a TODO
for it to address in future (when we migrate all modules), but we should
bear in mind that it might lead to inconsistencies.
2) There is a reference to `redshift` from long-running tasks = should
be address in `redshift` module

Redshift: it has some references to `datasets`. So there will be either
dependencies among modules or small code duplication (if `redshift`
doesn't rely hard on `datasets`) = will be addressed in `redshift`
module

Other changes:
Fixed and improved some tests 
Extracted glue handler code that related to `DatasetTableColumn`
Renamed import mode from tasks to handlers for async lambda.
A few hacks that will go away with next refactoring :) 

Next steps:
[Part2 ](nikpodsh#1) in preview :) 
Extract rest of datasets functionality (perhaps, in a few steps)
Refactor extractor modules the same way as notebooks
Extract tests to follow the same structure.



By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
nikpodsh committed Apr 24, 2023
1 parent f26fc01 commit 3c4ab2d
Show file tree
Hide file tree
Showing 62 changed files with 620 additions and 425 deletions.
2 changes: 1 addition & 1 deletion backend/aws_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

engine = get_engine(envname=ENVNAME)

load_modules(modes=[ImportMode.TASKS])
load_modules(modes=[ImportMode.HANDLERS])


def handler(event, context=None):
Expand Down
3 changes: 2 additions & 1 deletion backend/dataall/api/Objects/DatasetProfiling/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ....aws.handlers.sts import SessionHelper
from ....db import api, permissions, models
from ....db.api import ResourcePolicy
from dataall.modules.datasets.services.dataset_table import DatasetTableService

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,7 +98,7 @@ def get_last_table_profiling_run(context: Context, source, tableUri=None):

if run:
if not run.results:
table = api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)
dataset = api.Dataset.get_dataset_by_uri(session, table.datasetUri)
environment = api.Environment.get_environment_by_uri(
session, dataset.environmentUri
Expand Down
25 changes: 13 additions & 12 deletions backend/dataall/api/Objects/DatasetTable/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
from ....db.api import ResourcePolicy, Glossary
from ....searchproxy import indexers
from ....utils import json_utils
from dataall.modules.datasets.services.dataset_table import DatasetTableService

log = logging.getLogger(__name__)


def create_table(context, source, datasetUri: str = None, input: dict = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.create_dataset_table(
table = DatasetTableService.create_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -37,7 +38,7 @@ def list_dataset_tables(context, source, filter: dict = None):
if not filter:
filter = {}
with context.engine.scoped_session() as session:
return db.api.DatasetTable.list_dataset_tables(
return DatasetTableService.list_dataset_tables(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -49,8 +50,8 @@ def list_dataset_tables(context, source, filter: dict = None):

def get_table(context, source: models.Dataset, tableUri: str = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
return db.api.DatasetTable.get_dataset_table(
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)
return DatasetTableService.get_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -64,14 +65,14 @@ def get_table(context, source: models.Dataset, tableUri: str = None):

def update_table(context, source, tableUri: str = None, input: dict = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)

dataset = db.api.Dataset.get_dataset_by_uri(session, table.datasetUri)

input['table'] = table
input['tableUri'] = table.tableUri

db.api.DatasetTable.update_dataset_table(
DatasetTableService.update_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -85,8 +86,8 @@ def update_table(context, source, tableUri: str = None, input: dict = None):

def delete_table(context, source, tableUri: str = None):
with context.engine.scoped_session() as session:
table = db.api.DatasetTable.get_dataset_table_by_uri(session, tableUri)
db.api.DatasetTable.delete_dataset_table(
table = DatasetTableService.get_dataset_table_by_uri(session, tableUri)
DatasetTableService.delete_dataset_table(
session=session,
username=context.username,
groups=context.groups,
Expand All @@ -102,7 +103,7 @@ def delete_table(context, source, tableUri: str = None):

def preview(context, source, tableUri: str = None):
with context.engine.scoped_session() as session:
table: models.DatasetTable = db.api.DatasetTable.get_dataset_table_by_uri(
table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, tableUri
)
dataset = db.api.Dataset.get_dataset_by_uri(session, table.datasetUri)
Expand Down Expand Up @@ -157,7 +158,7 @@ def get_glue_table_properties(context: Context, source: models.DatasetTable, **k
if not source:
return None
with context.engine.scoped_session() as session:
table: models.DatasetTable = db.api.DatasetTable.get_dataset_table_by_uri(
table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, source.tableUri
)
return json_utils.to_string(table.GlueTableProperties).replace('\\', ' ')
Expand Down Expand Up @@ -186,7 +187,7 @@ def resolve_glossary_terms(context: Context, source: models.DatasetTable, **kwar

def publish_table_update(context: Context, source, tableUri: str = None):
with context.engine.scoped_session() as session:
table: models.DatasetTable = db.api.DatasetTable.get_dataset_table_by_uri(
table: models.DatasetTable = DatasetTableService.get_dataset_table_by_uri(
session, tableUri
)
ResourcePolicy.check_user_resource_permission(
Expand Down Expand Up @@ -235,7 +236,7 @@ def resolve_redshift_copy_location(

def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str, envUri: str, filter: dict = None):
with context.engine.scoped_session() as session:
return db.api.DatasetTable.get_dataset_tables_shared_with_env(
return DatasetTableService.get_dataset_tables_shared_with_env(
session,
envUri,
datasetUri
Expand Down
2 changes: 1 addition & 1 deletion backend/dataall/api/Objects/DatasetTable/schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ..DatasetTableColumn.resolvers import list_table_columns
from dataall.modules.datasets.api.table_column.resolvers import list_table_columns
from ... import gql
from .resolvers import *
from ...constants import GraphQLEnumMapper
Expand Down
43 changes: 43 additions & 0 deletions backend/dataall/api/Objects/Feed/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from dataclasses import dataclass
from typing import Type, Dict

from dataall.api import gql
from dataall.api.gql.graphql_union_type import UnionTypeRegistry
from dataall.db import Resource, models


@dataclass
class FeedDefinition:
target_type: str
model: Type[Resource]


class FeedRegistry(UnionTypeRegistry):
"""Registers models for different target types"""
_DEFINITIONS: Dict[str, FeedDefinition] = {}

@classmethod
def register(cls, definition: FeedDefinition):
cls._DEFINITIONS[definition.target_type] = definition

@classmethod
def find_model(cls, target_type: str):
return cls._DEFINITIONS[target_type].model

@classmethod
def find_target(cls, obj: Resource):
for target_type, definition in cls._DEFINITIONS.items():
if isinstance(obj, definition.model):
return target_type
return None

@classmethod
def types(cls):
return [gql.Ref(target_type) for target_type in cls._DEFINITIONS.keys()]


FeedRegistry.register(FeedDefinition("Worksheet", models.Worksheet))
FeedRegistry.register(FeedDefinition("DataPipeline", models.DataPipeline))
FeedRegistry.register(FeedDefinition("DatasetTable", models.DatasetTable))
FeedRegistry.register(FeedDefinition("DatasetStorageLocation", models.DatasetStorageLocation))
FeedRegistry.register(FeedDefinition("Dashboard", models.Dashboard))
32 changes: 5 additions & 27 deletions backend/dataall/api/Objects/Feed/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from sqlalchemy import or_

from ....api.context import Context
from ....db import paginate, models
from dataall.api.context import Context
from dataall.db import paginate, models
from dataall.api.Objects.Feed.registry import FeedRegistry


class Feed:
Expand All @@ -19,37 +20,14 @@ def targetType(self):


def resolve_feed_target_type(obj, *_):
if isinstance(obj, models.DatasetTableColumn):
return 'DatasetTableColumn'
elif isinstance(obj, models.Worksheet):
return 'Worksheet'
elif isinstance(obj, models.DataPipeline):
return 'DataPipeline'
elif isinstance(obj, models.DatasetTable):
return 'DatasetTable'
elif isinstance(obj, models.Dataset):
return 'Dataset'
elif isinstance(obj, models.DatasetStorageLocation):
return 'DatasetStorageLocation'
elif isinstance(obj, models.Dashboard):
return 'Dashboard'
else:
return None
return FeedRegistry.find_target(obj)


def resolve_target(context: Context, source: Feed, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
model = {
'Dataset': models.Dataset,
'DatasetTable': models.DatasetTable,
'DatasetTableColumn': models.DatasetTableColumn,
'DatasetStorageLocation': models.DatasetStorageLocation,
'Dashboard': models.Dashboard,
'DataPipeline': models.DataPipeline,
'Worksheet': models.Worksheet,
}[source.targetType]
model = FeedRegistry.find_model(source.targetType)
target = session.query(model).get(source.targetUri)
return target

Expand Down
11 changes: 2 additions & 9 deletions backend/dataall/api/Objects/Feed/schema.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
from ... import gql
from .resolvers import *
from dataall.api.Objects.Feed.registry import FeedRegistry


FeedTarget = gql.Union(
name='FeedTarget',
types=[
gql.Ref('Dataset'),
gql.Ref('DatasetTable'),
gql.Ref('DatasetTableColumn'),
gql.Ref('DatasetStorageLocation'),
gql.Ref('DataPipeline'),
gql.Ref('Worksheet'),
gql.Ref('Dashboard'),
],
type_registry=FeedRegistry,
resolver=resolve_feed_target_type,
)

Expand Down
3 changes: 2 additions & 1 deletion backend/dataall/api/Objects/Glossary/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mutations,
resolvers,
schema,
registry,
)

__all__ = ['resolvers', 'schema', 'input_types', 'queries', 'mutations']
__all__ = ['registry', 'resolvers', 'schema', 'input_types', 'queries', 'mutations']
58 changes: 58 additions & 0 deletions backend/dataall/api/Objects/Glossary/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dataclasses import dataclass
from typing import Type, Dict, Optional, Protocol, Union

from dataall.api import gql
from dataall.api.gql.graphql_union_type import UnionTypeRegistry
from dataall.db import Resource, models


class Identifiable(Protocol):
def uri(self):
...


@dataclass
class GlossaryDefinition:
"""Glossary's definition used for registration references of other modules"""
target_type: str
object_type: str
model: Union[Type[Resource], Identifiable] # should be an intersection, but python typing doesn't have one yet

def target_uri(self):
return self.model.uri()


class GlossaryRegistry(UnionTypeRegistry):
"""Registry of glossary definition and API to retrieve data"""
_DEFINITIONS: Dict[str, GlossaryDefinition] = {}

@classmethod
def register(cls, glossary: GlossaryDefinition) -> None:
cls._DEFINITIONS[glossary.target_type] = glossary

@classmethod
def find_model(cls, target_type: str) -> Optional[Resource]:
definition = cls._DEFINITIONS[target_type]
return definition.model if definition is not None else None

@classmethod
def find_object_type(cls, model: Resource) -> Optional[str]:
for _, definition in cls._DEFINITIONS.items():
if isinstance(model, definition.model):
return definition.object_type
return None

@classmethod
def definitions(cls):
return cls._DEFINITIONS.values()

@classmethod
def types(cls):
return [gql.Ref(definition.object_type) for definition in cls._DEFINITIONS.values()]


GlossaryRegistry.register(GlossaryDefinition("DatasetTable", "DatasetTable", models.DatasetTable))
GlossaryRegistry.register(GlossaryDefinition("Folder", "DatasetStorageLocation", models.DatasetStorageLocation))
GlossaryRegistry.register(GlossaryDefinition("Dashboard", "Dashboard", models.Dashboard))
GlossaryRegistry.register(GlossaryDefinition("DatasetTable", "DatasetTable", models.DatasetTable))
GlossaryRegistry.register(GlossaryDefinition("Dataset", "Dataset", models.Dataset))
Loading

0 comments on commit 3c4ab2d

Please sign in to comment.