From 51ce6e85953bd39e901fec24dfca45b86f55f939 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 2 Jan 2018 13:45:34 -0500 Subject: [PATCH 1/4] wip --- python/pyspark/sql/tests.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 67bdb3d72d93b..dfafcb3677034 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4013,6 +4013,23 @@ def normalize(pdf): expected = expected.assign(norm=expected.norm.astype('float64')) self.assertFramesEqual(expected, result) + def test_groupkey(self): + import pandas as pd + from pyspark.sql.functions import pandas_udf, PandasUDFType + df = self.data + pdf = df.toPandas() + + @pandas_udf('v double, v_norm double', PandasUDFType.GROUP_MAP) + def normalize(pdf): + v = pdf['v'] + return pd.DataFrame({'v': v, 'v_norm': v - v.mean()}) + + result = df.groupby('id').apply(normalize).toPandas() + expected = pdf.groupby('id').apply(normalize.func) + + print(result) + print(expected) + def test_empty_groupby(self): from pyspark.sql.functions import pandas_udf, col, PandasUDFType df = self.data From 07f921139e250bd62e79da8475d8d615045d636a Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 9 Jan 2018 15:08:15 -0500 Subject: [PATCH 2/4] Test working; Need to add docs --- python/pyspark/sql/tests.py | 60 +++++++------ .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../logical/pythonLogicalOperators.scala | 1 + .../spark/sql/RelationalGroupedDataset.scala | 20 ++++- .../spark/sql/execution/SparkStrategies.scala | 5 +- .../python/FlatMapGroupsInPandasExec.scala | 84 +++++++++++++++---- 6 files changed, 125 insertions(+), 48 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index dfafcb3677034..6942213d2bf56 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3995,40 +3995,46 @@ def test_coerce(self): self.assertFramesEqual(expected, result) def test_complex_groupby(self): + import pandas as pd from pyspark.sql.functions import pandas_udf, col, PandasUDFType df = self.data + pdf = df.toPandas() @pandas_udf( - 'id long, v int, norm double', + 'v int, v2 double', PandasUDFType.GROUP_MAP ) - def normalize(pdf): + def foo(pdf): v = pdf.v - return pdf.assign(norm=(v - v.mean()) / v.std()) - - result = df.groupby(col('id') % 2 == 0).apply(normalize).sort('id', 'v').toPandas() - pdf = df.toPandas() - expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func) - expected = expected.sort_values(['id', 'v']).reset_index(drop=True) - expected = expected.assign(norm=expected.norm.astype('float64')) - self.assertFramesEqual(expected, result) - - def test_groupkey(self): - import pandas as pd - from pyspark.sql.functions import pandas_udf, PandasUDFType - df = self.data - pdf = df.toPandas() - - @pandas_udf('v double, v_norm double', PandasUDFType.GROUP_MAP) - def normalize(pdf): - v = pdf['v'] - return pd.DataFrame({'v': v, 'v_norm': v - v.mean()}) - - result = df.groupby('id').apply(normalize).toPandas() - expected = pdf.groupby('id').apply(normalize.func) - - print(result) - print(expected) + return pd.DataFrame({'v': v + 1, 'v2': v - v.mean()})[:] + + # Use expression in groupby. The grouping expression should be prepended to the result. + result1 = df.groupby(col('id') % 2 == 0).apply(foo).sort('((id % 2) = 0)', 'v').toPandas() + expected1 = pdf.groupby(pdf['id'] % 2 == 0).apply(foo.func) + expected1.index.names = ['((id % 2) = 0)', None] + expected1 = expected1.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v']).reset_index(drop=True) + + # Grouping column is not returned by the udf. The grouping column should be prepended. + result2 = df.groupby('id').apply(foo).sort('id', 'v').toPandas() + expected2 = pdf.groupby('id').apply(foo.func).reset_index(level=0) \ + .sort_values(['id', 'v']) + + # Only one of the grouping column is returned by the udf. In this case, the grouping column + # that is not returned by the udf should be prepended. + result3 = df.groupby('id', 'v').apply(foo).sort('id', 'v').toPandas() + expected3 = pdf.groupby(['id', 'v']).apply(foo.func).reset_index(level=0) \ + .reset_index(drop=True).sort_values(['id', 'v']) + + # Mix expression and column + result4 = df.groupby(col('id') % 2 == 0, 'v').apply(foo).sort('((id % 2) = 0)', 'v').toPandas() + expected4 = pdf.groupby([pdf['id'] % 2 == 0, 'v']).apply(foo.func) + expected4.index.names = ['((id % 2) = 0)', 'v', None] + expected4 = expected4.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v']).reset_index(drop=True) + + self.assertFramesEqual(expected1, result1) + self.assertFramesEqual(expected2, result2) + self.assertFramesEqual(expected3, result3) + self.assertFramesEqual(expected4, result4) def test_empty_groupby(self): from pyspark.sql.functions import pandas_udf, col, PandasUDFType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0d4b02c6e7d8a..afed2565fe4e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -452,7 +452,8 @@ object ColumnPruning extends Rule[LogicalPlan] { // Prunes the unused columns from child of Aggregate/Expand/Generate case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => a.copy(child = prunedChild(child, a.references)) - case f @ FlatMapGroupsInPandas(_, _, _, child) if (child.outputSet -- f.references).nonEmpty => + case f @ FlatMapGroupsInPandas(_, _, _, _, child) + if (child.outputSet -- f.references).nonEmpty => f.copy(child = prunedChild(child, f.references)) case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty => e.copy(child = prunedChild(child, e.references)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 254687ec00880..2469b3be8d4f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expre */ case class FlatMapGroupsInPandas( groupingAttributes: Seq[Attribute], + additionalGroupingAttributes: Seq[Attribute], functionExpr: Expression, output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index a009c00b0abc5..effc052f8a996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.language.implicitConversions import org.apache.spark.annotation.InterfaceStability @@ -457,13 +458,26 @@ class RelationalGroupedDataset protected[sql]( val groupingNamedExpressions = groupingExprs.map { case ne: NamedExpression => ne - case other => Alias(other, other.toString)() + case other => Alias(other, toPrettySQL(other))() } val groupingAttributes = groupingNamedExpressions.map(_.toAttribute) val child = df.logicalPlan val project = Project(groupingNamedExpressions ++ child.output, child) - val output = expr.dataType.asInstanceOf[StructType].toAttributes - val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project) + val udfOutput: Seq[Attribute] = expr.dataType.asInstanceOf[StructType].toAttributes + val additionalGroupingAttributes = mutable.ArrayBuffer[Attribute]() + + for (attribute <- groupingAttributes) { + if (!udfOutput.map(_.name).contains(attribute.name)) { + additionalGroupingAttributes += attribute + } + } + + val output = additionalGroupingAttributes ++ udfOutput + + val plan = FlatMapGroupsInPandas( + groupingAttributes, + additionalGroupingAttributes, + expr, output, project) Dataset.ofRows(df.sparkSession, plan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 910294853c318..71c0de509a538 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -452,8 +452,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) => execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping, data, objAttr, planLater(child)) :: Nil - case logical.FlatMapGroupsInPandas(grouping, func, output, child) => - execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil + case logical.FlatMapGroupsInPandas(grouping, additionalGrouping, func, output, child) => + execution.python.FlatMapGroupsInPandasExec( + grouping, additionalGrouping, func, output, planLater(child)) :: Nil case logical.MapElements(f, _, _, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, _, _, in, out, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index 59db66bd7adf1..a83d1a56ee63f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.python +import java.io.File + import scala.collection.JavaConverters._ -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -27,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] @@ -47,6 +50,7 @@ import org.apache.spark.sql.types.StructType */ case class FlatMapGroupsInPandasExec( groupingAttributes: Seq[Attribute], + additionalGroupingAttributes: Seq[Attribute], func: Expression, output: Seq[Attribute], child: SparkPlan) @@ -80,27 +84,77 @@ case class FlatMapGroupsInPandasExec( val sessionLocalTimeZone = conf.sessionLocalTimeZone val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone - inputRDD.mapPartitionsInternal { iter => - val grouped = if (groupingAttributes.isEmpty) { - Iterator(iter) - } else { + if (additionalGroupingAttributes.isEmpty) { + // Fast path if additional grouping attributes is empty + + inputRDD.mapPartitionsInternal { iter => + val grouped = if (groupingAttributes.isEmpty) { + Iterator(iter) + } else { + val groupedIter = GroupedIterator(iter, groupingAttributes, child.output) + val dropGrouping = + UnsafeProjection.create(child.output.drop(groupingAttributes.length), child.output) + groupedIter.map { + case (_, groupedRowIter) => groupedRowIter.map(dropGrouping) + } + } + + val context = TaskContext.get() + + val columnarBatchIter = new ArrowPythonRunner( + chainedFunc, bufferSize, reuseWorker, + PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema, + sessionLocalTimeZone, pandasRespectSessionTimeZone) + .compute(grouped, context.partitionId(), context) + + columnarBatchIter + .flatMap(_.rowIterator.asScala) + .map(UnsafeProjection.create(output, output)) + } + } else { + // If additionGroupingAttributes is not empty, join the grouping attributes with + // the udf output to get the final result + + inputRDD.mapPartitionsInternal { iter => + assert(groupingAttributes.nonEmpty) + val groupedIter = GroupedIterator(iter, groupingAttributes, child.output) + + val context = TaskContext.get() + + val queue = HybridRowQueue(context.taskMemoryManager(), + new File(Utils.getLocalDir(SparkEnv.get.conf)), additionalGroupingAttributes.length) + context.addTaskCompletionListener { _ => + queue.close() + } + val additionalGroupingProj = UnsafeProjection.create( + additionalGroupingAttributes, groupingAttributes) val dropGrouping = UnsafeProjection.create(child.output.drop(groupingAttributes.length), child.output) - groupedIter.map { - case (_, groupedRowIter) => groupedRowIter.map(dropGrouping) + val grouped = groupedIter.map { + case (k, groupedRowIter) => + val additionalGrouping = additionalGroupingProj(k) + queue.add(additionalGrouping) + (additionalGrouping, groupedRowIter.map(dropGrouping)) } - } - val context = TaskContext.get() + val columnarBatchIter = new ArrowPythonRunner( + chainedFunc, bufferSize, reuseWorker, + PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema, + sessionLocalTimeZone, pandasRespectSessionTimeZone) + .compute(grouped.map(_._2), context.partitionId(), context) - val columnarBatchIter = new ArrowPythonRunner( - chainedFunc, bufferSize, reuseWorker, - PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema, - sessionLocalTimeZone, pandasRespectSessionTimeZone) - .compute(grouped, context.partitionId(), context) + val joinedRow = new JoinedRow + val outputProj = UnsafeProjection.create(output, output) - columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) + columnarBatchIter + .flatMap{ batchIter => + val additionalGrouping = queue.remove() + batchIter.rowIterator().asScala.map { row => + outputProj(joinedRow(additionalGrouping, row)) + } + } + } } } } From f2822b529293e37f63a4a190b25dbdd018e36ba6 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 9 Jan 2018 15:55:03 -0500 Subject: [PATCH 3/4] Add simple doc --- python/pyspark/sql/group.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 09fae46adf014..bfa696436b4ea 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -233,6 +233,27 @@ def apply(self, udf): | 2| 1.1094003924504583| +---+-------------------+ + Notes on grouping column: + + Depending on whether the UDF returns grouping columns as part of its return type, this + function may or may not prepend grouping columns to the result. This is explained as + following: + + 1. UDF returns all grouping columns: + + This function will not prepend any grouping columns to the result. + + 2. UDF returns some grouping columns: + + This function will prepend grouping columns that are not returned by the UDF. + + 3. UDF returns no grouping columns: + + This function will prepend all grouping columns. + + In all cases, if the grouping column and the UDF output conflict, the value in the UDF + output will override the origin value of the grouping column. + .. seealso:: :meth:`pyspark.sql.functions.pandas_udf` """ From 46dc9e18f36dc14915e87ba206dd0614d0618dad Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 9 Jan 2018 16:10:26 -0500 Subject: [PATCH 4/4] fix python style --- python/pyspark/sql/tests.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6942213d2bf56..c67b927f9c705 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4011,8 +4011,9 @@ def foo(pdf): # Use expression in groupby. The grouping expression should be prepended to the result. result1 = df.groupby(col('id') % 2 == 0).apply(foo).sort('((id % 2) = 0)', 'v').toPandas() expected1 = pdf.groupby(pdf['id'] % 2 == 0).apply(foo.func) - expected1.index.names = ['((id % 2) = 0)', None] - expected1 = expected1.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v']).reset_index(drop=True) + expected1.index.names = ['((id % 2) = 0)', None] + expected1 = expected1.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v'])\ + .reset_index(drop=True) # Grouping column is not returned by the udf. The grouping column should be prepended. result2 = df.groupby('id').apply(foo).sort('id', 'v').toPandas() @@ -4026,10 +4027,12 @@ def foo(pdf): .reset_index(drop=True).sort_values(['id', 'v']) # Mix expression and column - result4 = df.groupby(col('id') % 2 == 0, 'v').apply(foo).sort('((id % 2) = 0)', 'v').toPandas() + result4 = df.groupby(col('id') % 2 == 0, 'v').apply(foo).sort('((id % 2) = 0)', 'v')\ + .toPandas() expected4 = pdf.groupby([pdf['id'] % 2 == 0, 'v']).apply(foo.func) expected4.index.names = ['((id % 2) = 0)', 'v', None] - expected4 = expected4.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v']).reset_index(drop=True) + expected4 = expected4.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v'])\ + .reset_index(drop=True) self.assertFramesEqual(expected1, result1) self.assertFramesEqual(expected2, result2)