Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ def __hash__(self):
"pyspark.sql.avro.functions",
"pyspark.sql.pandas.conversion",
"pyspark.sql.pandas.map_ops",
"pyspark.sql.pandas.functions",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the tests in pyspark.sql.pandas.functions should be conditionally ran - It should skip the tests if pandas or PyArrow are not available. However, we have been skipping them always due to the lack of mechanism to conditionally run the doctests.

Now, the doctests at pyspark.sql.pandas.functions have type hints that are only for Python 3.5+. So, even if we skip all the tests like the previous way, it shows compilation error due to illegal syntax in Python 2. This is why I had to remove this from the module list to avoid compiling the doctests at all.

"pyspark.sql.pandas.group_ops",
"pyspark.sql.pandas.types",
"pyspark.sql.pandas.serializers",
Expand Down
233 changes: 158 additions & 75 deletions docs/sql-pyspark-pandas-with-arrow.md

Large diffs are not rendered by default.

258 changes: 135 additions & 123 deletions examples/src/main/python/sql/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@

from __future__ import print_function

import sys

from pyspark.sql import SparkSession
from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version

require_minimum_pandas_version()
require_minimum_pyarrow_version()

if sys.version_info < (3, 6):
raise Exception(
"Running this example file requires Python 3.6+; however, "
"your Python version was:\n %s" % sys.version)


def dataframe_with_arrow_example(spark):
# $example on:dataframe_with_arrow$
Expand All @@ -50,15 +57,45 @@ def dataframe_with_arrow_example(spark):
print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))


def scalar_pandas_udf_example(spark):
# $example on:scalar_pandas_udf$
def ser_to_frame_pandas_udf_example(spark):
# $example on:ser_to_frame_pandas_udf$
import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3

# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
# $example off:ser_to_frame_pandas_udf$$


def ser_to_ser_pandas_udf_example(spark):
# $example on:ser_to_ser_pandas_udf$
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())
Expand All @@ -83,135 +120,89 @@ def multiply_func(a, b):
# | 4|
# | 9|
# +-------------------+
# $example off:scalar_pandas_udf$
# $example off:ser_to_ser_pandas_udf$


def scalar_iter_pandas_udf_example(spark):
# $example on:scalar_iter_pandas_udf$
def iter_ser_to_iter_ser_pandas_udf_example(spark):
# $example on:iter_ser_to_iter_ser_pandas_udf$
from typing import Iterator

import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType
from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with a single column that is not StructType,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def plus_one(batch_iter):
for x in batch_iter:
# Declare the function and create the UDF
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in iterator:
yield x + 1

df.select(plus_one(col("x"))).show()
df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# $example off:iter_ser_to_iter_ser_pandas_udf$


def iter_sers_to_iter_ser_pandas_udf_example(spark):
# $example on:iter_sers_to_iter_ser_pandas_udf$
from typing import Iterator, Tuple

import pandas as pd

# When the UDF is called with more than one columns,
# the input to the underlying function is an iterator of pd.Series tuple.
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def multiply_two_cols(batch_iter):
for a, b in batch_iter:
from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b

df.select(multiply_two_cols(col("x"), col("x"))).show()
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
# $example off:iter_sers_to_iter_ser_pandas_udf$

# When the UDF is called with a single column that is StructType,
# the input to the underlying function is an iterator of pd.DataFrame.
@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def multiply_two_nested_cols(pdf_iter):
for pdf in pdf_iter:
yield pdf["a"] * pdf["b"]

df.select(
multiply_two_nested_cols(
struct(col("x").alias("a"), col("x").alias("b"))
).alias("y")
).show()
# +---+
# | y|
# +---+
# | 1|
# | 4|
# | 9|
# +---+

# In the UDF, you can initialize some states before processing batches.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this example. It seems too much to know, and the example itself doesn't look particularly useful.

# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long", PandasUDFType.SCALAR_ITER)
def plus_y(batch_iter):
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
# $example off:scalar_iter_pandas_udf$


def grouped_map_pandas_udf_example(spark):
# $example on:grouped_map_pandas_udf$
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
# $example off:grouped_map_pandas_udf$

def ser_to_scalar_pandas_udf_example(spark):
# $example on:ser_to_scalar_pandas_udf$
import pandas as pd

def grouped_agg_pandas_udf_example(spark):
# $example on:grouped_agg_pandas_udf$
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
Expand All @@ -233,37 +224,54 @@ def mean_udf(v):
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
# $example off:grouped_agg_pandas_udf$
# $example off:ser_to_scalar_pandas_udf$


def map_iter_pandas_udf_example(spark):
# $example on:map_iter_pandas_udf$
import pandas as pd
def grouped_apply_in_pandas_example(spark):
# $example on:grouped_apply_in_pandas$
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

from pyspark.sql.functions import pandas_udf, PandasUDFType
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
# $example off:grouped_apply_in_pandas$


def map_in_pandas_example(spark):
# $example on:map_in_pandas$
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

@pandas_udf(df.schema, PandasUDFType.MAP_ITER)
def filter_func(batch_iter):
for pdf in batch_iter:
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]

df.mapInPandas(filter_func).show()
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
# $example off:map_iter_pandas_udf$
# $example off:map_in_pandas$


def cogrouped_map_pandas_udf_example(spark):
# $example on:cogrouped_map_pandas_udf$
def cogrouped_apply_in_pandas_example(spark):
# $example on:cogrouped_apply_in_pandas$
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
Expand All @@ -272,11 +280,11 @@ def cogrouped_map_pandas_udf_example(spark):
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))

@pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
Expand All @@ -285,7 +293,7 @@ def asof_join(l, r):
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
# $example off:cogrouped_map_pandas_udf$
# $example off:cogrouped_apply_in_pandas$


if __name__ == "__main__":
Expand All @@ -296,17 +304,21 @@ def asof_join(l, r):

print("Running Pandas to/from conversion example")
dataframe_with_arrow_example(spark)
print("Running pandas_udf scalar example")
scalar_pandas_udf_example(spark)
print("Running pandas_udf scalar iterator example")
scalar_iter_pandas_udf_example(spark)
print("Running pandas_udf grouped map example")
grouped_map_pandas_udf_example(spark)
print("Running pandas_udf grouped agg example")
grouped_agg_pandas_udf_example(spark)
print("Running pandas_udf map iterator example")
map_iter_pandas_udf_example(spark)
print("Running pandas_udf cogrouped map example")
cogrouped_map_pandas_udf_example(spark)
print("Running pandas_udf example: Series to Frame")
ser_to_frame_pandas_udf_example(spark)
print("Running pandas_udf example: Series to Series")
ser_to_ser_pandas_udf_example(spark)
print("Running pandas_udf example: Iterator of Series to Iterator of Seires")
iter_ser_to_iter_ser_pandas_udf_example(spark)
print("Running pandas_udf example: Iterator of Multiple Series to Iterator of Series")
iter_sers_to_iter_ser_pandas_udf_example(spark)
print("Running pandas_udf example: Series to Scalar")
ser_to_scalar_pandas_udf_example(spark)
print("Running pandas function example: Grouped Map")
grouped_apply_in_pandas_example(spark)
print("Running pandas function example: Map")
map_in_pandas_example(spark)
print("Running pandas function example: Co-grouped Map")
cogrouped_apply_in_pandas_example(spark)

spark.stop()
Loading