Skip to content

Commit

Permalink
💥 ♻️ Rename all 'data_set' codes entries to 'dataset'(#391)
Browse files Browse the repository at this point in the history
  • Loading branch information
Galileo-Galilei committed Oct 22, 2023
1 parent d0505fe commit 53522ad
Show file tree
Hide file tree
Showing 18 changed files with 74 additions and 74 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
- :boom: :recycle: Rename the following ``DataSets`` to make their use more explicit, and use the ``Dataset`` suffix ([#465, ShubhamZoro](https://github.com/Galileo-Galilei/kedro-mlflow/pull/465)):
- ``MlflowModelLoggerDataSet``->``MlflowModelTrackingDataset``
- ``MlflowModelSaverDataSet``->``MlflowModelLocalFileSystemDataset``

- :boom: :sparkles: Change default ``copy_mode`` to ``"assign"`` in ``KedroPipelineModel`` because this is the most efficient setup (and usually the desired one) when serving a Kedro ``Pipeline`` as a Mlflow model. This is different from Kedro's default which is to deepcopy the dataset ([#463, ShubhamZoro](https://github.com/Galileo-Galilei/kedro-mlflow/pull/463)).
- :boom: :recycle: ``MlflowArtifactDataset.__init__`` method ``data_set`` argument is renamed ``dataset`` to match new Kedro conventions ().

## [0.11.10] - 2023-10-03

Expand Down
2 changes: 1 addition & 1 deletion docs/source/03_getting_started/02_first_steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ example_iris_data:

example_model:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pickle.PickleDataset
filepath: data/06_models/trained_model.pkl
```
Expand Down
16 changes: 8 additions & 8 deletions docs/source/04_experimentation_tracking/03_version_datasets.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ You can change it to:
```yaml
my_dataset_to_version:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/LOCAL/destination/file.csv # must be a local file, wherever you want to log the data in the end
```
Expand All @@ -40,19 +40,19 @@ and this dataset will be automatically versioned in each pipeline execution.

### Can I pass extra parameters to the ``MlflowArtifactDataset`` for finer control?

The ``MlflowArtifactDataset`` takes a ``data_set`` argument which is a python dictionary passed to the ``__init__`` method of the dataset declared in ``type``. It means that you can pass any argument accepted by the underlying dataset in this dictionary. If you want to pass ``load_args`` and ``save_args`` in the previous example, add them in the ``data_set`` argument:
The ``MlflowArtifactDataset`` takes a ``dataset`` argument which is a python dictionary passed to the ``__init__`` method of the dataset declared in ``type``. It means that you can pass any argument accepted by the underlying dataset in this dictionary. If you want to pass ``load_args`` and ``save_args`` in the previous example, add them in the ``dataset`` argument:

```yaml
my_dataset_to_version:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/local/destination/file.csv
load_args:
sep: ;
save_args:
sep: ;
# ... any other valid arguments for data_set
# ... any other valid arguments for dataset
```

### Can I use the ``MlflowArtifactDataset`` in interactive mode?
Expand All @@ -64,7 +64,7 @@ from kedro_mlflow.io.artifacts import MlflowArtifactDataset
from kedro_datasets.pandas import CSVDataset

csv_dataset = MlflowArtifactDataSet(
data_set={
dataset={
"type": CSVDataset, # either a string "pandas.CSVDataset" or the class
"filepath": r"/path/to/a/local/destination/file.csv",
}
Expand All @@ -90,7 +90,7 @@ The ``MlflowArtifactDataset`` has an extra attribute ``run_id`` which specifies
```yaml
my_dataset_to_version:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/local/destination/file.csv # must be a local filepath, no matter what is your actual mlflow storage (S3 or other)
run_id: 13245678910111213 # a valid mlflow run to log in. If None, default to active run
Expand All @@ -105,7 +105,7 @@ You may want to reuse th artifact of a previous run to reuse it in another one,
```yaml
my_dataset_to_reload:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/local/destination/file.csv # must be a local filepath, no matter what is your actual mlflow storage (S3 or other)
run_id: 13245678910111213 # a valid mlflow run with the existing artifact. It must be named "file.csv"
Expand All @@ -120,7 +120,7 @@ With below example, the artifact will be logged in mlflow within a `reporting` f
```yaml
my_dataset_to_version:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/local/destination/file.csv
artifact_path: reporting # relative path where the remote artifact must be stored. if None, saved in root folder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ If you want to save your model both locally and remotely within the same run, yo
```yaml
sklearn_model:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: kedro_mlflow.io.models.MlflowModelLocalFileSystemDataset
flavor: mlflow.sklearn
filepath: data/06_models/sklearn_model
Expand Down
8 changes: 4 additions & 4 deletions docs/source/07_python_objects/01_DataSets.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
```yaml
my_dataset_to_version:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/local/destination/file.csv
```
Expand All @@ -17,14 +17,14 @@ or with additional parameters:
```yaml
my_dataset_to_version:
type: kedro_mlflow.io.artifacts.MlflowArtifactDataset
data_set:
dataset:
type: pandas.CSVDataset # or any valid kedro DataSet
filepath: /path/to/a/local/destination/file.csv
load_args:
sep: ;
save_args:
sep: ;
# ... any other valid arguments for data_set
# ... any other valid arguments for dataset
run_id: 13245678910111213 # a valid mlflow run to log in. If None, default to active run
artifact_path: reporting # relative path where the artifact must be stored. if None, saved in root folder.
```
Expand All @@ -36,7 +36,7 @@ from kedro_mlflow.io.artifacts import MlflowArtifactDataset
from kedro_datasets.pandas import CSVDataset

csv_dataset = MlflowArtifactDataset(
data_set={"type": CSVDataset, "filepath": r"/path/to/a/local/destination/file.csv"}
dataset={"type": CSVDataset, "filepath": r"/path/to/a/local/destination/file.csv"}
)
csv_dataset.save(data=pd.DataFrame({"a": [1, 2], "b": [3, 4]}))
```
Expand Down
16 changes: 8 additions & 8 deletions kedro_mlflow/framework/hooks/mlflow_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def after_context_created(
experiment_name = context._package_name
if experiment_name is None:
# context._package_name may be None if the session is created interactively
metadata = _get_project_metadata(context._project_path)
metadata = _get_project_metadata(context.project_path)
experiment_name = metadata.package_name
mlflow_config.tracking.experiment.name = experiment_name

Expand All @@ -140,40 +140,40 @@ def after_catalog_created(
):
# we use this hooks to modif "MlflowmetricsDataset" to ensure consistency
# of the metric name with the catalog name
for name, dataset in catalog._data_sets.items():
for name, dataset in catalog._datasets.items():
if isinstance(dataset, MlflowMetricsDataset) and dataset._prefix is None:
if dataset._run_id is not None:
catalog._data_sets[name] = MlflowMetricsDataset(
catalog._datasets[name] = MlflowMetricsDataset(
run_id=dataset._run_id, prefix=name
)
else:
catalog._data_sets[name] = MlflowMetricsDataset(prefix=name)
catalog._datasets[name] = MlflowMetricsDataset(prefix=name)

if isinstance(dataset, MlflowMetricDataset) and dataset.key is None:
if dataset._run_id is not None:
catalog._data_sets[name] = MlflowMetricDataset(
catalog._datasets[name] = MlflowMetricDataset(
run_id=dataset._run_id,
key=name,
load_args=dataset._load_args,
save_args=dataset._save_args,
)
else:
catalog._data_sets[name] = MlflowMetricDataset(
catalog._datasets[name] = MlflowMetricDataset(
key=name,
load_args=dataset._load_args,
save_args=dataset._save_args,
)

if isinstance(dataset, MlflowMetricHistoryDataset) and dataset.key is None:
if dataset._run_id is not None:
catalog._data_sets[name] = MlflowMetricHistoryDataset(
catalog._datasets[name] = MlflowMetricHistoryDataset(
run_id=dataset._run_id,
key=name,
load_args=dataset._load_args,
save_args=dataset._save_args,
)
else:
catalog._data_sets[name] = MlflowMetricHistoryDataset(
catalog._datasets[name] = MlflowMetricHistoryDataset(
key=name,
load_args=dataset._load_args,
save_args=dataset._save_args,
Expand Down
12 changes: 6 additions & 6 deletions kedro_mlflow/io/artifacts/mlflow_artifact_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ class MlflowArtifactDataset(AbstractVersionedDataset):

def __new__(
cls,
data_set: Union[str, Dict],
dataset: Union[str, Dict],
run_id: str = None,
artifact_path: str = None,
credentials: Dict[str, Any] = None,
):
data_set, data_set_args = parse_dataset_definition(config=data_set)
dataset, dataset_args = parse_dataset_definition(config=dataset)

# fake inheritance : this mlflow class should be a mother class which wraps
# all dataset (i.e. it should replace AbstractVersionedDataset)
# instead and since we can't modify the core package,
# we create a subclass which inherits dynamically from the data_set class
class MlflowArtifactDatasetChildren(data_set):
# we create a subclass which inherits dynamically from the dataset class
class MlflowArtifactDatasetChildren(dataset):
def __init__(self, run_id, artifact_path):
super().__init__(**data_set_args)
super().__init__(**dataset_args)
self.run_id = run_id
self.artifact_path = artifact_path
self._logging_activated = True
Expand Down Expand Up @@ -134,7 +134,7 @@ def _load(self) -> Any: # pragma: no cover
return super()._load()

# rename the class
parent_name = data_set.__name__
parent_name = dataset.__name__
MlflowArtifactDatasetChildren.__name__ = f"Mlflow{parent_name}"
MlflowArtifactDatasetChildren.__qualname__ = (
f"{parent_name}.Mlflow{parent_name}"
Expand Down
6 changes: 3 additions & 3 deletions kedro_mlflow/io/catalog/switch_catalog_logging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
def switch_catalog_logging(catalog, logging_flag=True):
for name, data_set in catalog._data_sets.items():
if type(data_set).__name__.startswith("Mlflow"):
catalog._data_sets[name]._logging_activated = logging_flag
for name, dataset in catalog._datasets.items():
if type(dataset).__name__.startswith("Mlflow"):
catalog._datasets[name]._logging_activated = logging_flag
32 changes: 16 additions & 16 deletions kedro_mlflow/mlflow/kedro_pipeline_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(
# copy mode has been converted because it is a property
# TODO: we need to use the runner's default dataset in case of multithreading
self.loaded_catalog = DataCatalog(
data_sets={
datasets={
name: MemoryDataset(copy_mode=copy_mode)
for name, copy_mode in self.copy_mode.items()
}
Expand All @@ -86,7 +86,7 @@ def copy_mode(self, copy_mode):
# of all catalog entries with this copy_mode
self._copy_mode = {
name: copy_mode
for name in self.pipeline.data_sets()
for name in self.pipeline.datasets()
if name != self.output_name
}
elif isinstance(copy_mode, dict):
Expand All @@ -95,7 +95,7 @@ def copy_mode(self, copy_mode):
# the others will be returned as None when accessing with dict.get()
self._copy_mode = {
name: None
for name in self.pipeline.data_sets()
for name in self.pipeline.datasets()
if name != self.output_name
}
self._copy_mode.update(copy_mode)
Expand All @@ -106,37 +106,37 @@ def copy_mode(self, copy_mode):

def _extract_pipeline_catalog(self, catalog: DataCatalog) -> DataCatalog:
sub_catalog = DataCatalog()
for data_set_name in self.pipeline.inputs():
if data_set_name == self.input_name:
for dataset_name in self.pipeline.inputs():
if dataset_name == self.input_name:
# there is no obligation that this dataset is persisted
# and even if it is, we keep only an ampty memory dataset to avoid
# extra uneccessary dependencies: this dataset will be replaced at
# inference time and we do not need to know the original type, see
# https://github.com/Galileo-Galilei/kedro-mlflow/issues/273
sub_catalog.add(data_set_name=data_set_name, data_set=MemoryDataset())
sub_catalog.add(dataset_name=dataset_name, dataset=MemoryDataset())
else:
try:
data_set = catalog._data_sets[data_set_name]
dataset = catalog._datasets[dataset_name]
if isinstance(
data_set, MemoryDataset
) and not data_set_name.startswith("params:"):
dataset, MemoryDataset
) and not dataset_name.startswith("params:"):
raise KedroPipelineModelError(
"""
The datasets of the training pipeline must be persisted locally
to be used by the inference pipeline. You must enforce them as
non 'MemoryDataset' in the 'catalog.yml'.
Dataset '{data_set_name}' is not persisted currently.
Dataset '{dataset_name}' is not persisted currently.
""".format(
data_set_name=data_set_name
dataset_name=dataset_name
)
)
self._logger.info(
f"The data_set '{data_set_name}' is added to the Pipeline catalog."
f"The dataset '{dataset_name}' is added to the Pipeline catalog."
)
sub_catalog.add(data_set_name=data_set_name, data_set=data_set)
sub_catalog.add(dataset_name=dataset_name, dataset=dataset)
except KeyError:
raise KedroPipelineModelError(
f"The provided catalog must contains '{data_set_name}' data_set "
f"The provided catalog must contains '{dataset_name}' dataset "
"since it is the input of the pipeline."
)

Expand All @@ -146,7 +146,7 @@ def extract_pipeline_artifacts(
self, parameters_saving_folder: Optional[Path] = None
):
artifacts = {}
for name, dataset in self.initial_catalog._data_sets.items():
for name, dataset in self.initial_catalog._datasets.items():
if name != self.input_name:
if name.startswith("params:"):
# we need to persist it locally for mlflow access
Expand Down Expand Up @@ -195,7 +195,7 @@ def load_context(self, context):

updated_catalog = self.initial_catalog.shallow_copy()
for name, uri in context.artifacts.items():
updated_catalog._data_sets[name]._filepath = Path(uri)
updated_catalog._datasets[name]._filepath = Path(uri)
self.loaded_catalog.save(name=name, data=updated_catalog.load(name))

def predict(self, context, model_input):
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/cli/test_cli_modelify.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def test_modelify_logs_in_mlflow(monkeypatch, example_repo, artifacts_list):

for artifact in artifacts_list:
assert (
f"The data_set '{artifact}' is added to the Pipeline catalog"
f"The dataset '{artifact}' is added to the Pipeline catalog"
in stripped_output
)
assert "Model successfully logged" in stripped_output
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/hooks/test_hook_deactivate_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def catalog_config(kedro_project_path):
return {
"artifact_data": {
"type": "kedro_mlflow.io.artifacts.MlflowArtifactDataset",
"data_set": {
"dataset": {
"type": "pickle.PickleDataset",
"filepath": fake_data_filepath,
},
Expand Down
8 changes: 4 additions & 4 deletions tests/framework/hooks/test_hook_log_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ def test_mlflow_hook_automatically_prefix_metrics_dataset(
)
# Check if metrics datasets have prefix with its names.
# for metric
assert dummy_catalog._data_sets["my_metrics"]._prefix == "my_metrics"
assert dummy_catalog._data_sets["another_metrics"]._prefix == "foo"
assert dummy_catalog._data_sets["my_metric"].key == "my_metric"
assert dummy_catalog._data_sets["another_metric"].key == "foo"
assert dummy_catalog._datasets["my_metrics"]._prefix == "my_metrics"
assert dummy_catalog._datasets["another_metrics"]._prefix == "foo"
assert dummy_catalog._datasets["my_metric"].key == "my_metric"
assert dummy_catalog._datasets["another_metric"].key == "foo"


def test_mlflow_hook_metrics_dataset_with_run_id(
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/hooks/test_hook_log_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_node_hook_logging(
)

node_inputs = {
v: dummy_catalog._data_sets.get(v) for k, v in dummy_node._inputs.items()
v: dummy_catalog._datasets.get(v) for k, v in dummy_node._inputs.items()
}

mlflow_tracking_uri = (kedro_project / "mlruns").as_uri()
Expand Down
4 changes: 2 additions & 2 deletions tests/framework/hooks/test_hook_pipeline_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def test_mlflow_hook_save_pipeline_ml_with_copy_mode(

actual_copy_mode = {
name: ds._copy_mode
for name, ds in loaded_model._model_impl.python_model.loaded_catalog._data_sets.items()
for name, ds in loaded_model._model_impl.python_model.loaded_catalog._datasets.items()
}

assert actual_copy_mode == expected
Expand Down Expand Up @@ -380,7 +380,7 @@ def test_mlflow_hook_save_pipeline_ml_with_default_copy_mode_assign(
assert all(
[
ds._copy_mode == "assign"
for ds in loaded_model._model_impl.python_model.loaded_catalog._data_sets.values()
for ds in loaded_model._model_impl.python_model.loaded_catalog._datasets.values()
]
)

Expand Down

0 comments on commit 53522ad

Please sign in to comment.