Skip to content

Commit

Permalink
FEAT-modin-project#6398: Improved performance of list-like objects in…
Browse files Browse the repository at this point in the history
…sertion into DataFrames

If the partition contains either pandas DataFrame or pyarrow Table,
insert the object directly into the frame/table, otherwise create
a single-column frame and join the frames by rowid.

Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
  • Loading branch information
AndreyPavlenko committed Sep 25, 2023
1 parent ea8088a commit 48f2a81
Show file tree
Hide file tree
Showing 8 changed files with 441 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
build_categorical_from_at,
check_cols_to_join,
check_join_supported,
ensure_supported_dtype,
get_data_for_join_by_index,
maybe_range,
)
Expand Down Expand Up @@ -198,6 +199,9 @@ def __init__(
self.id = str(type(self)._next_id[0])
type(self)._next_id[0] += 1

if op is None and partitions is not None:
op = FrameNode(self)

self._op = op
self._index_cols = index_cols
self._partitions = partitions
Expand Down Expand Up @@ -481,9 +485,7 @@ def _has_arrow_table(self):
-------
bool
"""
return self._partitions is not None and isinstance(
self._partitions[0][0].get(), pyarrow.Table
)
return self._partitions is not None and self._partitions[0][0].raw

def _dtypes_for_exprs(self, exprs):
"""
Expand Down Expand Up @@ -1423,12 +1425,7 @@ def _join_arrow_columns(self, other_modin_frames):
and isinstance(f._execute(), (DbTable, pyarrow.Table))
for f in frames
):
tables = [
t
if isinstance(t := f._partitions[0][0].get(), pyarrow.Table)
else t.to_arrow()
for f in frames
]
tables = [f._partitions[0][0].get(True) for f in frames]
column_names = [c for t in tables for c in t.column_names]
if len(column_names) != len(set(column_names)):
raise NotImplementedError("Duplicate column names")
Expand Down Expand Up @@ -1636,6 +1633,13 @@ def insert(self, loc, column, value):
assert column not in self._table_cols
assert 0 <= loc <= len(self.columns)

if is_list_like(value):
if isinstance(value, pd.Series) and not self.index.equals(value.index):
# Align by index
value = pd.Series(value, index=self.index)
value.reset_index(drop=True, inplace=True)
return self._insert_list(loc, column, value)

exprs = self._index_exprs()
for i in range(0, loc):
col = self.columns[i]
Expand All @@ -1656,6 +1660,159 @@ def insert(self, loc, column, value):
force_execution_mode=self._force_execution_mode,
)

def _insert_list(self, loc, name, value):
"""
Insert a list-like value.
Parameters
----------
loc : int
name : str
value : list
Returns
-------
HdkOnNativeDataframe
"""
ncols = len(self.columns)

if loc == -1:
loc = ncols

if ncols == 0:
assert loc == 0
return self._list_to_df(name, value, True)

if self._partitions and self._partitions[0][0].raw:
return self._insert_list_col(loc, name, value)

if loc == 0 or loc == ncols:
in_idx = 0 if loc == 0 else 1
if (
isinstance(self._op, JoinNode)
and self._op.by_rowid
and self._op.input[in_idx]._partitions
and self._op.input[in_idx]._partitions[0][0].raw
):
lhs = self._op.input[0]
rhs = self._op.input[1]
if loc == 0:
lhs = lhs._insert_list(0, name, value)
dtype = lhs.dtypes[0]
else:
rhs = rhs._insert_list(-1, name, value)
dtype = rhs.dtypes[-1]
elif loc == 0:
lhs = self._list_to_df(name, value, False)
rhs = self
dtype = lhs.dtypes[0]
else:
lhs = self
rhs = self._list_to_df(name, value, False)
dtype = rhs.dtypes[0]
elif isinstance(self._op, JoinNode) and self._op.by_rowid:
left_len = len(self._op.input[0].columns)
if loc < left_len:
lhs = self._op.input[0]._insert_list(loc, name, value)
rhs = self._op.input[1]
dtype = lhs.dtypes[loc]
else:
lhs = self._op.input[0]
rhs = self._op.input[1]._insert_list(loc - left_len, name, value)
dtype = rhs.dtypes[loc]
else:
lexprs = self._index_exprs()
rexprs = OrderedDict()
for i, col in enumerate(self.columns):
(lexprs if i < loc else rexprs)[col] = self.ref(col)
lhs = self.__constructor__(
columns=self.columns[0:loc],
dtypes=self._dtypes_for_exprs(lexprs),
op=TransformNode(self, lexprs),
index=self._index_cache,
index_cols=self._index_cols,
force_execution_mode=self._force_execution_mode,
)._insert_list(loc, name, value)
rhs = self.__constructor__(
columns=self.columns[loc:],
dtypes=self._dtypes_for_exprs(rexprs),
op=TransformNode(self, rexprs),
force_execution_mode=self._force_execution_mode,
)
dtype = lhs.dtypes[loc]

op = self._join_by_rowid_op(lhs, rhs)
return self._insert_list_col(loc, name, value, dtype, op)

def _insert_list_col(self, idx, name, value, dtype=None, op=None):
"""
Insert a list-like column.
Parameters
----------
idx : int
name : str
value : list
dtype : dtype, default: None
op : DFAlgNode, default: None
Returns
-------
HdkOnNativeDataframe
"""
cols = self.columns.tolist()
cols.insert(idx, name)
if self._index_cols:
idx += len(self._index_cols)
if dtype is None:
part, dtype = self._partitions[0][0].insert(idx, name, value)
part = np.array([[part]])
else:
part = None
dtypes = self._dtypes.tolist()
dtypes.insert(idx, dtype)
return self.copy(partitions=part, columns=cols, dtypes=dtypes, op=op)

