Skip to content

Commit

Permalink
Manage index spark columns instead of index spark column names. (#1944)
Browse files Browse the repository at this point in the history
Manages index spark columns instead of index spark column names as the same as data spark columns.
  • Loading branch information
ueshin committed Dec 2, 2020
1 parent 434cf46 commit 138c7b8
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 249 deletions.
6 changes: 3 additions & 3 deletions databricks/koalas/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ def attach_id_column(self, id_type: str, column: Union[Any, Tuple]) -> "DataFram
return DataFrame(
InternalFrame(
spark_frame=sdf,
index_spark_column_names=[
SPARK_INDEX_NAME_FORMAT(i) for i in range(internal.index_level)
index_spark_columns=[
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level)
],
index_names=internal.index_names,
column_labels=internal.column_labels + [column],
Expand Down Expand Up @@ -386,7 +386,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame":
)

# Otherwise, it loses index.
internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None)
internal = InternalFrame(spark_frame=sdf, index_spark_columns=None)

return DataFrame(internal)

Expand Down
2 changes: 1 addition & 1 deletion databricks/koalas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ def value_counts(

internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=[index_name],
index_spark_columns=[scol_for(sdf, index_name)],
column_labels=self._internal.column_labels,
data_spark_columns=[scol_for(sdf, "count")],
column_label_names=self._internal.column_label_names,
Expand Down
164 changes: 86 additions & 78 deletions databricks/koalas/frame.py

Large diffs are not rendered by default.

47 changes: 25 additions & 22 deletions databricks/koalas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ def _spark_groupby(kdf, func, groupkeys=()):
index_spark_column_names = groupkey_names
index_names = [kser._column_label for kser in groupkeys]
else:
index_spark_column_names = None
index_names = None
index_spark_column_names = []
index_names = []
return InternalFrame(
spark_frame=sdf,
index_spark_column_names=index_spark_column_names,
index_spark_columns=[scol_for(sdf, col) for col in index_spark_column_names],
index_names=index_names,
column_labels=column_labels,
data_spark_columns=[scol_for(sdf, col) for col in data_columns],
Expand Down Expand Up @@ -613,7 +613,7 @@ def size(self) -> Series:
sdf = sdf.groupby(*groupkey_names).count()
internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=groupkey_names,
index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
index_names=[kser._column_label for kser in groupkeys],
column_labels=[None],
data_spark_columns=[scol_for(sdf, "count")],
Expand Down Expand Up @@ -1211,7 +1211,7 @@ def wrapped_func(df, *a, **k):
)
else:
# Otherwise, it loses index.
internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None)
internal = InternalFrame(spark_frame=sdf, index_spark_columns=None)

if should_return_series:
kser = first_series(DataFrame(internal))
Expand Down Expand Up @@ -1532,7 +1532,7 @@ def idxmax(self, skipna=True) -> Union[DataFrame, Series]:

internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=groupkey_names,
index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
index_names=[kser._column_label for kser in self._groupkeys],
column_labels=[kser._column_label for kser in self._agg_columns],
data_spark_columns=[
Expand Down Expand Up @@ -1610,7 +1610,7 @@ def idxmin(self, skipna=True) -> Union[DataFrame, Series]:

internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=groupkey_names,
index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
index_names=[kser._column_label for kser in self._groupkeys],
column_labels=[kser._column_label for kser in self._agg_columns],
data_spark_columns=[
Expand Down Expand Up @@ -2093,7 +2093,7 @@ def pandas_transform(pdf):
retain_index=False,
)
# Otherwise, it loses index.
internal = InternalFrame(spark_frame=sdf, index_spark_column_names=None)
internal = InternalFrame(spark_frame=sdf, index_spark_columns=None)

return DataFrame(internal)

Expand Down Expand Up @@ -2261,6 +2261,9 @@ def get_group(self, name) -> Union[DataFrame, Series]:

internal = internal.copy(
spark_frame=spark_frame,
index_spark_columns=[
scol_for(spark_frame, col) for col in internal.index_spark_column_names
],
column_labels=[s._column_label for s in self._agg_columns],
data_spark_columns=[
scol_for(spark_frame, s._internal.data_spark_column_names[0])
Expand Down Expand Up @@ -2310,7 +2313,7 @@ def _reduce_for_stat_function(self, sfun, only_numeric):

internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=groupkey_names,
index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
index_names=[kser._column_label for kser in self._groupkeys],
column_labels=column_labels,
data_spark_columns=[scol_for(sdf, col) for col in data_columns],
Expand Down Expand Up @@ -2613,8 +2616,8 @@ def describe(self) -> DataFrame:
# Reindex the DataFrame to reflect initial grouping and agg columns.
internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=[
kser._internal.data_spark_column_names[0] for kser in self._groupkeys
index_spark_columns=[
scol_for(sdf, kser._internal.data_spark_column_names[0]) for kser in self._groupkeys
],
index_names=[kser._column_label for kser in self._groupkeys],
column_labels=column_labels,
Expand Down Expand Up @@ -2767,14 +2770,14 @@ def nsmallest(self, n=5) -> Series:
sdf.withColumn(temp_rank_column, F.row_number().over(window))
.filter(F.col(temp_rank_column) <= n)
.drop(temp_rank_column)
)
).drop(NATURAL_ORDER_COLUMN_NAME)

internal = InternalFrame(
spark_frame=sdf.drop(NATURAL_ORDER_COLUMN_NAME),
index_spark_column_names=(
groupkey_col_names
spark_frame=sdf,
index_spark_columns=(
[scol_for(sdf, col) for col in groupkey_col_names]
+ [
SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys))
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys)))
for i in range(self._kdf._internal.index_level)
]
),
Expand Down Expand Up @@ -2840,14 +2843,14 @@ def nlargest(self, n=5) -> Series:
sdf.withColumn(temp_rank_column, F.row_number().over(window))
.filter(F.col(temp_rank_column) <= n)
.drop(temp_rank_column)
)
).drop(NATURAL_ORDER_COLUMN_NAME)

internal = InternalFrame(
spark_frame=sdf.drop(NATURAL_ORDER_COLUMN_NAME),
index_spark_column_names=(
groupkey_col_names
spark_frame=sdf,
index_spark_columns=(
[scol_for(sdf, col) for col in groupkey_col_names]
+ [
SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys))
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i + len(self._groupkeys)))
for i in range(self._kdf._internal.index_level)
]
),
Expand Down Expand Up @@ -2916,7 +2919,7 @@ def value_counts(self, sort=None, ascending=None, dropna=True) -> Series:

internal = InternalFrame(
spark_frame=sdf,
index_spark_column_names=groupkey_names,
index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
index_names=[kser._column_label for kser in groupkeys],
column_labels=[self._agg_columns[0]._column_label],
data_spark_columns=[scol_for(sdf, agg_column)],
Expand Down

0 comments on commit 138c7b8

Please sign in to comment.