Skip to content

Commit

Permalink
[SPARK-36847][PYTHON] Explicitly specify error codes when ignoring ty…
Browse files Browse the repository at this point in the history
…pe hint errors

### What changes were proposed in this pull request?

Explicitly specifies error codes when ignoring type hint errors.

### Why are the changes needed?

We use a lot of `type: ignore` annotation to ignore type hint errors in pandas-on-Spark.

We should explicitly specify the error codes to make it clear what kind of error is being ignored, then the type hint checker can check more cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #34102 from ueshin/issues/SPARK-36847/type_ignore.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
ueshin authored and HyukjinKwon committed Sep 26, 2021
1 parent 77cd133 commit fc404d6
Show file tree
Hide file tree
Showing 29 changed files with 235 additions and 173 deletions.
1 change: 1 addition & 0 deletions python/mypy.ini
Expand Up @@ -19,6 +19,7 @@
strict_optional = True
no_implicit_optional = True
disallow_untyped_defs = True
show_error_codes = True

; Allow untyped def in internal modules and tests

Expand Down
12 changes: 8 additions & 4 deletions python/pyspark/pandas/accessors.py
Expand Up @@ -568,7 +568,7 @@ def pandas_series_func(
) -> "UserDefinedFunctionLike":
ff = f

@pandas_udf(returnType=return_type) # type: ignore
@pandas_udf(returnType=return_type) # type: ignore[call-overload]
def udf(pdf: pd.DataFrame) -> pd.Series:
return first_series(ff(pdf))

Expand Down Expand Up @@ -632,7 +632,9 @@ def udf(pdf: pd.DataFrame) -> pd.Series:
)
columns = self_applied._internal.spark_columns

pudf = pandas_udf(output_func, returnType=return_schema) # type: ignore
pudf = pandas_udf( # type: ignore[call-overload]
output_func, returnType=return_schema
)
temp_struct_column = verify_temp_column_name(
self_applied._internal.spark_frame, "__temp_struct__"
)
Expand Down Expand Up @@ -697,7 +699,9 @@ def udf(pdf: pd.DataFrame) -> pd.Series:
)
columns = self_applied._internal.spark_columns

pudf = pandas_udf(output_func, returnType=return_schema) # type: ignore
pudf = pandas_udf( # type: ignore[call-overload]
output_func, returnType=return_schema
)
temp_struct_column = verify_temp_column_name(
self_applied._internal.spark_frame, "__temp_struct__"
)
Expand Down Expand Up @@ -907,7 +911,7 @@ def apply_func(pdf: pd.DataFrame) -> pd.DataFrame:
psdf, apply_func, return_schema, retain_index=False
)

@pandas_udf(returnType=field.spark_type) # type: ignore
@pandas_udf(returnType=field.spark_type) # type: ignore[call-overload]
def pudf(*series: pd.Series) -> pd.Series:
return first_series(output_func(pandas_concat(*series)))

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/base.py
Expand Up @@ -955,7 +955,7 @@ def notnull(self: IndexOpsLike) -> IndexOpsLike:

if isinstance(self, MultiIndex):
raise NotImplementedError("notna is not defined for MultiIndex")
return (~self.isnull()).rename(self.name) # type: ignore
return (~self.isnull()).rename(self.name) # type: ignore[attr-defined]

