Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDF using DataFrame.apply #65

Closed
AbdealiLoKo opened this issue Apr 9, 2019 · 4 comments · Fixed by #1259
Closed

UDF using DataFrame.apply #65

AbdealiLoKo opened this issue Apr 9, 2019 · 4 comments · Fixed by #1259
Labels
enhancement New feature or request

Comments

@AbdealiLoKo
Copy link
Contributor

These are the patterns that I have seen with the apply() function:

df = pd.DataFrame({'a': [1, 2, 3, 4, 5, 6, 7], 'b': [7, 6, 5, 4, 3, 2, 1]})

# Option 1: Use a series and apply on it to run a UDF (Technically a Series.apply)
df['c'] = df['a'].apply(lambda x: x*2)

# Option 2: Use the entire row from a dataframe
df['d'] = df.apply(lambda x: x.a*x.b, axis=1)

# Option 3: Use some columns in a row
df['e'] = df[['a', 'b']].apply(lambda x: x.a*x.b, axis=1)

# Option 4: Apply aggregates on every column to create a new dataframe
new_df = df.apply(lambda x: x.sum())

# Option 5: Apply aggregates but return multiple values - here it adjusts the shorted columns with NaNs
new_df = df.apply(lambda x: x[:x[0]])
@HyukjinKwon
Copy link
Member

There are some problems to use it directly.

  1. Current Scalar UDF doesn't support aggregation across whole data. If we do the same thing, it will aggregate each batch unit that's internally used (arrow batch).

  2. If we restrict it's 1 to 1 then we can consider transform instead. But we shouldn't forget we should check if the input & output length are same.

  3. When axis is different, it will need a different execution style to row by row, which will probably be like d397c1a way.

@HyukjinKwon
Copy link
Member

Actually, I think we could hack this via df.groupby(F.spark_partition_id()).apply(pands_udf). Let me take a look soon.

@chunyang
Copy link

Would groupby-apply introduce exchange/shuffle on every apply? If so, seems like that would be an expensive operation.

@HyukjinKwon
Copy link
Member

Yeah, we should avoid. With Spark 3.0, we can avoid a shuffle with mapInPandas API.

@ueshin ueshin added the enhancement New feature or request label Jan 27, 2020
HyukjinKwon added a commit that referenced this issue Feb 11, 2020
This PR proposes to implement `DataFrame.apply` with both `axis` 0 and 1. Note that, `DataFrame.apply(..., axis=1)` with global aggregations is impossible.

It can be tested with the examples below:

```python
import numpy as np
import databricks.koalas as ks

df = ks.DataFrame([[4, 9]] * 10, columns=['A', 'B'])

df.apply(np.sqrt, axis=0)

def sqrt(x) -> ks.Series[float]:
    return np.sqrt(x)
df.apply(sqrt, axis=0)


df.apply(np.sum, axis=1)

def summation(x) -> int:
   return np.sum(x)

df.apply(summation, axis=1)
```

Basically the approach is using group map Pandas UDF by grouping by partitions.

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def func(pdf):
    return pdf.apply(...)

df.groupby(F.spark_partition_id()).apply(func).show()
```

Resolves #1228
Resolves #65
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants