In [0]:
from pyspark.sql import Window
from pyspark.sql import functions as fn

In [0]:
data = [
  {
    'group': 'A',
    'values' : 1
  },
  {
    'group': 'A',
    'values' : 2
  },
  {
    'group': 'B',
    'values' : 3
  },
  {
    'group': 'B',
    'values' : 4
  },
]

In [0]:
df = spark.createDataFrame(data)

In [0]:
display(df)

group,values
A,1
A,2
B,3
B,4


# groupBy and join

In [0]:
df_grouped = df.groupBy('group'
).agg(
  fn.mean('values').alias('average'),
  fn.sum('values').alias('total'),
)

# note that if you do not alias df, you will be 
# unable to resolve the two resulting 'group' columns
df_joined_groupings = df.alias('df').join(
  df_grouped,
  on = df.group == df_grouped.group,
  how = 'inner'
).select(
  fn.col('df.group'),
  'values',
  'average',
  'total'
)

display(df_joined_groupings)

group,values,average,total
A,1,1.5,3
A,2,1.5,3
B,3,3.5,7
B,4,3.5,7


In [0]:
#examine the plan for this method
df.groupBy('group'
).agg(
  fn.mean('values').alias('average'),
  fn.sum('values').alias('total'),
).alias('df_grouped').join(
  df.alias('df'),
  on = df.group == df_grouped.group,
  how = 'inner'
).select(
  fn.col('df.group'),
  'values',
  'average',
  'total'
).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [group#778, values#779L, average#772, total#774L]
   +- SortMergeJoin [group#701], [group#778], Inner
      :- Sort [group#701 ASC NULLS FIRST], false, 0
      :  +- HashAggregate(keys=[group#701], functions=[finalmerge_avg(merge sum#796, count#797L) AS avg(values#702L)#771, finalmerge_sum(merge sum#799L) AS sum(values#702L)#773L])
      :     +- Exchange hashpartitioning(group#701, 200), ENSURE_REQUIREMENTS, [plan_id=2062]
      :        +- HashAggregate(keys=[group#701], functions=[partial_avg(values#702L) AS (sum#796, count#797L), partial_sum(values#702L) AS sum#799L])
      :           +- Filter isnotnull(group#701)
      :              +- Scan ExistingRDD[group#701,values#702L]
      +- Sort [group#778 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(group#778, 200), ENSURE_REQUIREMENTS, [plan_id=2066]
            +- Filter isnotnull(group#778)
               +- Scan ExistingRDD[group#778,values#779

# Window function

In [0]:
window = Window().partitionBy('group')

df_with_groupings = df.withColumn(
  'average', fn.mean('values').over(window)
).withColumn(
  'total', fn.sum('values').over(window)
)

display(df_with_groupings)

group,values,average,total
A,1,1.5,3
A,2,1.5,3
B,3,3.5,7
B,4,3.5,7


In [0]:
#examine the plan for this method
df.withColumn(
  'average', fn.mean('values').over(window)
).withColumn(
  'total', fn.sum('values').over(window)
).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [group#701, values#702L, avg(values#702L) windowspecdefinition(group#701, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS average#844, sum(values#702L) windowspecdefinition(group#701, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total#849L], [group#701]
   +- Sort [group#701 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(group#701, 200), ENSURE_REQUIREMENTS, [plan_id=2223]
         +- Scan ExistingRDD[group#701,values#702L]




# Two methods to calculate global averages

In [0]:
# here we will ignore the group column
df_grouped = df.groupBy(
).agg(
  fn.mean('values').alias('average'),
  fn.sum('values').alias('total'),
)

df_joined_groupings = df.join(
  df_grouped,
  # note our on clause always equates True
  on = df.values == df.values,
  how = 'inner'
).drop('group')

display(df_joined_groupings)

values,average,total
1,2.5,10
2,2.5,10
3,2.5,10
4,2.5,10


In [0]:
# empty partition aggregates across all rows
window = Window().partitionBy()

df_with_groupings = df.withColumn(
  'average', fn.mean('values').over(window)
).withColumn(
  'total', fn.sum('values').over(window)
).drop('group')

display(df_with_groupings)

values,average,total
1,2.5,10
2,2.5,10
3,2.5,10
4,2.5,10
