Skip to content

Commit

Permalink
Merge pull request #38 from Breaka84/allow_skipping_of_missing_columns
Browse files Browse the repository at this point in the history
Handle missing input columns
  • Loading branch information
rt-phb authored Aug 11, 2022
2 parents d44eb5a + 7f3bc4c commit 8386385
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 11 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
Changelog
=========

3.3.8 (2022-08-11)
-------------------
* [MOD] Mapper: Add additional parameter allowing skipping of transformations in case the source column is missing:

- `nullify_missing_columns`: set source column to null in case it does not exist
- `skip_missing_columns`: skip transformation in case the source column does not exist
- `ignore_missing_columns`: DEPRECATED -> use `nullify_missing_columns` instead

3.3.7 (2022-03-15)
-------------------
* [FIX] Fix long overflow in extended_string_to_timestamp
Expand Down
2 changes: 1 addition & 1 deletion spooq/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.3.7"
__version__ = "3.3.8"
56 changes: 47 additions & 9 deletions spooq/transformer/mapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import absolute_import
import warnings
from builtins import str
from pyspark.sql.utils import AnalysisException
from pyspark.sql import functions as F
Expand Down Expand Up @@ -47,8 +48,19 @@ class Mapper(Transformer):
:py:mod:`spooq.transformer.mapper_custom_data_types` module for more information.
ignore_missing_columns : :any:`bool`, Defaults to False
DEPRECATED: please use nullify_missing_columns instead!
nullify_missing_columns : :any:`bool`, Defaults to False
Specifies if the mapping transformation should use NULL if a referenced input
column is missing in the provided DataFrame. If set to False, it will raise an exception.
column is missing in the provided DataFrame. Only one of `nullify_missing_columns` and `skip_missing_columns`
can be set to True. If none of the two is set to True then an exception will be raised in case the input
column was not found.
skip_missing_columns : :any:`bool`, Defaults to False
Specifies if the mapping transformation should be skipped if a referenced input
column is missing in the provided DataFrame. Only one of `nullify_missing_columns` and `skip_missing_columns`
can be set to True. If none of the two is set to True then an exception will be raised in case the input
column was not found.
ignore_ambiguous_columns : :any:`bool`, Defaults to False
It can happen that the input DataFrame has ambiguous column names (like "Key" vs "key") which will
Expand Down Expand Up @@ -111,10 +123,29 @@ class Mapper(Transformer):
Attention: Decimal is NOT SUPPORTED by Hive! Please use Double instead!
"""

def __init__(self, mapping, ignore_missing_columns=False, ignore_ambiguous_columns=False, mode="replace"):
def __init__(
self,
mapping,
ignore_missing_columns=False,
ignore_ambiguous_columns=False,
nullify_missing_columns=False,
skip_missing_columns=False,
mode="replace",
):
super(Mapper, self).__init__()
self.logger.warn("Parameter `ignore_missing_columns` is deprecated, use `nullify_missing_columns` instead!")
warnings.warn(
message="Parameter `ignore_missing_columns` is deprecated, use `nullify_missing_columns` instead!",
category=FutureWarning
)
self.mapping = mapping
self.ignore_missing_columns = ignore_missing_columns
self.nullify_missing_columns = nullify_missing_columns or ignore_missing_columns
self.skip_missing_columns = skip_missing_columns
if self.nullify_missing_columns and self.skip_missing_columns:
raise ValueError(
"Only one of the parameters `nullify_missing_columns` (before `ignore_missing_columns`) and "
"`skip_missing_columns` can be set to True!"
)
self.ignore_ambiguous_columns = ignore_ambiguous_columns
self.mode = mode

Expand Down Expand Up @@ -177,9 +208,14 @@ def _get_spark_column(self, source_column, name, input_df):
source_column = F.col(source_column)

except AnalysisException as e:
if isinstance(source_column, str) and self.ignore_missing_columns:
if isinstance(source_column, str) and self.skip_missing_columns:
self.logger.warn(
f"Missing column ({str(source_column)}) replaced with NULL (via ignore_missing_columns=True): {e.desc}"
f"Missing column ({str(source_column)}) skipped (via skip_missing_columns=True): {e.desc}"
)
return None
elif isinstance(source_column, str) and self.nullify_missing_columns:
self.logger.warn(
f"Missing column ({str(source_column)}) replaced with NULL (via nullify_missing_columns=True): {e.desc}"
)
source_column = F.lit(None)
elif "ambiguous" in e.desc.lower() and self.ignore_ambiguous_columns:
Expand All @@ -188,10 +224,12 @@ def _get_spark_column(self, source_column, name, input_df):
)
return None
else:
self.logger.exception(
'Column: "{}" cannot be resolved '.format(str(source_column))
+ 'but is referenced in the mapping by column: "{}".\n'.format(name)
)
self.logger.exception(f"""
Column: '{source_column}' cannot be resolved but is referenced in the mapping by column: '{name}'.
You can make use of the following parameters to handle missing input columns:
- `nullify_missing_columns`: set missing source column to null
- `skip_missing_columns`: skip the transformation
""")
raise e
return source_column

Expand Down
54 changes: 53 additions & 1 deletion tests/unit/transformer/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TestExceptionForMissingInputColumns(object):

@pytest.fixture(scope="class")
def transformer(self, mapping):
return Mapper(mapping=mapping, ignore_missing_columns=False)
return Mapper(mapping=mapping, ignore_missing_columns=False, skip_missing_columns=False, nullify_missing_columns=False)

def test_missing_column_raises_exception(self, input_df, transformer):
input_df = input_df.drop("attributes")
Expand All @@ -192,6 +192,58 @@ def test_empty_input_dataframe_raises_exception(self, spark_session, transformer
transformer.transform(input_df)


class TestNullifyMissingColumns(object):
"""
Nullify input columns in case it does not exist
"""

@pytest.fixture(scope="class")
def transformer(self, mapping):
return Mapper(mapping=mapping, skip_missing_columns=False, nullify_missing_columns=True)

@pytest.fixture(scope="class")
def mapped_df(self, input_df, transformer):
input_df = input_df.drop("attributes")
return transformer.transform(input_df)

def test_missing_columns_are_not_skipped(self, mapped_df, mapping):
assert len(mapping) == len(mapped_df.columns)

def test_missing_columns_are_nullified(self, mapped_df, mapping):
attribute_columns = [name for name, source, _ in mapping if source.startswith("attributes.")]
filter = " AND ".join([f"{column} is NULL" for column in attribute_columns])
assert mapped_df.filter(filter).count() == mapped_df.count()


class TestSkipMissingColumns(object):
"""
Skip mapping transformation in case the input column does not exist
"""

@pytest.fixture(scope="class")
def transformer(self, mapping):
return Mapper(mapping=mapping, skip_missing_columns=True, nullify_missing_columns=False)

@pytest.fixture(scope="class")
def mapped_df(self, input_df, transformer):
input_df = input_df.drop("attributes")
return transformer.transform(input_df)

def test_missing_columns_are_skipped(self, mapped_df, mapping):
attribute_columns = [name for name, source, _ in mapping if source.startswith("attributes.")]
assert not any([column in mapped_df.columns for column in attribute_columns])


class TestExceptionWhenBothSkippingAndNullifying(object):
"""
Raise an exception in case both parameters skip_missing_columns and nullify_missing_columns are True
"""

def test_invalid_parameter_setting_raises_exception(self, input_df, transformer):
with pytest.raises(ValueError):
Mapper(mapping=mapping, skip_missing_columns=True, nullify_missing_columns=True)


class TestDataTypesOfMappedDataFrame(object):
@pytest.mark.parametrize(
("column", "expected_data_type"),
Expand Down

0 comments on commit 8386385

Please sign in to comment.