Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Expr class #158

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions dask_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from fsspec.utils import stringify_path
from tlz import first

from dask_expr import expr
from dask_expr.expr import no_default
from dask_expr import frameexpr
from dask_expr.frameexpr import no_default
from dask_expr.merge import Merge
from dask_expr.reductions import (
DropDuplicates,
Expand All @@ -47,7 +47,7 @@ def _wrap_expr_api(*args, wrap_api=None, **kwargs):
*[arg.expr if isinstance(arg, FrameBase) else arg for arg in args],
**kwargs,
)
if isinstance(result, expr.Expr):
if isinstance(result, frameexpr.FrameExpr):
return new_collection(result)
return result

Expand Down Expand Up @@ -170,16 +170,16 @@ def index(self):
return new_collection(self.expr.index)

def reset_index(self, drop=False):
return new_collection(expr.ResetIndex(self.expr, drop))
return new_collection(frameexpr.ResetIndex(self.expr, drop))

def head(self, n=5, compute=True):
out = new_collection(expr.Head(self.expr, n=n))
out = new_collection(frameexpr.Head(self.expr, n=n))
if compute:
out = out.compute()
return out

def tail(self, n=5, compute=True):
out = new_collection(expr.Tail(self.expr, n=n))
out = new_collection(frameexpr.Tail(self.expr, n=n))
if compute:
out = out.compute()
return out
Expand All @@ -200,7 +200,7 @@ def _partitions(self, index):
assert set(index).issubset(range(self.npartitions))

# Return selected partitions
return new_collection(expr.Partitions(self.expr, index))
return new_collection(frameexpr.Partitions(self.expr, index))

