Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Spark and Dask Take 1 row without sorting optimization #269

Closed
goodwanghan opened this issue Nov 2, 2021 · 2 comments · Fixed by #530
Closed

[FEATURE] Spark and Dask Take 1 row without sorting optimization #269

goodwanghan opened this issue Nov 2, 2021 · 2 comments · Fixed by #530
Labels
Milestone

Comments

@goodwanghan
Copy link
Collaborator

Is your feature request related to a problem? Please describe.
Look at here

If taking just one row with our sorting, we may use GROUP BY and FIRST to solve this problem, it can be a lot faster. Let's add this special handling.

@goodwanghan goodwanghan added this to the 0.6.5 milestone Nov 2, 2021
@kvnkho kvnkho added the good first issue Good for newcomers label Nov 2, 2021
@goodwanghan goodwanghan removed this from the 0.6.5 milestone Dec 31, 2021
@satya-nutella
Copy link

@kvnkho @goodwanghan is this issue taken? Would love to jump on this. Any pointers would be awesome

@kvnkho
Copy link
Collaborator

kvnkho commented Jul 29, 2022

Hi @meehawk , good to see you in the Fugue repo this time! Thanks for the interest. I still remember you from the Prefect repo. This issue is not taken and you can take a stab at it. Here is some initial guidance.

There is a WorkflowDataFrame class over here. So this is the DAG that Fugue creates to express a computation workflow. The DAG then pushes down the methods to the underlying execution engine (Pandas, Spark, or Dask). So in the same file, you will find a take() method. This calls the Take Processor. This is a generic abstraction for Pandas take, Spark take, and Dask take. So notice the last line calls the take of the underlying execution engine.

You can find the implementations of take attached to each execution engine.

  • Pandas -
    def take(
    self,
    df: DataFrame,
    n: int,
    presort: str,
    na_position: str = "last",
    partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
    metadata: Any = None,
    ) -> DataFrame:
    assert_or_throw(
    isinstance(n, int),
    ValueError("n needs to be an integer"),
    )
    d = df.as_pandas()
    # Use presort over partition_spec.presort if possible
    if presort:
    presort = parse_presort_exp(presort)
    _presort: IndexedOrderedDict = presort or partition_spec.presort
    if len(_presort.keys()) > 0:
    d = d.sort_values(
    list(_presort.keys()),
    ascending=list(_presort.values()),
    na_position=na_position,
    )
    if len(partition_spec.partition_by) == 0:
    d = d.head(n)
    else:
    d = d.groupby(by=partition_spec.partition_by, dropna=False).head(n)
    return PandasDataFrame(
    d.reset_index(drop=True), df.schema, metadata, pandas_df_wrapper=True
    )
  • Spark -
    def take(
    self,
    df: DataFrame,
    n: int,
    presort: str,
    na_position: str = "last",
    partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
    metadata: Any = None,
    ) -> DataFrame:
    assert_or_throw(
    isinstance(n, int),
    ValueError("n needs to be an integer"),
    )
    d = self.to_df(df).native
    nulls_last = bool(na_position == "last")
    if presort:
    presort = parse_presort_exp(presort)
    # Use presort over partition_spec.presort if possible
    _presort: IndexedOrderedDict = presort or partition_spec.presort
    def _presort_to_col(_col: str, _asc: bool) -> Any:
    if nulls_last:
    if _asc:
    return col(_col).asc_nulls_last()
    else:
    return col(_col).desc_nulls_last()
    else:
    if _asc:
    return col(_col).asc_nulls_first()
    else:
    return col(_col).desc_nulls_first()
    # If no partition
    if len(partition_spec.partition_by) == 0:
    if len(_presort.keys()) > 0:
    d = d.orderBy(
    [_presort_to_col(_col, _presort[_col]) for _col in _presort.keys()]
    )
    d = d.limit(n)
    # If partition exists
    else:
    w = Window.partitionBy([col(x) for x in partition_spec.partition_by])
    if len(_presort.keys()) > 0:
    w = w.orderBy(
    [_presort_to_col(_col, _presort[_col]) for _col in _presort.keys()]
    )
    else:
    # row_number() still needs an orderBy
    w = w.orderBy(lit(1))
    d = (
    d.select(col("*"), row_number().over(w).alias("__row_number__"))
    .filter(col("__row_number__") <= n)
    .drop("__row_number__")
    )
    return self.to_df(d, df.schema, metadata)
  • Dask -
    def take(
    self,
    df: DataFrame,
    n: int,
    presort: str,
    na_position: str = "last",
    partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
    metadata: Any = None,
    ) -> DataFrame:
    assert_or_throw(
    isinstance(n, int),
    ValueError("n needs to be an integer"),
    )
    d = self.to_df(df).native
    meta = [(d[x].name, d[x].dtype) for x in d.columns]
    if presort:
    presort = parse_presort_exp(presort)
    # Use presort over partition_spec.presort if possible
    _presort: IndexedOrderedDict = presort or partition_spec.presort
    def _partition_take(partition, n, presort):
    if len(presort.keys()) > 0:
    partition = partition.sort_values(
    list(presort.keys()),
    ascending=list(presort.values()),
    na_position=na_position,
    )
    return partition.head(n)
    if len(partition_spec.partition_by) == 0:
    if len(_presort.keys()) == 0:
    d = d.head(n)
    else:
    # Use the default partition
    d = (
    d.map_partitions(_partition_take, n, _presort, meta=meta)
    .reset_index(drop=True)
    .compute()
    )
    # compute() brings this to Pandas so we can use pandas
    d = d.sort_values(
    list(_presort.keys()),
    ascending=list(_presort.values()),
    na_position=na_position,
    ).head(n)
    else:
    d = (
    d.groupby(partition_spec.partition_by, dropna=False)
    .apply(_partition_take, n=n, presort=_presort, meta=meta)
    .reset_index(drop=True)
    )
    return DaskDataFrame(d, df.schema, metadata)

So this specific issue is an optimization for the Spark code. But to know the issue, you need to know the take behavior. Fugue has a unified test suite for all the backends so that users get the same behavior for whatever backend they choose. You can find the tests for take here and it also shows you how to run it. The engine there is the execution engine. For example, for Spark it's this class.

The important thing to know for take as you can see in the tests is that if no partition is specified, it just returns n rows. If a partition is specified, it returns n rows for EACH partition.

This issue proposes special handling to use the native Spark groupby().first() if n is 1 because this will avoid the need to sort each group. An example of this function can be found here. It is relatively newer (PySpark 3.1). This would avoid the sorting logic this take method currently uses. I think you also need to check if the partition has a presort, and if it's ascending use first, and if descending, use last. You can read more about Fugue's partitions here if you are interested, but the gist is you partition by key, and also specify a presort if wanted.

This may be a lot to digest, feel free to ping us in the Slack channel at slack.fugue.ai also and we can discuss this more.

@goodwanghan goodwanghan added this to the 0.9.0 milestone Jan 6, 2024
@goodwanghan goodwanghan linked a pull request Jan 7, 2024 that will close this issue
@goodwanghan goodwanghan changed the title [FEATURE] Spark Take 1 row without sorting optimization [FEATURE] Spark and Dask Take 1 row without sorting optimization Jan 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants