-
Notifications
You must be signed in to change notification settings - Fork 1
/
hash_abs.py
57 lines (49 loc) · 1.83 KB
/
hash_abs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from typing import List, Optional, Union
from pyspark.sql import Column, DataFrame
from pyspark.sql.functions import hash, abs as abs_
from spark_auto_mapper.data_types.data_type_base import AutoMapperDataTypeBase
from spark_auto_mapper.data_types.text_like_base import AutoMapperTextLikeBase
from spark_auto_mapper.helpers.value_parser import AutoMapperValueParser
from spark_auto_mapper.type_definitions.native_types import AutoMapperNativeTextType
from spark_auto_mapper.type_definitions.wrapper_types import AutoMapperWrapperType
class AutoMapperHashAbsDataType(AutoMapperTextLikeBase):
"""
Calculates the hash code of given columns, and returns the absolute value of the result as an int column.
"""
def __init__(
self,
*args: Union[
AutoMapperNativeTextType, AutoMapperWrapperType, AutoMapperTextLikeBase
],
):
super().__init__()
self.value: List[AutoMapperDataTypeBase] = [
value
if isinstance(value, AutoMapperDataTypeBase)
else AutoMapperValueParser.parse_value(value=value)
for value in args
]
def get_column_spec(
self,
source_df: Optional[DataFrame],
current_column: Optional[Column],
parent_columns: Optional[List[Column]],
) -> Column:
column_spec = abs_(
hash(
*[
col.get_column_spec(
source_df=source_df,
current_column=current_column,
parent_columns=parent_columns,
)
for col in self.value
]
)
)
return column_spec.cast("int")
@property
def children(
self,
) -> Union[AutoMapperDataTypeBase, List[AutoMapperDataTypeBase]]:
return self.value