Skip to content

Commit

Permalink
FEAT-modin-project#6398: Improved performance of list-like objects in…
Browse files Browse the repository at this point in the history
…sertion into DataFrames

Wrap list-like object into a single-column frame before the insertion.
In case of the HDK backend: if the partition contains either pandas DataFrame or
pyarrow Table, insert the object directly into the frame/table, otherwise create
a single-column frame and join the frames by rowid.

Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Aug 22, 2023
1 parent 29d9da0 commit e166a1b
Show file tree
Hide file tree
Showing 13 changed files with 515 additions and 90 deletions.
21 changes: 21 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2751,6 +2751,7 @@ def getitem_row_array(self, key):
)

def setitem(self, axis, key, value):
value = self._wrap_column_data(value)
return self._setitem(axis=axis, key=key, value=value, how=None)

def _setitem(self, axis, key, value, how="inner"):
Expand Down Expand Up @@ -2986,6 +2987,7 @@ def _compute_duplicated(df): # pragma: no cover
# return a new one from here and let the front end handle the inplace
# update.
def insert(self, loc, column, value):
value = self._wrap_column_data(value)
if isinstance(value, type(self)):
value.columns = [column]
return self.insert_item(axis=1, loc=loc, value=value, how=None)
Expand Down Expand Up @@ -3018,6 +3020,25 @@ def insert(df, internal_indices=[]): # pragma: no cover
)
return self.__constructor__(new_modin_frame)

def _wrap_column_data(self, data):
"""
If the data is list-like, create a single column query compiler.
Parameters
----------
data : any
Returns
-------
data or PandasQueryCompiler
"""
if is_list_like(data):
return self.from_pandas(
pandas.DataFrame(pandas.Series(data, index=self.index)),
data_cls=type(self._modin_frame),
)
return data

# END Insert

def explode(self, column):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
check_cols_to_join,
get_data_for_join_by_index,
build_categorical_from_at,
ensure_supported_dtype,
)
from ..db_worker import DbTable
from ..partitioning.partition_manager import HdkOnNativeDataframePartitionManager
Expand Down Expand Up @@ -201,6 +202,9 @@ def __init__(
self.id = str(type(self)._next_id[0])
type(self)._next_id[0] += 1

if op is None and partitions is not None:
op = FrameNode(self)

self._op = op
self._index_cols = index_cols
self._partitions = partitions
Expand Down Expand Up @@ -484,9 +488,7 @@ def _has_arrow_table(self):
-------
bool
"""
return self._partitions is not None and isinstance(
self._partitions[0][0].get(), pyarrow.Table
)
return self._partitions is not None and self._partitions[0][0].raw

def _dtypes_for_exprs(self, exprs):
"""
Expand Down Expand Up @@ -1424,12 +1426,7 @@ def _join_arrow_columns(self, other_modin_frames):
and isinstance(f._execute(), (DbTable, pyarrow.Table))
for f in frames
):
tables = [
t
if isinstance(t := f._partitions[0][0].get(), pyarrow.Table)
else t.to_arrow()
for f in frames
]
tables = [f._partitions[0][0].get(True) for f in frames]
column_names = [c for t in tables for c in t.column_names]
if len(column_names) != len(set(column_names)):
raise NotImplementedError("Duplicate column names")
Expand Down Expand Up @@ -1637,6 +1634,13 @@ def insert(self, loc, column, value):
assert column not in self._table_cols
assert 0 <= loc <= len(self.columns)

if is_list_like(value):
if isinstance(value, pd.Series) and not self.index.equals(value.index):
# Align by index
value = pd.Series(value, index=self.index)
value.reset_index(drop=True, inplace=True)
return self._insert_list(loc, column, value)

exprs = self._index_exprs()
for i in range(0, loc):
col = self.columns[i]
Expand All @@ -1657,6 +1661,159 @@ def insert(self, loc, column, value):
force_execution_mode=self._force_execution_mode,
)

def _insert_list(self, loc, name, value):
"""
Insert a list-like value.
Parameters
----------
loc : int
name : str
value : list
Returns
-------
HdkOnNativeDataframe
"""
ncols = len(self.columns)

if loc == -1:
loc = ncols

