Skip to content

Commit

Permalink
Fix Series._with_new_scol to use alias. (#1634)
Browse files Browse the repository at this point in the history
We should always use alias when `IndexOpsMixin._with_new_scol` to make sure the column name is valid.

```py
>>> kser = ks.Series([1, 2, 3, 4, 5, 6, 7], name="x")
>>> (kser.rename("y") + 1).head()
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: syntax error in attribute name: `(x AS `y` + 1)`;
```

Resolves #1633.
  • Loading branch information
ueshin committed Jul 7, 2020
1 parent 46d396d commit 3c18bb4
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 52 deletions.
2 changes: 1 addition & 1 deletion databricks/koalas/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,4 +817,4 @@ def _transform_batch(self, func, return_schema):
spark_return_type = return_schema

pudf = pandas_udf(func, returnType=spark_return_type, functionType=PandasUDFType.SCALAR)
return self._kser._with_new_scol(scol=pudf(self._kser.spark.column)).rename(self._kser.name)
return self._kser._with_new_scol(scol=pudf(self._kser.spark.column))
15 changes: 9 additions & 6 deletions databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pandas as pd
from pandas.api.types import is_list_like
from pandas.core.accessor import CachedAccessor
from pyspark import sql as spark
from pyspark.sql import functions as F, Window, Column
from pyspark.sql.types import DateType, DoubleType, FloatType, LongType, StringType, TimestampType

Expand Down Expand Up @@ -141,6 +142,10 @@ def _internal(self) -> InternalFrame:
def _kdf(self) -> DataFrame:
return self._anchor

@abstractmethod
def _with_new_scol(self, scol: spark.Column):
pass

spark = CachedAccessor("spark", SparkIndexOpsMethods)

@property
Expand Down Expand Up @@ -733,7 +738,7 @@ def isin(self, values):
" to isin(), you passed a [{values_type}]".format(values_type=type(values).__name__)
)

return self._with_new_scol(self.spark.column.isin(list(values))).rename(self.name)
return self._with_new_scol(self.spark.column.isin(list(values)))

def isnull(self):
"""
Expand Down Expand Up @@ -767,11 +772,9 @@ def isnull(self):
if isinstance(self, MultiIndex):
raise NotImplementedError("isna is not defined for MultiIndex")
if isinstance(self.spark.data_type, (FloatType, DoubleType)):
return self._with_new_scol(
self.spark.column.isNull() | F.isnan(self.spark.column)
).rename(self.name)
return self._with_new_scol(self.spark.column.isNull() | F.isnan(self.spark.column))
else:
return self._with_new_scol(self.spark.column.isNull()).rename(self.name)
return self._with_new_scol(self.spark.column.isNull())

isna = isnull

Expand Down Expand Up @@ -1005,7 +1008,7 @@ def _shift(self, periods, fill_value, part_cols=()):
)
lag_col = F.lag(col, periods).over(window)
col = F.when(lag_col.isNull() | F.isnan(lag_col), fill_value).otherwise(lag_col)
return self._with_new_scol(col).rename(self.name)
return self._with_new_scol(col)

# TODO: Update Documentation for Bins Parameter when its supported
def value_counts(self, normalize=False, sort=True, ascending=False, bins=None, dropna=True):
Expand Down
4 changes: 1 addition & 3 deletions databricks/koalas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2550,9 +2550,7 @@ def transform(self, func, axis=0, *args, **kwargs):
functionType=PandasUDFType.SCALAR,
)
kser = self._kser_for(input_label)
applied.append(
kser._with_new_scol(scol=pudf(kser.spark.column)).rename(input_label)
)
applied.append(kser._with_new_scol(scol=pudf(kser.spark.column)))

internal = self._internal.with_new_columns(applied)
return DataFrame(internal)
Expand Down
4 changes: 1 addition & 3 deletions databricks/koalas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1419,9 +1419,7 @@ def abs(self):
3 7 40 50
"""
# TODO: The first example above should not have "Name: 0".
return self._apply_series_op(
lambda kser: kser._with_new_scol(F.abs(kser.spark.column)).rename(kser.name)
)
return self._apply_series_op(lambda kser: kser._with_new_scol(F.abs(kser.spark.column)))

