From 3c18bb428c29121d6b4f3b133d52ef155b46724f Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 6 Jul 2020 19:20:03 -0700 Subject: [PATCH] Fix Series._with_new_scol to use alias. (#1634) We should always use alias when `IndexOpsMixin._with_new_scol` to make sure the column name is valid. ```py >>> kser = ks.Series([1, 2, 3, 4, 5, 6, 7], name="x") >>> (kser.rename("y") + 1).head() Traceback (most recent call last): ... pyspark.sql.utils.AnalysisException: syntax error in attribute name: `(x AS `y` + 1)`; ``` Resolves #1633. --- databricks/koalas/accessors.py | 2 +- databricks/koalas/base.py | 15 ++++--- databricks/koalas/frame.py | 4 +- databricks/koalas/generic.py | 4 +- databricks/koalas/indexes.py | 2 +- databricks/koalas/series.py | 55 +++++++++++++------------- databricks/koalas/spark/accessors.py | 2 +- databricks/koalas/strings.py | 7 ++-- databricks/koalas/tests/test_series.py | 8 ++++ databricks/koalas/utils.py | 3 +- databricks/koalas/window.py | 7 +--- 11 files changed, 57 insertions(+), 52 deletions(-) diff --git a/databricks/koalas/accessors.py b/databricks/koalas/accessors.py index 966fd09770..00400f58ab 100644 --- a/databricks/koalas/accessors.py +++ b/databricks/koalas/accessors.py @@ -817,4 +817,4 @@ def _transform_batch(self, func, return_schema): spark_return_type = return_schema pudf = pandas_udf(func, returnType=spark_return_type, functionType=PandasUDFType.SCALAR) - return self._kser._with_new_scol(scol=pudf(self._kser.spark.column)).rename(self._kser.name) + return self._kser._with_new_scol(scol=pudf(self._kser.spark.column)) diff --git a/databricks/koalas/base.py b/databricks/koalas/base.py index 57636e54fc..072eb86fc3 100644 --- a/databricks/koalas/base.py +++ b/databricks/koalas/base.py @@ -27,6 +27,7 @@ import pandas as pd from pandas.api.types import is_list_like from pandas.core.accessor import CachedAccessor +from pyspark import sql as spark from pyspark.sql import functions as F, Window, Column from pyspark.sql.types import DateType, DoubleType, FloatType, LongType, StringType, TimestampType @@ -141,6 +142,10 @@ def _internal(self) -> InternalFrame: def _kdf(self) -> DataFrame: return self._anchor + @abstractmethod + def _with_new_scol(self, scol: spark.Column): + pass + spark = CachedAccessor("spark", SparkIndexOpsMethods) @property @@ -733,7 +738,7 @@ def isin(self, values): " to isin(), you passed a [{values_type}]".format(values_type=type(values).__name__) ) - return self._with_new_scol(self.spark.column.isin(list(values))).rename(self.name) + return self._with_new_scol(self.spark.column.isin(list(values))) def isnull(self): """ @@ -767,11 +772,9 @@ def isnull(self): if isinstance(self, MultiIndex): raise NotImplementedError("isna is not defined for MultiIndex") if isinstance(self.spark.data_type, (FloatType, DoubleType)): - return self._with_new_scol( - self.spark.column.isNull() | F.isnan(self.spark.column) - ).rename(self.name) + return self._with_new_scol(self.spark.column.isNull() | F.isnan(self.spark.column)) else: - return self._with_new_scol(self.spark.column.isNull()).rename(self.name) + return self._with_new_scol(self.spark.column.isNull()) isna = isnull @@ -1005,7 +1008,7 @@ def _shift(self, periods, fill_value, part_cols=()): ) lag_col = F.lag(col, periods).over(window) col = F.when(lag_col.isNull() | F.isnan(lag_col), fill_value).otherwise(lag_col) - return self._with_new_scol(col).rename(self.name) + return self._with_new_scol(col) # TODO: Update Documentation for Bins Parameter when its supported def value_counts(self, normalize=False, sort=True, ascending=False, bins=None, dropna=True): diff --git a/databricks/koalas/frame.py b/databricks/koalas/frame.py index 36e378f7bd..cbe125759c 100644 --- a/databricks/koalas/frame.py +++ b/databricks/koalas/frame.py @@ -2550,9 +2550,7 @@ def transform(self, func, axis=0, *args, **kwargs): functionType=PandasUDFType.SCALAR, ) kser = self._kser_for(input_label) - applied.append( - kser._with_new_scol(scol=pudf(kser.spark.column)).rename(input_label) - ) + applied.append(kser._with_new_scol(scol=pudf(kser.spark.column))) internal = self._internal.with_new_columns(applied) return DataFrame(internal) diff --git a/databricks/koalas/generic.py b/databricks/koalas/generic.py index 0d912a3b99..e68f92e6d5 100644 --- a/databricks/koalas/generic.py +++ b/databricks/koalas/generic.py @@ -1419,9 +1419,7 @@ def abs(self): 3 7 40 50 """ # TODO: The first example above should not have "Name: 0". - return self._apply_series_op( - lambda kser: kser._with_new_scol(F.abs(kser.spark.column)).rename(kser.name) - ) + return self._apply_series_op(lambda kser: kser._with_new_scol(F.abs(kser.spark.column))) # TODO: by argument only support the grouping name and as_index only for now. Documentation # should be updated when it's supported. diff --git a/databricks/koalas/indexes.py b/databricks/koalas/indexes.py index 0a30c8a722..aa7935256d 100644 --- a/databricks/koalas/indexes.py +++ b/databricks/koalas/indexes.py @@ -135,7 +135,7 @@ def _with_new_scol(self, scol: spark.Column) -> "Index": :param scol: the new Spark Column :return: the copied Index """ - sdf = self._internal.spark_frame.select(scol) # type: ignore + sdf = self._internal.spark_frame.select(scol.alias(SPARK_DEFAULT_INDEX_NAME)) internal = InternalFrame( spark_frame=sdf, index_map=OrderedDict(zip(sdf.columns, self._internal.index_names)), # type: ignore diff --git a/databricks/koalas/series.py b/databricks/koalas/series.py index e9ae07d373..ec9854bf24 100644 --- a/databricks/koalas/series.py +++ b/databricks/koalas/series.py @@ -35,7 +35,6 @@ import pyspark from pyspark import sql as spark from pyspark.sql import functions as F, Column -from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import ( BooleanType, DoubleType, @@ -386,7 +385,8 @@ def _with_new_scol(self, scol: spark.Column) -> "Series": :return: the copied Series """ internal = self._kdf._internal.copy( - column_labels=[self._column_label], data_spark_columns=[scol] + column_labels=[self._column_label], + data_spark_columns=[scol.alias(name_like_string(self._column_label))], ) return first_series(DataFrame(internal)) @@ -427,7 +427,7 @@ def spark_type(self): # Arithmetic Operators def add(self, other): - return (self + other).rename(self.name) + return self + other add.__doc__ = _flex_doc_SERIES.format( desc="Addition", @@ -449,7 +449,7 @@ def radd(self, other): ) def div(self, other): - return (self / other).rename(self.name) + return self / other div.__doc__ = _flex_doc_SERIES.format( desc="Floating division", @@ -473,7 +473,7 @@ def rdiv(self, other): ) def truediv(self, other): - return (self / other).rename(self.name) + return self / other truediv.__doc__ = _flex_doc_SERIES.format( desc="Floating division", @@ -495,7 +495,7 @@ def rtruediv(self, other): ) def mul(self, other): - return (self * other).rename(self.name) + return self * other mul.__doc__ = _flex_doc_SERIES.format( desc="Multiplication", @@ -519,7 +519,7 @@ def rmul(self, other): ) def sub(self, other): - return (self - other).rename(self.name) + return self - other sub.__doc__ = _flex_doc_SERIES.format( desc="Subtraction", @@ -543,7 +543,7 @@ def rsub(self, other): ) def mod(self, other): - return (self % other).rename(self.name) + return self % other mod.__doc__ = _flex_doc_SERIES.format( desc="Modulo", @@ -565,7 +565,7 @@ def rmod(self, other): ) def pow(self, other): - return (self ** other).rename(self.name) + return self ** other pow.__doc__ = _flex_doc_SERIES.format( desc="Exponential power of series", @@ -587,7 +587,7 @@ def rpow(self, other): ) def floordiv(self, other): - return (self // other).rename(self.name) + return self // other floordiv.__doc__ = _flex_doc_SERIES.format( desc="Integer division", @@ -634,7 +634,7 @@ def eq(self, other): d False Name: b, dtype: bool """ - return (self == other).rename(self.name) + return self == other equals = eq @@ -660,7 +660,7 @@ def gt(self, other): d False Name: b, dtype: bool """ - return (self > other).rename(self.name) + return self > other def ge(self, other): """ @@ -684,7 +684,7 @@ def ge(self, other): d False Name: b, dtype: bool """ - return (self >= other).rename(self.name) + return self >= other def lt(self, other): """ @@ -708,7 +708,7 @@ def lt(self, other): d False Name: b, dtype: bool """ - return (self < other).rename(self.name) + return self < other def le(self, other): """ @@ -732,7 +732,7 @@ def le(self, other): d False Name: b, dtype: bool """ - return (self <= other).rename(self.name) + return self <= other def ne(self, other): """ @@ -756,7 +756,7 @@ def ne(self, other): d True Name: b, dtype: bool """ - return (self != other).rename(self.name) + return self != other def divmod(self, other): """ @@ -960,7 +960,7 @@ def map(self, arg): current = current.otherwise(F.lit(tmp_val)) else: current = current.otherwise(F.lit(None).cast(self.spark.data_type)) - return self._with_new_scol(current).rename(self.name) + return self._with_new_scol(current) else: return self.apply(arg) @@ -1659,10 +1659,12 @@ def _fillna(self, value=None, method=None, axis=None, inplace=False, limit=None, if inplace: self._kdf._update_internal_frame( - self._kdf._internal.with_new_spark_column(self._column_label, scol) + self._kdf._internal.with_new_spark_column( + self._column_label, scol.alias(name_like_string(self.name)) + ) ) else: - return self._with_new_scol(scol).rename(self.name) + return self._with_new_scol(scol) def dropna(self, axis=0, inplace=False, **kwargs): """ @@ -1761,7 +1763,7 @@ def clip(self, lower: Union[float, int] = None, upper: Union[float, int] = None) scol = F.when(scol < lower, lower).otherwise(scol) if upper is not None: scol = F.when(scol > upper, upper).otherwise(scol) - return self._with_new_scol(scol.alias(self._internal.data_spark_column_names[0])) + return self._with_new_scol(scol) else: return self @@ -2946,9 +2948,8 @@ def round(self, decimals=0): """ if not isinstance(decimals, int): raise ValueError("decimals must be an integer") - column_name = self.name scol = F.round(self.spark.column, decimals) - return self._with_new_scol(scol).rename(column_name) + return self._with_new_scol(scol) # TODO: add 'interpolation' parameter. def quantile(self, q=0.5, accuracy=10000): @@ -3188,7 +3189,7 @@ def _rank(self, method="average", ascending=True, part_cols=()): Window.unboundedPreceding, Window.unboundedFollowing ) scol = stat_func(F.row_number().over(window1)).over(window2) - kser = self._with_new_scol(scol).rename(self.name) + kser = self._with_new_scol(scol) return kser.astype(np.float64) def filter(self, items=None, like=None, regex=None, axis=None): @@ -3281,7 +3282,7 @@ def _diff(self, periods, part_cols=()): .rowsBetween(-periods, -periods) ) scol = self.spark.column - F.lag(self.spark.column, periods).over(window) - return self._with_new_scol(scol).rename(self.name) + return self._with_new_scol(scol) def idxmax(self, skipna=True): """ @@ -4440,7 +4441,7 @@ def combine_first(self, other): cond = F.when(this.isNull(), that).otherwise(this) # If `self` and `other` come from same frame, the anchor should be kept if same_anchor(self, other): - return self._with_new_scol(cond).rename(self.name) + return self._with_new_scol(cond) index_scols = combined._internal.index_spark_columns sdf = combined._internal.spark_frame.select( *index_scols, cond.alias(self._internal.data_spark_column_names[0]) @@ -4941,7 +4942,7 @@ def _cum(self, func, skipna, part_cols=()): F.lit(None), ).otherwise(func(self.spark.column).over(window)) - return self._with_new_scol(scol).rename(self.name) + return self._with_new_scol(scol) def _cumprod(self, skipna, part_cols=()): from pyspark.sql.functions import pandas_udf @@ -4957,7 +4958,7 @@ def negative_check(s): return F.sum(F.log(negative_check(scol))) kser = self._cum(cumprod, skipna, part_cols) - return kser._with_new_scol(F.exp(kser.spark.column)).rename(self.name) + return kser._with_new_scol(F.exp(kser.spark.column)) # ---------------------------------------------------------------------- # Accessor Methods diff --git a/databricks/koalas/spark/accessors.py b/databricks/koalas/spark/accessors.py index cb6b02ec63..0a6f8571fc 100644 --- a/databricks/koalas/spark/accessors.py +++ b/databricks/koalas/spark/accessors.py @@ -116,7 +116,7 @@ def transform(self, func): "The output of the function [%s] should be of a " "pyspark.sql.Column; however, got [%s]." % (func, type(output)) ) - new_ser = self._data._with_new_scol(scol=output).rename(self._data.name) + new_ser = self._data._with_new_scol(scol=output) # Trigger the resolution so it throws an exception if anything does wrong # within the function, for example, # `df1.a.spark.transform(lambda _: F.col("non-existent"))`. diff --git a/databricks/koalas/strings.py b/databricks/koalas/strings.py index 7d18a38e9e..b53f8ac3e9 100644 --- a/databricks/koalas/strings.py +++ b/databricks/koalas/strings.py @@ -38,7 +38,6 @@ def __init__(self, series: "ks.Series"): if not isinstance(series.spark.data_type, (StringType, BinaryType, ArrayType)): raise ValueError("Cannot call StringMethods on type {}".format(series.spark.data_type)) self._data = series - self.name = self._data.name # Methods def capitalize(self) -> "ks.Series": @@ -1148,7 +1147,7 @@ def findall(self, pat, flags=0) -> "ks.Series": returnType=ArrayType(StringType(), containsNull=True), functionType=PandasUDFType.SCALAR, ) - return self._data._with_new_scol(scol=pudf(self._data.spark.column)).rename(self.name) + return self._data._with_new_scol(scol=pudf(self._data.spark.column)) def index(self, sub, start=0, end=None) -> "ks.Series": """ @@ -2001,7 +2000,7 @@ def split(self, pat=None, n=-1, expand=False) -> Union["ks.Series", "ks.DataFram returnType=ArrayType(StringType(), containsNull=True), functionType=PandasUDFType.SCALAR, ) - kser = self._data._with_new_scol(scol=pudf(self._data.spark.column)).rename(self.name) + kser = self._data._with_new_scol(scol=pudf(self._data.spark.column)) if expand: kdf = kser.to_frame() @@ -2135,7 +2134,7 @@ def rsplit(self, pat=None, n=-1, expand=False) -> Union["ks.Series", "ks.DataFra returnType=ArrayType(StringType(), containsNull=True), functionType=PandasUDFType.SCALAR, ) - kser = self._data._with_new_scol(scol=pudf(self._data.spark.column)).rename(self.name) + kser = self._data._with_new_scol(scol=pudf(self._data.spark.column)) if expand: kdf = kser.to_frame() diff --git a/databricks/koalas/tests/test_series.py b/databricks/koalas/tests/test_series.py index 6a0290dcc4..0495aa1224 100644 --- a/databricks/koalas/tests/test_series.py +++ b/databricks/koalas/tests/test_series.py @@ -141,6 +141,8 @@ def test_rename_method(self): self.assertEqual(kser.name, "x") # no mutation self.assert_eq(kser.rename(), pser.rename()) + self.assert_eq((kser.rename("y") + 1).head(), (pser.rename("y") + 1).head()) + kser.rename("z", inplace=True) pser.rename("z", inplace=True) self.assertEqual(kser.name, "z") @@ -244,6 +246,12 @@ def test_fillna(self): pser.fillna(0, inplace=True) self.assert_eq(kser, pser) + kser = kdf.x.rename("y") + pser = pdf.x.rename("y") + kser.fillna(0, inplace=True) + pser.fillna(0, inplace=True) + self.assert_eq(kser.head(), pser.head()) + pser = pd.Series([1, 2, 3, 4, 5, 6], name="x") kser = ks.from_pandas(pser) diff --git a/databricks/koalas/utils.py b/databricks/koalas/utils.py index 25217f32b6..f03888a565 100644 --- a/databricks/koalas/utils.py +++ b/databricks/koalas/utils.py @@ -342,7 +342,8 @@ def align_diff_series(func, this_series, *args, how="full"): ) internal = combined._internal.copy( - column_labels=this_series._internal.column_labels, data_spark_columns=[scol] + column_labels=this_series._internal.column_labels, + data_spark_columns=[scol.alias(name_like_string(this_series.name))], ) return first_series(DataFrame(internal)) diff --git a/databricks/koalas/window.py b/databricks/koalas/window.py index e60c6d9f1b..fd4bcf8081 100644 --- a/databricks/koalas/window.py +++ b/databricks/koalas/window.py @@ -150,8 +150,7 @@ def __getattr__(self, item: str) -> Any: def _apply_as_series_or_frame(self, func): return self._kdf_or_kser._apply_series_op( - lambda kser: kser._with_new_scol(func(kser.spark.column)).rename(kser.name), - should_resolve=True, + lambda kser: kser._with_new_scol(func(kser.spark.column)), should_resolve=True ) def count(self): @@ -687,9 +686,7 @@ def _apply_as_series_or_frame(self, func): applied = [] for agg_column in agg_columns: - applied.append( - agg_column._with_new_scol(func(agg_column.spark.column)).rename(agg_column.name) - ) + applied.append(agg_column._with_new_scol(func(agg_column.spark.column))) # Seems like pandas filters out when grouped key is NA. cond = groupby._groupkeys[0].spark.column.isNotNull()