if ncols == 0:
assert loc == 0
return self._list_to_df(name, value, True)

if self._partitions and self._partitions[0][0].raw:
return self._insert_list_col(loc, name, value)

if loc == 0 or loc == ncols:
in_idx = 0 if loc == 0 else 1
if (
isinstance(self._op, JoinNode)
and self._op.by_rowid
and self._op.input[in_idx]._partitions
and self._op.input[in_idx]._partitions[0][0].raw
):
lhs = self._op.input[0]
rhs = self._op.input[1]
if loc == 0:
lhs = lhs._insert_list(0, name, value)
dtype = lhs.dtypes[0]
else:
rhs = rhs._insert_list(-1, name, value)
dtype = rhs.dtypes[-1]
elif loc == 0:
lhs = self._list_to_df(name, value, False)
rhs = self
dtype = lhs.dtypes[0]
else:
lhs = self
rhs = self._list_to_df(name, value, False)
dtype = rhs.dtypes[0]
elif isinstance(self._op, JoinNode) and self._op.by_rowid:
left_len = len(self._op.input[0].columns)
if loc < left_len:
lhs = self._op.input[0]._insert_list(loc, name, value)
rhs = self._op.input[1]
dtype = lhs.dtypes[loc]
else:
lhs = self._op.input[0]
rhs = self._op.input[1]._insert_list(loc - left_len, name, value)
dtype = rhs.dtypes[loc]
else:
lexprs = self._index_exprs()
rexprs = OrderedDict()
for i, col in enumerate(self.columns):
(lexprs if i < loc else rexprs)[col] = self.ref(col)
lhs = self.__constructor__(
columns=self.columns[0:loc],
dtypes=self._dtypes_for_exprs(lexprs),
op=TransformNode(self, lexprs),
index=self._index_cache,
index_cols=self._index_cols,
force_execution_mode=self._force_execution_mode,
)._insert_list(loc, name, value)
rhs = self.__constructor__(
columns=self.columns[loc:],
dtypes=self._dtypes_for_exprs(rexprs),
op=TransformNode(self, rexprs),
force_execution_mode=self._force_execution_mode,
)
dtype = lhs.dtypes[loc]

op = self._join_by_rowid_op(lhs, rhs)
return self._insert_list_col(loc, name, value, dtype, op)

def _insert_list_col(self, idx, name, value, dtype=None, op=None):
"""
Insert a list-like column.
Parameters
----------
idx : int
name : str
value : list
dtype : dtype, default: None
op : DFAlgNode, default: None
Returns
-------
HdkOnNativeDataframe
"""
cols = self.columns.tolist()
cols.insert(idx, name)
if self._index_cols:
idx += len(self._index_cols)
if dtype is None:
part, dtype = self._partitions[0][0].insert(idx, name, value)
part = np.array([[part]])
else:
part = None
dtypes = self._dtypes.tolist()
dtypes.insert(idx, dtype)
return self.copy(partitions=part, columns=cols, dtypes=dtypes, op=op)

def _list_to_df(self, name, value, add_index):
"""
Create a single-column frame from the list-like value.
Parameters
----------
name : str
value : list
add_index : bool
Returns
-------
HdkOnNativeDataframe
"""
df = pd.DataFrame({name: value}, index=self.index if add_index else None)
ensure_supported_dtype(df.dtypes[0])
return self.from_pandas(df)

@staticmethod
def _join_by_rowid_op(lhs, rhs):
"""
Create a JoinNode for join by rowid.
Parameters
----------
lhs : HdkOnNativeDataframe
rhs : HdkOnNativeDataframe
Returns
-------
JoinNode
"""
exprs = lhs._index_exprs() if lhs._index_cols else rhs._index_exprs()
exprs.update((c, lhs.ref(c)) for c in lhs.columns)
exprs.update((c, rhs.ref(c)) for c in rhs.columns)
condition = lhs._build_equi_join_condition(
rhs, [ROWID_COL_NAME], [ROWID_COL_NAME]
)
return JoinNode(lhs, rhs, exprs=exprs, condition=condition)

