Skip to content

Commit

Permalink
backend: Pluginarchitecture (#359)
Browse files Browse the repository at this point in the history
This is a draft PR for showing purposes. There are still some minor
issues that needs to be addressed.

### Feature or Bugfix
- Refactoring

### Detail
There are following changes under this PR:
1. Modularization + Refactoring of notebooks
There are new modules that will play a major role in the future
refactoring:

   * Core = contains the code need for application to operate correctly
   * Common = common code for all modules
* Modules = the plugin/feature that can be inserted into the system (at
the moment only notebooks)

The other part that is related to modularization is the creation of
environment parameters.
Environment parameter will replace all hardcoded parameters of the
environment configuration.
There is a new file - config.json that allows you to configure an
application configuration.
All existing parameters will be migrated via db migration in AWS

2. Extracting permissions and request context (Optional for the
modularization)

Engine, user, and user groups had been passed as a parameter of context
in the request. This had forced to pass a lot of parameters to other
methods that weren't even needed. This information should be as a scope
of the request session.
There is a new way to retrieve the information using `RequestContext.`
There is also a new way to use permission checks that require less
parameters and make code cleaner. The old way was marked as deprecated

3. Restructure of the code (Optional for the modularization)

Since the modularization will touch all the places in the API code it
can be a good change to set a new structure of the code. There are small
re-organization in notebook module to address

   * Allocating the resources before the validating parameters
   * Not clear responsibility of the classes
   * Mixed layers 
  
   There are new structure :
   - resolvers = validate and pass code to service layer
   - service layer = bisnesss logic
   - repositories = database logic (all queries should be placed here)
   - aws = contains a wrapper client upon boto3
   - cdk = all logic related to create stacks or code for ecs
- tasks = code that will be executed in AWS lambda (short-living tasks)

   All names can be changed.

### Relates
[#295](#295)



By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: dlpzx <71252798+dlpzx@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 31, 2023
1 parent 2bf7f7a commit 098081d
Show file tree
Hide file tree
Showing 138 changed files with 2,176 additions and 1,367 deletions.
10 changes: 8 additions & 2 deletions backend/api_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from dataall.api.Objects import bootstrap as bootstrap_schema, get_executable_schema
from dataall.aws.handlers.service_handlers import Worker
from dataall.aws.handlers.sqs import SqsQueue
from dataall.core.context import set_context, dispose_context, RequestContext
from dataall.db import init_permissions, get_engine, api, permissions
from dataall.modules.loader import load_modules, ImportMode
from dataall.searchproxy import connect

logger = logging.getLogger()
Expand All @@ -23,6 +25,7 @@
for name in ['boto3', 's3transfer', 'botocore', 'boto']:
logging.getLogger(name).setLevel(logging.ERROR)

load_modules(modes=[ImportMode.API])
SCHEMA = bootstrap_schema()
TYPE_DEFS = gql(SCHEMA.gql(with_directives=False))
ENVNAME = os.getenv('envname', 'local')
Expand All @@ -42,7 +45,6 @@ def adapted(obj, info, **kwargs):
username=info.context['username'],
groups=info.context['groups'],
schema=info.context['schema'],
cdkproxyurl=info.context['cdkproxyurl'],
),
source=obj or None,
**kwargs,
Expand Down Expand Up @@ -135,21 +137,25 @@ def handler(event, context):
print(f'Error managing groups due to: {e}')
groups = []

set_context(RequestContext(ENGINE, username, groups, ES))

app_context = {
'engine': ENGINE,
'es': ES,
'username': username,
'groups': groups,
'schema': SCHEMA,
'cdkproxyurl': None,
}

else:
raise Exception(f'Could not initialize user context from event {event}')

query = json.loads(event.get('body'))
success, response = graphql_sync(
schema=executable_schema, data=query, context_value=app_context
)

dispose_context()
response = json.dumps(response)

log.info('Lambda Response %s', response)
Expand Down
3 changes: 3 additions & 0 deletions backend/aws_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from dataall.aws.handlers.service_handlers import Worker
from dataall.db import get_engine
from dataall.modules.loader import load_modules, ImportMode

logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL'))
Expand All @@ -13,6 +14,8 @@

engine = get_engine(envname=ENVNAME)

load_modules(modes=[ImportMode.TASKS])


def handler(event, context=None):
"""Processes messages received from sqs"""
Expand Down
3 changes: 2 additions & 1 deletion backend/cdkproxymain.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dataall.cdkproxy.cdk_cli_wrapper as wrapper
from dataall.cdkproxy.stacks import StackManager
from dataall import db
from dataall.modules.loader import load_modules, ImportMode

print('\n'.join(sys.path))

Expand All @@ -20,7 +21,7 @@
f"Application started for envname= `{ENVNAME}` DH_DOCKER_VERSION:{os.environ.get('DH_DOCKER_VERSION')}"
)


