Skip to content
Permalink
Browse files

(storage-plugin-system-4) Eliminate RunStorage and new strategy for t…

…ype plugins

Summary:
This is a change to the type storage system plugins. We have
added a function to the TypeStoragePlugin interface called
applies_to_storage that determines whether or not to use a
plugin based on the SystemStorageDefinition that is active
for the current execution.

I have also renamed the plugins passed to types "auto_plugins". These
are plugins that the system attempts to load whenver that type is
included in a pipeline.

Extensions of this would include:

1) Pipeline-specific overrides. Imagine if a user wants to either a)
   override the type storage plugin. (E.g. they want to persist dataframes
   in s3 in a specific way but still use the out-of-the-box system storage
   definition) or they want to have a type storage plugin for their own
   system storage definition.

2) Config-driven type plugins. Allow users to configure type plugins.
   For example maybe they want to persist spark data frames as csv
   instead of parquet.

Also deleted the unused doc snapshots.

Test Plan: buildkite

Reviewers: natekupp, max, alangenfeld

Reviewed By: natekupp, max

Differential Revision: https://dagster.phacility.com/D341
  • Loading branch information...
schrockn committed Jun 8, 2019
1 parent 05afca0 commit 116f043bd5942de354130b7b5ca63de7ff148212
No changes.

This file was deleted.

@@ -19,11 +19,12 @@
check,
output_selector_schema,
)
from dagster.core.storage.runs import RunStorageMode
from dagster.core.definitions.system_storage import fs_system_storage
from dagster.core.storage.type_storage import TypeStoragePlugin
from dagster.core.types.runtime import Stringish, RuntimeType

from dagster_aws.s3.types import BytesIOS3StoragePlugin
from dagster_aws.s3.system_storage import s3_system_storage

AirlineDemoResources = namedtuple(
'AirlineDemoResources',
@@ -32,6 +33,10 @@


class SparkDataFrameS3StoragePlugin(TypeStoragePlugin): # pylint: disable=no-init
@classmethod
def applies_to_storage(cls, system_storage_def):
return system_storage_def is s3_system_storage

@classmethod
def set_object(cls, intermediate_store, obj, _context, _runtime_type, paths):
target_path = intermediate_store.object_store.key_for_paths(paths)
@@ -46,6 +51,10 @@ def get_object(cls, intermediate_store, context, _runtime_type, paths):


class SparkDataFrameFilesystemStoragePlugin(TypeStoragePlugin): # pylint: disable=no-init
@classmethod
def applies_to_storage(cls, system_storage_def):
return system_storage_def is fs_system_storage

@classmethod
def set_object(cls, intermediate_store, obj, _context, _runtime_type, paths):
target_path = os.path.join(intermediate_store.root, *paths)
@@ -86,10 +95,7 @@ def spark_df_output_schema(_context, file_type, file_options, spark_df):
DataFrame,
name='SparkDataFrameType',
description='A Pyspark data frame.',
storage_plugins={
RunStorageMode.S3: SparkDataFrameS3StoragePlugin,
RunStorageMode.FILESYSTEM: SparkDataFrameFilesystemStoragePlugin,
},
auto_plugins=[SparkDataFrameS3StoragePlugin, SparkDataFrameFilesystemStoragePlugin],
output_schema=spark_df_output_schema,
)

@@ -108,10 +114,10 @@ def __init__(self):

class FileFromPath(RuntimeType):
def __init__(self):
self.storage_plugins = {RunStorageMode.S3: BytesIOS3StoragePlugin}
super(FileFromPath, self).__init__(
'FileFromPath',
'FileFromPath',
auto_plugins=[BytesIOS3StoragePlugin],
description='Bytes representing the contents of a file at a specific path.',
)

@@ -68,7 +68,6 @@
)

from dagster.core.storage.init import InitSystemStorageContext
from dagster.core.storage.runs import RunStorageMode