notna = notnull

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/data_type_ops/datetime_ops.py
Expand Up @@ -160,7 +160,7 @@ class DatetimeNTZOps(DatetimeOps):
"""

def _cast_spark_column_timestamp_to_long(self, scol: Column) -> Column:
jvm = SparkContext._active_spark_context._jvm # type: ignore
jvm = SparkContext._active_spark_context._jvm # type: ignore[attr-defined]
return Column(jvm.PythonSQLUtils.castTimestampNTZToLong(scol._jc))

def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/extensions.py
Expand Up @@ -59,7 +59,7 @@ def __get__(
) -> Union[T, Type[T]]:
if obj is None:
return self._accessor
accessor_obj = self._accessor(obj) # type: ignore
accessor_obj = self._accessor(obj) # type: ignore[call-arg]
object.__setattr__(obj, self._name, accessor_obj)
return accessor_obj

Expand Down
63 changes: 35 additions & 28 deletions python/pyspark/pandas/frame.py
Expand Up @@ -344,12 +344,12 @@


if (3, 5) <= sys.version_info < (3, 7) and __name__ != "__main__":
from typing import GenericMeta # type: ignore
from typing import GenericMeta # type: ignore[attr-defined]

# This is a workaround to support variadic generic in DataFrame in Python 3.5+.
# See https://github.com/python/typing/issues/193
# We wrap the input params by a tuple to mimic variadic generic.
old_getitem = GenericMeta.__getitem__ # type: ignore
old_getitem = GenericMeta.__getitem__

@no_type_check
def new_getitem(self, params):
Expand All @@ -358,7 +358,7 @@ def new_getitem(self, params):
else:
return old_getitem(self, params)

GenericMeta.__getitem__ = new_getitem # type: ignore
GenericMeta.__getitem__ = new_getitem


class DataFrame(Frame, Generic[T]):
Expand Down Expand Up @@ -482,24 +482,26 @@ def _pssers(self) -> Dict[Label, "Series"]:
{label: Series(data=self, index=label) for label in self._internal.column_labels},
)
else:
psseries = self._psseries # type: ignore
psseries = cast(Dict[Label, Series], self._psseries) # type: ignore[has-type]
assert len(self._internal.column_labels) == len(psseries), (
len(self._internal.column_labels),
len(psseries),
)
if any(self is not psser._psdf for psser in psseries.values()):
# Refresh the dict to contain only Series anchoring `self`.
self._psseries = {
label: psseries[label]
if self is psseries[label]._psdf
else Series(data=self, index=label)
label: (
psseries[label]
if self is psseries[label]._psdf
else Series(data=self, index=label)
)
for label in self._internal.column_labels
}
return self._psseries

@property
def _internal(self) -> InternalFrame:
return self._internal_frame # type: ignore
return cast(InternalFrame, self._internal_frame) # type: ignore[has-type]

def _update_internal_frame(
self, internal: InternalFrame, requires_same_anchor: bool = True
Expand Down Expand Up @@ -662,7 +664,7 @@ def _reduce_for_stat_function(
if len(pdf) <= limit:
return Series(pser)

@pandas_udf(returnType=as_spark_type(pser.dtype.type)) # type: ignore
@pandas_udf(returnType=as_spark_type(pser.dtype.type)) # type: ignore[call-overload]
def calculate_columns_axis(*cols: pd.Series) -> pd.Series:
return getattr(pd.concat(cols, axis=1), name)(
axis=axis, numeric_only=numeric_only, **kwargs
Expand Down Expand Up @@ -1513,7 +1515,7 @@ def extract_kv_from_spark_row(row: Row) -> Tuple[Name, Any]:
can_return_named_tuples = sys.version_info >= (3, 7) or len(self.columns) + index < 255

if name is not None and can_return_named_tuples:
itertuple = namedtuple(name, fields, rename=True) # type: ignore
itertuple = namedtuple(name, fields, rename=True) # type: ignore[misc]
for k, v in map(
extract_kv_from_spark_row,
self._internal.resolved_copy.spark_frame.toLocalIterator(),
Expand Down Expand Up @@ -3030,7 +3032,8 @@ def between_time(
psdf.index.name = verify_temp_column_name(psdf, "__index_name__")
return_types = [psdf.index.dtype] + list(psdf.dtypes)

def pandas_between_time(pdf) -> ps.DataFrame[return_types]: # type: ignore
@no_type_check
def pandas_between_time(pdf) -> ps.DataFrame[return_types]:
return pdf.between_time(start_time, end_time, include_start, include_end).reset_index()

# apply_batch will remove the index of the pandas-on-Spark DataFrame and attach a
Expand Down Expand Up @@ -3109,12 +3112,14 @@ def at_time(

if LooseVersion(pd.__version__) < LooseVersion("0.24"):

def pandas_at_time(pdf) -> ps.DataFrame[return_types]: # type: ignore
@no_type_check
def pandas_at_time(pdf) -> ps.DataFrame[return_types]:
return pdf.at_time(time, asof).reset_index()

else:

def pandas_at_time(pdf) -> ps.DataFrame[return_types]: # type: ignore
@no_type_check
def pandas_at_time(pdf) -> ps.DataFrame[return_types]:
return pdf.at_time(time, asof, axis).reset_index()

# apply_batch will remove the index of the pandas-on-Spark DataFrame and attach
Expand Down Expand Up @@ -4413,7 +4418,7 @@ def duplicated(
],
index_names=self._internal.index_names,
index_fields=self._internal.index_fields,
column_labels=[None], # type: ignore
column_labels=[None],
data_spark_columns=[scol_for(sdf, SPARK_DEFAULT_SERIES_NAME)],
)
)
Expand Down Expand Up @@ -4590,7 +4595,7 @@ def to_delta(
... mode='overwrite', replaceWhere='date >= "2012-01-01"') # doctest: +SKIP
"""
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
options = options.get("options") # type: ignore
options = options.get("options") # type: ignore[assignment]

