Skip to content

Commit

Permalink
[MAINTENANCE] persist batch configs (#9130)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
tyler-hoffman and pre-commit-ci[bot] committed Dec 19, 2023
1 parent cb05dbb commit 7aec10d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
34 changes: 34 additions & 0 deletions great_expectations/data_context/store/datasource_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from typing_extensions import TypedDict

from great_expectations.core.batch_config import BatchConfig
from great_expectations.core.serializer import AbstractConfigSerializer
from great_expectations.data_context.types.resource_identifiers import (
GXCloudIdentifier,
Expand Down Expand Up @@ -320,3 +321,36 @@ def _determine_datasource_key(self, datasource_name: str) -> DataContextVariable
resource_name=datasource_name,
)
return datasource_key

def add_batch_config(self, batch_config: BatchConfig) -> BatchConfig:
data_asset = batch_config.data_asset
key = DataContextVariableKey(resource_name=data_asset.datasource.name)

loaded_datasource = self.get(key)
assert isinstance(loaded_datasource, FluentDatasource)

loaded_asset = loaded_datasource.get_asset(data_asset.name)
batch_config_names = {bc.name for bc in loaded_asset.batch_configs}

if batch_config.name in batch_config_names:
raise ValueError(
f'"{batch_config.name}" already exists (all existing batch_config names are {", ".join(batch_config_names)})'
)

# This must be added for it to be picked up during serialization
loaded_asset.__fields_set__.add("batch_configs")

loaded_asset.batch_configs.append(batch_config)
updated_datasource = self._persist_datasource(key=key, config=loaded_datasource)
assert isinstance(updated_datasource, FluentDatasource)

updated_asset = updated_datasource.get_asset(data_asset.name)

updated_batch_config_as_list = [
bc for bc in updated_asset.batch_configs if bc.name == batch_config.name
]
assert len(updated_batch_config_as_list) == 1
updated_batch_config = updated_batch_config_as_list[0]

updated_batch_config._data_asset = updated_asset
return updated_batch_config
52 changes: 52 additions & 0 deletions tests/data_context/store/test_datasource_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest

import great_expectations.exceptions as gx_exceptions
from great_expectations.core.batch_config import BatchConfig
from great_expectations.core.data_context_key import DataContextVariableKey
from great_expectations.core.serializer import (
AbstractConfigSerializer,
Expand Down Expand Up @@ -35,6 +36,8 @@
NamedDatasourceSerializer,
YAMLReadyDictDatasourceConfigSerializer,
)
from great_expectations.datasource.fluent.interfaces import Datasource
from great_expectations.datasource.fluent.pandas_datasource import PandasDatasource
from tests.data_context.conftest import MockResponse

yaml = YAMLHandler()
Expand Down Expand Up @@ -198,6 +201,55 @@ def test_datasource_store_retrieval(
)


@pytest.mark.unit
def test_datasource_store__add_batch_config__success(
empty_datasource_store: DatasourceStore,
) -> None:
# Arrange
store = empty_datasource_store
datasource = PandasDatasource(name="foo")
asset = datasource.add_csv_asset("cool new asset", "taxi.csv")
key = DataContextVariableKey(resource_name=datasource.name)
store.set(key=key, value=datasource)

# Act
batch_config = BatchConfig(name="my cool batch config")
batch_config._data_asset = asset
updated_batch_config = store.add_batch_config(batch_config)

# Assert
updated_datasource = store.get(key=key)
assert updated_batch_config.name == batch_config.name
assert isinstance(updated_datasource, Datasource)
updated_batch_configs = updated_datasource.get_asset(asset.name).batch_configs
assert any(bc.name == batch_config.name for bc in updated_batch_configs)


@pytest.mark.unit
def test_datasource_store__add_batch_config__duplicate_name(
empty_datasource_store: DatasourceStore,
) -> None:
# Arrange
name = "whatever"
store = empty_datasource_store
datasource = PandasDatasource(name="foo")
asset = datasource.add_csv_asset("cool new asset", "taxi.csv")
key = DataContextVariableKey(resource_name=datasource.name)
store.set(key=key, value=datasource)

batch_config = BatchConfig(name=name)
batch_config._data_asset = asset
store.add_batch_config(batch_config)

# Act + Assert
new_batch_config = BatchConfig(name=name)
new_batch_config._data_asset = asset

with pytest.raises(ValueError) as e:
store.add_batch_config(new_batch_config)
assert "already exists" in str(e.value)


@pytest.mark.cloud
def test_datasource_store_set_cloud_mode(
block_config_datasource_config: DatasourceConfig,
Expand Down

0 comments on commit 7aec10d

Please sign in to comment.