# TODO: by argument only support the grouping name and as_index only for now. Documentation
# should be updated when it's supported.
Expand Down
2 changes: 1 addition & 1 deletion databricks/koalas/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _with_new_scol(self, scol: spark.Column) -> "Index":
:param scol: the new Spark Column
:return: the copied Index
"""
sdf = self._internal.spark_frame.select(scol) # type: ignore
sdf = self._internal.spark_frame.select(scol.alias(SPARK_DEFAULT_INDEX_NAME))
internal = InternalFrame(
spark_frame=sdf,
index_map=OrderedDict(zip(sdf.columns, self._internal.index_names)), # type: ignore
Expand Down
55 changes: 28 additions & 27 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import pyspark
from pyspark import sql as spark
from pyspark.sql import functions as F, Column
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import (
BooleanType,
DoubleType,
Expand Down Expand Up @@ -386,7 +385,8 @@ def _with_new_scol(self, scol: spark.Column) -> "Series":
:return: the copied Series
"""
internal = self._kdf._internal.copy(
column_labels=[self._column_label], data_spark_columns=[scol]
column_labels=[self._column_label],
data_spark_columns=[scol.alias(name_like_string(self._column_label))],
)
return first_series(DataFrame(internal))

Expand Down Expand Up @@ -427,7 +427,7 @@ def spark_type(self):

# Arithmetic Operators
def add(self, other):
return (self + other).rename(self.name)
return self + other

add.__doc__ = _flex_doc_SERIES.format(
desc="Addition",
Expand All @@ -449,7 +449,7 @@ def radd(self, other):
)

def div(self, other):
return (self / other).rename(self.name)
return self / other

div.__doc__ = _flex_doc_SERIES.format(
desc="Floating division",
Expand All @@ -473,7 +473,7 @@ def rdiv(self, other):
)

def truediv(self, other):
return (self / other).rename(self.name)
return self / other

truediv.__doc__ = _flex_doc_SERIES.format(
desc="Floating division",
Expand All @@ -495,7 +495,7 @@ def rtruediv(self, other):
)

def mul(self, other):
return (self * other).rename(self.name)
return self * other

mul.__doc__ = _flex_doc_SERIES.format(
desc="Multiplication",
Expand All @@ -519,7 +519,7 @@ def rmul(self, other):
)

def sub(self, other):
return (self - other).rename(self.name)
return self - other

sub.__doc__ = _flex_doc_SERIES.format(
desc="Subtraction",
Expand All @@ -543,7 +543,7 @@ def rsub(self, other):
)

def mod(self, other):
return (self % other).rename(self.name)
return self % other

mod.__doc__ = _flex_doc_SERIES.format(
desc="Modulo",
Expand All @@ -565,7 +565,7 @@ def rmod(self, other):
)

def pow(self, other):
return (self ** other).rename(self.name)
return self ** other

pow.__doc__ = _flex_doc_SERIES.format(
desc="Exponential power of series",
Expand All @@ -587,7 +587,7 @@ def rpow(self, other):
)

