Skip to content

Commit

Permalink
Merge pull request #29 from Breaka84/set_empty_maps_to_null
Browse files Browse the repository at this point in the history
store null instead of empty map in case no cleaning was necessary
  • Loading branch information
rt-phb authored Jul 22, 2021
2 parents b4aadb9 + d06b671 commit 6b705c5
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Changelog
=========

3.3.4 (2021-07-21)
-------------------
* [MOD] Store null value instead of an empty Map in case no cleansing was necessary

3.3.3 (2021-06-30)
-------------------
* [MOD] Change logic for storing cleansed values as MapType Column to not break Spark (logical plan got to big)
Expand Down
2 changes: 1 addition & 1 deletion spooq/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.3.3"
__version__ = "3.3.4"
5 changes: 5 additions & 0 deletions spooq/transformer/base_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,10 @@ def _only_keep_cleansed_values(col_name, temporary_col_name):
self.column_to_log_cleansed_values,
F.from_json(self.column_to_log_cleansed_values, T.MapType(T.StringType(), T.StringType())),
)
# set empty map to null
input_df = input_df.withColumn(
self.column_to_log_cleansed_values,
F.when(F.size(F.col(self.column_to_log_cleansed_values))>0, F.col(self.column_to_log_cleansed_values)).otherwise(F.lit(None)),
)

return input_df.drop(*temporary_column_names)
28 changes: 14 additions & 14 deletions tests/unit/transformer/test_enum_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,15 @@ def expected_output_schema_for_tests_with_multiple_cleansing_rules(self):
T.StructField("b", T.StringType(), True),
T.StructField("c", T.StringType(), True),
T.StructField("d", T.StringType(), True),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), False),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), True),
]
)

@pytest.fixture(scope="class")
def expected_output_df_for_tests_with_multiple_cleansing_rules(self, spark_session, expected_output_schema_for_tests_with_multiple_cleansing_rules):
return spark_session.createDataFrame(
[
("stay", "positive", "or", "healthy", {}),
("stay", "positive", "or", "healthy", None),
("stay", None, "and", "healthy", {"b": "negative"}),
("stay", "positive", "or", "healthy", {"c": "xor"}),
],
Expand All @@ -465,14 +465,14 @@ def test_single_cleansed_value_is_stored_in_separate_column(self, input_df, spar
expected_output_schema = T.StructType(
[
T.StructField("b", T.StringType(), True),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), False),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), True),
]
)
expected_output_df = spark_session.createDataFrame(
[
("positive", {}),
("positive", None),
(None, {"b": "negative"}),
("positive", {}),
("positive", None),
],
schema=expected_output_schema,
)
Expand All @@ -486,14 +486,14 @@ def test_single_cleansed_value_is_stored_in_separate_column_with_default_substit
expected_output_schema = T.StructType(
[
T.StructField("b", T.StringType(), True),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), False),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), True),
]
)
expected_output_df = spark_session.createDataFrame(
[
("positive", {}),
("positive", None),
("cleansed_value", {"b": "negative"}),
("positive", {}),
("positive", None),
],
schema=expected_output_schema,
)
Expand All @@ -513,14 +513,14 @@ def test_only_cleansed_values_are_stored_in_separate_column(self, spark_session)
[
T.StructField("a", T.StringType(), True),
T.StructField("b", T.StringType(), True),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), False),
T.StructField("cleansed_values_enum", T.MapType(T.StringType(), T.StringType(), True), True),
]
)
expected_output_df = spark_session.createDataFrame(
[
("stay", "positive", {}),
("stay", "positive", None),
("stay", None, {"b": "negative"}),
("stay", "positive", {}),
("stay", "positive", None),
],
schema=expected_output_schema,
)
Expand Down Expand Up @@ -556,9 +556,9 @@ def test_multiple_cleansing_rules_without_any_cleansing(self,

expected_output_df = spark_session.createDataFrame(
[
("stay", "positive", "or", "healthy", {}),
("stay", "positive", "and", "healthy", {}),
("stay", "positive", "or", "healthy", {}),
("stay", "positive", "or", "healthy", None),
("stay", "positive", "and", "healthy", None),
("stay", "positive", "or", "healthy", None),
],
schema=expected_output_schema_for_tests_with_multiple_cleansing_rules,
)
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/transformer/test_threshold_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def test_single_cleansed_value_is_stored_in_separate_column(self, transformer, i
expected_output_df = spark_session.createDataFrame(
[
(0, None, {"integers": "-5"}),
(1, 5, {}),
(1, 5, None),
(2, None, {"integers": "15"}),
],
schema=expected_output_schema,
Expand All @@ -349,7 +349,7 @@ def test_single_cleansed_value_is_stored_in_separate_column_with_default_substit
expected_output_df = spark_session.createDataFrame(
[
(0, -1, {"integers": "-5"}),
(1, 5, {}),
(1, 5, None),
(2, -1, {"integers": "15"}),
],
schema=expected_output_schema,
Expand All @@ -370,15 +370,15 @@ def test_multiple_cleansing_rules(self, spark_session, transformer, input_df):
T.StructField("strings", T.StringType(), True),
T.StructField("timestamps", T.StringType(), True),
T.StructField("datetimes", T.StringType(), True),
T.StructField("cleansed_values_threshold", T.MapType(T.StringType(), T.StringType(), True), False),
T.StructField("cleansed_values_threshold", T.MapType(T.StringType(), T.StringType(), True), True),
]
)

expected_result_df = spark_session.createDataFrame(
[
(0, 12.0, 12, "12", None, None, {"timestamps": "1850-01-01T12:00:00.000Z", "datetimes": "1850-01-01"}),
(1, 65.7, 65, "65", "2020-06-01 12:00:00", "2020-06-01", {}),
(2, 300.0, 300, "300", "2020-06-01 15:00:00", "2020-06-15", {}),
(1, 65.7, 65, "65", "2020-06-01 12:00:00", "2020-06-01", None),
(2, 300.0, 300, "300", "2020-06-01 15:00:00", "2020-06-15", None),
(4, None, None, "5000", "2020-06-01 16:00:00", "2020-07-01", {"floats": "5000.0", "integers": "5000"}),
(
5,
Expand Down

0 comments on commit 6b705c5

Please sign in to comment.