Skip to content

Commit

Permalink
Asset config gql resolver (#8163)
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey authored and OwenKephart committed Jun 2, 2022
1 parent 45e5a3e commit 3b69d5c
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 91 deletions.
14 changes: 11 additions & 3 deletions examples/bollinger/bollinger/assets/bollinger_analysis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# pylint: disable=redefined-outer-name
from dagster import asset
from dagster import Field, asset

from ..lib import (
AnomalousEventsDgType,
Expand All @@ -22,11 +22,19 @@ def sp500_prices():

@asset(
dagster_type=BollingerBandsDgType,
config_schema={
"rate": Field(int, default_value=30, description="Size of sliding window in days"),
"sigma": Field(
float, default_value=2.0, description="Width of envelope in standard deviations"
),
},
metadata={"owner": "alice@example.com"},
)
def sp500_bollinger_bands(sp500_prices):
def sp500_bollinger_bands(context, sp500_prices):
"""Bollinger bands for the S&P 500 stock prices."""
return compute_bollinger_bands_multi(sp500_prices)
return compute_bollinger_bands_multi(
sp500_prices, rate=context.op_config["rate"], sigma=context.op_config["sigma"]
)


@asset(
Expand Down
8 changes: 6 additions & 2 deletions examples/bollinger/bollinger/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,14 @@ def compute_bollinger_bands(
return odf


def compute_bollinger_bands_multi(df: pd.DataFrame, dropna: bool = True):
def compute_bollinger_bands_multi(
df: pd.DataFrame, dropna: bool = True, rate: int = 30, sigma: float = 2.0
):
"""Compute Bollinger bands for a set of stocks over time. The input dataframe can contain
multiple timeseries grouped by the `name` column."""
odf = df.groupby("name").apply(lambda idf: compute_bollinger_bands(idf, dropna=False))
odf = df.groupby("name").apply(
lambda idf: compute_bollinger_bands(idf, dropna=False, rate=rate, sigma=sigma)
)
return odf.dropna().reset_index() if dropna else odf


Expand Down
19 changes: 10 additions & 9 deletions js_modules/dagit/packages/core/src/graphql/schema.graphql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import graphene
from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.config_types import GrapheneConfigTypeField
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
from dagster_graphql.schema.solids import (
GrapheneCompositeSolidDefinition,
Expand Down Expand Up @@ -98,6 +99,7 @@ class GrapheneAssetNode(graphene.ObjectType):
limit=graphene.Int(),
)
computeKind = graphene.String()
configField = graphene.Field(GrapheneConfigTypeField)
dependedBy = non_null_list(GrapheneAssetDependency)
dependedByKeys = non_null_list(GrapheneAssetKey)
dependencies = non_null_list(GrapheneAssetDependency)
Expand Down Expand Up @@ -170,6 +172,16 @@ def external_repository(self) -> ExternalRepository:
def external_asset_node(self) -> ExternalAssetNode:
return self._external_asset_node

def get_op_definition(
self,
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if len(self._external_asset_node.job_names) >= 1:
pipeline_name = self._external_asset_node.job_names[0]
pipeline = self._external_repository.get_full_external_pipeline(pipeline_name)
return build_solid_definition(pipeline, self._external_asset_node.op_name)
else:
return None

def get_partition_keys(self) -> Sequence[str]:
# TODO: Add functionality for dynamic partitions definition
partitions_def_data = self._external_asset_node.partitions_def_data
Expand Down Expand Up @@ -226,6 +238,11 @@ def resolve_assetMaterializations(
)
]

# TODO: Prob want a more efficient way of resolving this
def resolve_configField(self, _graphene_info) -> Optional[GrapheneConfigTypeField]:
op = self.get_op_definition()
return op.resolve_config_field(_graphene_info) if op else None

def resolve_computeKind(self, _graphene_info) -> Optional[str]:
return self._external_asset_node.compute_kind

Expand Down Expand Up @@ -378,12 +395,7 @@ def resolve_metadata_entries(self, _graphene_info) -> Sequence[GrapheneMetadataE
def resolve_op(
self, _graphene_info
) -> Optional[Union[GrapheneSolidDefinition, GrapheneCompositeSolidDefinition]]:
if len(self._external_asset_node.job_names) >= 1:
pipeline_name = self._external_asset_node.job_names[0]
pipeline = self._external_repository.get_full_external_pipeline(pipeline_name)
return build_solid_definition(pipeline, self._external_asset_node.op_name)
else:
return None
return self.get_op_definition()

def resolve_opNames(self, _graphene_info) -> Sequence[str]:
return self._external_asset_node.op_names or []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=missing-graphene-docstring
from typing import List, Optional, Union

import graphene

import dagster._check as check
Expand All @@ -8,8 +10,20 @@

from .util import non_null_list

GrapheneConfigTypeUnion = Union[
"GrapheneEnumConfigType",
"GrapheneCompositeConfigType",
"GrapheneArrayConfigType",
"GrapheneMapConfigType",
"GrapheneNullableConfigType",
"GrapheneRegularConfigType",
"GrapheneScalarUnionConfigType",
]


def to_config_type(config_schema_snapshot, config_type_key):
def to_config_type(
config_schema_snapshot: ConfigSchemaSnapshot, config_type_key: str
) -> GrapheneConfigTypeUnion:
check.inst_param(config_schema_snapshot, "config_schema_snapshot", ConfigSchemaSnapshot)
check.str_param(config_type_key, "config_type_key")

Expand Down Expand Up @@ -81,7 +95,9 @@ class Meta:


class ConfigTypeMixin:
def __init__(self, config_schema_snapshot, config_type_snap):
def __init__(
self, config_schema_snapshot: ConfigSchemaSnapshot, config_type_snap: ConfigTypeSnap
):
self._config_type_snap = check.inst_param(
config_type_snap, "config_type_snap", ConfigTypeSnap
)
Expand All @@ -90,7 +106,7 @@ def __init__(self, config_schema_snapshot, config_type_snap):
)
super().__init__(**_ctor_kwargs_for_snap(config_type_snap))

def resolve_recursive_config_types(self, _graphene_info):
def resolve_recursive_config_types(self, _graphene_info) -> List[GrapheneConfigTypeUnion]:
return list(
map(
lambda key: to_config_type(self._config_schema_snapshot, key),
Expand Down Expand Up @@ -121,19 +137,19 @@ class Meta:
interfaces = (GrapheneConfigType,)
name = "MapConfigType"

def resolve_key_type(self, _graphene_info):
def resolve_key_type(self, _graphene_info) -> GrapheneConfigTypeUnion:
return to_config_type(
self._config_schema_snapshot,
self._config_type_snap.key_type_key,
)

def resolve_value_type(self, _graphene_info):
def resolve_value_type(self, _graphene_info) -> GrapheneConfigTypeUnion:
return to_config_type(
self._config_schema_snapshot,
self._config_type_snap.inner_type_key,
)

def resolve_key_label_name(self, _graphene_info):
def resolve_key_label_name(self, _graphene_info) -> Optional[str]:
return self._config_type_snap.given_name


Expand All @@ -149,7 +165,7 @@ class Meta:
interfaces = (GrapheneConfigType, GrapheneWrappingConfigType)
name = "ArrayConfigType"

def resolve_of_type(self, _graphene_info):
def resolve_of_type(self, _graphene_info) -> GrapheneConfigTypeUnion:
return to_config_type(
self._config_schema_snapshot,
self._config_type_snap.inner_type_key,
Expand All @@ -166,22 +182,22 @@ class Meta:
interfaces = (GrapheneConfigType,)
name = "ScalarUnionConfigType"

def get_scalar_type_key(self):
def get_scalar_type_key(self) -> str:
return self._config_type_snap.scalar_type_key

def get_non_scalar_type_key(self):
def get_non_scalar_type_key(self) -> str:
return self._config_type_snap.non_scalar_type_key

def resolve_scalar_type_key(self, _):
def resolve_scalar_type_key(self, _) -> str:
return self.get_scalar_type_key()

def resolve_non_scalar_type_key(self, _):
def resolve_non_scalar_type_key(self, _) -> str:
return self.get_non_scalar_type_key()

def resolve_scalar_type(self, _):
def resolve_scalar_type(self, _) -> GrapheneConfigTypeUnion:
return to_config_type(self._config_schema_snapshot, self.get_scalar_type_key())

def resolve_non_scalar_type(self, _):
def resolve_non_scalar_type(self, _) -> GrapheneConfigTypeUnion:
return to_config_type(self._config_schema_snapshot, self.get_non_scalar_type_key())


Expand All @@ -190,7 +206,7 @@ class Meta:
interfaces = (GrapheneConfigType, GrapheneWrappingConfigType)
name = "NullableConfigType"

def resolve_of_type(self, _graphene_info):
def resolve_of_type(self, _graphene_info) -> GrapheneConfigTypeUnion:
return to_config_type(self._config_schema_snapshot, self._config_type_snap.inner_type_key)


Expand All @@ -210,10 +226,10 @@ class Meta:
values = non_null_list(GrapheneEnumConfigValue)
given_name = graphene.NonNull(graphene.String)

def resolve_values(self, _graphene_info):
def resolve_values(self, _graphene_info) -> List[GrapheneEnumConfigValue]:
return [
GrapheneEnumConfigValue(value=ev.value, description=ev.description)
for ev in self._config_type_snap.enum_values
for ev in check.not_none(self._config_type_snap.enum_values)
]

def resolve_given_name(self, _):
Expand All @@ -231,24 +247,26 @@ class GrapheneConfigTypeField(graphene.ObjectType):
class Meta:
name = "ConfigTypeField"

def resolve_config_type_key(self, _):
def resolve_config_type_key(self, _) -> str:
return self._field_snap.type_key

def __init__(self, config_schema_snapshot, field_snap):
def __init__(self, config_schema_snapshot: ConfigSchemaSnapshot, field_snap: ConfigFieldSnap):
self._config_schema_snapshot = check.inst_param(
config_schema_snapshot, "config_schema_snapshot", ConfigSchemaSnapshot
)
self._field_snap = check.inst_param(field_snap, "field_snap", ConfigFieldSnap)
self._field_snap: ConfigFieldSnap = check.inst_param(
field_snap, "field_snap", ConfigFieldSnap
)
super().__init__(
name=field_snap.name,
description=field_snap.description,
is_required=field_snap.is_required,
)

def resolve_config_type(self, _graphene_info):
def resolve_config_type(self, _graphene_info) -> GrapheneConfigTypeUnion:
return to_config_type(self._config_schema_snapshot, self._field_snap.type_key)

def resolve_default_value_as_json(self, _graphene_info):
def resolve_default_value_as_json(self, _graphene_info) -> Optional[str]:
return self._field_snap.default_value_as_json_str


Expand All @@ -259,14 +277,14 @@ class Meta:
interfaces = (GrapheneConfigType,)
name = "CompositeConfigType"

def resolve_fields(self, _graphene_info):
def resolve_fields(self, _graphene_info) -> List[GrapheneConfigTypeField]:
return sorted(
[
GrapheneConfigTypeField(
config_schema_snapshot=self._config_schema_snapshot,
field_snap=field_snap,
)
for field_snap in self._config_type_snap.fields
for field_snap in (self._config_type_snap.fields or [])
],
key=lambda field: field.name,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# pylint: disable=missing-graphene-docstring
from typing import Optional, Union

import graphene
from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.metadata import GrapheneMetadataEntry

import dagster._check as check
from dagster.core.snap import PipelineSnapshot
from dagster.core.snap.dagster_types import DagsterTypeSnap
from dagster.core.types.dagster_type import DagsterTypeKind

from .config_types import GrapheneConfigType, to_config_type
from .config_types import GrapheneConfigType, GrapheneConfigTypeUnion, to_config_type
from .errors import (
GrapheneDagsterTypeNotFoundError,
GraphenePipelineNotFoundError,
Expand All @@ -16,18 +19,22 @@
from .util import non_null_list


def config_type_for_schema(pipeline_snapshot, schema_key):
def config_type_for_schema(
pipeline_snapshot: PipelineSnapshot, schema_key: Optional[str]
) -> Optional[GrapheneConfigTypeUnion]:
return (
to_config_type(pipeline_snapshot.config_schema_snapshot, schema_key) if schema_key else None
)


def to_dagster_type(pipeline_snapshot, dagster_type_key):
def to_dagster_type(
pipeline_snapshot: PipelineSnapshot, dagster_type_key: str
) -> Union["GrapheneListDagsterType", "GrapheneNullableDagsterType", "GrapheneRegularDagsterType"]:
check.inst_param(pipeline_snapshot, "pipeline_snapshot", PipelineSnapshot)
check.str_param(dagster_type_key, "dagster_type_key")
check.inst_param(pipeline_snapshot, pipeline_snapshot, PipelineSnapshot)

dagster_type_meta = pipeline_snapshot.dagster_type_namespace_snapshot.get_dagster_type_snap(
dagster_type_key
dagster_type_meta: DagsterTypeSnap = (
pipeline_snapshot.dagster_type_namespace_snapshot.get_dagster_type_snap(dagster_type_key)
)

base_args = dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,10 @@ class Meta:
interfaces = (GrapheneISolidDefinition,)
name = "SolidDefinition"

def __init__(self, represented_pipeline, solid_def_name):
def __init__(self, represented_pipeline: RepresentedPipeline, solid_def_name: str):
check.inst_param(represented_pipeline, "represented_pipeline", RepresentedPipeline)
self._solid_def_snap = represented_pipeline.get_node_def_snap(solid_def_name)
super().__init__(name=solid_def_name, description=self._solid_def_snap.description)
super().__init__(name=solid_def_name, description=self._solid_def_snap.description) # type: ignore
ISolidDefinitionMixin.__init__(self, represented_pipeline, solid_def_name)

def resolve_config_field(self, _graphene_info):
Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/config/snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class ConfigFieldSnap(
("type_key", str),
("is_required", bool),
("default_provided", bool),
("default_value_as_json_str", Optional[object]),
("default_value_as_json_str", Optional[str]),
("description", Optional[str]),
],
)
Expand Down

0 comments on commit 3b69d5c

Please sign in to comment.