@property
def partitions(self):
Expand Down Expand Up @@ -313,7 +313,7 @@ def map_partitions(
# will need to call `Repartition` on operands that are not
# aligned with `self.expr`.
raise NotImplementedError()
new_expr = expr.MapPartitions(
new_expr = frameexpr.MapPartitions(
self.expr,
func,
meta,
Expand Down Expand Up @@ -417,7 +417,7 @@ def assign(self, **pairs):
raise TypeError(f"Column assignment doesn't support type {type(v)}")
if not isinstance(k, str):
raise TypeError(f"Column name cannot be type {type(k)}")
result = new_collection(expr.Assign(result.expr, k, v.expr))
result = new_collection(frameexpr.Assign(result.expr, k, v.expr))
return result

def merge(
Expand Down Expand Up @@ -544,15 +544,15 @@ def __getattr__(self, key):
def __dir__(self):
o = set(dir(type(self)))
o.update(self.__dict__)
o.update(set(dir(expr.Expr)))
o.update(set(dir(frameexpr.FrameExpr)))
o.update(c for c in self.columns if (isinstance(c, str) and c.isidentifier()))
return list(o)

def map(self, func, na_action=None):
return new_collection(expr.Map(self.expr, arg=func, na_action=na_action))
return new_collection(frameexpr.Map(self.expr, arg=func, na_action=na_action))

def __repr__(self):
return f"<dask_expr.expr.DataFrame: expr={self.expr}>"
return f"<dask_expr.frameexpr.DataFrame: expr={self.expr}>"

def nlargest(self, n=5, columns=None):
return new_collection(NLargest(self.expr, n=n, _columns=columns))
Expand Down Expand Up @@ -582,7 +582,7 @@ def dropna(self, how=no_default, subset=None, thresh=no_default):
"You cannot set both the how and thresh arguments at the same time."
)
return new_collection(
expr.DropnaFrame(self.expr, how=how, subset=subset, thresh=thresh)
frameexpr.DropnaFrame(self.expr, how=how, subset=subset, thresh=thresh)
)

def sample(self, n=None, frac=None, replace=False, random_state=None):
Expand All @@ -605,14 +605,16 @@ def sample(self, n=None, frac=None, replace=False, random_state=None):

state_data = random_state_data(self.npartitions, random_state)
return new_collection(
expr.Sample(self.expr, state_data=state_data, frac=frac, replace=replace)
frameexpr.Sample(
self.expr, state_data=state_data, frac=frac, replace=replace
)
)

def rename(self, columns):
return new_collection(expr.RenameFrame(self.expr, columns=columns))
return new_collection(frameexpr.RenameFrame(self.expr, columns=columns))

def explode(self, column):
return new_collection(expr.ExplodeFrame(self.expr, column=column))
return new_collection(frameexpr.ExplodeFrame(self.expr, column=column))

def to_parquet(self, path, **kwargs):
from dask_expr.io.parquet import to_parquet
Expand All @@ -626,7 +628,7 @@ class Series(FrameBase):
def __dir__(self):
o = set(dir(type(self)))
o.update(self.__dict__)
o.update(set(dir(expr.Expr)))
o.update(set(dir(frameexpr.FrameExpr)))
return list(o)

@property
Expand All @@ -638,13 +640,13 @@ def nbytes(self):
return new_collection(self.expr.nbytes)

def map(self, arg, na_action=None):
return new_collection(expr.Map(self.expr, arg=arg, na_action=na_action))
return new_collection(frameexpr.Map(self.expr, arg=arg, na_action=na_action))

def __repr__(self):
return f"<dask_expr.expr.Series: expr={self.expr}>"
return f"<dask_expr.frameexpr.Series: expr={self.expr}>"

def to_frame(self, name=no_default):
return new_collection(expr.ToFrame(self.expr, name=name))
return new_collection(frameexpr.ToFrame(self.expr, name=name))

def value_counts(self, sort=None, ascending=False, dropna=True, normalize=False):
return new_collection(
Expand All @@ -667,43 +669,43 @@ def drop_duplicates(self, ignore_index=False):
return new_collection(DropDuplicates(self.expr, ignore_index=ignore_index))

def dropna(self):
return new_collection(expr.DropnaSeries(self.expr))
return new_collection(frameexpr.DropnaSeries(self.expr))

def between(self, left, right, inclusive="both"):
return new_collection(
expr.Between(self.expr, left=left, right=right, inclusive=inclusive)
frameexpr.Between(self.expr, left=left, right=right, inclusive=inclusive)
)

def explode(self):
return new_collection(expr.ExplodeSeries(self.expr))
return new_collection(frameexpr.ExplodeSeries(self.expr))


class Index(Series):
"""Index-like Expr Collection"""

def __repr__(self):
return f"<dask_expr.expr.Index: expr={self.expr}>"
return f"<dask_expr.frameexpr.Index: expr={self.expr}>"

def to_frame(self, index=True, name=no_default):
if not index:
raise NotImplementedError
return new_collection(expr.ToFrameIndex(self.expr, index=index, name=name))
return new_collection(frameexpr.ToFrameIndex(self.expr, index=index, name=name))

def memory_usage(self, deep=False):
return new_collection(MemoryUsageIndex(self.expr, deep=deep))

def __dir__(self):
o = set(dir(type(self)))
o.update(self.__dict__)
o.update(set(dir(expr.Expr)))
o.update(set(dir(frameexpr.FrameExpr)))
return list(o)


class Scalar(FrameBase):
"""Scalar Expr Collection"""

def __repr__(self):
return f"<dask_expr.expr.Scalar: expr={self.expr}>"
return f"<dask_expr.frameexpr.Scalar: expr={self.expr}>"

def __dask_postcompute__(self):
return first, ()
Expand All @@ -724,7 +726,7 @@ def new_collection(expr):


def optimize(collection, fuse=True):
return new_collection(expr.optimize(collection.expr, fuse=fuse))
return new_collection(frameexpr.optimize(collection.expr, fuse=fuse))


def from_pandas(*args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion dask_expr/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dask.utils import random_state_data

from dask_expr.collection import new_collection
from dask_expr.expr import Projection
from dask_expr.frameexpr import Projection
from dask_expr.io import BlockwiseIO, PartitionsFiltered

__all__ = ["timeseries"]
Expand Down
Loading
Loading