mode = validate_mode(mode)
self.spark.to_spark_io(
Expand Down Expand Up @@ -4667,7 +4672,7 @@ def to_parquet(
... partition_cols=['date', 'country'])
"""
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
options = options.get("options") # type: ignore
options = options.get("options")

mode = validate_mode(mode)
builder = self.to_spark(index_col=index_col).write.mode(mode)
Expand Down Expand Up @@ -4739,7 +4744,7 @@ def to_orc(
... partition_cols=['date', 'country'])
"""
if "options" in options and isinstance(options.get("options"), dict) and len(options) == 1:
options = options.get("options") # type: ignore
options = options.get("options") # type: ignore[assignment]

mode = validate_mode(mode)
self.spark.to_spark_io(
Expand Down Expand Up @@ -6398,7 +6403,7 @@ def select_dtypes(
4 1 True 1.0
5 2 False 2.0
"""
from pyspark.sql.types import _parse_datatype_string # type: ignore
from pyspark.sql.types import _parse_datatype_string # type: ignore[attr-defined]

if not is_list_like(include):
include_list = [include] if include is not None else []
Expand Down Expand Up @@ -6978,12 +6983,12 @@ def sort_index(
"Specifying the sorting algorithm is not supported at the moment."
)

if level is None or (is_list_like(level) and len(level) == 0): # type: ignore
if level is None or (is_list_like(level) and len(level) == 0): # type: ignore[arg-type]
by = self._internal.index_spark_columns
elif is_list_like(level):
by = [self._internal.index_spark_columns[l] for l in level] # type: ignore
by = [self._internal.index_spark_columns[l] for l in level] # type: ignore[union-attr]
else:
by = [self._internal.index_spark_columns[level]] # type: ignore
by = [self._internal.index_spark_columns[level]] # type: ignore[index]

psdf = self._sort(by=by, ascending=ascending, na_position=na_position)
if inplace:
Expand Down Expand Up @@ -7591,7 +7596,7 @@ def to_list(os: Optional[Union[Name, List[Name]]]) -> List[Label]:
if os is None:
return []
elif is_name_like_tuple(os):
return [os] # type: ignore
return [cast(Label, os)]
elif is_name_like_value(os):
return [(os,)]
else:
Expand Down Expand Up @@ -7907,7 +7912,7 @@ def join(
need_set_index = False
if on:
if not is_list_like(on):
on = [on] # type: ignore
on = [on]
if len(on) != right._internal.index_level:
raise ValueError(
'len(left_on) must equal the number of levels in the index of "right"'
Expand Down Expand Up @@ -10248,7 +10253,7 @@ def mapper_fn(x: Any) -> Any:
if level < 0 or level >= num_indices:
raise ValueError("level should be an integer between [0, num_indices)")

@pandas_udf(returnType=index_mapper_ret_stype) # type: ignore
@pandas_udf(returnType=index_mapper_ret_stype) # type: ignore[call-overload]
def index_mapper_udf(s: pd.Series) -> pd.Series:
return s.map(index_mapper_fn)

Expand Down Expand Up @@ -10817,7 +10822,9 @@ def info(
# hack to use pandas' info as is.
object.__setattr__(self, "_data", self)
count_func = self.count
self.count = lambda: count_func().to_pandas() # type: ignore
self.count = ( # type: ignore[assignment]
lambda: count_func().to_pandas() # type: ignore[assignment, misc, union-attr]
)
return pd.DataFrame.info(
self,
verbose=verbose,
Expand All @@ -10828,7 +10835,7 @@ def info(
)
finally:
del self._data
self.count = count_func # type: ignore
self.count = count_func # type: ignore[assignment]

# TODO: fix parameter 'axis' and 'numeric_only' to work same as pandas'
def quantile(
Expand Down Expand Up @@ -11444,7 +11451,7 @@ def get_spark_column(psdf: DataFrame, label: Label) -> Column:

else:

@pandas_udf(returnType=DoubleType()) # type: ignore
@pandas_udf(returnType=DoubleType()) # type: ignore[call-overload]
def calculate_columns_axis(*cols: pd.Series) -> pd.Series:
return pd.concat(cols, axis=1).mad(axis=1)

Expand Down Expand Up @@ -11990,7 +11997,7 @@ def __getattr__(self, key: str) -> Any:
if hasattr(_MissingPandasLikeDataFrame, key):
property_or_func = getattr(_MissingPandasLikeDataFrame, key)
if isinstance(property_or_func, property):
return property_or_func.fget(self) # type: ignore
return property_or_func.fget(self)
else:
return partial(property_or_func, self)

Expand Down

0 comments on commit fc404d6

Please sign in to comment.