Skip to content

Commit

Permalink
code cleanup and edits
Browse files Browse the repository at this point in the history
  • Loading branch information
christinafan committed Sep 11, 2023
1 parent e8085b8 commit 4e563b7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 84 deletions.
4 changes: 2 additions & 2 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2017,6 +2017,7 @@ def window(

# applies reduction function over entire virtual partition
def window_function_complete(virtual_partition):
# have to copy the pandas dataframe on ray because it's immutable
virtual_partition_copy = virtual_partition.copy()
window_result = reduce_fn(virtual_partition_copy)
return window_result
Expand Down Expand Up @@ -2092,7 +2093,6 @@ def window_function_partition(virtual_partition):
]
)
parts_to_join.append(masked_new_parts)
break
else:
# window continues into next part, so just add this part to parts_to_join
if axis == Axis.COL_WISE:
Expand Down Expand Up @@ -2138,7 +2138,7 @@ def window_function_partition(virtual_partition):
results = np.array(results)

return self.__constructor__(
results, self.index, self.columns, None, None, result_schema
np.array(results), self.index, self.columns, None, None, result_schema
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down
126 changes: 44 additions & 82 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ def expanding_corr(
)
)

old_window_mean = Fold.register(
_window_mean = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).mean(*args, **kwargs)
)
Expand All @@ -1429,9 +1429,9 @@ def window_mean(self, axis, window_kwargs, *args, **kwargs):
)
)
else:
return self.old_window_mean(axis, window_kwargs, *args, **kwargs)
return self._window_mean(axis, window_kwargs, *args, **kwargs)

old_window_sum = Fold.register(
_window_sum = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).sum(*args, **kwargs)
)
Expand All @@ -1450,9 +1450,9 @@ def window_sum(self, axis, window_kwargs, *args, **kwargs):
)
)
else:
return self.old_window_sum(axis, window_kwargs, *args, **kwargs)
return self._window_sum(axis, window_kwargs, *args, **kwargs)

old_window_var = Fold.register(
_window_var = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).var(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1471,9 +1471,9 @@ def window_var(self, axis, window_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_window_var(axis, window_kwargs, ddof, *args, **kwargs)
return self._window_var(axis, window_kwargs, ddof, *args, **kwargs)

old_window_std = Fold.register(
_window_std = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).std(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1492,9 +1492,9 @@ def window_std(self, axis, window_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_window_std(axis, window_kwargs, ddof, *args, **kwargs)
return self._window_std(axis, window_kwargs, ddof, *args, **kwargs)

old_rolling_count = Fold.register(
_rolling_count = Fold.register(
lambda df, rolling_kwargs: pandas.DataFrame(df.rolling(**rolling_kwargs).count())
)

Expand All @@ -1509,9 +1509,9 @@ def rolling_count(self, axis, rolling_kwargs):
)
)
else:
return self.old_rolling_count(axis, rolling_kwargs)
return self._rolling_count(axis, rolling_kwargs)

old_rolling_sum = Fold.register(
_rolling_sum = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).sum(*args, **kwargs)
)
Expand All @@ -1530,9 +1530,9 @@ def rolling_sum(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_sum(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_sum(axis, rolling_kwargs, *args, **kwargs)

old_rolling_sem = Fold.register(
_rolling_sem = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).sem(*args, **kwargs)
)
Expand All @@ -1551,9 +1551,9 @@ def rolling_sem(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_sem(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_sem(axis, rolling_kwargs, *args, **kwargs)

old_rolling_mean = Fold.register(
_rolling_mean = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).mean(*args, **kwargs)
)
Expand All @@ -1572,9 +1572,9 @@ def rolling_mean(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_mean(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_mean(axis, rolling_kwargs, *args, **kwargs)

old_rolling_median = Fold.register(
_rolling_median = Fold.register(
lambda df, rolling_kwargs, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).median(**kwargs)
)
Expand All @@ -1591,9 +1591,9 @@ def rolling_median(self, axis, rolling_kwargs, **kwargs):
)
)
else:
return self.old_rolling_median(axis, rolling_kwargs, **kwargs)
return self._rolling_median(axis, rolling_kwargs, **kwargs)

old_rolling_var = Fold.register(
_rolling_var = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).var(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1614,9 +1614,9 @@ def rolling_var(self, axis, rolling_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_rolling_var(axis, rolling_kwargs, ddof, *args, **kwargs)
return self._rolling_var(axis, rolling_kwargs, ddof, *args, **kwargs)

old_rolling_std = Fold.register(
_rolling_std = Fold.register(
lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).std(ddof=ddof, *args, **kwargs)
)
Expand All @@ -1637,9 +1637,9 @@ def rolling_std(self, axis, rolling_kwargs, ddof, *args, **kwargs):
)
)
else:
return self.old_rolling_std(axis, rolling_kwargs, ddof, *args, **kwargs)
return self._rolling_std(axis, rolling_kwargs, ddof, *args, **kwargs)

old_rolling_min = Fold.register(
_rolling_min = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).min(*args, **kwargs)
)
Expand All @@ -1658,9 +1658,9 @@ def rolling_min(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_min(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_min(axis, rolling_kwargs, *args, **kwargs)

old_rolling_max = Fold.register(
_rolling_max = Fold.register(
lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).max(*args, **kwargs)
)
Expand All @@ -1679,9 +1679,9 @@ def rolling_max(self, axis, rolling_kwargs, *args, **kwargs):
)
)
else:
return self.old_rolling_max(axis, rolling_kwargs, *args, **kwargs)
return self._rolling_max(axis, rolling_kwargs, *args, **kwargs)

old_rolling_skew = Fold.register(
_rolling_skew = Fold.register(
lambda df, rolling_kwargs, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).skew(**kwargs)
)
Expand All @@ -1698,9 +1698,9 @@ def rolling_skew(self, axis, rolling_kwargs, **kwargs):
)
)
else:
return self.old_rolling_skew(axis, rolling_kwargs, **kwargs)
return self._rolling_skew(axis, rolling_kwargs, **kwargs)

old_rolling_kurt = Fold.register(
_rolling_kurt = Fold.register(
lambda df, rolling_kwargs, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).kurt(**kwargs)
)
Expand All @@ -1717,9 +1717,9 @@ def rolling_kurt(self, axis, rolling_kwargs, **kwargs):
)
)
else:
return self.old_rolling_kurt(axis, rolling_kwargs, **kwargs)
return self._rolling_kurt(axis, rolling_kwargs, **kwargs)

