-
Notifications
You must be signed in to change notification settings - Fork 233
/
infer_spark.py
133 lines (110 loc) · 4.5 KB
/
infer_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import fastnumbers
from pyspark.ml.linalg import VectorUDT
from pyspark.sql import DataFrame as SparkDataFrame, functions as F
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, BooleanType, StructType, ArrayType, \
LongType, DateType, ByteType, ShortType, TimestampType, BinaryType, NullType
from optimus.infer import is_bool_value, is_list_value, is_datetime, is_date, is_binary, is_str, is_bool_str, str_to_date, \
is_list_str
SPARK_DTYPES_DICT = {"string": StringType, "int": IntegerType, "float": FloatType,
"double": DoubleType, "boolean": BooleanType, "struct": StructType, "array": ArrayType,
"bigint": LongType, "date": DateType, "byte": ByteType, "short": ShortType,
"datetime": TimestampType, "binary": BinaryType, "null": NullType, "vector": VectorUDT
}
SPARK_DTYPES_DICT_OBJECTS = \
{"string": StringType(), "int": IntegerType(), "float": FloatType(),
"double": DoubleType(), "boolean": BooleanType(), "struct": StructType(), "array": ArrayType(StringType()),
"bigint": LongType(), "date": DateType(), "byte": ByteType(), "short": ShortType(),
"datetime": TimestampType(), "binary": BinaryType(), "null": NullType()
}
SPARK_DTYPES_TO_INFERRED = {"int": ["smallint", "tinyint", "bigint", "int"], "float": ["float", "double"],
"string": "string", "date": {"date", "timestamp"}, "boolean": "boolean", "binary": "binary",
"array": "array", "object": "object", "null": "null", "missing": "missing"}
PYSPARK_NUMERIC_TYPES = ["byte", "short", "big", "int", "double", "float"]
PYSPARK_NOT_ARRAY_TYPES = ["byte", "short", "big", "int", "double", "float", "string", "date", "bool"]
PYSPARK_STRING_TYPES = ["str"]
PYSPARK_ARRAY_TYPES = ["array"]
SPARK_SHORT_DTYPES = {"string": "string",
"str": "string",
"integer": "int",
"int": "int",
"bigint": "bigint",
"big": "bigint",
"long": "bigint",
"float": "float",
"double": "double",
"bool": "boolean",
"boolean": "boolean",
"struct": "struct",
"array": "array",
"date": "date",
"datetime": "datetime",
"byte": "byte",
"short": "short",
"binary": "binary",
"null": "null",
"vector": "vector",
"timestamp": "datetime"
}
def parse_spark_class_dtypes(value):
"""
Get a pyspark data class from a string data type representation. for example 'StringType()' from 'string'
:param value:
:return:
"""
if not isinstance(value, list):
value = [value]
try:
data_type = [SPARK_DTYPES_DICT_OBJECTS[SPARK_SHORT_DTYPES[v]] for v in value]
except (KeyError, TypeError):
data_type = value
if isinstance(data_type, list) and len(data_type) == 1:
result = data_type[0]
else:
result = data_type
return result
def to_spark(value):
"""
Infer a Spark data type from a value
:param value: value to be inferred
:return: Spark data type
"""
result = None
if value is None:
result = "null"
elif is_bool_value(value):
result = "bool"
elif fastnumbers.isint(value):
result = "int"
elif fastnumbers.isfloat(value):
result = "float"
elif is_list_value(value):
result = ArrayType(to_spark(value[0]))
elif is_datetime(value):
result = "datetime"
elif is_date(value):
result = "date"
elif is_binary(value):
result = "binary"
elif is_str(value):
if is_bool_str(value):
result = "bool"
elif is_datetime(value):
result = "string" # date
elif is_list_str(value):
result = "string" # array
else:
result = "string"
return parse_spark_class_dtypes(result)
def is_list_of_spark_dataframes(value):
"""
Check if an object is a Spark DataFrame
:param value:
:return:
"""
return bool(value) and isinstance(value, list) and all(isinstance(elem, SparkDataFrame) for elem in value)
def is_column(value):
"""
Check if a object is a column
:return:
"""
return isinstance(value, F.Column)