def _list_to_df(self, name, value, add_index):
"""
Create a single-column frame from the list-like value.
Parameters
----------
name : str
value : list
add_index : bool
Returns
-------
HdkOnNativeDataframe
"""
df = pd.DataFrame({name: value}, index=self.index if add_index else None)
ensure_supported_dtype(df.dtypes[0])
return self.from_pandas(df)

@staticmethod
def _join_by_rowid_op(lhs, rhs):
"""
Create a JoinNode for join by rowid.
Parameters
----------
lhs : HdkOnNativeDataframe
rhs : HdkOnNativeDataframe
Returns
-------
JoinNode
"""
exprs = lhs._index_exprs() if lhs._index_cols else rhs._index_exprs()
exprs.update((c, lhs.ref(c)) for c in lhs.columns)
exprs.update((c, rhs.ref(c)) for c in rhs.columns)
condition = lhs._build_equi_join_condition(
rhs, [ROWID_COL_NAME], [ROWID_COL_NAME]
)
return JoinNode(lhs, rhs, exprs=exprs, condition=condition)

def cat_codes(self):
"""
Extract codes for a category column.
Expand Down Expand Up @@ -2177,8 +2334,12 @@ def _compute_axis_labels_and_lengths(self, axis: int, partitions=None):

def _build_index_cache(self):
"""Materialize index and store it in the cache."""
index, _ = self._compute_axis_labels_and_lengths(axis=0)
self.set_index_cache(index)
if self._partitions and not self._index_cols:
nrows = self._partitions[0][0]._length_cache
self.set_index_cache(Index.__new__(RangeIndex, data=range(nrows)))
else:
index, _ = self._compute_axis_labels_and_lengths(axis=0)
self.set_index_cache(index)

def _get_index(self):
"""
Expand Down Expand Up @@ -2624,8 +2785,8 @@ def to_pandas(self):
assert len(df.columns) == len(self.columns)
else:
assert self._index_cols is None
assert df.index.name is None or isinstance(
self._partitions[0][0].get(), pd.DataFrame
assert (
df.index.name is None or self._has_unsupported_data
), f"index name '{df.index.name}' is not None"
if self.has_materialized_index:
df.index = self._index_cache.get().copy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,52 @@ def get_common_arrow_type(t1: pa.lib.DataType, t2: pa.lib.DataType) -> pa.lib.Da
return pa.from_numpy_dtype(np.promote_types(t1, t2))


def is_supported_arrow_type(dtype: pa.lib.DataType) -> bool:
"""
Return True if the specified arrow type is supported by HDK.
Parameters
----------
dtype : pa.lib.DataType
Returns
-------
bool
"""
if (
pa.types.is_string(dtype)
or pa.types.is_time(dtype)
or pa.types.is_dictionary(dtype)
or pa.types.is_null(dtype)
):
return True
if isinstance(dtype, pa.ExtensionType) or pa.types.is_duration(dtype):
return False
try:
pandas_dtype = dtype.to_pandas_dtype()
return pandas_dtype != np.dtype("O")
except NotImplementedError:
return False


def ensure_supported_dtype(dtype: pa.lib.DataType):
"""
Check if the specified `dtype` is supported by HDK.
If `dtype` is not supported, `NotImplementedError` is raised.
Parameters
----------
dtype : dtype
"""
try:
if is_supported_arrow_type(pa.from_numpy_dtype(dtype)):
return
except pa.ArrowNotImplementedError:
...
raise NotImplementedError(f"Type {dtype}")


def arrow_to_pandas(at: pa.Table) -> pandas.DataFrame:
"""
Convert the specified arrow table to pandas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ def execute_arrow(self, ignore=None) -> Union[DbTable, pa.Table, pandas.DataFram
"""
frame = self.modin_frame
if frame._partitions is not None:
return frame._partitions[0][0].get()
part = frame._partitions[0][0]
to_arrow = part.raw and not frame._has_unsupported_data
return part.get(to_arrow)
if frame._has_unsupported_data:
return pandas.DataFrame(
index=frame._index_cache, columns=frame._columns_cache
Expand Down Expand Up @@ -834,6 +836,48 @@ def __init__(
self.exprs = exprs
self.condition = condition

@property
def by_rowid(self):
"""
Return True if this is a join by the rowid column.
Returns
-------
bool
"""
return (
isinstance(self.condition, OpExpr)
and self.condition.op == "="
and all(
isinstance(o, InputRefExpr) and o.column == ColNameCodec.ROWID_COL_NAME
for o in self.condition.operands
)
)

@_inherit_docstrings(DFAlgNode.require_executed_base)
def require_executed_base(self) -> bool:
return self.by_rowid and any(
not isinstance(i._op, FrameNode) for i in self.input
)

@_inherit_docstrings(DFAlgNode.can_execute_arrow)
def can_execute_arrow(self) -> bool:
return self.by_rowid and all(
isinstance(e, InputRefExpr) for e in self.exprs.values()
)

@_inherit_docstrings(DFAlgNode.execute_arrow)
def execute_arrow(self, tables: List[pa.Table]) -> pa.Table:
t1 = tables[0]
t2 = tables[1]
cols1 = t1.column_names
cols = [
(t1 if (col := ColNameCodec.encode(e.column)) in cols1 else t2).column(col)
for e in self.exprs.values()
]
names = [ColNameCodec.encode(c) for c in self.exprs]
return pa.table(cols, names)

def copy(self):
"""
Make a shallow copy of the node.
Expand Down

0 comments on commit 48f2a81

Please sign in to comment.