Skip to content

Commit

Permalink
FEAT-modin-project#6398: Improved performance of list-like objects in…
Browse files Browse the repository at this point in the history
…sertion into DataFrames

Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Jul 24, 2023
1 parent d6eb589 commit d5aaffa
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 25 deletions.
21 changes: 21 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2730,6 +2730,7 @@ def getitem_row_array(self, key):
)

def setitem(self, axis, key, value):
value = self._wrap_column_data(value)
return self._setitem(axis=axis, key=key, value=value, how=None)

def _setitem(self, axis, key, value, how="inner"):
Expand Down Expand Up @@ -2895,6 +2896,7 @@ def _compute_duplicated(df): # pragma: no cover
# return a new one from here and let the front end handle the inplace
# update.
def insert(self, loc, column, value):
value = self._wrap_column_data(value)
if isinstance(value, type(self)):
value.columns = [column]
return self.insert_item(axis=1, loc=loc, value=value, how=None)
Expand Down Expand Up @@ -2927,6 +2929,25 @@ def insert(df, internal_indices=[]): # pragma: no cover
)
return self.__constructor__(new_modin_frame)

def _wrap_column_data(self, data):
"""
If the data is list-like, create a single column query compiler.
Parameters
----------
data : any
Returns
-------
data or PandasQueryCompiler
"""
if is_list_like(data):
return self.from_pandas(
pandas.DataFrame(pandas.Series(data, index=self.index)),
data_cls=type(self._modin_frame),
)
return data

# END Insert

def explode(self, column):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def _maybe_update_proxies(self, dtypes, new_parent=None):
super()._maybe_update_proxies(dtypes, new_parent)
if self._partitions is None:
return
table = self._partitions[0][0].get()
table = self._partitions[0][0].get(lazy=True)
if isinstance(table, pyarrow.Table):
super()._maybe_update_proxies(dtypes, new_parent=table)

Expand Down Expand Up @@ -1635,6 +1635,9 @@ def insert(self, loc, column, value):
assert column not in self._table_cols
assert 0 <= loc <= len(self.columns)

if is_list_like(value):
return self._insert_list(loc, column, value)

exprs = self._index_exprs()
for i in range(0, loc):
col = self.columns[i]
Expand All @@ -1655,6 +1658,171 @@ def insert(self, loc, column, value):
force_execution_mode=self._force_execution_mode,
)

def _insert_list(self, loc, name, value):
"""
Insert a list-like value.
Parameters
----------
loc : int
name : str
value : list
Returns
-------
HdkOnNativeDataframe
"""
ncols = len(self.columns)

if ncols == 0:
return self._list_to_df(name, value, True)

if self._partitions and self._partitions[0][0].lazy:
return self._insert_list_col(loc, name, value)

if loc == 0 or loc == ncols:
in_idx = 0 if loc == 0 else 1
if (
isinstance(self._op, JoinNode)
and self._op.by_rowid
and self._op.input[in_idx]._partitions
and self._op.input[in_idx]._partitions[0][0].lazy
):
lhs = self._op.input[0]
rhs = self._op.input[1]
if loc == 0:
lhs = lhs._insert_list(0, name, value)
else:
rhs = rhs._insert_list(-1, name, value)
else:
lhs = self if loc == ncols else self._list_to_df(name, value, False)
rhs = self if loc == 0 else self._list_to_df(name, value, False)
elif isinstance(self._op, JoinNode) and self._op.by_rowid:
left_len = len(self._op.input[0].columns)
if loc < left_len:
lhs = self._op.input[0]._insert_list(loc, name, value)
rhs = self._op.input[1]
else:
lhs = self._op.input[0]
rhs = self._op.input[1]._insert_list(loc - left_len, name, value)
else:
lexprs = self._index_exprs()
rexprs = OrderedDict()
for i, col in enumerate(self.columns):
(lexprs if i < loc else rexprs)[col] = self.ref(col)
lhs = self.__constructor__(
columns=self.columns[0:loc],
dtypes=self._dtypes_for_exprs(lexprs),
op=TransformNode(self, lexprs),
index=self._index_cache,
index_cols=self._index_cols,
force_execution_mode=self._force_execution_mode,
)._insert_list(loc, name, value)
rhs = self.__constructor__(
columns=self.columns[loc:],
dtypes=self._dtypes_for_exprs(rexprs),
op=TransformNode(self, rexprs),
force_execution_mode=self._force_execution_mode,
)