def cat_codes(self):
"""
Extract codes for a category column.
Expand Down Expand Up @@ -2124,6 +2281,11 @@ def to_arrow(result, op=frame._op, tables=[], frames=iter(input)):

def _build_index_cache(self):
"""Materialize index and store it in the cache."""
if self._partitions and not self._index_cols:
nrows = self._partitions[0][0]._length_cache
self.set_index_cache(Index.__new__(RangeIndex, data=range(nrows)))
return

obj = self._execute()

if self._index_cols is None:
Expand Down Expand Up @@ -2593,8 +2755,8 @@ def to_pandas(self):
assert len(df.columns) == len(self.columns)
else:
assert self._index_cols is None
assert df.index.name is None or isinstance(
self._partitions[0][0].get(), pd.DataFrame
assert (
df.index.name is None or self._has_unsupported_data
), f"index name '{df.index.name}' is not None"
if self.has_materialized_index:
df.index = self._index_cache.get().copy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,52 @@ def get_common_arrow_type(t1: pa.lib.DataType, t2: pa.lib.DataType) -> pa.lib.Da
return pa.from_numpy_dtype(np.promote_types(t1, t2))


def is_supported_arrow_type(dtype: pa.lib.DataType) -> bool:
"""
Return True if the specified arrow type is supported by HDK.
Parameters
----------
dtype : pa.lib.DataType
Returns
-------
bool
"""
if (
pa.types.is_string(dtype)
or pa.types.is_time(dtype)
or pa.types.is_dictionary(dtype)
or pa.types.is_null(dtype)
):
return True
if isinstance(dtype, pa.ExtensionType) or pa.types.is_duration(dtype):
return False
try:
pandas_dtype = dtype.to_pandas_dtype()
return pandas_dtype != np.dtype("O")
except NotImplementedError:
return False


def ensure_supported_dtype(dtype: pa.lib.DataType):
"""
Check if the specified `dtype` is supported by HDK.
If `dtype` is not supported, `NotImplementedError` is raised.
Parameters
----------
dtype : dtype
"""
try:
if is_supported_arrow_type(pa.from_numpy_dtype(dtype)):
return
except pa.ArrowNotImplementedError:
...
raise NotImplementedError(f"Type {dtype}")


def arrow_to_pandas(at: pa.Table) -> pandas.DataFrame:
"""
Convert the specified arrow table to pandas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,9 @@ def execute_arrow(self, ignore=None) -> Union[DbTable, pa.Table, pandas.DataFram
"""
frame = self.modin_frame
if frame._partitions is not None:
return frame._partitions[0][0].get()
part = frame._partitions[0][0]
to_arrow = part.raw and not frame._has_unsupported_data
return part.get(to_arrow)
if frame._has_unsupported_data:
return pandas.DataFrame(
index=frame._index_cache, columns=frame._columns_cache
Expand Down Expand Up @@ -836,6 +838,48 @@ def __init__(
self.exprs = exprs
self.condition = condition

@property
def by_rowid(self):
"""
Return True if this is a join by the rowid column.
Returns
-------
bool
"""
return (
isinstance(self.condition, OpExpr)
and self.condition.op == "="
and all(
isinstance(o, InputRefExpr) and o.column == ColNameCodec.ROWID_COL_NAME
for o in self.condition.operands
)
)

@_inherit_docstrings(DFAlgNode.require_executed_base)
def require_executed_base(self) -> bool:
return self.by_rowid and any(
not isinstance(i._op, FrameNode) for i in self.input
)

@_inherit_docstrings(DFAlgNode.can_execute_arrow)
def can_execute_arrow(self) -> bool:
return self.by_rowid and all(
isinstance(e, InputRefExpr) for e in self.exprs.values()
)

@_inherit_docstrings(DFAlgNode.execute_arrow)
def execute_arrow(self, tables: List[pa.Table]) -> pa.Table:
t1 = tables[0]
t2 = tables[1]
cols1 = t1.column_names
cols = [
(t1 if (col := ColNameCodec.encode(e.column)) in cols1 else t2).column(col)
for e in self.exprs.values()
]
names = [ColNameCodec.encode(c) for c in self.exprs]
return pa.table(cols, names)

def copy(self):
"""
Make a shallow copy of the node.
Expand Down

0 comments on commit e166a1b

Please sign in to comment.