def floordiv(self, other):
return (self // other).rename(self.name)
return self // other

floordiv.__doc__ = _flex_doc_SERIES.format(
desc="Integer division",
Expand Down Expand Up @@ -634,7 +634,7 @@ def eq(self, other):
d False
Name: b, dtype: bool
"""
return (self == other).rename(self.name)
return self == other

equals = eq

Expand All @@ -660,7 +660,7 @@ def gt(self, other):
d False
Name: b, dtype: bool
"""
return (self > other).rename(self.name)
return self > other

def ge(self, other):
"""
Expand All @@ -684,7 +684,7 @@ def ge(self, other):
d False
Name: b, dtype: bool
"""
return (self >= other).rename(self.name)
return self >= other

def lt(self, other):
"""
Expand All @@ -708,7 +708,7 @@ def lt(self, other):
d False
Name: b, dtype: bool
"""
return (self < other).rename(self.name)
return self < other

def le(self, other):
"""
Expand All @@ -732,7 +732,7 @@ def le(self, other):
d False
Name: b, dtype: bool
"""
return (self <= other).rename(self.name)
return self <= other

def ne(self, other):
"""
Expand All @@ -756,7 +756,7 @@ def ne(self, other):
d True
Name: b, dtype: bool
"""
return (self != other).rename(self.name)
return self != other

def divmod(self, other):
"""
Expand Down Expand Up @@ -960,7 +960,7 @@ def map(self, arg):
current = current.otherwise(F.lit(tmp_val))
else:
current = current.otherwise(F.lit(None).cast(self.spark.data_type))
return self._with_new_scol(current).rename(self.name)
return self._with_new_scol(current)
else:
return self.apply(arg)

Expand Down Expand Up @@ -1659,10 +1659,12 @@ def _fillna(self, value=None, method=None, axis=None, inplace=False, limit=None,

if inplace:
self._kdf._update_internal_frame(
self._kdf._internal.with_new_spark_column(self._column_label, scol)
self._kdf._internal.with_new_spark_column(
self._column_label, scol.alias(name_like_string(self.name))
)
)
else:
return self._with_new_scol(scol).rename(self.name)
return self._with_new_scol(scol)

def dropna(self, axis=0, inplace=False, **kwargs):
"""
Expand Down Expand Up @@ -1761,7 +1763,7 @@ def clip(self, lower: Union[float, int] = None, upper: Union[float, int] = None)
scol = F.when(scol < lower, lower).otherwise(scol)
if upper is not None:
scol = F.when(scol > upper, upper).otherwise(scol)
return self._with_new_scol(scol.alias(self._internal.data_spark_column_names[0]))
return self._with_new_scol(scol)
else:
return self

Expand Down Expand Up @@ -2946,9 +2948,8 @@ def round(self, decimals=0):
"""
if not isinstance(decimals, int):
raise ValueError("decimals must be an integer")
column_name = self.name
scol = F.round(self.spark.column, decimals)
return self._with_new_scol(scol).rename(column_name)
return self._with_new_scol(scol)

# TODO: add 'interpolation' parameter.
def quantile(self, q=0.5, accuracy=10000):
Expand Down Expand Up @@ -3188,7 +3189,7 @@ def _rank(self, method="average", ascending=True, part_cols=()):
Window.unboundedPreceding, Window.unboundedFollowing
)
scol = stat_func(F.row_number().over(window1)).over(window2)
kser = self._with_new_scol(scol).rename(self.name)
kser = self._with_new_scol(scol)
return kser.astype(np.float64)

def filter(self, items=None, like=None, regex=None, axis=None):
Expand Down Expand Up @@ -3281,7 +3282,7 @@ def _diff(self, periods, part_cols=()):
.rowsBetween(-periods, -periods)
)
scol = self.spark.column - F.lag(self.spark.column, periods).over(window)
return self._with_new_scol(scol).rename(self.name)
return self._with_new_scol(scol)

def idxmax(self, skipna=True):
"""
Expand Down Expand Up @@ -4440,7 +4441,7 @@ def combine_first(self, other):
cond = F.when(this.isNull(), that).otherwise(this)
# If `self` and `other` come from same frame, the anchor should be kept
if same_anchor(self, other):
return self._with_new_scol(cond).rename(self.name)
return self._with_new_scol(cond)
index_scols = combined._internal.index_spark_columns
sdf = combined._internal.spark_frame.select(
*index_scols, cond.alias(self._internal.data_spark_column_names[0])
Expand Down Expand Up @@ -4941,7 +4942,7 @@ def _cum(self, func, skipna, part_cols=()):
F.lit(None),
).otherwise(func(self.spark.column).over(window))

return self._with_new_scol(scol).rename(self.name)
return self._with_new_scol(scol)

def _cumprod(self, skipna, part_cols=()):
from pyspark.sql.functions import pandas_udf
Expand All @@ -4957,7 +4958,7 @@ def negative_check(s):
return F.sum(F.log(negative_check(scol)))

kser = self._cum(cumprod, skipna, part_cols)
return kser._with_new_scol(F.exp(kser.spark.column)).rename(self.name)
return kser._with_new_scol(F.exp(kser.spark.column))

# ----------------------------------------------------------------------
# Accessor Methods
Expand Down
2 changes: 1 addition & 1 deletion databricks/koalas/spark/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def transform(self, func):
"The output of the function [%s] should be of a "
"pyspark.sql.Column; however, got [%s]." % (func, type(output))
)
new_ser = self._data._with_new_scol(scol=output).rename(self._data.name)
new_ser = self._data._with_new_scol(scol=output)
# Trigger the resolution so it throws an exception if anything does wrong
# within the function, for example,
# `df1.a.spark.transform(lambda _: F.col("non-existent"))`.
Expand Down
7 changes: 3 additions & 4 deletions databricks/koalas/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def __init__(self, series: "ks.Series"):
if not isinstance(series.spark.data_type, (StringType, BinaryType, ArrayType)):
raise ValueError("Cannot call StringMethods on type {}".format(series.spark.data_type))
self._data = series
self.name = self._data.name

# Methods
def capitalize(self) -> "ks.Series":
Expand Down Expand Up @@ -1148,7 +1147,7 @@ def findall(self, pat, flags=0) -> "ks.Series":
returnType=ArrayType(StringType(), containsNull=True),
functionType=PandasUDFType.SCALAR,
)
return self._data._with_new_scol(scol=pudf(self._data.spark.column)).rename(self.name)
return self._data._with_new_scol(scol=pudf(self._data.spark.column))

