# Distributed Cumulative Sum

### Getting `spark` up and running

In [None]:
classpath.add(
  "org.apache.spark" %% "spark-core" % "2.0.2"
);

In [2]:
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}

[32mimport [36morg.apache.spark.sql.{SparkSession, DataFrame, Dataset}[0m

In [None]:
val spark = SparkSession.builder().master("local[*]").getOrCreate()

In [49]:
// Let's assume we have some sorted data that we want to calculate the cumulative sum for
val data = Seq(1, 2, 3, 4, 5)

// Here's the expected cumulative sum
val expected = Seq(1, 3, 6, 10, 15)

// If this was a local Iterator we could just use scanLeft
val local = data.scanLeft(0)(_ + _).drop(1)

// But what if it's distributed?
val rdd = spark.sparkContext.parallelize(data)

[36mdata[0m: [32mSeq[0m[[32mInt[0m] = [33mList[0m([32m1[0m, [32m2[0m, [32m3[0m, [32m4[0m, [32m5[0m)
[36mexpected[0m: [32mSeq[0m[[32mInt[0m] = [33mList[0m([32m1[0m, [32m3[0m, [32m6[0m, [32m10[0m, [32m15[0m)
[36mlocal[0m: [32mSeq[0m[[32mInt[0m] = [33mList[0m([32m1[0m, [32m3[0m, [32m6[0m, [32m10[0m, [32m15[0m)
[36mrdd[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mrdd[0m.[32mRDD[0m[[32mInt[0m] = ParallelCollectionRDD[39] at parallelize at Main.scala:36

### Method 1

In [47]:
// Calculate the sum per partition
val x = rdd.mapPartitionsWithIndex { (index, partition) =>
    Iterator((index, partition.sum))
}.collect().toMap

[36mx[0m: [32mMap[0m[[32mInt[0m, [32mInt[0m] = [33mMap[0m([32m0[0m -> [32m0[0m, [32m5[0m -> [32m0[0m, [32m1[0m -> [32m1[0m, [32m6[0m -> [32m4[0m, [32m2[0m -> [32m0[0m, [32m7[0m -> [32m5[0m, [32m3[0m -> [32m2[0m, [32m4[0m -> [32m3[0m)

In [48]:
rdd.mapPartitionsWithIndex { (index, partition) =>
    // For each partition calculate the sum of all the previous partitions
    val sums = (0 until index).map(x).sum
    
    // Scan left starting with the cumulative sum for all previous partitions
    partition.scanLeft(sums)(_ + _).drop(1)
}.collect()

[36mres47[0m: [32mArray[0m[[32mInt[0m] = [33mArray[0m([32m1[0m, [32m3[0m, [32m6[0m, [32m10[0m, [32m15[0m)

### Method 2

In [45]:
// Calculate the cumulative sum at each partition index once
val x = rdd.mapPartitionsWithIndex { (index, partition) =>
    Iterator((index, partition.sum))
}.collect().scanLeft((0, 0))((a, b) => (b._1, a._2 + b._2)).toMap

[36mx[0m: [32mMap[0m[[32mInt[0m, [32mInt[0m] = [33mMap[0m([32m0[0m -> [32m0[0m, [32m5[0m -> [32m6[0m, [32m1[0m -> [32m1[0m, [32m6[0m -> [32m10[0m, [32m2[0m -> [32m1[0m, [32m7[0m -> [32m15[0m, [32m3[0m -> [32m3[0m, [32m4[0m -> [32m6[0m)

In [46]:
rdd.mapPartitionsWithIndex { (index, partition) =>
    partition.scanLeft(x.getOrElse(index - 1, 0) )(_ + _).drop(1)
}.collect()

[36mres45[0m: [32mArray[0m[[32mInt[0m] = [33mArray[0m([32m1[0m, [32m3[0m, [32m6[0m, [32m10[0m, [32m15[0m)

Thanks to [jmorra](https://github.com/jmorra) for teaching me this.