Skip to content

Commit

Permalink
Merge pull request #40 from Breaka84/single_parameter_for_handling_mi…
Browse files Browse the repository at this point in the history
…ssing_columns

Single parameter for handling missing columns
  • Loading branch information
rt-phb committed Aug 16, 2022
2 parents 8386385 + 0a800f7 commit 0a6fd28
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 48 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Changelog
=========

3.3.9 (2022-08-16)
* [MOD] Mapper: Replace missing column parameters (`nullify_missing_columns`, `skip_missing_columns`,
`ignore_missing_columns`) with one single parameter: `missing_column_handling`

3.3.8 (2022-08-11)
-------------------
* [MOD] Mapper: Add additional parameter allowing skipping of transformations in case the source column is missing:
Expand Down
2 changes: 1 addition & 1 deletion spooq/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.3.8"
__version__ = "3.3.9"
77 changes: 40 additions & 37 deletions spooq/transformer/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,14 @@ class Mapper(Transformer):
clean, pivot, anonymize, ... the data itself. Please have a look at the
:py:mod:`spooq.transformer.mapper_custom_data_types` module for more information.
ignore_missing_columns : :any:`bool`, Defaults to False
DEPRECATED: please use nullify_missing_columns instead!
nullify_missing_columns : :any:`bool`, Defaults to False
Specifies if the mapping transformation should use NULL if a referenced input
column is missing in the provided DataFrame. Only one of `nullify_missing_columns` and `skip_missing_columns`
can be set to True. If none of the two is set to True then an exception will be raised in case the input
column was not found.
skip_missing_columns : :any:`bool`, Defaults to False
Specifies if the mapping transformation should be skipped if a referenced input
column is missing in the provided DataFrame. Only one of `nullify_missing_columns` and `skip_missing_columns`
can be set to True. If none of the two is set to True then an exception will be raised in case the input
column was not found.
missing_column_handling : :any:`str`, Defaults to 'raise_error'
Specifies how to proceed in case a source column does not exist in the source DataFrame:
* raise_error (default)
Raise an exception
* nullify
Create source column filled with null
* skip
Skip the mapping transformation
ignore_ambiguous_columns : :any:`bool`, Defaults to False
It can happen that the input DataFrame has ambiguous column names (like "Key" vs "key") which will
Expand All @@ -83,6 +77,13 @@ class Mapper(Transformer):
exists in the input DataFrame, its position is kept.
=> output schema: new columns + input columns
Keyword Arguments
-----------------
ignore_missing_columns : :any:`bool`, Defaults to False
DEPRECATED: please use missing_column_handling instead!
Note
----
Let's talk about Mappings:
Expand All @@ -108,10 +109,9 @@ class Mapper(Transformer):
Note
----
The available input columns can vary from batch to batch if you use schema inference
(f.e. on json data) for the extraction. Ignoring missing columns on the input DataFrame is
highly encouraged in this case. Although, if you have tight control over the structure
of the extracted DataFrame, setting `ignore_missing_columns` to True is advised
as it can uncover typos and bugs.
(f.e. on json data) for the extraction. Via the parameter `missing_column_handling` you can specify a strategy on
how to handle missing columns on the input DataFrame.
It is advised to use the 'raise_error' option as it can uncover typos and bugs.
Note
----
Expand All @@ -126,29 +126,32 @@ class Mapper(Transformer):
def __init__(
self,
mapping,
ignore_missing_columns=False,
ignore_ambiguous_columns=False,
nullify_missing_columns=False,
skip_missing_columns=False,
missing_column_handling="raise_error",
mode="replace",
**kwargs
):
super(Mapper, self).__init__()
self.logger.warn("Parameter `ignore_missing_columns` is deprecated, use `nullify_missing_columns` instead!")
warnings.warn(
message="Parameter `ignore_missing_columns` is deprecated, use `nullify_missing_columns` instead!",
category=FutureWarning
)
self.mapping = mapping
self.nullify_missing_columns = nullify_missing_columns or ignore_missing_columns
self.skip_missing_columns = skip_missing_columns
if self.nullify_missing_columns and self.skip_missing_columns:
raise ValueError(
"Only one of the parameters `nullify_missing_columns` (before `ignore_missing_columns`) and "
"`skip_missing_columns` can be set to True!"
)
self.missing_column_handling = missing_column_handling
self.ignore_ambiguous_columns = ignore_ambiguous_columns
self.mode = mode

if "ignore_missing_columns" in kwargs:
message = "Parameter `ignore_missing_columns` is deprecated, use `missing_column_handling` instead!"
if kwargs["ignore_missing_columns"]:
message += "\n`missing_column_handling` was set to `nullify` because you defined " \
"`ignore_missing_columns=True`!"
self.missing_column_handling = "nullify"
self.logger.warn(message)
warnings.warn(message=message, category=FutureWarning)

if self.missing_column_handling not in ["raise_error", "skip", "nullify"]:
raise ValueError("""Only the following values are allowed for `missing_column_handling`:
- raise_error: raise an exception in case the source column is missing
- skip: skip transformation in case the source is missing
- nullify: set source column to null in case it is missing""")

def transform(self, input_df):
self.logger.info("Generating SQL Select-Expression for Mapping...")
self.logger.debug("Input Schema/Mapping:")
Expand Down Expand Up @@ -208,14 +211,14 @@ def _get_spark_column(self, source_column, name, input_df):
source_column = F.col(source_column)