def index(self, sub, start=0, end=None) -> "ks.Series":
"""
Expand Down Expand Up @@ -2001,7 +2000,7 @@ def split(self, pat=None, n=-1, expand=False) -> Union["ks.Series", "ks.DataFram
returnType=ArrayType(StringType(), containsNull=True),
functionType=PandasUDFType.SCALAR,
)
kser = self._data._with_new_scol(scol=pudf(self._data.spark.column)).rename(self.name)
kser = self._data._with_new_scol(scol=pudf(self._data.spark.column))

if expand:
kdf = kser.to_frame()
Expand Down Expand Up @@ -2135,7 +2134,7 @@ def rsplit(self, pat=None, n=-1, expand=False) -> Union["ks.Series", "ks.DataFra
returnType=ArrayType(StringType(), containsNull=True),
functionType=PandasUDFType.SCALAR,
)
kser = self._data._with_new_scol(scol=pudf(self._data.spark.column)).rename(self.name)
kser = self._data._with_new_scol(scol=pudf(self._data.spark.column))

if expand:
kdf = kser.to_frame()
Expand Down
8 changes: 8 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ def test_rename_method(self):
self.assertEqual(kser.name, "x") # no mutation
self.assert_eq(kser.rename(), pser.rename())

self.assert_eq((kser.rename("y") + 1).head(), (pser.rename("y") + 1).head())

kser.rename("z", inplace=True)
pser.rename("z", inplace=True)
self.assertEqual(kser.name, "z")
Expand Down Expand Up @@ -244,6 +246,12 @@ def test_fillna(self):
pser.fillna(0, inplace=True)
self.assert_eq(kser, pser)

kser = kdf.x.rename("y")
pser = pdf.x.rename("y")
kser.fillna(0, inplace=True)
pser.fillna(0, inplace=True)
self.assert_eq(kser.head(), pser.head())

pser = pd.Series([1, 2, 3, 4, 5, 6], name="x")
kser = ks.from_pandas(pser)

Expand Down
3 changes: 2 additions & 1 deletion databricks/koalas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ def align_diff_series(func, this_series, *args, how="full"):
)

internal = combined._internal.copy(
column_labels=this_series._internal.column_labels, data_spark_columns=[scol]
column_labels=this_series._internal.column_labels,
data_spark_columns=[scol.alias(name_like_string(this_series.name))],
)
return first_series(DataFrame(internal))

Expand Down
Loading

0 comments on commit 3c18bb4

Please sign in to comment.