Skip to content

Commit

Permalink
PERF-modin-project#6876: Skip the masking stage on 'iloc' where benef…
Browse files Browse the repository at this point in the history
…icial

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Jan 24, 2024
1 parent 43134ef commit d85920a
Showing 1 changed file with 96 additions and 48 deletions.
144 changes: 96 additions & 48 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,13 @@ def _take_2d_positional(
if col_positions is None and row_positions is None:
return self.copy()

must_sort_row_pos = row_positions is not None and np.all(
row_positions[1:] >= row_positions[:-1]
)
must_sort_col_pos = col_positions is not None and np.all(
col_positions[1:] >= col_positions[:-1]
)

if col_positions is None and row_positions is not None:
# Check if the optimization that first takes part of the data using the mask
# operation so that later less data is concatenated into a whole column is useful.
Expand All @@ -1175,18 +1182,41 @@ def _take_2d_positional(
all_rows = None
if self.has_materialized_index:
all_rows = len(self.index)
elif self._row_lengths_cache:
elif self._row_lengths_cache or must_sort_row_pos:
all_rows = sum(self._row_lengths_cache)
if all_rows:
if len(row_positions) > 0.9 * all_rows:
return self._reorder_labels(
row_positions=row_positions, col_positions=col_positions
)

sorted_row_positions = sorted_col_positions = None
# 'base_num_cols' specifies the number of columns that the dataframe should have
# in order to jump to 'reordered_labels' in case of len(row_positions) / len(self) >= base_ratio;
# these variables may be a subject to change in order to tune performance more accurately
base_num_cols = 10
base_ratio = 0.2
# Example:
# len(self.columns): 10 == base_num_cols -> min ratio to jump to reorder_labels: 0.2 == base_ratio
# len(self.columns): 15 -> min ratio to jump to reorder_labels: 0.3
# len(self.columns): 20 -> min ratio to jump to reorder_labels: 0.4
# ...
# len(self.columns): 49 -> min ratio to jump to reorder_labels: 0.98
# len(self.columns): 50 -> min ratio to jump to reorder_labels: 1.0
# len(self.columns): 55 -> min ratio to jump to reorder_labels: 1.0
# ...
if (all_rows and len(row_positions) > 0.9 * all_rows) or (
must_sort_row_pos
and len(row_positions) * base_num_cols
>= min(
all_rows * len(self.columns) * base_ratio,
len(row_positions) * base_num_cols,
)
):
return self._reorder_labels(
row_positions=row_positions, col_positions=col_positions
)

sorted_row_positions = sorted_col_positions = None
if row_positions is not None:
sorted_row_positions = self._get_sorted_positions(row_positions)
if must_sort_row_pos:
sorted_row_positions = self._get_sorted_positions(row_positions)
else:
sorted_row_positions = row_positions
# Get dict of row_parts as {row_index: row_internal_indices}
row_partitions_dict = self._get_dict_of_block_index(
0, sorted_row_positions, are_indices_sorted=True
Expand All @@ -1201,7 +1231,10 @@ def _take_2d_positional(
new_index = self.copy_index_cache(copy_lengths=True)

if col_positions is not None:
sorted_col_positions = self._get_sorted_positions(col_positions)
if must_sort_col_pos:
sorted_col_positions = self._get_sorted_positions(col_positions)
else:
sorted_col_positions = col_positions
# Get dict of col_parts as {col_index: col_internal_indices}
col_partitions_dict = self._get_dict_of_block_index(
1, sorted_col_positions, are_indices_sorted=True
Expand Down Expand Up @@ -1692,6 +1725,20 @@ def _get_dict_of_block_index(self, axis, indices, are_indices_sorted=False):
A mapping from partition index to list of internal indices which correspond to `indices` in each
partition.
"""
# breakpoint()
if axis == 0:
splitting = np.digitize(indices, np.cumsum(self.row_lengths))
parts = pandas.Index(indices).groupby(splitting)
return parts
# res = {}
# for rl, (key, value) in zip(self.row_lengths, parts.items()):
# if rl == len(value) and np.all(np.diff(value) == 1):
# print("hitting slice")
# res[key] = slice(None)
# else:
# res[key] = value
# return res

# TODO: Support handling of slices with specified 'step'. For now, converting them into a range
if isinstance(indices, slice) and (
indices.step is not None and indices.step != 1
Expand Down Expand Up @@ -1785,47 +1832,48 @@ def _get_dict_of_block_index(self, axis, indices, are_indices_sorted=False):
# INT_MAX to make sure we don't try to compute on partitions that don't exist.
cumulative = np.append(bins[:-1].cumsum(), np.iinfo(bins.dtype).max)

def internal(block_idx: int, global_index):
"""Transform global index to internal one for given block (identified by its index)."""
return (
global_index
if not block_idx
else np.subtract(
global_index, cumulative[min(block_idx, len(cumulative) - 1) - 1]
)
)
# def internal(block_idx: int, global_index):
# """Transform global index to internal one for given block (identified by its index)."""
# return (
# global_index
# if not block_idx
# else np.subtract(
# global_index, cumulative[min(block_idx, len(cumulative) - 1) - 1]
# )
# )

partition_ids = np.digitize(indices, cumulative)
count_for_each_partition = np.array(
[(partition_ids == i).sum() for i in range(len(cumulative))]
).cumsum()
# Compute the internal indices and pair those with the partition index.
# If the first partition has any values we need to return, compute those
# first to make the list comprehension easier. Otherwise, just append the
# rest of the values to an empty list.
if count_for_each_partition[0] > 0:
first_partition_indices = [
(0, internal(0, indices[slice(count_for_each_partition[0])]))
]
else:
first_partition_indices = []
partition_ids_with_indices = first_partition_indices + [
(
i,
internal(
i,
indices[
slice(
count_for_each_partition[i - 1],
count_for_each_partition[i],
)
],
),
)
for i in range(1, len(count_for_each_partition))
if count_for_each_partition[i] > count_for_each_partition[i - 1]
]
return dict(partition_ids_with_indices)
return pandas.Index(indices).groupby(partition_ids)
# count_for_each_partition = np.array(
# [(partition_ids == i).sum() for i in range(len(cumulative))]
# ).cumsum()
# # Compute the internal indices and pair those with the partition index.
# # If the first partition has any values we need to return, compute those
# # first to make the list comprehension easier. Otherwise, just append the
# # rest of the values to an empty list.
# if count_for_each_partition[0] > 0:
# first_partition_indices = [
# (0, internal(0, indices[slice(count_for_each_partition[0])]))
# ]
# else:
# first_partition_indices = []
# partition_ids_with_indices = first_partition_indices + [
# (
# i,
# internal(
# i,
# indices[
# slice(
# count_for_each_partition[i - 1],
# count_for_each_partition[i],
# )
# ],
# ),
# )
# for i in range(1, len(count_for_each_partition))
# if count_for_each_partition[i] > count_for_each_partition[i - 1]
# ]
# return dict(partition_ids_with_indices)

@staticmethod
def _join_index_objects(axis, indexes, how, sort):
Expand Down

0 comments on commit d85920a

Please sign in to comment.