-
Notifications
You must be signed in to change notification settings - Fork 28k
/
exceptions.py
150 lines (133 loc) · 5.21 KB
/
exceptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Exceptions/Errors used in pandas-on-Spark.
"""
from typing import Optional
class DataError(Exception):
pass
class SparkPandasIndexingError(Exception):
pass
def code_change_hint(pandas_function: Optional[str], spark_target_function: Optional[str]) -> str:
if pandas_function is not None and spark_target_function is not None:
return "You are trying to use pandas function {}, use spark function {}".format(
pandas_function, spark_target_function
)
elif pandas_function is not None and spark_target_function is None:
return (
"You are trying to use pandas function {}, checkout the spark "
"user guide to find a relevant function"
).format(pandas_function)
elif pandas_function is None and spark_target_function is not None:
return "Use spark function {}".format(spark_target_function)
else: # both none
return "Checkout the spark user guide to find a relevant function"
class SparkPandasNotImplementedError(NotImplementedError):
def __init__(
self,
pandas_function: Optional[str] = None,
spark_target_function: Optional[str] = None,
description: str = "",
):
self.pandas_source = pandas_function
self.spark_target = spark_target_function
hint = code_change_hint(pandas_function, spark_target_function)
if len(description) > 0:
description += " " + hint
else:
description = hint
super().__init__(description)
class PandasNotImplementedError(NotImplementedError):
def __init__(
self,
class_name: str,
method_name: Optional[str] = None,
arg_name: Optional[str] = None,
property_name: Optional[str] = None,
scalar_name: Optional[str] = None,
deprecated: bool = False,
reason: str = "",
):
assert [method_name is not None, property_name is not None, scalar_name is not None].count(
True
) == 1
self.class_name = class_name
self.method_name = method_name
self.arg_name = arg_name
if method_name is not None:
if arg_name is not None:
msg = "The method `{0}.{1}()` does not support `{2}` parameter. {3}".format(
class_name, method_name, arg_name, reason
)
else:
if deprecated:
msg = (
"The method `{0}.{1}()` is deprecated in pandas and will therefore "
+ "not be supported in pandas-on-Spark. {2}"
).format(class_name, method_name, reason)
else:
if reason == "":
reason = " yet."
else:
reason = ". " + reason
msg = "The method `{0}.{1}()` is not implemented{2}".format(
class_name, method_name, reason
)
elif scalar_name is not None:
msg = (
"The scalar `{0}.{1}` is not reimplemented in pyspark.pandas;"
" use `pd.{1}`.".format(class_name, scalar_name)
)
else:
if deprecated:
msg = (
"The property `{0}.{1}()` is deprecated in pandas and will therefore "
+ "not be supported in pandas-on-Spark. {2}"
).format(class_name, property_name, reason)
else:
if reason == "":
reason = " yet."
else:
reason = ". " + reason
msg = "The property `{0}.{1}()` is not implemented{2}".format(
class_name, property_name, reason
)
super().__init__(msg)
def _test() -> None:
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.exceptions
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.pandas.exceptions.__dict__.copy()
globs["ps"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.exceptions tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.exceptions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()