except AnalysisException as e:
if isinstance(source_column, str) and self.skip_missing_columns:
if isinstance(source_column, str) and self.missing_column_handling == "skip":
self.logger.warn(
f"Missing column ({str(source_column)}) skipped (via skip_missing_columns=True): {e.desc}"
f"Missing column ({str(source_column)}) skipped (via missing_column_handling='skip'): {e.desc}"
)
return None
elif isinstance(source_column, str) and self.nullify_missing_columns:
elif isinstance(source_column, str) and self.missing_column_handling == "nullify":
self.logger.warn(
f"Missing column ({str(source_column)}) replaced with NULL (via nullify_missing_columns=True): {e.desc}"
f"Missing column ({str(source_column)}) replaced with NULL (via missing_column_handling='nullify'): {e.desc}"
)
source_column = F.lit(None)
elif "ambiguous" in e.desc.lower() and self.ignore_ambiguous_columns:
Expand Down
20 changes: 10 additions & 10 deletions tests/unit/transformer/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

@pytest.fixture(scope="module")
def transformer(mapping):
return Mapper(mapping=mapping, ignore_missing_columns=True)
return Mapper(mapping=mapping, missing_column_handling="nullify")


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -135,12 +135,12 @@ def new_columns(self, new_mapping):

def test_appending_a_mapping(self, mapped_df, new_mapping, input_columns, new_columns):
"""Output schema is correct for added mapping at the end of the input schema"""
new_mapped_df = Mapper(mapping=new_mapping, mode="append", ignore_missing_columns=True).transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="append", missing_column_handling="nullify").transform(mapped_df)
assert input_columns + new_columns == new_mapped_df.columns

def test_prepending_a_mapping(self, mapped_df, new_mapping, input_columns, new_columns):
"""Output schema is correct for added mapping at the beginning of the input schema"""
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", ignore_missing_columns=True).transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", missing_column_handling="nullify").transform(mapped_df)
assert new_columns + input_columns == new_mapped_df.columns

def test_appending_a_mapping_with_duplicated_columns(self, input_columns, mapped_df):
Expand All @@ -152,7 +152,7 @@ def test_appending_a_mapping_with_duplicated_columns(self, input_columns, mapped
]
new_columns = [name for (name, path, data_type) in new_mapping]
new_columns_deduplicated = [x for x in new_columns if x not in input_columns]
new_mapped_df = Mapper(mapping=new_mapping, mode="append", ignore_missing_columns=True).transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="append", missing_column_handling="nullify").transform(mapped_df)
assert input_columns + new_columns_deduplicated == new_mapped_df.columns
assert mapped_df.schema["birthday"].dataType == T.TimestampType()
assert new_mapped_df.schema["birthday"].dataType == T.DateType()
Expand All @@ -166,7 +166,7 @@ def test_prepending_a_mapping_with_duplicated_columns(self, input_columns, mappe
]
new_columns = [name for (name, path, data_type) in new_mapping]
new_columns_deduplicated = [x for x in new_columns if x not in input_columns]
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", ignore_missing_columns=True).transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", missing_column_handling="nullify").transform(mapped_df)
assert new_columns_deduplicated + input_columns == new_mapped_df.columns
assert mapped_df.schema["birthday"].dataType == T.TimestampType()
assert new_mapped_df.schema["birthday"].dataType == T.DateType()
Expand All @@ -179,7 +179,7 @@ class TestExceptionForMissingInputColumns(object):

@pytest.fixture(scope="class")
def transformer(self, mapping):
return Mapper(mapping=mapping, ignore_missing_columns=False, skip_missing_columns=False, nullify_missing_columns=False)
return Mapper(mapping=mapping, missing_column_handling="raise_error")

def test_missing_column_raises_exception(self, input_df, transformer):
input_df = input_df.drop("attributes")
Expand All @@ -199,7 +199,7 @@ class TestNullifyMissingColumns(object):

@pytest.fixture(scope="class")
def transformer(self, mapping):
return Mapper(mapping=mapping, skip_missing_columns=False, nullify_missing_columns=True)
return Mapper(mapping=mapping, missing_column_handling="nullify")

@pytest.fixture(scope="class")
def mapped_df(self, input_df, transformer):
Expand All @@ -222,7 +222,7 @@ class TestSkipMissingColumns(object):

@pytest.fixture(scope="class")
def transformer(self, mapping):
return Mapper(mapping=mapping, skip_missing_columns=True, nullify_missing_columns=False)
return Mapper(mapping=mapping, missing_column_handling="skip")

@pytest.fixture(scope="class")
def mapped_df(self, input_df, transformer):
Expand All @@ -234,14 +234,14 @@ def test_missing_columns_are_skipped(self, mapped_df, mapping):
assert not any([column in mapped_df.columns for column in attribute_columns])


class TestExceptionWhenBothSkippingAndNullifying(object):
class TestExceptionWhenInvalidHandling(object):
"""
Raise an exception in case both parameters skip_missing_columns and nullify_missing_columns are True
"""

def test_invalid_parameter_setting_raises_exception(self, input_df, transformer):
with pytest.raises(ValueError):
Mapper(mapping=mapping, skip_missing_columns=True, nullify_missing_columns=True)
Mapper(mapping=mapping, missing_column_handling="invalid")


class TestDataTypesOfMappedDataFrame(object):
Expand Down

0 comments on commit 0a6fd28

Please sign in to comment.