Skip to content

Commit

Permalink
Merge pull request #62 from Breaka84/update_match_regex_for_mapper_tests
Browse files Browse the repository at this point in the history
Fix mapper tests for older pyspark version
  • Loading branch information
rt-phb authored Jun 14, 2024
2 parents c64b104 + ececc17 commit 19310a7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
2 changes: 1 addition & 1 deletion spooq/transformer/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _get_spark_data_type(
data_type = getattr(T, data_type_)() # Spark datatype as string
except AttributeError:
try:
data_type = T._parse_datatype_string(data_type_) # Spark datatype as short string
data_type = T._parse_datatype_string("void" if data_type_ == "null" else data_type_) # Spark datatype as short string
except ParseException:
pass

Expand Down
34 changes: 25 additions & 9 deletions tests/unit/transformer/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,16 @@ def new_columns(self, new_mapping):

def test_appending_a_mapping(self, mapped_df, new_mapping, input_columns, new_columns):
"""Output schema is correct for added mapping at the end of the input schema"""
new_mapped_df = Mapper(mapping=new_mapping, mode="append", missing_column_handling="nullify").transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="append", missing_column_handling="nullify").transform(
mapped_df
)
assert input_columns + new_columns == new_mapped_df.columns

def test_prepending_a_mapping(self, mapped_df, new_mapping, input_columns, new_columns):
"""Output schema is correct for added mapping at the beginning of the input schema"""
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", missing_column_handling="nullify").transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", missing_column_handling="nullify").transform(
mapped_df
)
assert new_columns + input_columns == new_mapped_df.columns

def test_appending_a_mapping_with_duplicated_columns(self, input_columns, mapped_df):
Expand All @@ -159,7 +163,9 @@ def test_appending_a_mapping_with_duplicated_columns(self, input_columns, mapped
]
new_columns = [name for (name, path, data_type) in new_mapping]
new_columns_deduplicated = [x for x in new_columns if x not in input_columns]
new_mapped_df = Mapper(mapping=new_mapping, mode="append", missing_column_handling="nullify").transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="append", missing_column_handling="nullify").transform(
mapped_df
)
assert input_columns + new_columns_deduplicated == new_mapped_df.columns
assert mapped_df.schema["birthday"].dataType == T.TimestampType()
assert new_mapped_df.schema["birthday"].dataType == T.DateType()
Expand All @@ -173,7 +179,9 @@ def test_prepending_a_mapping_with_duplicated_columns(self, input_columns, mappe
]
new_columns = [name for (name, path, data_type) in new_mapping]
new_columns_deduplicated = [x for x in new_columns if x not in input_columns]
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", missing_column_handling="nullify").transform(mapped_df)
new_mapped_df = Mapper(mapping=new_mapping, mode="prepend", missing_column_handling="nullify").transform(
mapped_df
)
assert new_columns_deduplicated + input_columns == new_mapped_df.columns
assert mapped_df.schema["birthday"].dataType == T.TimestampType()
assert new_mapped_df.schema["birthday"].dataType == T.DateType()
Expand Down Expand Up @@ -218,7 +226,8 @@ def test_missing_columns_are_not_skipped(self, mapped_df, mapping):

def test_missing_columns_are_nullified(self, mapped_df, mapping):
attribute_columns = [
name for name, source, transformation in mapping
name
for name, source, transformation in mapping
if source.startswith("attributes.") and not transformation == spq.has_value
]
filter = " AND ".join([f"{column} is NULL" for column in attribute_columns])
Expand Down Expand Up @@ -315,8 +324,7 @@ class TestValidateDataTypes:
@pytest.fixture(scope="class")
def input_df(self, spark_session):
return spark_session.createDataFrame(
[Row(col_a=123, col_b="Hello", col_c=123456789)],
schema="col_a int, col_b string, col_c long"
[Row(col_a=123, col_b="Hello", col_c=123456789)], schema="col_a int, col_b string, col_c long"
)

@pytest.fixture(scope="class")
Expand Down Expand Up @@ -356,15 +364,23 @@ def test_spooq_transformation_raises_exception(self, input_df, matching_mapping)
mapping_ = deepcopy(matching_mapping)
mapping_[0] = ("col_a", "col_a", spq.to_int)
transformer = Mapper(mapping_, mode="rename_and_validate")
with pytest.raises(DataTypeValidationFailed, match="Spooq transformations are not allowed in 'rename_and_validate' mode"):
with pytest.raises(
DataTypeValidationFailed, match="Spooq transformations are not allowed in 'rename_and_validate' mode"
):
transformer.transform(input_df)

def test_mapper_raises_exception_for_missing_column(self, input_df, matching_mapping):
missing_column = "col_d"
mapping_ = deepcopy(matching_mapping)
mapping_.append((missing_column, missing_column, T.StringType()))
transformer = Mapper(mapping_, mode="rename_and_validate", missing_column_handling="raise_error")
with pytest.raises(AnalysisException, match=f"A column or function parameter with name `{missing_column}` cannot be resolved."):
with pytest.raises(
AnalysisException,
match=f"A column or function parameter with name `{missing_column}` cannot be resolved.|"
f"cannot resolve '{missing_column}' given input columns|"
f"cannot resolve '`{missing_column}`' given input columns|"
f"Column '{missing_column}' does not exist. Did you mean one of the following?",
):
transformer.transform(input_df)

def test_validation_raises_exception_for_skipped_column(self, input_df, matching_mapping):
Expand Down

0 comments on commit 19310a7

Please sign in to comment.