load_modules(modes=[ImportMode.CDK])
StackManager.registered_stacks()


Expand Down
6 changes: 0 additions & 6 deletions backend/dataall/api/Objects/Dashboard/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,6 @@ def get_dashboard_organization(context: Context, source: models.Dashboard, **kwa
return org


def get_dashboard_environment(context: Context, source: models.Dashboard, **kwargs):
with context.engine.scoped_session() as session:
env = session.query(models.Environment).get(source.environmentUri)
return env


def request_dashboard_share(
context: Context,
source: models.Dashboard,
Expand Down
4 changes: 3 additions & 1 deletion backend/dataall/api/Objects/Dashboard/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from .resolvers import *
from ...constants import DashboardRole

from dataall.api.Objects.Environment.resolvers import resolve_environment

Dashboard = gql.ObjectType(
name='Dashboard',
fields=[
Expand All @@ -23,7 +25,7 @@
gql.Field(
'environment',
type=gql.Ref('Environment'),
resolver=get_dashboard_environment,
resolver=resolve_environment,
),
gql.Field(
'userRoleForDashboard',
Expand Down
27 changes: 2 additions & 25 deletions backend/dataall/api/Objects/DataPipeline/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def create_pipeline(context: Context, source, input=None):
payload={'account': pipeline.AwsAccountId, 'region': pipeline.region},
)

stack_helper.deploy_stack(context, pipeline.DataPipelineUri)
stack_helper.deploy_stack(pipeline.DataPipelineUri)

return pipeline

Expand Down Expand Up @@ -80,7 +80,7 @@ def update_pipeline(context: Context, source, DataPipelineUri: str, input: dict
check_perm=True,
)
if (pipeline.template == ""):
stack_helper.deploy_stack(context, pipeline.DataPipelineUri)
stack_helper.deploy_stack(pipeline.DataPipelineUri)

return pipeline

Expand Down Expand Up @@ -111,14 +111,6 @@ def get_pipeline(context: Context, source, DataPipelineUri: str = None):
)


def get_pipeline_env(context: Context, source: models.DataPipeline, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
env = session.query(models.Environment).get(source.environmentUri)
return env


def resolve_user_role(context: Context, source: models.DataPipeline):
if not source:
return None
Expand Down Expand Up @@ -155,15 +147,6 @@ def list_pipeline_environments(context: Context, source: models.DataPipeline, fi
)


def get_pipeline_org(context: Context, source: models.DataPipeline, **kwargs):
if not source:
return None
with context.engine.scoped_session() as session:
env = session.query(models.Environment).get(source.environmentUri)
org = session.query(models.Organization).get(env.organizationUri)
return org


def get_clone_url_http(context: Context, source: models.DataPipeline, **kwargs):
if not source:
return None
Expand Down Expand Up @@ -249,7 +232,6 @@ def get_stack(context, source: models.DataPipeline, **kwargs):
if not source:
return None
return stack_helper.get_stack_with_cfn_resources(
context=context,
targetUri=source.DataPipelineUri,
environmentUri=source.environmentUri,
)
Expand Down Expand Up @@ -399,7 +381,6 @@ def delete_pipeline(

if deleteFromAWS:
stack_helper.delete_repository(
context=context,
target_uri=DataPipelineUri,
accountid=env.AwsAccountId,
cdk_role_arn=env.CDKRoleArn,
Expand All @@ -408,21 +389,17 @@ def delete_pipeline(
)
if pipeline.devStrategy == "cdk-trunk":
stack_helper.delete_stack(
context=context,
target_uri=DataPipelineUri,
accountid=env.AwsAccountId,
cdk_role_arn=env.CDKRoleArn,
region=env.region,
target_type='cdkpipeline',
)
else:
stack_helper.delete_stack(
context=context,
target_uri=DataPipelineUri,
accountid=env.AwsAccountId,
cdk_role_arn=env.CDKRoleArn,
region=env.region,
target_type='pipeline',
)

return True
Expand Down
6 changes: 4 additions & 2 deletions backend/dataall/api/Objects/DataPipeline/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from ... import gql
from .resolvers import *
from ...constants import DataPipelineRole
from dataall.api.Objects.Environment.resolvers import resolve_environment
from dataall.api.Objects.Organization.resolvers import resolve_organization_by_env

DataPipeline = gql.ObjectType(
name='DataPipeline',
Expand All @@ -16,10 +18,10 @@
gql.Field('repo', type=gql.String),
gql.Field('SamlGroupName', type=gql.String),
gql.Field(
'organization', type=gql.Ref('Organization'), resolver=get_pipeline_org
'organization', type=gql.Ref('Organization'), resolver=resolve_organization_by_env
),
gql.Field(
'environment', type=gql.Ref('Environment'), resolver=get_pipeline_env
'environment', type=gql.Ref('Environment'), resolver=resolve_environment
),
gql.Field(
'developmentEnvironments',
Expand Down
11 changes: 4 additions & 7 deletions backend/dataall/api/Objects/Dataset/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def create_dataset(context: Context, source, input=None):
session=session, es=context.es, datasetUri=dataset.datasetUri
)

stack_helper.deploy_dataset_stack(context, dataset)
stack_helper.deploy_dataset_stack(dataset)

dataset.userRoleForDataset = DatasetRole.Creator.value

Expand Down Expand Up @@ -76,7 +76,7 @@ def import_dataset(context: Context, source, input=None):
session=session, es=context.es, datasetUri=dataset.datasetUri
)

stack_helper.deploy_dataset_stack(context, dataset)
stack_helper.deploy_dataset_stack(dataset)

dataset.userRoleForDataset = DatasetRole.Creator.value

Expand Down Expand Up @@ -222,7 +222,7 @@ def update_dataset(context, source, datasetUri: str = None, input: dict = None):
)
indexers.upsert_dataset(session, context.es, datasetUri)

stack_helper.deploy_dataset_stack(context, updated_dataset)
stack_helper.deploy_dataset_stack(updated_dataset)

return updated_dataset

Expand Down Expand Up @@ -493,7 +493,6 @@ def get_dataset_stack(context: Context, source: models.Dataset, **kwargs):
if not source:
return None
return stack_helper.get_stack_with_cfn_resources(
context=context,
targetUri=source.datasetUri,
environmentUri=source.environmentUri,
)
Expand Down Expand Up @@ -575,14 +574,12 @@ def delete_dataset(

if deleteFromAWS:
stack_helper.delete_stack(
context=context,
target_uri=datasetUri,
accountid=env.AwsAccountId,
cdk_role_arn=env.CDKRoleArn,
region=env.region,
target_type='dataset',
)
stack_helper.deploy_stack(context, dataset.environmentUri)
stack_helper.deploy_stack(dataset.environmentUri)
return True


Expand Down
13 changes: 11 additions & 2 deletions backend/dataall/api/Objects/Environment/input_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
],
)

ModifyEnvironmentParameterInput = gql.InputType(
name='ModifyEnvironmentParameterInput',
arguments=[
gql.Argument('key', gql.String),
gql.Argument('value', gql.String)
]
)

NewEnvironmentInput = gql.InputType(
name='NewEnvironmentInput',
arguments=[
Expand All @@ -21,7 +29,6 @@
gql.Argument('AwsAccountId', gql.NonNullableType(gql.String)),
gql.Argument('region', gql.NonNullableType(gql.String)),
gql.Argument('dashboardsEnabled', type=gql.Boolean),
gql.Argument('notebooksEnabled', type=gql.Boolean),
gql.Argument('mlStudiosEnabled', type=gql.Boolean),
gql.Argument('pipelinesEnabled', type=gql.Boolean),
gql.Argument('warehousesEnabled', type=gql.Boolean),
Expand All @@ -30,6 +37,8 @@
gql.Argument('publicSubnetIds', gql.ArrayType(gql.String)),
gql.Argument('EnvironmentDefaultIAMRoleName', gql.String),
gql.Argument('resourcePrefix', gql.String),
gql.Argument('parameters', gql.ArrayType(ModifyEnvironmentParameterInput))

],
)

Expand All @@ -44,11 +53,11 @@
gql.Argument('privateSubnetIds', gql.ArrayType(gql.String)),
gql.Argument('publicSubnetIds', gql.ArrayType(gql.String)),
gql.Argument('dashboardsEnabled', type=gql.Boolean),
gql.Argument('notebooksEnabled', type=gql.Boolean),
gql.Argument('mlStudiosEnabled', type=gql.Boolean),
gql.Argument('pipelinesEnabled', type=gql.Boolean),
gql.Argument('warehousesEnabled', type=gql.Boolean),
gql.Argument('resourcePrefix', gql.String),
gql.Argument('parameters', gql.ArrayType(ModifyEnvironmentParameterInput))
],
)

Expand Down
Loading

0 comments on commit 098081d

Please sign in to comment.