Skip to content

Commit

Permalink
FIX-modin-project#6904: Align levels of partially known dtypes with M…
Browse files Browse the repository at this point in the history
…ultiIndex labels

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Feb 2, 2024
1 parent 3e42b8b commit 912bdc4
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 2 deletions.
106 changes: 105 additions & 1 deletion modin/core/dataframe/pandas/metadata/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ def columns_order(self) -> Optional[dict[int, IndexLabel]]:
if self._parent_df is None or not self._parent_df.has_materialized_columns:
return None

self._columns_order = {i: col for i, col in enumerate(self._parent_df.columns)}
actual_columns = self._parent_df.columns
self._normalize_self_levels(actual_columns)

self._columns_order = {i: col for i, col in enumerate(actual_columns)}
# we got information about new columns and thus can potentially
# extend our knowledge about missing dtypes
if len(self._columns_order) > (
Expand Down Expand Up @@ -360,6 +363,7 @@ def _materialize_all_names(self):
return

all_cols = self._parent_df.columns
self._normalize_self_levels(all_cols)
for col in all_cols:
if (
col not in self._known_dtypes
Expand Down Expand Up @@ -403,6 +407,7 @@ def materialize(self):

if self._remaining_dtype is not None:
cols = self._parent_df.columns
self._normalize_self_levels(cols)
self._known_dtypes.update(
{
col: self._remaining_dtype
Expand Down Expand Up @@ -630,6 +635,105 @@ def concat(
know_all_names=know_all_names,
)

@staticmethod
def _normalize_levels(columns, reference=None):
"""
Normalize levels of MultiIndex column names.
The function fills missing levels with empty strings as pandas do:
'''
>>> columns = ["a", ("l1", "l2"), ("l1a", "l2a", "l3a")]
>>> _normalize_levels(columns)
[("a", "", ""), ("l1", "l2", ""), ("l1a", "l2a", "l3a")]
>>> # with a reference
>>> idx = pandas.MultiIndex(...)
>>> idx.nlevels
4
>>> _normalize_levels(columns, reference=idx)
[("a", "", "", ""), ("l1", "l2", "", ""), ("l1a", "l2a", "l3a", "")]
'''
Parameters
----------
columns : sequence
Labels to normalize. If dictionary, will replace keys with normalized columns.
reference : pandas.Index, optional
An index to match the number of levels with. If reference is a MultiIndex, then the reference number
of levels should not be greater than the maximum number of levels in `columns`. If not specified,
the `columns` themselves become a `reference`.
Returns
-------
sequence
Column values with normalized levels.
dict[hashable, hashable]
Mapping from old column names to new names, only contains column names that
were changed.
Raises
------
ValueError
When the reference number of levels is greater than the maximum number of levels
in `columns`.
"""
if reference is None:
reference = columns

if isinstance(reference, pandas.Index):
max_nlevels = reference.nlevels
else:
max_nlevels = 1
for col in reference:
if isinstance(col, tuple):
max_nlevels = max(max_nlevels, len(col))

# if the reference is a regular flat index, then no actions are required (the result will be
# a flat index containing tuples of different lengths, this behavior fully matches pandas).
# Yes, this shortcut skips the 'if max_columns_nlevels > max_nlevels' below check on purpose.
if max_nlevels == 1:
return columns, {}

max_columns_nlevels = 1
for col in columns:
if isinstance(col, tuple):
max_columns_nlevels = max(max_columns_nlevels, len(col))

if max_columns_nlevels > max_nlevels:
raise ValueError(
f"The reference number of levels is greater than the maximum number of levels in columns: {max_columns_nlevels} > {max_nlevels}"
)

new_columns = []
old_to_new_mapping = {}
for col in columns:
old_col = col
if not isinstance(col, tuple):
col = (col,)
col = col + ("",) * (max_nlevels - len(col))
new_columns.append(col)
if old_col != col:
old_to_new_mapping[old_col] = col

return new_columns, old_to_new_mapping

def _normalize_self_levels(self, reference=None):
"""
Call ``self._normalize_levels()`` for known and unknown dtypes of this object.
Parameters
----------
reference : pandas.Index, optional
"""
_, old_to_new_mapping = self._normalize_levels(
self._known_dtypes.keys(), reference
)
for old_col, new_col in old_to_new_mapping.items():
value = self._known_dtypes.pop(old_col)
self._known_dtypes[new_col] = value
self._cols_with_unknown_dtypes, _ = self._normalize_levels(
self._cols_with_unknown_dtypes, reference
)


class ModinDtypes:
"""
Expand Down
33 changes: 32 additions & 1 deletion modin/test/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
)
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.distributed.dataframe.pandas import from_partitions
from modin.pandas.test.utils import create_test_dfs, df_equals, test_data_values
from modin.pandas.test.utils import (
create_test_dfs,
df_equals,
eval_general,
test_data_values,
)
from modin.utils import try_cast_to_pandas

NPartitions.put(4)
Expand Down Expand Up @@ -2219,6 +2224,32 @@ def test_set_index_with_dupl_labels(self):
pandas.Series([np.dtype(int), np.dtype("float64")], index=["a", "a"])
)

def test_reset_index_mi_columns(self):
# reproducer from: https://github.com/modin-project/modin/issues/6904
md_df, pd_df = create_test_dfs({"a": [1, 1, 2, 2], "b": [3, 3, 4, 4]})
eval_general(
md_df,
pd_df,
lambda df: df.groupby("a").agg({"b": ["min", "std"]}).reset_index().dtypes,
)

def test_concat_mi(self):
"""
Verify that concatenating dfs with non-MultiIndex and MultiIndex columns results into valid indices for lazy dtypes.
"""
md_df1, pd_df1 = create_test_dfs({"a": [1, 1, 2, 2], "b": [3, 3, 4, 4]})
md_df2, pd_df2 = create_test_dfs(
{("l1", "v1"): [1, 1, 2, 2], ("l1", "v2"): [3, 3, 4, 4]}
)

# Drop actual dtypes in order to use partially-known dtypes
md_df1._query_compiler._modin_frame.set_dtypes_cache(None)
md_df2._query_compiler._modin_frame.set_dtypes_cache(None)

md_res = pd.concat([md_df1, md_df2], axis=1)
pd_res = pandas.concat([pd_df1, pd_df2], axis=1)
df_equals(md_res.dtypes, pd_res.dtypes)


class TestZeroComputationDtypes:
"""
Expand Down

0 comments on commit 912bdc4

Please sign in to comment.