old_rolling_apply = Fold.register(
_rolling_apply = Fold.register(
lambda df, rolling_kwargs, func, raw, engine, engine_kwargs, args, kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).apply(
func=func,
Expand Down Expand Up @@ -1754,7 +1754,7 @@ def rolling_apply(
)
)
else:
return self.old_rolling_apply(
return self._rolling_apply(
axis,
rolling_kwargs,
func,
Expand All @@ -1765,7 +1765,7 @@ def rolling_apply(
kwargs,
)

old_rolling_rank = Fold.register(
_rolling_rank = Fold.register(
lambda df, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).rank(
method=method,
Expand Down Expand Up @@ -1798,11 +1798,11 @@ def rolling_rank(
)
)
else:
return self.old_rolling_rank(
return self._rolling_rank(
axis, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs
)

old_rolling_quantile = Fold.register(
_rolling_quantile = Fold.register(
lambda df, rolling_kwargs, quantile, interpolation, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).quantile(
quantile=quantile, interpolation=interpolation, **kwargs
Expand All @@ -1825,11 +1825,11 @@ def rolling_quantile(self, axis, rolling_kwargs, quantile, interpolation, **kwar
)
)
else:
return self.old_rolling_quantile(
return self._rolling_quantile(
axis, rolling_kwargs, quantile, interpolation, **kwargs
)

old_rolling_corr = Fold.register(
_rolling_corr = Fold.register(
lambda df, rolling_kwargs, other, pairwise, *args, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).corr(
other=other, pairwise=pairwise, *args, **kwargs
Expand Down Expand Up @@ -1859,11 +1859,11 @@ def rolling_corr(self, axis, rolling_kwargs, other, pairwise, *args, **kwargs):
)
)
else:
return self.old_rolling_corr(
return self._rolling_corr(
axis, rolling_kwargs, other, pairwise, *args, **kwargs
)

old_rolling_cov = Fold.register(
_rolling_cov = Fold.register(
lambda df, rolling_kwargs, other, pairwise, ddof, **kwargs: pandas.DataFrame(
df.rolling(**rolling_kwargs).cov(
other=other, pairwise=pairwise, ddof=ddof, **kwargs
Expand Down Expand Up @@ -1897,7 +1897,7 @@ def rolling_cov(self, axis, rolling_kwargs, other, pairwise, ddof, **kwargs):
)
)
else:
return self.old_rolling_cov(
return self._rolling_cov(
axis, rolling_kwargs, other, pairwise, ddof, **kwargs
)

Expand Down Expand Up @@ -4494,48 +4494,10 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
return self.__constructor__(new_modin_frame)

def sort_rows_by_column_values(self, columns, ascending=True, **kwargs):
# Our algebra sort is only implemented for Engines that support virtual partitioning.
if _current_engine_supports_virtual_partitions():
new_modin_frame = self._modin_frame.sort_by(
0, columns, ascending=ascending, **kwargs
)
return self.__constructor__(new_modin_frame)
ignore_index = kwargs.get("ignore_index", False)
kwargs["ignore_index"] = False
if not is_list_like(columns):
columns = [columns]
# Currently, sort_values will just reindex based on the sorted values.
# TODO create a more efficient way to sort
ErrorMessage.default_to_pandas("sort_values")
broadcast_value_dict = {
col: self.getitem_column_array([col]).to_pandas().squeeze(axis=1)
for col in columns
}
# Clear index level names because they also appear in broadcast_value_dict
orig_index_level_names = self.index.names
tmp_index = self.index.copy()
tmp_index.names = [None] * tmp_index.nlevels
# Index may contain duplicates
broadcast_values1 = pandas.DataFrame(broadcast_value_dict, index=tmp_index)
# Index without duplicates
broadcast_values2 = pandas.DataFrame(broadcast_value_dict)
broadcast_values2 = broadcast_values2.reset_index(drop=True)
# Index may contain duplicates
new_index1 = broadcast_values1.sort_values(
by=columns, axis=0, ascending=ascending, **kwargs
).index
# Index without duplicates
new_index2 = broadcast_values2.sort_values(
by=columns, axis=0, ascending=ascending, **kwargs
).index

result = self.reset_index(drop=True).reindex(axis=0, labels=new_index2)
if ignore_index:
result = result.reset_index(drop=True)
else:
result.index = new_index1
result.index.names = orig_index_level_names
return result
new_modin_frame = self._modin_frame.sort_by(
0, columns, ascending=ascending, **kwargs
)
return self.__constructor__(new_modin_frame)

def sort_columns_by_row_values(self, rows, ascending=True, **kwargs):
if not is_list_like(rows):
Expand Down

0 comments on commit 4e563b7

Please sign in to comment.