op = self._join_by_rowid_op(lhs, rhs)
return self._insert_list_col(loc, name, value, op=op)

def _insert_list_col(self, idx, name, value, op=None):
"""
Insert a list-like column.
Parameters
----------
idx : int
name : str
value : list
op : DFAlgNode, default: None
Returns
-------
HdkOnNativeDataframe
"""
cols = self.columns.tolist()
if idx == -1:
idx = len(cols)
cols.insert(idx, name)

part = None
if not op and self._partitions and self._partitions[0][0].lazy:
part = np.array([[self._partitions[0][0].insert(idx, name, value)]])

if self._index_cols:
idx += len(self._index_cols)
dtypes = self._dtypes.tolist()
dtypes.insert(idx, get_dtype(None if len(value) == 0 else type(value[0])))

df = self.copy(partitions=part, columns=cols, dtypes=dtypes, op=op)
if not op and part:
df._op = FrameNode(df)
return df

def _list_to_df(self, name, values, add_index):
"""
Create a single-column frame from the list-like value.
Parameters
----------
name : str
values : list
add_index : bool
Returns
-------
HdkOnNativeDataframe
"""
index_cols = self._index_cols if add_index else None
columns = Index([name])
dtype = get_dtype(None if len(values) == 0 else type(values[0]))
if index_cols is None or (
self.has_materialized_index and self._is_trivial_index(self.index)
):
idx = None
index_cols = None
dtypes = pd.Series([dtype], index=columns)
else:
idx = self._get_index
dtypes = self._dtypes[0 : len(index_cols)]
dtypes[name] = dtype
part_data = OrderedDict({name: values})
part = self._partition_mgr_cls._partition_class(part_data, idx)
df = HdkOnNativeDataframe(
partitions=np.array([[part]]),
index=self._index_cache,
index_cols=index_cols,
columns=columns,
dtypes=dtypes,
)
df._op = FrameNode(df)
return df

@staticmethod
def _join_by_rowid_op(lhs, rhs):
"""
Create a JoinNode for join by rowid.
Parameters
----------
lhs : HdkOnNativeDataframe
rhs : HdkOnNativeDataframe
Returns
-------
JoinNode
"""
exprs = lhs._index_exprs()
exprs.update((c, lhs.ref(c)) for c in lhs.columns)
exprs.update((c, rhs.ref(c)) for c in rhs.columns)
condition = lhs._build_equi_join_condition(
rhs, [ROWID_COL_NAME], [ROWID_COL_NAME]
)
return JoinNode(lhs, rhs, exprs=exprs, condition=condition)

def cat_codes(self):
"""
Extract codes for a category column.
Expand Down Expand Up @@ -2122,6 +2290,11 @@ def to_arrow(result, op=frame._op, tables=[], frames=iter(input)):

def _build_index_cache(self):
"""Materialize index and store it in the cache."""
if self._partitions and not self._index_cols:
nrows = self._partitions[0][0]._length_cache
self.set_index_cache(Index.__new__(RangeIndex, data=range(nrows)))
return

obj = self._execute()

if self._index_cols is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,28 @@ def __init__(
self.exprs = exprs
self.condition = condition

@property
def by_rowid(self):
"""
Return True if this is a join by the rowid column.
Returns
-------
bool
"""
return (
isinstance(self.condition, OpExpr)
and self.condition.op == "="
and all(
isinstance(o, InputRefExpr) and o.column == ColNameCodec.ROWID_COL_NAME
for o in self.condition.operands
)
)

@_inherit_docstrings(DFAlgNode.require_executed_base)
def require_executed_base(self) -> bool:
return self.by_rowid

def copy(self):
"""
Make a shallow copy of the node.
Expand Down

0 comments on commit d5aaffa

Please sign in to comment.