from dagster.core.types import (
Any,
@@ -148,7 +147,6 @@
'PipelineConfigEvaluationError',
'PipelineExecutionResult',
'RunConfig',
'RunStorageMode',
'SolidExecutionResult',
'SystemStoragePluginData',
# Errors
@@ -110,7 +110,7 @@ def fs_system_storage(init_context):
intermediates_manager=IntermediateStoreIntermediatesManager(
FileSystemIntermediateStore(
run_id=init_context.run_config.run_id,
types_to_register=init_context.type_storage_plugin_registry,
type_storage_plugin_registry=init_context.type_storage_plugin_registry,
base_dir=base_dir,
)
),
@@ -30,7 +30,6 @@ class RunConfig(
produced during execution.
loggers (list): Additional loggers that log messages will be sent to.
executor_config (ExecutorConfig): Configuration for where and how computation will occur.
storage_mode (RunStorageMode): Where intermediate artifacts will be stored during execution.
rexecution_config (RexecutionConfig): Information about a previous run to allow
for subset rexecution.
step_keys_to_execute (list[str]): They subset of steps from a pipeline to execute this run.
@@ -22,7 +22,7 @@
from dagster.core.log_manager import DagsterLogManager
from dagster.core.storage.init import InitSystemStorageContext
from dagster.core.storage.intermediates_manager import IntermediatesManager
from dagster.core.storage.runs import DagsterRunMeta, RunStorage, RunStorageMode
from dagster.core.storage.runs import DagsterRunMeta, RunStorage
from dagster.core.storage.type_storage import construct_type_storage_plugin_registry
from dagster.core.system_config.objects import EnvironmentConfig
from dagster.core.types.evaluator import (
@@ -96,12 +96,12 @@ def construct_system_storage_data(storage_init_context):

def system_storage_def_from_config(mode_definition, environment_config):
for system_storage_def in mode_definition.system_storage_defs:
if system_storage_def.name == environment_config.storage.storage_mode:
if system_storage_def.name == environment_config.storage.system_storage_name:
return system_storage_def

check.failed(
'Could not find storage mode {}. Should have be caught by config system'.format(
environment_config.storage.storage_mode
environment_config.storage.system_storage_name
)
)

@@ -165,15 +165,11 @@ def scoped_pipeline_context(
pipeline_def=pipeline_def,
mode_def=mode_def,
system_storage_def=system_storage_def,
system_storage_config=environment_config.storage.storage_config,
system_storage_config=environment_config.storage.system_storage_config,
run_config=run_config,
environment_config=environment_config,
type_storage_plugin_registry=construct_type_storage_plugin_registry(
pipeline_def,
# TODO eliminate once type plugin system migrated to system storage
RunStorageMode.from_environment_config(
environment_config.storage.storage_mode
),
pipeline_def, system_storage_def
),
solid_resources_builder=solid_resources_builder,
)
@@ -95,7 +95,7 @@ def build(self):
step_dict = {step.key: step for step in self.steps}

system_storage_def = self.mode_definition.get_system_storage_def(
self.environment_config.storage.storage_mode
self.environment_config.storage.system_storage_name
)

return ExecutionPlan(self.pipeline_def, step_dict, deps, system_storage_def.is_persistent)
@@ -5,7 +5,7 @@
from dagster.core.definitions.resource import SolidResourcesBuilder
from dagster.core.execution.config import RunConfig
from dagster.core.system_config.objects import EnvironmentConfig
from dagster.core.types.runtime import RuntimeType
from dagster.core.storage.type_storage import TypeStoragePluginRegistry


class InitSystemStorageContext(
@@ -39,11 +39,10 @@ def __new__(
environment_config=check.inst_param(
environment_config, 'environment_config', EnvironmentConfig
),
type_storage_plugin_registry=check.dict_param(
type_storage_plugin_registry=check.inst_param(
type_storage_plugin_registry,
'type_storage_plugin_registry',
key_type=RuntimeType,
value_type=type, # subclass of TypeStoragePlugin
TypeStoragePluginRegistry,
),
solid_resources_builder=check.inst_param(
solid_resources_builder, 'solid_resources_builder', SolidResourcesBuilder
@@ -9,15 +9,16 @@
from dagster.core.types.runtime import RuntimeType, resolve_to_runtime_type

from .object_store import ObjectStore, FileSystemObjectStore
from .runs import RunStorageMode
from .type_storage import TypeStoragePluginRegistry


class IntermediateStore(six.with_metaclass(ABCMeta)):
def __init__(self, object_store, root, types_to_register=None):
def __init__(self, object_store, root, type_storage_plugin_registry):
self.root = check.str_param(root, 'root')
self.object_store = check.inst_param(object_store, 'object_store', ObjectStore)
self.registry = TypeStoragePluginRegistry(types_to_register)
self.type_storage_plugin_registry = check.inst_param(
type_storage_plugin_registry, 'type_storage_plugin_registry', TypeStoragePluginRegistry
)

def uri_for_paths(self, paths, protocol=None):
check.list_param(paths, 'paths', of_type=str)
@@ -67,22 +68,26 @@ def copy_object_from_prev_run(self, context, previous_run_id, paths):
pass

def set_value(self, obj, context, runtime_type, paths):
if self.registry.is_registered(runtime_type):
return self.registry.get(runtime_type.name).set_object(
if self.type_storage_plugin_registry.is_registered(runtime_type):
return self.type_storage_plugin_registry.get(runtime_type.name).set_object(
self, obj, context, runtime_type, paths
)
elif runtime_type.name is None:
self.registry.check_for_unsupported_composite_overrides(runtime_type)
self.type_storage_plugin_registry.check_for_unsupported_composite_overrides(
runtime_type
)

return self.set_object(obj, context, runtime_type, paths)

def get_value(self, context, runtime_type, paths):
if self.registry.is_registered(runtime_type):
return self.registry.get(runtime_type.name).get_object(
if self.type_storage_plugin_registry.is_registered(runtime_type):
return self.type_storage_plugin_registry.get(runtime_type.name).get_object(
self, context, runtime_type, paths
)
elif runtime_type.name is None:
self.registry.check_for_unsupported_composite_overrides(runtime_type)
self.type_storage_plugin_registry.check_for_unsupported_composite_overrides(
runtime_type
)
return self.get_object(context, runtime_type, paths)

@staticmethod
@@ -108,9 +113,15 @@ def rm_intermediate(self, context, step_key, output_name='result'):


class FileSystemIntermediateStore(IntermediateStore):
def __init__(self, run_id, types_to_register=None, base_dir=None):
def __init__(self, run_id, type_storage_plugin_registry=None, base_dir=None):
self.run_id = check.str_param(run_id, 'run_id')
self.storage_mode = RunStorageMode.FILESYSTEM
type_storage_plugin_registry = check.inst_param(
type_storage_plugin_registry
if type_storage_plugin_registry
else TypeStoragePluginRegistry(types_to_register={}),
'type_storage_plugin_registry',
TypeStoragePluginRegistry,
)

self._base_dir = os.path.abspath(
os.path.expanduser(
@@ -130,7 +141,7 @@ def __init__(self, run_id, types_to_register=None, base_dir=None):
root = object_store.key_for_paths([self.base_dir, 'dagster', 'runs', run_id, 'files'])

super(FileSystemIntermediateStore, self).__init__(
object_store, root=root, types_to_register=types_to_register
object_store, root=root, type_storage_plugin_registry=type_storage_plugin_registry
)

@property
@@ -8,7 +8,6 @@
from dagster.core.types.runtime import RuntimeType

from .intermediate_store import IntermediateStore
from .runs import RunStorageMode


class IntermediatesManager(six.with_metaclass(ABCMeta)): # pylint: disable=no-init
@@ -46,7 +45,6 @@ class InMemoryIntermediatesManager(IntermediatesManager):
def __init__(self):

self.values = {}
self.storage_mode = RunStorageMode.IN_MEMORY

# Note:
# For the in-memory manager context and runtime are currently optional
@@ -81,7 +79,6 @@ def __init__(self, intermediate_store):
self._intermediate_store = check.inst_param(
intermediate_store, 'intermediate_store', IntermediateStore
)
self.storage_mode = self._intermediate_store.storage_mode

def _get_paths(self, step_output_handle):
return ['intermediates', step_output_handle.step_key, step_output_handle.output_name]
@@ -1,14 +1,12 @@
from abc import ABCMeta, abstractmethod
from collections import namedtuple, OrderedDict
from enum import Enum
import json
import os
import shutil

import six

from dagster import check, seven
from dagster.core.errors import DagsterInvariantViolationError
from dagster.utils import mkdir_p, list_pull


@@ -140,23 +138,3 @@ def nuke(self):
@property
def is_persistent(self):
return False


# TODO eliminate once type plugin system migrated to system storage
class RunStorageMode(Enum):
IN_MEMORY = 'IN_MEMORY'
FILESYSTEM = 'FILESYSTEM'
S3 = 'S3'

@classmethod
def from_environment_config(cls, mode):
check.opt_str_param(mode, 'mode')

if mode == 'filesystem':
return RunStorageMode.FILESYSTEM
elif mode == 's3':
return RunStorageMode.S3
elif mode == 'in_memory' or mode is None:
return RunStorageMode.IN_MEMORY
else:
raise DagsterInvariantViolationError('Invalid storage specified {}'.format(mode))
@@ -8,9 +8,14 @@
class TypeStoragePlugin(six.with_metaclass(ABCMeta)): # pylint: disable=no-init
'''Base class for storage plugins.
Extend this class for (storage_mode, runtime_type) pairs that need special handling.
Extend this class for (system_storage_name, runtime_type) pairs that need special handling.
'''

@classmethod
@abstractmethod
def applies_to_storage(self, system_storage_def):
raise NotImplementedError()

@classmethod
@abstractmethod
def set_object(cls, intermediate_store, obj, context, runtime_type, paths):
@@ -99,11 +104,17 @@ def check_for_unsupported_composite_overrides(self, runtime_type):
)


def construct_type_storage_plugin_registry(pipeline_def, storage_mode):
def construct_type_storage_plugin_registry(pipeline_def, system_storage_def):
# Needed to avoid circular dep
from dagster.core.definitions import PipelineDefinition, SystemStorageDefinition

check.inst_param(pipeline_def, 'pipeline_def', PipelineDefinition)
check.inst_param(system_storage_def, 'system_storage_def', SystemStorageDefinition)

type_plugins = {}
for type_obj in pipeline_def.all_runtime_types():
storage_plugin = type_obj.storage_plugins.get(storage_mode)
if storage_plugin:
type_plugins[type_obj] = storage_plugin
for auto_plugin in type_obj.auto_plugins:
if auto_plugin.applies_to_storage(system_storage_def):
type_plugins[type_obj] = auto_plugin

return type_plugins
return TypeStoragePluginRegistry(type_plugins)
@@ -105,18 +105,22 @@ def __new__(cls):
return super(ExecutionConfig, cls).__new__(cls)


class StorageConfig(namedtuple('_FilesConfig', 'storage_mode storage_config')):
def __new__(cls, storage_mode, storage_config):
class StorageConfig(namedtuple('_FilesConfig', 'system_storage_name system_storage_config')):
def __new__(cls, system_storage_name, system_storage_config):
return super(StorageConfig, cls).__new__(
cls,
storage_mode=check.opt_str_param(storage_mode, 'storage_mode', 'in_memory'),
storage_config=check.opt_dict_param(storage_config, 'storage_config', key_type=str),
system_storage_name=check.opt_str_param(
system_storage_name, 'system_storage_name', 'in_memory'
),
system_storage_config=check.opt_dict_param(
system_storage_config, 'system_storage_config', key_type=str
),
)

@staticmethod
def from_dict(config=None):
check.opt_dict_param(config, 'config', key_type=str)
if config:
storage_mode, storage_config = single_item(config)
return StorageConfig(storage_mode, storage_config.get('config'))
system_storage_name, system_storage_config = single_item(config)
return StorageConfig(system_storage_name, system_storage_config.get('config'))
return StorageConfig(None, None)

0 comments on commit 116f043

Please sign in to comment.
You can’t perform that action at this time.