# Data aggregation

## Key-value RDD

Before we continue with the hands-on part let us consider the schematic representation of the word-count using `groupByKey` and `reductByKey` approach. This is a classic example that helps underdstand the shuffle stage better and the effect of map-side combiner:

<img src="./debug/groupbykey.png">

<img src="./debug/reducebykey.png">

### Example: Calculating mean

Let us now continue with a simple toy example of calculating a mean. We will consider a dataset of baby names (a small fraction of it), and will try to calculate an average age for each name.

To calculate mean in a distributed setting we can keep track of sum of elements and the number of elements in each partition, and then combine those.

This is very easy to achieve with `aggregateByKey` action. The `aggregateByKey` takes following 3 arguments:
  1. zero for aggregation buffer within a partition. Since we are calculating sum of elements in each partition and the number of elements our zero is a following tuple: (0,0)
  1. `increment` function performing summation and count within each partition
  1. `combine` function to combine results from all partitions

In [1]:
//import scala.collection.mutable.ListBuffer

var babyNames = sc.parallelize(List(("David", 6), ("Abby", 4), ("David", 5), ("Abby", 5)))

var zero = (0,0)

//takes buffer and current value in the RDD
def increment(buffer: Tuple2[Int,Int], value: Int) : Tuple2[Int,Int] = {
    (buffer._1 + value, buffer._2 + 1)
}
//takes buffers from various partitions after work is done in each 
def combine(buffer1: Tuple2[Int,Int], buffer2: Tuple2[Int,Int]) : Tuple2[Int,Int] = {
    (buffer1._1 + buffer2._1, buffer1._2 + buffer2._2)
}

In [2]:
val after_aggregation = babyNames.aggregateByKey(zero)(increment,combine)
for (d <- after_aggregation.take(2)) {
    println(d)
}

(Abby,(9,2))
(David,(11,2))


In [3]:
val result = after_aggregation.mapValues(sum_and_count => 1.0 * sum_and_count._1 / sum_and_count._2)
for (d <- result.collect()) {
    println(d)
}

(Abby,4.5)
(David,5.5)


## Dataframe groupBy aggregations

If you are working with dataframes, there is no need to convert to `RDD` or `PairRDD` to perform some aggregation operation. 


Spark SQL has many powerful aggregates, and thanks to its optimizer it can be easy to combine many aggregates into one single action/query. Like with Pandas dataframes, `groupBy` returns a special `GroupedData` object on which we can ask for a certain aggregation to be performed. 

`min`, `max`, `avg`, and `sum` are all implemented as convenience functions directly on `GroupedData`, and more can be specified by providing the expressions to `agg`. Once you specify the aggregates you want to compute, you can get the results back as a DataFrame.


In [4]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Aggregation").getOrCreate()
import spark.implicits._

val babyNamesDF = babyNames.toDF("Name","Age")
babyNamesDF.show()

+-----+---+
| Name|Age|
+-----+---+
|David|  6|
| Abby|  4|
|David|  5|
| Abby|  5|
+-----+---+



In [5]:
val result = babyNamesDF.groupBy("Name").avg("Age")
result.show()

+-----+--------+
| Name|avg(Age)|
+-----+--------+
| Abby|     4.5|
|David|     5.5|
+-----+--------+



In [6]:
babyNamesDF.describe().show()

+-------+-----+-----------------+
|summary| Name|              Age|
+-------+-----+-----------------+
|  count|    4|                4|
|   mean| null|              5.0|
| stddev| null|0.816496580927726|
|    min| Abby|                4|
|    max|David|                6|
+-------+-----+-----------------+



## Exercise

Recalculate average age of children using the dataframe `agg` API
Hint: you would need to import
```scala
import org.apache.spark.sql.functions.avg
```


## User-defined aggregation functions on Dataframes (UDAF)

We have seen that there is a number of basic aggregation functions available in Spark to work on grouped data.
What if we need to solve an advanced proble, i.e. if we work with Spark ML and dealing with some columns containing vectors?

The solution would be to use `UDAF`, which allows you to derive a class from `UserDefinedAggregateFunction`, and overload a set of methods to achieve desired performance. Following is an example UDAF to calculate (again) a mean of a collection of elements:

```scala
class MyUDAF() extends UserDefinedAggregateFunction {
 
  // Input Data Type Schema
  def inputSchema: StructType = StructType(Array(StructField("item", DoubleType)))
 
  // Intermediate Schema
  def bufferSchema = StructType(Array(
    StructField("sum", DoubleType),
    StructField("cnt", LongType)
  ))
 
  // Returned Data Type .
  def dataType: DataType = DoubleType
 
  // Self-explaining
  def deterministic = true
 
  // This function is called whenever key changes
  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = 0.toDouble // set sum to zero
    buffer(1) = 0L // set number of items to 0
  }
 
  // Iterate over each entry of a group
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) = buffer.getDouble(0) + input.getDouble(0)
    buffer(1) = buffer.getLong(1) + 1
  }
 
  // Merge two partial aggregates
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
 
  // Called after all the entries are exhausted.
  def evaluate(buffer: Row) = {
    buffer.getDouble(0)/buffer.getLong(1).toDouble
  }
 
}
```

More advanced example of UDAF will be in a cluster portion of the exercise.

## Dataset Aggregator class

Similarly to UDAF for dataframes, one can define advanced aggregators for dataset API.

Aggregators provide a mechanism for adding up all of the elements in a DataSet (or in each group of a GroupedDataset), returning a single result. An Aggregator is similar to a UDAF, but the interface is expressed in terms of JVM objects instead of as a Row. Any class that extends `Aggregator[A, B, C]` can be used, where:
  1. A - specifies the input type to the aggregator
  1. B - specifies the intermediate type durring aggregation
  1. C - specifies the final type output by the aggregation
  

Below is a simple Aggregator that calclulates an average:

```scala
case class AggData(name: String, age: Int)

class Average extends Aggregator[AggData, Tuple2[Int,Int], Double] with Serializable {
  def zero: Tuple2[Int,Int] = (0,0)                                                 // The initial value for aggregation within each partitions.
  def reduce(buffer: Tuple2[Int,Int], value: AggData) = (b._1 + a.age, b._2 + 1)    // Add an element to the running total, this is what we called `increment` method before
  def merge(buffer1: Tuple2[Int,Int], buffer2: Tuple2[Int,Int]) = b1 + b2            // Merge intermediate values, this is what we called `combine` method.
  def finish(buffer: Tuple2[Int,Int]) = {  
      buffer._1/buffer._2                // Return the final result. Can do various map/filter transformations on this step as well!
  }
}.toColumn
```            